Movement model for Among Us with epistemic logic

To run interactively, see Binder

The design is explained in movement.pdf.

import networkx as nx
import itertools
from copy import copy, deepcopy
import matplotlib.pyplot as plt
from IPython.display import Image, display, Latex
class KripkeModel:
    def __init__(self, history, state_graph, point):
        self.history = history
        self.G = state_graph
        self.point = point
    def relabel_nodes_to_int(self):
        """ Relabel nodes to integers
        Execution of action model turns node names into (s,j) pairs. This method
        relabels them to integers
        relabel_map = {n : i for i,n in enumerate(self.G.nodes)}
        self.point = relabel_map[self.point]
        nx.relabel_nodes(self.G, relabel_map, copy=False)
    def remove_unreachable(self):
        """ Remove states that are not reachable from the pointed state
    def list_reachable(self, agent = None):
        for n in self.G.neighbors(self.point):
            if agent is None or agent in self.G.edges[self.point, n]['agents']:
                yield n
    def get_edges(self):
        for n in self.list_reachable():
            yield (n, self.G.edges[self.point, n]['agents'])
    def valuation(self, node = None):
        if node is None:
            node = self.point
        return self.G.nodes[node]['valuation']
    def accumulate_agent_relations(self, block_B, block_C):
        block_B, block_C: iterator of node
        agents = set()
        # Get the edge boundary
        edges = list(nx.algorithms.boundary.edge_boundary(self.G, block_B, block_C))

        for s,t in edges:
            agents = agents | self.G.edges[s,t]['agents']
        return {'agents' : agents}
class ActionModel:
    def __init__(self, action_name, state_graph, point):
        self.action_name = action_name
        self.G = state_graph
        self.point = point
def execute_action(km, am, remove_unreachable = True, simplify_bisim = True):
    """ Make model resulting from execution of action model
    km : KripkeModel
        Pointed Kripke model of original state
    am : ActionModel
        Action to be executed in that state
        The new Kripke model after executing the action
    new_point = (km.point, am.point)
    # Strong product will have Cartesian product of nodes as nodes, and 
    # edges where both input graphs have edges
    product_graph = nx.Graph()
    for u in km.G.nodes:
        val = km.G.nodes[u]['valuation']
        for v in am.G.nodes:

            pre_true, pre_false = am.G.nodes[v]['pre']
            post_true, post_false = am.G.nodes[v]['post']
            if not ((pre_true - val) or (pre_false & val)):
                product_graph.add_node((u, v), valuation = (val | post_true) - post_false)
    prod_edges = []
    for (u,v) in product_graph.nodes:
        for x in km.G.neighbors(u):
            for y in am.G.neighbors(v):
                if (x,y) in product_graph.nodes:
                    agents = set.intersection(
                    if agents:
                        prod_edges.append(((u,v), (x,y), {'agents' : agents}))

    result = KripkeModel(
        km.history + [am.action_name], 
    if not new_point in result.G.nodes:
        raise ValueError("Preconditions of action model point not satisfied")
    if remove_unreachable:
    if simplify_bisim:
        result = kripke_merge_equivalent(result)
    return result
class MovementModel:
    def __init__(self, rooms, agent_locations):
        """ Initialize movement model for the given rooms and agents
        rooms : nerworkx.Graph
            Graph with nodes named by an int
        agent_locations : dict of str to int
            Names of the agents and their locations

        self.rooms = rooms
        self.rooms.add_edges_from([(n,n) for n in self.rooms.nodes])
        self.agent_locations = agent_locations
        self.room_names = set(rooms.nodes)
        self.agent_names = set(agent_locations.keys())
        self.consistent_vals = self.list_allowed_valuations()
        self.num_vals = len(self.consistent_vals)
        = self.make_start_km()
        self.observe_am = self.make_observe_model()
    def make_start_km(self):
        """ Construct initial Kripke model
        point_val = self.get_valuation()
        state_graph = nx.complete_graph(self.num_vals)
        for i,val in enumerate(self.consistent_vals):
            state_graph.nodes[i]['valuation'] = val
        for u,v in state_graph.edges:
            state_graph.edges[u,v]['agents'] = set(self.agent_names)
        # Find start state
        return KripkeModel(["Initialize"], state_graph, self.get_val_idx())
    def make_observe_model(self):
        """ Construct action model for observation
        observations = {i: dict() for i in range(self.num_vals)}
        for i, val in enumerate(self.consistent_vals):
            room_obs = {rn: set() for rn in self.room_names}
            agent_room = dict()
            for (an, rn) in val:
                room_obs[rn].add((an, rn))
                agent_room[an] = rn
            for an in self.agent_names:
                observations[i][an] = room_obs[agent_room[an]]
        all_atoms = self.all_atoms()
        # make graph
        state_graph = nx.Graph()
        for n in range(self.num_vals):
            # Precondition is that all atoms in the valuation must be true, and
            # all other atoms must be false.
                pre = (self.consistent_vals[n], all_atoms - self.consistent_vals[n]),
                post = (set(), set())
        for u, v in itertools.product(range(self.num_vals), range(self.num_vals)):
            # Check for which agents these states are the same
            rel = set(an for an in self.agent_names if observations[u][an] == observations[v][an])
            if rel:
                state_graph.add_edge(u, v, agents = rel)
        return ActionModel("Observe", state_graph, self.get_val_idx())
    def observe(self):
        """ Execute the "observe" action, letting all agents observe the room they are in
        # We can reuse the observe action model because only the point changes
        self.observe_am.point = self.get_val_idx() = execute_action(, self.observe_am)
    def list_allowed_valuations(self):
        """ List all allowed valuations
        agent_atoms = {
            a: [(a, r) for r in self.room_names]
            for a in self.agent_names
        valuations = [set(atoms) for atoms in itertools.product(*agent_atoms.values())]
        return valuations
    def get_valuation(self):
        """ Get valuation of the current state
        return set(self.agent_locations.items())
    def get_val_idx(self, valuation = None):
        """ Get index of a valuation in the point val list
        if valuation is None:
            valuation = self.get_valuation()
        for i, val in enumerate(self.consistent_vals):
            if val == valuation:
                return i
        raise ValueError("Inconsistent valuation!")
    def all_atoms(self):
        """ Get a set of all atoms
        return set(itertools.product(self.agent_names, self.room_names))
    def make_agent_move_action(self, agent, new_room):
        """ Make movement action model of a single agent moving to another room
        old_room = self.agent_locations[agent]
        if not (self.agent_locations[agent], new_room) in self.rooms.edges:
            raise ValueError("Illegal move: {} -> {}".format(old_room, new_room))

        def move_conditions(agent, old_room, new_room):
            pre = (set([(agent, old_room)]), set())
            if old_room != new_room:
                post = (set([(agent, new_room)]), set([(agent, old_room)]))
                post = (set(), set())
            return {'pre' : pre, 'post' : post}
        state_graph = nx.Graph()
        for r1, r2 in self.rooms.edges:
                (r1, r2), 
                **move_conditions(agent, r1, r2)
                (r2, r1), 
                **move_conditions(agent, r2, r1)
        # Edges connect all states for all agens except the one making the move
        agent_dict = {'agents' : set(self.agent_names) - set(agent)}
        edges = [(i, j, agent_dict) for i,j in itertools.product(state_graph.nodes, state_graph.nodes)]
        # Add reflexive edges for all agents
        state_graph.add_edges_from([(n, n, {'agents' : self.agent_names}) for n in state_graph.nodes])

        am = ActionModel(
            "Move ({}: {} -> {})".format(agent, old_room, new_room),
            (old_room, new_room)
        return am
    def move_agents(self, new_locations):
        """ Execute a series of moves followed by a simultaneous observation
        moves : dict of str -> int
            Dict giving the new location of each agent
        if not new_locations.keys() == self.agent_names:
            raise ValueError("Agent names in moves don't match model agent list")
        for agent, new_room in new_locations.items():
            move_action = self.make_agent_move_action(agent, new_room)
   = execute_action(, move_action)
            self.agent_locations[agent] = new_room 
    def print_possible_worlds(self):
        print("Valuation: ",
        for n, agents in
            print(, "reachable for ", agents)
    def propositional_knowledge(self, agent):
        """ Get a list of atomic propositions that the agent knows to be true or false
        agent : ste
            Name of the agent
        dict of str to set of atom
            Dict with keys 'true' and 'false', each referring to a set of atomic propositions
        atoms = self.all_atoms()
        world_valuations = [ for w in]
        knowledge = {
            'true' : set.intersection(*world_valuations),
            'false' : atoms - set.union(*world_valuations)
        return knowledge
    def make_knowledge_table(self):
        """ Create a table of propositional knowledge
        room_cols = list(self.room_names)
        def get_room_propositions(props):
            """ Get propositions that are t/f by room"""
            room_props = dict()
            for room in self.room_names:
                room_props[room] = {
                    'true' : [a for (a,r) in props['true'] if r==room],
                    'false' : [a for (a,r) in props['false'] if r==room]
            return room_props
        def format_table_row(row_name, room_cols, room_props):
            return "{} & {}\\\\\n\hline\n".format(
                ' & '.join([
                        ["{}_{}".format(a, r) for a in room_props[r]['true']] + \
                        ["\\neg {}_{}".format(a, r) for a in room_props[r]['false']]
                    for r in room_cols
        truth = {
            'true' : self.get_valuation(), 
            'false' : self.all_atoms() - self.get_valuation()
        table = "\\begin{{array}}{{|c|{}|}}\n".format('|'.join('c' for r in room_cols))
        table += "\hline\n & {}\\\\\n\hline\n".format('&'.join("\\text{{Room {}}}".format(r) for r in room_cols))
        table += format_table_row(
        agents = list(self.agent_names)
        for a in agents:
            table += format_table_row(
                "\\text{{Agent {}}}".format(a),
        table += "\\end{array}"
        return table
# Equivalence partition code was taken from
def equivalence_partition(iterable, relation):
    """Partitions a set of objects into equivalence classes

        iterable: collection of objects to be partitioned
        relation: equivalence relation. I.e. relation(o1,o2) evaluates to True
            if and only if o1 and o2 are equivalent

    Returns: classes, partitions
        classes: A sequence of sets. Each one is an equivalence class
        partitions: A dictionary mapping objects to equivalence classes
    classes = []
    partitions = {}
    for o in iterable:  # for each object
        # find the class it is in
        found = False
        for c in classes:
            if relation(next(iter(c)), o):  # is it equivalent to this class?
                partitions[o] = c
                found = True
        if not found:  # it is in a new class
            partitions[o] = classes[-1]
    return classes, partitions

def equivalence_enumeration(iterable, relation):
    """Partitions a set of objects into equivalence classes

    Same as equivalence_partition() but also numbers the classes.

        iterable: collection of objects to be partitioned
        relation: equivalence relation. I.e. relation(o1,o2) evaluates to True
            if and only if o1 and o2 are equivalent

    Returns: classes, partitions, ids
        classes: A sequence of sets. Each one is an equivalence class
        partitions: A dictionary mapping objects to equivalence classes
        ids: A dictionary mapping objects to the indices of their equivalence classes
    classes, partitions = equivalence_partition(iterable, relation)
    ids = {}
    for i, c in enumerate(classes):
        for o in c:
            ids[o] = i
    return classes, partitions, ids
def kripke_merge_equivalent(km):
    """ Refine model by replacing states by their bisimilarity class
    km : Kripke model
        Equivalent Kripke model where bisimilarity classes are replaced with a single state
    Based on the algorighm given on page 26 of
    def same_valuation(s, t, G):
        return G.nodes[s]['valuation'] == G.nodes[t]['valuation']
    def same_tansition_blocks(s, t, transition_block):
        """ Return true iff for all agents, 
        if s has an a-relation to an agent in a block, so does t
        return transition_block[s] == transition_block[t]
    def find_transition_blocks(G, block_dict):
        transition_block = {s : dict() for s in G.nodes}
        for s in G.nodes:
            for n in G.neighbors(s):
                i = block_dict[n]
                for agent in km.G.edges[s,n]['agents']:
                    if not agent in transition_block[s]:
                        transition_block[s][agent] = set()
        return transition_block
    def refine_partition(G, block_dict):
        transition_block = find_transition_blocks(G, block_dict)
        return equivalence_enumeration(
            lambda s, t: same_tansition_blocks(s, t, transition_block)
    part_dict = {s : set([s]) for s in km.G.nodes}
    classes, refined_part_dict, block_dict = equivalence_enumeration(
        lambda s, t: same_valuation(s, t, km.G)
    while part_dict != refined_part_dict:
        part_dict = refined_part_dict
        classes, refined_part_dict, block_dict = refine_partition(
            km.G, block_dict
    part_dict = refined_part_dict
    # Create a new model where we replace all states by their equivalence classes
    new_graph = nx.algorithms.minors.quotient_graph(
        partition = lambda s, t: block_dict[s] == block_dict[t],
        edge_data = lambda B, C:  km.accumulate_agent_relations(B, C),
        node_data = lambda B: {'valuation' : km.G.nodes[next(iter(B))]['valuation']}
    for c in new_graph.nodes:
        new_graph.add_edge(c, c, **km.accumulate_agent_relations(c, c))
    # Get the new point
    new_point = frozenset(part_dict[km.point])
    result = KripkeModel(
        km.history + ["Merge bisimilairty classes: num_nodes {} -> {}".format(
    return result
connectivity = nx.Graph()
locations = {'a' : 1, 'b' : 1}
m = MovementModel(connectivity, locations)
l = m.make_knowledge_table()
m.move_agents({'a' : 1, 'b' : 2})
l = m.make_knowledge_table()
m.move_agents({'a' : 1, 'b' : 3})
l = m.make_knowledge_table()
m.move_agents({'a' : 1, 'b' : 4})
l = m.make_knowledge_table()
 & \text{Room 1}&\text{Room 2}&\text{Room 3}&\text{Room 4}\\
\text{Truth} & a_1,b_1 & \neg a_2,\neg b_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\
\text{Agent a} & b_1,a_1 & \neg a_2,\neg b_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\
\text{Agent b} & b_1,a_1 & \neg a_2,\neg b_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\
\begin{array}{|c|c|c|c|c|} \hline & \text{Room 1}&\text{Room 2}&\text{Room 3}&\text{Room 4}\\ \hline \text{Truth} & a_1,b_1 & \neg a_2,\neg b_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\ \hline \text{Agent a} & b_1,a_1 & \neg a_2,\neg b_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\ \hline \text{Agent b} & b_1,a_1 & \neg a_2,\neg b_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\ \hline \end{array}
 & \text{Room 1}&\text{Room 2}&\text{Room 3}&\text{Room 4}\\
\text{Truth} & a_1,\neg b_1 & b_2,\neg a_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\
\text{Agent a} & a_1,\neg b_1 & b_2,\neg a_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\
\text{Agent b} & a_1,\neg b_1 & b_2,\neg a_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\
\begin{array}{|c|c|c|c|c|} \hline & \text{Room 1}&\text{Room 2}&\text{Room 3}&\text{Room 4}\\ \hline \text{Truth} & a_1,\neg b_1 & b_2,\neg a_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\ \hline \text{Agent a} & a_1,\neg b_1 & b_2,\neg a_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\ \hline \text{Agent b} & a_1,\neg b_1 & b_2,\neg a_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\ \hline \end{array}
 & \text{Room 1}&\text{Room 2}&\text{Room 3}&\text{Room 4}\\
\text{Truth} & a_1,\neg b_1 & \neg a_2,\neg b_2 & b_3,\neg a_3 & \neg a_4,\neg b_4\\
\text{Agent a} & a_1,\neg b_1 & \neg a_2 & \neg a_3 & \neg a_4,\neg b_4\\
\text{Agent b} & \neg b_1 & \neg b_2 & b_3,\neg a_3 & \neg a_4,\neg b_4\\
\begin{array}{|c|c|c|c|c|} \hline & \text{Room 1}&\text{Room 2}&\text{Room 3}&\text{Room 4}\\ \hline \text{Truth} & a_1,\neg b_1 & \neg a_2,\neg b_2 & b_3,\neg a_3 & \neg a_4,\neg b_4\\ \hline \text{Agent a} & a_1,\neg b_1 & \neg a_2 & \neg a_3 & \neg a_4,\neg b_4\\ \hline \text{Agent b} & \neg b_1 & \neg b_2 & b_3,\neg a_3 & \neg a_4,\neg b_4\\ \hline \end{array}
 & \text{Room 1}&\text{Room 2}&\text{Room 3}&\text{Room 4}\\
\text{Truth} & a_1,\neg b_1 & \neg a_2,\neg b_2 & \neg a_3,\neg b_3 & b_4,\neg a_4\\
\text{Agent a} & a_1,\neg b_1 & \neg a_2 & \neg a_3 & \neg a_4\\
\text{Agent b} & \neg b_1 & \neg b_2 & \neg b_3 & b_4,\neg a_4\\
\begin{array}{|c|c|c|c|c|} \hline & \text{Room 1}&\text{Room 2}&\text{Room 3}&\text{Room 4}\\ \hline \text{Truth} & a_1,\neg b_1 & \neg a_2,\neg b_2 & \neg a_3,\neg b_3 & b_4,\neg a_4\\ \hline \text{Agent a} & a_1,\neg b_1 & \neg a_2 & \neg a_3 & \neg a_4\\ \hline \text{Agent b} & \neg b_1 & \neg b_2 & \neg b_3 & b_4,\neg a_4\\ \hline \end{array}
