Movement model for Among Us with epistemic logic

To run interactively, see Binder

The design is explained in movement.pdf.

In [1]:
import networkx as nx
import itertools
from copy import copy, deepcopy
import matplotlib.pyplot as plt
from IPython.display import Image, display, Latex
In [2]:
class KripkeModel:
    def __init__(self, history, state_graph, point):
        self.history = history
        self.G = state_graph
        self.point = point
    def relabel_nodes_to_int(self):
        """ Relabel nodes to integers
        
        Execution of action model turns node names into (s,j) pairs. This method
        relabels them to integers
        """
        relabel_map = {n : i for i,n in enumerate(self.G.nodes)}
        self.point = relabel_map[self.point]
        nx.relabel_nodes(self.G, relabel_map, copy=False)
    def remove_unreachable(self):
        """ Remove states that are not reachable from the pointed state
        """
        self.G.remove_nodes_from(
            list(nx.non_neighbors(
                self.G,
                self.point
            ))
        )
    def list_reachable(self, agent = None):
        """
        """
        for n in self.G.neighbors(self.point):
            if agent is None or agent in self.G.edges[self.point, n]['agents']:
                yield n
    def get_edges(self):
        """
        """
        for n in self.list_reachable():
            yield (n, self.G.edges[self.point, n]['agents'])
    def valuation(self, node = None):
        if node is None:
            node = self.point
        return self.G.nodes[node]['valuation']
    def accumulate_agent_relations(self, block_B, block_C):
        """
        
        Parameters
        ----------
        block_B, block_C: iterator of node
            
        """
        agents = set()
        # Get the edge boundary
        edges = list(nx.algorithms.boundary.edge_boundary(self.G, block_B, block_C))

        for s,t in edges:
            agents = agents | self.G.edges[s,t]['agents']
        return {'agents' : agents}
In [3]:
class ActionModel:
    def __init__(self, action_name, state_graph, point):
        self.action_name = action_name
        self.G = state_graph
        self.point = point
In [4]:
def execute_action(km, am, remove_unreachable = True, simplify_bisim = True):
    """ Make model resulting from execution of action model
    
    Parameters
    ----------
    km : KripkeModel
        Pointed Kripke model of original state
    am : ActionModel
        Action to be executed in that state
    
    Returns
    -------
    KripkeModel
        The new Kripke model after executing the action
    """
    new_point = (km.point, am.point)
    # Strong product will have Cartesian product of nodes as nodes, and 
    # edges where both input graphs have edges
    product_graph = nx.Graph()
    for u in km.G.nodes:
        val = km.G.nodes[u]['valuation']
        for v in am.G.nodes:

            pre_true, pre_false = am.G.nodes[v]['pre']
            post_true, post_false = am.G.nodes[v]['post']
            if not ((pre_true - val) or (pre_false & val)):
                product_graph.add_node((u, v), valuation = (val | post_true) - post_false)
    prod_edges = []
    for (u,v) in product_graph.nodes:
        for x in km.G.neighbors(u):
            for y in am.G.neighbors(v):
                if (x,y) in product_graph.nodes:
                    agents = set.intersection(
                        km.G.edges[u,x]['agents'],
                        am.G.edges[v,y]['agents'],
                    )
                    if agents:
                        prod_edges.append(((u,v), (x,y), {'agents' : agents}))
    product_graph.add_edges_from(prod_edges)
    

    result = KripkeModel(
        km.history + [am.action_name], 
        product_graph,
        new_point
    )
    if not new_point in result.G.nodes:
        raise ValueError("Preconditions of action model point not satisfied")
    result.relabel_nodes_to_int()
    if remove_unreachable:
        result.remove_unreachable()
    if simplify_bisim:
        result = kripke_merge_equivalent(result)
        result.relabel_nodes_to_int()
    return result
In [5]:
class MovementModel:
    def __init__(self, rooms, agent_locations):
        """ Initialize movement model for the given rooms and agents
        
        Parameters
        ----------
        rooms : nerworkx.Graph
            Graph with nodes named by an int
        agent_locations : dict of str to int
            Names of the agents and their locations
        """

        self.rooms = rooms
        self.rooms.add_edges_from([(n,n) for n in self.rooms.nodes])
        self.agent_locations = agent_locations
        self.room_names = set(rooms.nodes)
        self.agent_names = set(agent_locations.keys())
        self.consistent_vals = self.list_allowed_valuations()
        self.num_vals = len(self.consistent_vals)
        
        self.km = self.make_start_km()
        self.observe_am = self.make_observe_model()
        self.observe()
    def make_start_km(self):
        """ Construct initial Kripke model
        """
        point_val = self.get_valuation()
        state_graph = nx.complete_graph(self.num_vals)
        for i,val in enumerate(self.consistent_vals):
            state_graph.nodes[i]['valuation'] = val
            state_graph.add_edge(i,i)
        for u,v in state_graph.edges:
            state_graph.edges[u,v]['agents'] = set(self.agent_names)
        # Find start state
        return KripkeModel(["Initialize"], state_graph, self.get_val_idx())
    def make_observe_model(self):
        """ Construct action model for observation
        """
        observations = {i: dict() for i in range(self.num_vals)}
        for i, val in enumerate(self.consistent_vals):
            room_obs = {rn: set() for rn in self.room_names}
            agent_room = dict()
            for (an, rn) in val:
                room_obs[rn].add((an, rn))
                agent_room[an] = rn
            for an in self.agent_names:
                observations[i][an] = room_obs[agent_room[an]]
        all_atoms = self.all_atoms()
        # make graph
        state_graph = nx.Graph()
        for n in range(self.num_vals):
            # Precondition is that all atoms in the valuation must be true, and
            # all other atoms must be false.
            state_graph.add_node(
                n,
                pre = (self.consistent_vals[n], all_atoms - self.consistent_vals[n]),
                post = (set(), set())
            )
        for u, v in itertools.product(range(self.num_vals), range(self.num_vals)):
            # Check for which agents these states are the same
            rel = set(an for an in self.agent_names if observations[u][an] == observations[v][an])
            if rel:
                state_graph.add_edge(u, v, agents = rel)
        return ActionModel("Observe", state_graph, self.get_val_idx())
    def observe(self):
        """ Execute the "observe" action, letting all agents observe the room they are in
        """
        # We can reuse the observe action model because only the point changes
        self.observe_am.point = self.get_val_idx()
        self.km = execute_action(self.km, self.observe_am)
    def list_allowed_valuations(self):
        """ List all allowed valuations
        """
        agent_atoms = {
            a: [(a, r) for r in self.room_names]
            for a in self.agent_names
        }
        valuations = [set(atoms) for atoms in itertools.product(*agent_atoms.values())]
        return valuations
    def get_valuation(self):
        """ Get valuation of the current state
        """
        return set(self.agent_locations.items())
    def get_val_idx(self, valuation = None):
        """ Get index of a valuation in the point val list
        """
        if valuation is None:
            valuation = self.get_valuation()
        for i, val in enumerate(self.consistent_vals):
            if val == valuation:
                return i
        raise ValueError("Inconsistent valuation!")
    def all_atoms(self):
        """ Get a set of all atoms
        """
        return set(itertools.product(self.agent_names, self.room_names))
    def make_agent_move_action(self, agent, new_room):
        """ Make movement action model of a single agent moving to another room
        """
        old_room = self.agent_locations[agent]
        if not (self.agent_locations[agent], new_room) in self.rooms.edges:
            raise ValueError("Illegal move: {} -> {}".format(old_room, new_room))

        def move_conditions(agent, old_room, new_room):
            pre = (set([(agent, old_room)]), set())
            if old_room != new_room:
                post = (set([(agent, new_room)]), set([(agent, old_room)]))
            else:
                post = (set(), set())
            return {'pre' : pre, 'post' : post}
        state_graph = nx.Graph()
        for r1, r2 in self.rooms.edges:
            state_graph.add_node(
                (r1, r2), 
                **move_conditions(agent, r1, r2)
            )
            state_graph.add_node(
                (r2, r1), 
                **move_conditions(agent, r2, r1)
            )
        # Edges connect all states for all agens except the one making the move
        agent_dict = {'agents' : set(self.agent_names) - set(agent)}
        edges = [(i, j, agent_dict) for i,j in itertools.product(state_graph.nodes, state_graph.nodes)]
        state_graph.add_edges_from(edges)
        
        # Add reflexive edges for all agents
        state_graph.add_edges_from([(n, n, {'agents' : self.agent_names}) for n in state_graph.nodes])

        am = ActionModel(
            "Move ({}: {} -> {})".format(agent, old_room, new_room),
            state_graph,
            (old_room, new_room)
        )
        return am
    def move_agents(self, new_locations):
        """ Execute a series of moves followed by a simultaneous observation
        
        Parameters
        ----------
        moves : dict of str -> int
            Dict giving the new location of each agent
        """
        if not new_locations.keys() == self.agent_names:
            raise ValueError("Agent names in moves don't match model agent list")
        for agent, new_room in new_locations.items():
            move_action = self.make_agent_move_action(agent, new_room)
            self.km = execute_action(self.km, move_action)
            self.agent_locations[agent] = new_room 
        self.observe()
    def print_possible_worlds(self):
        print("Valuation: ", self.km.valuation())
        for n, agents in self.km.get_edges():
            print(self.km.valuation(n), "reachable for ", agents)
    def propositional_knowledge(self, agent):
        """ Get a list of atomic propositions that the agent knows to be true or false
        
        Parameters
        ----------
        agent : ste
            Name of the agent
            
        Returns
        -------
        dict of str to set of atom
            Dict with keys 'true' and 'false', each referring to a set of atomic propositions
        """
        atoms = self.all_atoms()
        world_valuations = [self.km.valuation(w) for w in self.km.list_reachable(agent)]
        knowledge = {
            'true' : set.intersection(*world_valuations),
            'false' : atoms - set.union(*world_valuations)
        }
        return knowledge
    def make_knowledge_table(self):
        """ Create a table of propositional knowledge
        """
        room_cols = list(self.room_names)
        room_cols.sort()
        def get_room_propositions(props):
            """ Get propositions that are t/f by room"""
            room_props = dict()
            for room in self.room_names:
                room_props[room] = {
                    'true' : [a for (a,r) in props['true'] if r==room],
                    'false' : [a for (a,r) in props['false'] if r==room]
                }
            return room_props
        def format_table_row(row_name, room_cols, room_props):
            return "{} & {}\\\\\n\hline\n".format(
                row_name,
                ' & '.join([
                    ','.join(
                        ["{}_{}".format(a, r) for a in room_props[r]['true']] + \
                        ["\\neg {}_{}".format(a, r) for a in room_props[r]['false']]
                    )
                    for r in room_cols
                ])
            )
        truth = {
            'true' : self.get_valuation(), 
            'false' : self.all_atoms() - self.get_valuation()
        }
        table = "\\begin{{array}}{{|c|{}|}}\n".format('|'.join('c' for r in room_cols))
        table += "\hline\n & {}\\\\\n\hline\n".format('&'.join("\\text{{Room {}}}".format(r) for r in room_cols))
        table += format_table_row(
            "\\text{Truth}",
            room_cols,
            get_room_propositions(truth)
        )
        agents = list(self.agent_names)
        agents.sort()
        for a in agents:
            table += format_table_row(
                "\\text{{Agent {}}}".format(a),
                room_cols,
                get_room_propositions(
                    self.propositional_knowledge(a)
                )
            )
        table += "\\end{array}"
        return table
In [6]:
# Equivalence partition code was taken from https://stackoverflow.com/a/38924631
def equivalence_partition(iterable, relation):
    """Partitions a set of objects into equivalence classes

    Args:
        iterable: collection of objects to be partitioned
        relation: equivalence relation. I.e. relation(o1,o2) evaluates to True
            if and only if o1 and o2 are equivalent

    Returns: classes, partitions
        classes: A sequence of sets. Each one is an equivalence class
        partitions: A dictionary mapping objects to equivalence classes
    """
    classes = []
    partitions = {}
    for o in iterable:  # for each object
        # find the class it is in
        found = False
        for c in classes:
            if relation(next(iter(c)), o):  # is it equivalent to this class?
                c.add(o)
                partitions[o] = c
                found = True
                break
        if not found:  # it is in a new class
            classes.append(set([o]))
            partitions[o] = classes[-1]
    return classes, partitions


def equivalence_enumeration(iterable, relation):
    """Partitions a set of objects into equivalence classes

    Same as equivalence_partition() but also numbers the classes.

    Args:
        iterable: collection of objects to be partitioned
        relation: equivalence relation. I.e. relation(o1,o2) evaluates to True
            if and only if o1 and o2 are equivalent

    Returns: classes, partitions, ids
        classes: A sequence of sets. Each one is an equivalence class
        partitions: A dictionary mapping objects to equivalence classes
        ids: A dictionary mapping objects to the indices of their equivalence classes
    """
    classes, partitions = equivalence_partition(iterable, relation)
    ids = {}
    for i, c in enumerate(classes):
        for o in c:
            ids[o] = i
    return classes, partitions, ids
In [7]:
def kripke_merge_equivalent(km):
    """ Refine model by replacing states by their bisimilarity class
    
    Parameters
    ----------
    km : Kripke model
    
    Returns
    -------
    KripkeModel
        Equivalent Kripke model where bisimilarity classes are replaced with a single state
    
    Based on the algorighm given on page 26 of
    https://staff.fnwi.uva.nl/d.j.n.vaneijck2/courses/lai0506/LAI11.pdf
    
    """
    def same_valuation(s, t, G):
        return G.nodes[s]['valuation'] == G.nodes[t]['valuation']
    def same_tansition_blocks(s, t, transition_block):
        """ Return true iff for all agents, 
        if s has an a-relation to an agent in a block, so does t
        """
        return transition_block[s] == transition_block[t]
    def find_transition_blocks(G, block_dict):
        transition_block = {s : dict() for s in G.nodes}
        for s in G.nodes:
            for n in G.neighbors(s):
                i = block_dict[n]
                for agent in km.G.edges[s,n]['agents']:
                    if not agent in transition_block[s]:
                        transition_block[s][agent] = set()
                    transition_block[s][agent].add(i)
        return transition_block
    def refine_partition(G, block_dict):
        transition_block = find_transition_blocks(G, block_dict)
        return equivalence_enumeration(
            km.G.nodes, 
            lambda s, t: same_tansition_blocks(s, t, transition_block)
        )
    part_dict = {s : set([s]) for s in km.G.nodes}
    classes, refined_part_dict, block_dict = equivalence_enumeration(
        km.G.nodes, 
        lambda s, t: same_valuation(s, t, km.G)
    )
    while part_dict != refined_part_dict:
        part_dict = refined_part_dict
        classes, refined_part_dict, block_dict = refine_partition(
            km.G, block_dict
        )
    part_dict = refined_part_dict
    
    # Create a new model where we replace all states by their equivalence classes
    new_graph = nx.algorithms.minors.quotient_graph(
        km.G,
        partition = lambda s, t: block_dict[s] == block_dict[t],
        edge_data = lambda B, C:  km.accumulate_agent_relations(B, C),
        node_data = lambda B: {'valuation' : km.G.nodes[next(iter(B))]['valuation']}
    )
    for c in new_graph.nodes:
        new_graph.add_edge(c, c, **km.accumulate_agent_relations(c, c))
    # Get the new point
    
    new_point = frozenset(part_dict[km.point])
    
    result = KripkeModel(
        km.history + ["Merge bisimilairty classes: num_nodes {} -> {}".format(
            km.G.number_of_nodes(),
            new_graph.number_of_nodes()
        )], 
        new_graph,
        new_point
    )
    return result
In [8]:
connectivity = nx.Graph()
connectivity.add_edge(1,2)
connectivity.add_edge(2,3)
connectivity.add_edge(3,4)
locations = {'a' : 1, 'b' : 1}
m = MovementModel(connectivity, locations)
l = m.make_knowledge_table()
print(l)
display(Latex(l))
m.move_agents({'a' : 1, 'b' : 2})
l = m.make_knowledge_table()
print(l)
display(Latex(l))
m.move_agents({'a' : 1, 'b' : 3})
l = m.make_knowledge_table()
print(l)
display(Latex(l))
m.move_agents({'a' : 1, 'b' : 4})
l = m.make_knowledge_table()
print(l)
display(Latex(l))
\begin{array}{|c|c|c|c|c|}
\hline
 & \text{Room 1}&\text{Room 2}&\text{Room 3}&\text{Room 4}\\
\hline
\text{Truth} & a_1,b_1 & \neg a_2,\neg b_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\
\hline
\text{Agent a} & b_1,a_1 & \neg a_2,\neg b_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\
\hline
\text{Agent b} & b_1,a_1 & \neg a_2,\neg b_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\
\hline
\end{array}
\begin{array}{|c|c|c|c|c|} \hline & \text{Room 1}&\text{Room 2}&\text{Room 3}&\text{Room 4}\\ \hline \text{Truth} & a_1,b_1 & \neg a_2,\neg b_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\ \hline \text{Agent a} & b_1,a_1 & \neg a_2,\neg b_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\ \hline \text{Agent b} & b_1,a_1 & \neg a_2,\neg b_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\ \hline \end{array}
\begin{array}{|c|c|c|c|c|}
\hline
 & \text{Room 1}&\text{Room 2}&\text{Room 3}&\text{Room 4}\\
\hline
\text{Truth} & a_1,\neg b_1 & b_2,\neg a_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\
\hline
\text{Agent a} & a_1,\neg b_1 & b_2,\neg a_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\
\hline
\text{Agent b} & a_1,\neg b_1 & b_2,\neg a_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\
\hline
\end{array}
\begin{array}{|c|c|c|c|c|} \hline & \text{Room 1}&\text{Room 2}&\text{Room 3}&\text{Room 4}\\ \hline \text{Truth} & a_1,\neg b_1 & b_2,\neg a_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\ \hline \text{Agent a} & a_1,\neg b_1 & b_2,\neg a_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\ \hline \text{Agent b} & a_1,\neg b_1 & b_2,\neg a_2 & \neg a_3,\neg b_3 & \neg a_4,\neg b_4\\ \hline \end{array}
\begin{array}{|c|c|c|c|c|}
\hline
 & \text{Room 1}&\text{Room 2}&\text{Room 3}&\text{Room 4}\\
\hline
\text{Truth} & a_1,\neg b_1 & \neg a_2,\neg b_2 & b_3,\neg a_3 & \neg a_4,\neg b_4\\
\hline
\text{Agent a} & a_1,\neg b_1 & \neg a_2 & \neg a_3 & \neg a_4,\neg b_4\\
\hline
\text{Agent b} & \neg b_1 & \neg b_2 & b_3,\neg a_3 & \neg a_4,\neg b_4\\
\hline
\end{array}
\begin{array}{|c|c|c|c|c|} \hline & \text{Room 1}&\text{Room 2}&\text{Room 3}&\text{Room 4}\\ \hline \text{Truth} & a_1,\neg b_1 & \neg a_2,\neg b_2 & b_3,\neg a_3 & \neg a_4,\neg b_4\\ \hline \text{Agent a} & a_1,\neg b_1 & \neg a_2 & \neg a_3 & \neg a_4,\neg b_4\\ \hline \text{Agent b} & \neg b_1 & \neg b_2 & b_3,\neg a_3 & \neg a_4,\neg b_4\\ \hline \end{array}
\begin{array}{|c|c|c|c|c|}
\hline
 & \text{Room 1}&\text{Room 2}&\text{Room 3}&\text{Room 4}\\
\hline
\text{Truth} & a_1,\neg b_1 & \neg a_2,\neg b_2 & \neg a_3,\neg b_3 & b_4,\neg a_4\\
\hline
\text{Agent a} & a_1,\neg b_1 & \neg a_2 & \neg a_3 & \neg a_4\\
\hline
\text{Agent b} & \neg b_1 & \neg b_2 & \neg b_3 & b_4,\neg a_4\\
\hline
\end{array}
\begin{array}{|c|c|c|c|c|} \hline & \text{Room 1}&\text{Room 2}&\text{Room 3}&\text{Room 4}\\ \hline \text{Truth} & a_1,\neg b_1 & \neg a_2,\neg b_2 & \neg a_3,\neg b_3 & b_4,\neg a_4\\ \hline \text{Agent a} & a_1,\neg b_1 & \neg a_2 & \neg a_3 & \neg a_4\\ \hline \text{Agent b} & \neg b_1 & \neg b_2 & \neg b_3 & b_4,\neg a_4\\ \hline \end{array}
In [ ]: