import networkx as nx
import itertools
from copy import copy, deepcopy
import matplotlib.pyplot as plt
from IPython.display import Image, display, Latex
class KripkeModel:
def __init__(self, history, state_graph, point):
self.history = history
self.G = state_graph
self.point = point
def relabel_nodes_to_int(self):
""" Relabel nodes to integers
Execution of action model turns node names into (s,j) pairs. This method
relabels them to integers
"""
relabel_map = {n : i for i,n in enumerate(self.G.nodes)}
self.point = relabel_map[self.point]
nx.relabel_nodes(self.G, relabel_map, copy=False)
def remove_unreachable(self):
""" Remove states that are not reachable from the pointed state
"""
self.G.remove_nodes_from(
list(nx.non_neighbors(
self.G,
self.point
))
)
def list_reachable(self, agent = None):
"""
"""
for n in self.G.neighbors(self.point):
if agent is None or agent in self.G.edges[self.point, n]['agents']:
yield n
def get_edges(self):
"""
"""
for n in self.list_reachable():
yield (n, self.G.edges[self.point, n]['agents'])
def valuation(self, node = None):
if node is None:
node = self.point
return self.G.nodes[node]['valuation']
def accumulate_agent_relations(self, block_B, block_C):
"""
Parameters
----------
block_B, block_C: iterator of node
"""
agents = set()
# Get the edge boundary
edges = list(nx.algorithms.boundary.edge_boundary(self.G, block_B, block_C))
for s,t in edges:
agents = agents | self.G.edges[s,t]['agents']
return {'agents' : agents}
class ActionModel:
def __init__(self, action_name, state_graph, point):
self.action_name = action_name
self.G = state_graph
self.point = point
def execute_action(km, am, remove_unreachable = True, simplify_bisim = True):
""" Make model resulting from execution of action model
Parameters
----------
km : KripkeModel
Pointed Kripke model of original state
am : ActionModel
Action to be executed in that state
Returns
-------
KripkeModel
The new Kripke model after executing the action
"""
new_point = (km.point, am.point)
# Strong product will have Cartesian product of nodes as nodes, and
# edges where both input graphs have edges
product_graph = nx.Graph()
for u in km.G.nodes:
val = km.G.nodes[u]['valuation']
for v in am.G.nodes:
pre_true, pre_false = am.G.nodes[v]['pre']
post_true, post_false = am.G.nodes[v]['post']
if not ((pre_true - val) or (pre_false & val)):
product_graph.add_node((u, v), valuation = (val | post_true) - post_false)
prod_edges = []
for (u,v) in product_graph.nodes:
for x in km.G.neighbors(u):
for y in am.G.neighbors(v):
if (x,y) in product_graph.nodes:
agents = set.intersection(
km.G.edges[u,x]['agents'],
am.G.edges[v,y]['agents'],
)
if agents:
prod_edges.append(((u,v), (x,y), {'agents' : agents}))
product_graph.add_edges_from(prod_edges)
result = KripkeModel(
km.history + [am.action_name],
product_graph,
new_point
)
if not new_point in result.G.nodes:
raise ValueError("Preconditions of action model point not satisfied")
result.relabel_nodes_to_int()
if remove_unreachable:
result.remove_unreachable()
if simplify_bisim:
result = kripke_merge_equivalent(result)
result.relabel_nodes_to_int()
return result
class MovementModel:
def __init__(self, rooms, agent_locations):
""" Initialize movement model for the given rooms and agents
Parameters
----------
rooms : nerworkx.Graph
Graph with nodes named by an int
agent_locations : dict of str to int
Names of the agents and their locations
"""
self.rooms = rooms
self.rooms.add_edges_from([(n,n) for n in self.rooms.nodes])
self.agent_locations = agent_locations
self.room_names = set(rooms.nodes)
self.agent_names = set(agent_locations.keys())
self.consistent_vals = self.list_allowed_valuations()
self.num_vals = len(self.consistent_vals)
self.km = self.make_start_km()
self.observe_am = self.make_observe_model()
self.observe()
def make_start_km(self):
""" Construct initial Kripke model
"""
point_val = self.get_valuation()
state_graph = nx.complete_graph(self.num_vals)
for i,val in enumerate(self.consistent_vals):
state_graph.nodes[i]['valuation'] = val
state_graph.add_edge(i,i)
for u,v in state_graph.edges:
state_graph.edges[u,v]['agents'] = set(self.agent_names)
# Find start state
return KripkeModel(["Initialize"], state_graph, self.get_val_idx())
def make_observe_model(self):
""" Construct action model for observation
"""
observations = {i: dict() for i in range(self.num_vals)}
for i, val in enumerate(self.consistent_vals):
room_obs = {rn: set() for rn in self.room_names}
agent_room = dict()
for (an, rn) in val:
room_obs[rn].add((an, rn))
agent_room[an] = rn
for an in self.agent_names:
observations[i][an] = room_obs[agent_room[an]]
all_atoms = self.all_atoms()
# make graph
state_graph = nx.Graph()
for n in range(self.num_vals):
# Precondition is that all atoms in the valuation must be true, and
# all other atoms must be false.
state_graph.add_node(
n,
pre = (self.consistent_vals[n], all_atoms - self.consistent_vals[n]),
post = (set(), set())
)
for u, v in itertools.product(range(self.num_vals), range(self.num_vals)):
# Check for which agents these states are the same
rel = set(an for an in self.agent_names if observations[u][an] == observations[v][an])
if rel:
state_graph.add_edge(u, v, agents = rel)
return ActionModel("Observe", state_graph, self.get_val_idx())
def observe(self):
""" Execute the "observe" action, letting all agents observe the room they are in
"""
# We can reuse the observe action model because only the point changes
self.observe_am.point = self.get_val_idx()
self.km = execute_action(self.km, self.observe_am)
def list_allowed_valuations(self):
""" List all allowed valuations
"""
agent_atoms = {
a: [(a, r) for r in self.room_names]
for a in self.agent_names
}
valuations = [set(atoms) for atoms in itertools.product(*agent_atoms.values())]
return valuations
def get_valuation(self):
""" Get valuation of the current state
"""
return set(self.agent_locations.items())
def get_val_idx(self, valuation = None):
""" Get index of a valuation in the point val list
"""
if valuation is None:
valuation = self.get_valuation()
for i, val in enumerate(self.consistent_vals):
if val == valuation:
return i
raise ValueError("Inconsistent valuation!")
def all_atoms(self):
""" Get a set of all atoms
"""
return set(itertools.product(self.agent_names, self.room_names))
def make_agent_move_action(self, agent, new_room):
""" Make movement action model of a single agent moving to another room
"""
old_room = self.agent_locations[agent]
if not (self.agent_locations[agent], new_room) in self.rooms.edges:
raise ValueError("Illegal move: {} -> {}".format(old_room, new_room))
def move_conditions(agent, old_room, new_room):
pre = (set([(agent, old_room)]), set())
if old_room != new_room:
post = (set([(agent, new_room)]), set([(agent, old_room)]))
else:
post = (set(), set())
return {'pre' : pre, 'post' : post}
state_graph = nx.Graph()
for r1, r2 in self.rooms.edges:
state_graph.add_node(
(r1, r2),
**move_conditions(agent, r1, r2)
)
state_graph.add_node(
(r2, r1),
**move_conditions(agent, r2, r1)
)
# Edges connect all states for all agens except the one making the move
agent_dict = {'agents' : set(self.agent_names) - set(agent)}
edges = [(i, j, agent_dict) for i,j in itertools.product(state_graph.nodes, state_graph.nodes)]
state_graph.add_edges_from(edges)
# Add reflexive edges for all agents
state_graph.add_edges_from([(n, n, {'agents' : self.agent_names}) for n in state_graph.nodes])
am = ActionModel(
"Move ({}: {} -> {})".format(agent, old_room, new_room),
state_graph,
(old_room, new_room)
)
return am
def move_agents(self, new_locations):
""" Execute a series of moves followed by a simultaneous observation
Parameters
----------
moves : dict of str -> int
Dict giving the new location of each agent
"""
if not new_locations.keys() == self.agent_names:
raise ValueError("Agent names in moves don't match model agent list")
for agent, new_room in new_locations.items():
move_action = self.make_agent_move_action(agent, new_room)
self.km = execute_action(self.km, move_action)
self.agent_locations[agent] = new_room
self.observe()
def print_possible_worlds(self):
print("Valuation: ", self.km.valuation())
for n, agents in self.km.get_edges():
print(self.km.valuation(n), "reachable for ", agents)
def propositional_knowledge(self, agent):
""" Get a list of atomic propositions that the agent knows to be true or false
Parameters
----------
agent : ste
Name of the agent
Returns
-------
dict of str to set of atom
Dict with keys 'true' and 'false', each referring to a set of atomic propositions
"""
atoms = self.all_atoms()
world_valuations = [self.km.valuation(w) for w in self.km.list_reachable(agent)]
knowledge = {
'true' : set.intersection(*world_valuations),
'false' : atoms - set.union(*world_valuations)
}
return knowledge
def make_knowledge_table(self):
""" Create a table of propositional knowledge
"""
room_cols = list(self.room_names)
room_cols.sort()
def get_room_propositions(props):
""" Get propositions that are t/f by room"""
room_props = dict()
for room in self.room_names:
room_props[room] = {
'true' : [a for (a,r) in props['true'] if r==room],
'false' : [a for (a,r) in props['false'] if r==room]
}
return room_props
def format_table_row(row_name, room_cols, room_props):
return "{} & {}\\\\\n\hline\n".format(
row_name,
' & '.join([
','.join(
["{}_{}".format(a, r) for a in room_props[r]['true']] + \
["\\neg {}_{}".format(a, r) for a in room_props[r]['false']]
)
for r in room_cols
])
)
truth = {
'true' : self.get_valuation(),
'false' : self.all_atoms() - self.get_valuation()
}
table = "\\begin{{array}}{{|c|{}|}}\n".format('|'.join('c' for r in room_cols))
table += "\hline\n & {}\\\\\n\hline\n".format('&'.join("\\text{{Room {}}}".format(r) for r in room_cols))
table += format_table_row(
"\\text{Truth}",
room_cols,
get_room_propositions(truth)
)
agents = list(self.agent_names)
agents.sort()
for a in agents:
table += format_table_row(
"\\text{{Agent {}}}".format(a),
room_cols,
get_room_propositions(
self.propositional_knowledge(a)
)
)
table += "\\end{array}"
return table
# Equivalence partition code was taken from https://stackoverflow.com/a/38924631
def equivalence_partition(iterable, relation):
"""Partitions a set of objects into equivalence classes
Args:
iterable: collection of objects to be partitioned
relation: equivalence relation. I.e. relation(o1,o2) evaluates to True
if and only if o1 and o2 are equivalent
Returns: classes, partitions
classes: A sequence of sets. Each one is an equivalence class
partitions: A dictionary mapping objects to equivalence classes
"""
classes = []
partitions = {}
for o in iterable: # for each object
# find the class it is in
found = False
for c in classes:
if relation(next(iter(c)), o): # is it equivalent to this class?
c.add(o)
partitions[o] = c
found = True
break
if not found: # it is in a new class
classes.append(set([o]))
partitions[o] = classes[-1]
return classes, partitions
def equivalence_enumeration(iterable, relation):
"""Partitions a set of objects into equivalence classes
Same as equivalence_partition() but also numbers the classes.
Args:
iterable: collection of objects to be partitioned
relation: equivalence relation. I.e. relation(o1,o2) evaluates to True
if and only if o1 and o2 are equivalent
Returns: classes, partitions, ids
classes: A sequence of sets. Each one is an equivalence class
partitions: A dictionary mapping objects to equivalence classes
ids: A dictionary mapping objects to the indices of their equivalence classes
"""
classes, partitions = equivalence_partition(iterable, relation)
ids = {}
for i, c in enumerate(classes):
for o in c:
ids[o] = i
return classes, partitions, ids
def kripke_merge_equivalent(km):
""" Refine model by replacing states by their bisimilarity class
Parameters
----------
km : Kripke model
Returns
-------
KripkeModel
Equivalent Kripke model where bisimilarity classes are replaced with a single state
Based on the algorighm given on page 26 of
https://staff.fnwi.uva.nl/d.j.n.vaneijck2/courses/lai0506/LAI11.pdf
"""
def same_valuation(s, t, G):
return G.nodes[s]['valuation'] == G.nodes[t]['valuation']
def same_tansition_blocks(s, t, transition_block):
""" Return true iff for all agents,
if s has an a-relation to an agent in a block, so does t
"""
return transition_block[s] == transition_block[t]
def find_transition_blocks(G, block_dict):
transition_block = {s : dict() for s in G.nodes}
for s in G.nodes:
for n in G.neighbors(s):
i = block_dict[n]
for agent in km.G.edges[s,n]['agents']:
if not agent in transition_block[s]:
transition_block[s][agent] = set()
transition_block[s][agent].add(i)
return transition_block
def refine_partition(G, block_dict):
transition_block = find_transition_blocks(G, block_dict)
return equivalence_enumeration(
km.G.nodes,
lambda s, t: same_tansition_blocks(s, t, transition_block)
)
part_dict = {s : set([s]) for s in km.G.nodes}
classes, refined_part_dict, block_dict = equivalence_enumeration(
km.G.nodes,
lambda s, t: same_valuation(s, t, km.G)
)
while part_dict != refined_part_dict:
part_dict = refined_part_dict
classes, refined_part_dict, block_dict = refine_partition(
km.G, block_dict
)
part_dict = refined_part_dict
# Create a new model where we replace all states by their equivalence classes
new_graph = nx.algorithms.minors.quotient_graph(
km.G,
partition = lambda s, t: block_dict[s] == block_dict[t],
edge_data = lambda B, C: km.accumulate_agent_relations(B, C),
node_data = lambda B: {'valuation' : km.G.nodes[next(iter(B))]['valuation']}
)
for c in new_graph.nodes:
new_graph.add_edge(c, c, **km.accumulate_agent_relations(c, c))
# Get the new point
new_point = frozenset(part_dict[km.point])
result = KripkeModel(
km.history + ["Merge bisimilairty classes: num_nodes {} -> {}".format(
km.G.number_of_nodes(),
new_graph.number_of_nodes()
)],
new_graph,
new_point
)
return result
connectivity = nx.Graph()
connectivity.add_edge(1,2)
connectivity.add_edge(2,3)
connectivity.add_edge(3,4)
locations = {'a' : 1, 'b' : 1}
m = MovementModel(connectivity, locations)
l = m.make_knowledge_table()
print(l)
display(Latex(l))
m.move_agents({'a' : 1, 'b' : 2})
l = m.make_knowledge_table()
print(l)
display(Latex(l))
m.move_agents({'a' : 1, 'b' : 3})
l = m.make_knowledge_table()
print(l)
display(Latex(l))
m.move_agents({'a' : 1, 'b' : 4})
l = m.make_knowledge_table()
print(l)
display(Latex(l))