# Copyright (c) 2011, 2012 by California Institute of Technology
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the California Institute of Technology nor
# the names of its contributors may be used to endorse or promote
# products derived from this software without specific prior
# written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL CALTECH
# OR THE CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# $Id$
"""
----------------
Automaton Module
----------------
"""
import re, copy, os, random
import xml.etree.ElementTree as ET
from errorprint import printWarning, printError
import conxml
try:
from pygraph.classes.digraph import digraph
from pygraph.algorithms.accessibility import connected_components
except ImportError:
print "python-graph package not found.\nHence some methods in Automaton class are unavailable."
# python-graph package not found. Disable dependent methods.
digraph = None
connected_components = None
###################################################################
[docs]class AutomatonState:
"""AutomatonState class for representing a state in a finite state
automaton. An AutomatonState object contains the following
fields:
- `id`: an integer specifying the id of this AutomatonState object.
- `state`: a dictionary whose keys are the names of the variables
and whose values are the values of the variables.
- `transition`: a list of id's of the AutomatonState objects to
which this AutomatonState object can transition.
"""
def __init__(self, id=-1, state={}, transition=[]):
self.id = id
self.state = copy.copy(state)
self.transition = transition[:]
def __repr__(self):
return "State: " + str(self.state) + "\n" + \
"Transitions: " + str(self.transition)
def __copy__(self):
return AutomatonState(self.id, self.state, self.transition)
[docs] def copy(self):
"""Return copy of this automaton node."""
return self.__copy__()
###################################################################
[docs]class Automaton:
"""Automaton class for representing a finite state automaton.
An Automaton object contains the following field:
- `states`: a list of AutomatonState objects.
Automaton([states_or_file, varname, verbose]) constructs an
Automaton object based on the following input:
- `states_or_file`: a string containing the name of the aut file
to be loaded or a list of AutomatonState objects to be assigned
to the `states` of this Automaton object, or an (open) file-like
object.
- `varname`: a list of all the variable names. If it is not empty
and states_or_file is a string representing the name of the aut
file to be loaded, then this function will also check whether
the variables in aut_file are in varnames.
"""
def __init__(self, states_or_file=[], varnames=[], verbose=0):
# Construct this automaton from a list of AutomatonState objects
if (isinstance(states_or_file, list)):
self.states = copy.deepcopy(states_or_file)
# Construct this automaton from file
else:
if isinstance(states_or_file, str) and (len(states_or_file) == 0):
self.states = []
else:
self.loadFile(states_or_file, varnames=varnames, verbose=verbose)
def __eq__(self, other):
"""Automaton equality comparison.
Two instances of Automaton are said to be equal if their nodes
may be identified: there is a bijection, and nodes of equal ID
agree on outgoing edge set and state (labelling).
"""
if (not isinstance(other, Automaton)) or (len(self) != len(other)):
return False
if len(self) == 0: # Trivial case, both empty
return True
for node in self.states:
other_ind = 0
while (other.states[other_ind].id != node.id) and (other_ind < len(other)):
other_ind += 1
if other_ind >= len(other):
return False
sorted_transition = node.transition[:]
sorted_transition.sort()
other_sorted_transition = other.states[other_ind].transition[:]
other_sorted_transition.sort()
if sorted_transition != other_sorted_transition:
return False
if node.state.items() != other.states[other_ind].state.items():
return False
return True
def __ne__(self, other):
return not self.__eq__(other)
def __len__(self):
"""Return number of nodes."""
return len(self.states)
def __copy__(self):
return Automaton(states_or_file=[s.copy() for s in self.states])
[docs] def copy(self):
"""Return copy of this Automaton."""
return self.__copy__()
def loadSPINAut(self, aut_file, varnames=[], verbose=0):
self.loadLinearAut(aut_file, '\(state (\d+)\)', '^\s*((?:\w+\(\d+\):)?\w+) = (\w+)',
"<<<<<START OF CYCLE>>>>>", varnames, verbose)
def loadSMVAut(self, aut_file, varnames=[], verbose=0):
self.loadLinearAut(aut_file, 'State: \d+\.(\d+)', '([\w.]+) = (\w+)',
"-- Loop starts here", varnames, verbose)
def loadLinearAut(self, aut_file, state_regex, assign_regex, loop_text,
varnames=[], verbose=0):
self.states = []
try:
f = open(aut_file, 'r')
closable = True
except IOError:
printWarning("Could not open " + aut_file + " for reading")
return
except TypeError:
# assume aut_file is already a file object
f = aut_file
# don't close a file we didn't open
closable = False
stateID = -1
valuation = {}
loopState = None
val_change = True
for (lineno, line) in enumerate(f, 1):
if re.search(state_regex, line) is not None:
# Only write a new state if the valuation has changed
if val_change:
# conclude a previous state
if stateID >= 0:
self.setAutStateState(stateID, valuation, verbose)
self.setAutStateTransition(stateID, [stateID+1], verbose)
stateID += 1
val_change = False
#try:
# stateID = int(stateID.group(1)) - 1 # start numbering from 0
#except:
# printWarning("SMV aut parsing failed on line " + str(lineno) +
# ":\n\t" + line)
# return
elif loop_text in line:
loopState = stateID + 1
else:
try:
# variable assignment
(var, val) = re.search(assign_regex, line).group(1,2)
except AttributeError:
# probably a comment line, ignore
continue
if varnames and not var in varnames:
printWarning('Unknown variable ' + var, obj=self)
try:
if var not in valuation or not valuation[var] == int(val):
val_change = True
valuation[var] = int(val)
except:
if var not in valuation or not valuation[var] == val:
val_change = True
valuation[var] = val
self.setAutStateState(stateID, valuation, verbose)
if loopState is not None:
self.setAutStateTransition(stateID, [loopState], verbose)
else:
self.setAutStateTransition(stateID, [], verbose)
if closable:
f.close()
[docs] def loadFile(self, aut_file, varnames=[], verbose=0):
"""
Construct an automation from aut_file.
Input:
- `aut_file`: the name of the text file containing the
automaton, or an (open) file-like object.
- `varnames`: a list of all the variable names. If it is not empty, then this
function will also check whether the variables in aut_file are in varnames.
"""
self.states = []
if isinstance(aut_file, str):
f = open(aut_file, 'r')
closable = True
else:
f = aut_file # Else, assume aut_file behaves as file object.
closable = False
stateID = -1
for line in f:
# parse states
if (line.find('State ') >= 0):
stateID = re.search('State (\d+)', line)
stateID = int(stateID.group(1))
state = dict(re.findall('(\w+):(\w+)', line))
for var, val in state.iteritems():
try:
state[var] = int(val)
except:
state[var] = val
if (len(varnames) > 0):
if not var in varnames:
printWarning('Unknown variable ' + var, obj=self)
for var in varnames:
if not var in state.keys():
printWarning('Variable ' + var + ' not assigned', obj=self)
self.setAutStateState(stateID, state, verbose)
# parse transitions
if (line.find('successors') >= 0):
transition = re.findall(' (\d+)', line)
for i in xrange(0,len(transition)):
transition[i] = int(transition[i])
self.setAutStateTransition(stateID, list(set(transition)), verbose)
if closable:
f.close()
[docs] def writeFile(self, destfile):
"""
Write an aut file that is readable by 'self.loadFile'. Note that this
is not a true automaton file.
Input:
- 'destfile': the file name to be written to.
"""
output = ""
for state in self.states:
output += 'State ' + str(state.id) + ' with rank # -> <'
for (k, v) in state.state.items():
output += str(k) + ':' + str(v) + ', '
if state.transition == []:
output += '>\n With no successors.\n'
else:
output = output[:-2] + '>\n With successors : '
output += str(state.transition)[1:-1] + '\n'
print 'Writing output to %s.' % destfile
f = open(destfile, 'w')
f.write(output)
[docs] def trimRedundantStates(self):
"""DEFUNCT UNTIL FURTHER NOTICE!
Combine states whose valuation of variables is identical
Merge and update transition listings as needed. N.B., this
method will change IDs after trimming to ensure indexing still
works (since self.states attribute is a list).
Return False on failure; True otherwise (success).
"""
if len(self.states) == 0:
return True # Empty state set; do nothing
for current_id in range(len(self.states)):
# See if flag set, i.e. this state already processed as duplicate
if self.states[current_id].id == -1:
continue
# Look for duplicates
dup_list = []
for k in range(len(self.states)):
if k == current_id:
continue
if self.states[k].state == self.states[current_id].state:
dup_list.append(k)
# Trim the fat; mark duplicate states as such (to be
# deleted after search finishes).
if len(dup_list) != 0:
for state in self.states:
for trans_ind in range(len(state.transition)):
if state.transition[trans_ind] in dup_list:
state.transition[trans_ind] = current_id
for k in dup_list:
self.states[current_id].transition.extend(self.states[k].transition)
self.states[k].id = -1 # Flag for deletion
self.states[current_id].transition = list(set(self.states[current_id].transition))
# Delete flagged (redundant) states
current_id = 0
while current_id < len(self.states):
if self.states[current_id].id == -1:
del self.states[current_id]
else:
current_id += 1
# Shift down other IDs, which are off-place due to the deletions
for current_id in range(len(self.states)):
if self.states[current_id].id == current_id:
continue
for state in self.states:
for trans_ind in range(len(state.transition)):
if state.transition[trans_ind] == self.states[current_id].id:
state.transition[trans_ind] = current_id
self.states[current_id].id = current_id
# Finally, remove any remaining redundant references in
# transition lists.
for current_id in range(len(self.states)):
self.states[current_id].transition = list(set(self.states[current_id].transition))
return True
[docs] def trimDeadStates(self):
"""
Recursively delete states with no outgoing transitions.
Merge and update transition listings as needed. N.B., this
method will change IDs after trimming to ensure indexing still
works (since self.states attribute is a list).
"""
if digraph is None:
print "WARNING: attempted to call unavailable method trimDeadStates."
return
self.createPygraphRepr()
# Delete nodes with no outbound transitions.
changed = True # Becomes False when no deletions have been made.
while changed:
changed = False
for node in self.pygraph.nodes():
if self.pygraph.neighbors(node) == []:
changed = True
self.pygraph.del_node(node)
self.loadPygraphRepr()
[docs] def trimUnconnectedStates(self, aut_state_id):
"""
Delete all states that are inaccessible from the given state.
Merge and update transition listings as needed. N.B., this
method will change IDs after trimming to ensure indexing still
works (since self.states attribute is a list).
"""
if connected_components is None:
print "WARNING: attempted to call unavailable method trimUnconnectedStates."
return
self.createPygraphRepr()
# Delete nodes that are unconnected to 'aut_state_id'.
connected = connected_components(self.pygraph)
main_component = connected[aut_state_id]
for node in self.pygraph.nodes():
if connected[node] != main_component:
self.pygraph.del_node(node)
self.loadPygraphRepr()
[docs] def createPygraphRepr(self):
"""
Generate a python-graph representation of this automaton, stored
in 'self.pygraph'
"""
if digraph is None:
print "WARNING: attempted to call unavailable method createPygraphRepr."
return
# Create directed graph in pygraph.
self.pygraph = digraph()
# Add nodes to graph.
for state in self.states:
self.pygraph.add_node(state.id, state.state.items())
# Add edges to graph.
for state in self.states:
for trans in state.transition:
self.pygraph.add_edge((state.id, trans))
[docs] def loadPygraphRepr(self):
"""
Recreate automaton states from 'self.pygraph'.
Merge and update transition listings as needed. N.B., this
method will change IDs after trimming to ensure indexing still
works (since self.states attribute is a list).
"""
if digraph is None:
print "WARNING: attempted to call unavailable method loadPygraphRepr."
return
# Reorder nodes by setting 'new_state_id'.
for (i, node) in enumerate(self.pygraph.nodes()):
self.pygraph.add_node_attribute(node, ('new_state_id', i))
# Recreate automaton from graph.
self.states = []
for node in self.pygraph.nodes():
node_attr = dict(self.pygraph.node_attributes(node))
id = node_attr.pop('new_state_id')
transition = [dict(self.pygraph.node_attributes(neighbor))['new_state_id']
for neighbor in self.pygraph.neighbors(node)]
self.states.append(AutomatonState(id=id, state=node_attr,
transition=transition))
[docs] def writeDotFileEdged(self, fname, env_vars, sys_vars,
hideZeros=False, hideAgentNames=True):
"""Write edge-labeled automaton to Graphviz DOT file.
I forked the method writeDotFile for fear of feature creep.
At least now the features will creep upon us from separate
methods, rather than a single complicated argument list.
env_vars and sys_vars should be lists of variable names (type
string) describing environment and system variables,
respectively.
The intent is to view nodes labeled with a system decision,
and edges labeled with an environment decision, thus better
expressing the game. Recall the automaton is a strategy.
Return False on failure; True otherwise (success).
"""
if len(env_vars) == 0 or len(sys_vars) == 0:
return False
# Make looping possible
agents = {"env" : env_vars,
"sys" : sys_vars}
output = "digraph A {\n"
# Prebuild sane state names
state_labels = dict()
for state in self.states:
for agent_name in agents.keys():
state_labels[str(state.id)+agent_name] = ''
for (k,v) in state.state.items():
if (not hideZeros) or (v != 0):
agent_name = None
for agent_candidate in agents.keys():
if k in agents[agent_candidate]:
agent_name = agent_candidate
break
if agent_name is None:
printWarning("variable \""+k+"\" does not belong to an agent in distinguishedTurns")
return False
if len(state_labels[str(state.id)+agent_name]) == 0:
if len(agent_name) > 0 and not hideAgentNames:
state_labels[str(state.id)+agent_name] += str(state.id)+"::"+agent_name+";\\n" + k+": "+str(v)
else:
state_labels[str(state.id)+agent_name] += str(state.id)+";\\n" + k+": "+str(v)
else:
state_labels[str(state.id)+agent_name] += ", "+k+": "+str(v)
for agent_name in agents.keys():
if len(state_labels[str(state.id)+agent_name]) == 0:
if not hideAgentNames:
state_labels[str(state.id)+agent_name] = str(state.id)+"::"+agent_name+";\\n {}"
else:
state_labels[str(state.id)+agent_name] = str(state.id)+";\\n {}"
# Initialization point
output += " \"\" [shape=circle,style=filled,color=black];\n"
# All nodes and edges
for state in self.states:
if len(self.getAutInSet(state.id)) == 0:
# Treat init nodes specially
output += " \"\" -> \"" \
+ state_labels[str(state.id)+"sys"] +"\" [label=\""
output += state_labels[str(state.id)+"env"] + "\"];\n"
for trans in state.transition:
output += " \""+ state_labels[str(state.id)+"sys"] +"\" -> \"" \
+ state_labels[str(self.states[trans].id)+"sys"] +"\" [label=\""
output += state_labels[str(self.states[trans].id)+"env"] + "\"];\n"
output += "\n}\n"
with open(fname, "w") as f:
f.write(output)
return True
[docs] def writeDotFile(self, fname, hideZeros=False,
distinguishTurns=None, turnOrder=None):
"""Write automaton to Graphviz DOT file.
In each state, the node ID and nonzero variables and their
value (in that state) are listed. This style is motivated by
Boolean variables, but applies to all variables, including
those taking arbitrary integer values.
N.B., to allow for trace memory (manifested as ``rank'' in
JTLV output), we include an ID for each node. Thus, identical
valuation of variables does *not* imply state equivalency
(confusingly).
If hideZeros is True, then for each vertex (in the DOT
diagram) variables taking the value 0 are *not* shown. This
may lead to more succinct diagrams when many boolean variables
are involved. The default if False, i.e. show all variable
values.
It is possible to break states into a linear sequence of steps
for visualization purposes using the argument
distinguishTurns. If not None, distinguishTurns should be a
dictionary with keys as strings indicating the agent
(e.g. "env" and "sys"), and values as lists of variable names
that belong to that agent. These lists should be disjoint.
Note that variable names are case sensitive!
If distinguishTurns is not None, state labels (in the DOT
digraph) now have a preface of the form ID::agent, where ID is
the original state identifier and "agent" is a key from
distinguishTurns.
N.B., if distinguishTurns is not None and has length 1, it is
ignored (i.e. treated as None).
turnOrder is only applicable if distinguishTurns is not None.
In this case, if turnOrder is None, then use whatever order is
given by default when listing keys of distinguishTurns.
Otherwise, if turnOrder is a list (or list-like), then each
element is key into distinguishTurns and state decompositions
take that order.
Return False on failure; True otherwise (success).
"""
if (distinguishTurns is not None) and (len(distinguishTurns) <= 1):
# This is a fringe case and seemingly ok to ignore.
distinguishTurns = None
output = "digraph A {\n"
# Prebuild sane state names
state_labels = dict()
for state in self.states:
if distinguishTurns is None:
state_labels[str(state.id)] = ''
else:
# If distinguishTurns is not a dictionary with
# items of the form string -> list, it should
# simulate that behavior.
for agent_name in distinguishTurns.keys():
state_labels[str(state.id)+agent_name] = ''
for (k,v) in state.state.items():
if not (hideZeros and (v == 0 or v == "FALSE")):
if distinguishTurns is None:
agent_name = ''
else:
agent_name = None
for agent_candidate in distinguishTurns.keys():
if k in distinguishTurns[agent_candidate]:
agent_name = agent_candidate
break
if agent_name is None:
printWarning("variable \""+k+"\" does not belong to an agent in distinguishedTurns")
return False
if len(state_labels[str(state.id)+agent_name]) == 0:
if len(agent_name) > 0:
state_labels[str(state.id)+agent_name] += str(state.id)+"::"+agent_name+";\\n" + k+": "+str(v)
else:
state_labels[str(state.id)+agent_name] += str(state.id)+";\\n" + k+": "+str(v)
else:
state_labels[str(state.id)+agent_name] += ", "+k+": "+str(v)
if distinguishTurns is None:
if len(state_labels[str(state.id)]) == 0:
state_labels[str(state.id)] = str(state.id)+";\\n {}"
else:
for agent_name in distinguishTurns.keys():
if len(state_labels[str(state.id)+agent_name]) == 0:
state_labels[str(state.id)+agent_name] = str(state.id)+"::"+agent_name+";\\n {}"
if (distinguishTurns is not None) and (turnOrder is None):
if distinguishTurns is not None:
turnOrder = distinguishTurns.keys()
for state in self.states:
if distinguishTurns is not None:
output += " \""+ state_labels[str(state.id)+turnOrder[0]] +"\" -> \"" \
+ state_labels[str(state.id)+turnOrder[1]] +"\";\n"
for agent_ind in range(1, len(turnOrder)-1):
output += " \""+ state_labels[str(state.id)+turnOrder[agent_ind]] +"\" -> \"" \
+ state_labels[str(state.id)+turnOrder[agent_ind+1]] +"\";\n"
for trans in state.transition:
if distinguishTurns is None:
output += " \""+ state_labels[str(state.id)] +"\" -> \"" \
+ state_labels[str(self.states[trans].id)] +"\";\n"
else:
output += " \""+ state_labels[str(state.id)+turnOrder[-1]] +"\" -> \"" \
+ state_labels[str(self.states[trans].id)+turnOrder[0]] +"\";\n"
output += "\n}\n"
with open(fname, "w") as f:
f.write(output)
return True
[docs] def dumpXML(self, pretty=True, idt_level=0):
"""Return string of automaton conforming to tulipcon XML.
If pretty is True, then use indentation and newlines to make
the resulting XML string more visually appealing. idt_level
is the base indentation level on which to create automaton
string. This level is only relevant if pretty=True.
Note that name subtags within aut tag are left blank.
"""
if pretty:
nl = "\n" # Newline
idt = " " # Indentation
else:
nl = ""
idt = ""
output = idt_level*idt+'<aut>'+nl
idt_level += 1
for node in self.states:
output += idt_level*idt+'<node>'+nl
idt_level += 1
output += idt_level*idt+'<id>' + str(node.id) + '</id><name></name>'+nl
output += idt_level*idt+conxml.taglist("child_list", node.transition)+nl
output += idt_level*idt+conxml.tagdict("state", node.state)+nl
idt_level -= 1
output += idt_level*idt+'</node>'+nl
idt_level -= 1
output += idt_level*idt+'</aut>'+nl
return output
[docs] def loadXML(self, x, namespace=""):
"""Read an automaton from given string conforming to tulipcon XML.
N.B., on a successful processing of the given string, the
original Automaton instance to which this method is attached
is replaced with the new structure. On failure, however, the
original Automaton is untouched.
The argument x can also be an instance of
xml.etree.ElementTree._ElementInterface ; this is mainly for
internal use, e.g. by the function untagpolytope and some
load/dumpXML methods elsewhere.
Return True on success; on failure, return False or raise
exception.
"""
if not isinstance(x, str) and not isinstance(x, ET._ElementInterface):
raise ValueError("given automaton XML must be a string or ElementTree._ElementInterface.")
if (namespace is None) or (len(namespace) == 0):
ns_prefix = ""
else:
ns_prefix = "{"+namespace+"}"
if isinstance(x, str):
etf = ET.fromstring(x)
else:
etf = x
if etf.tag != ns_prefix+"aut":
return False
node_list = etf.findall(ns_prefix+"node")
states = []
id_list = [] # For more convenient searching, and to catch redundancy
for node in node_list:
this_id = int(node.find(ns_prefix+"id").text)
this_name = node.find(ns_prefix+"name").text
(tag_name, this_child_list) = conxml.untaglist(node.find(ns_prefix+"child_list"),
cast_f=int)
if tag_name != ns_prefix+"child_list":
# This really should never happen and may not even be
# worth checking.
raise ValueError("failure of consistency check while processing aut XML string.")
(tag_name, this_state) = conxml.untagdict(node.find(ns_prefix+"state"),
cast_f_values=int,
namespace=namespace)
if tag_name != ns_prefix+"state":
raise ValueError("failure of consistency check while processing aut XML string.")
if this_id in id_list:
printWarning("duplicate nodes found: "+str(this_id)+"; ignoring...")
continue
id_list.append(this_id)
states.append(AutomatonState(id=this_id,
state=copy.copy(this_state),
transition=copy.copy(this_child_list)))
# Sort the mess
ordered_states = []
num_states = len(states)
for this_id in range(num_states):
ind = 0
while (ind < len(states)) and (states[ind].id != this_id):
ind += 1
if ind >= len(states):
raise ValueError("missing states in automaton.")
ordered_states.append(states.pop(ind))
self.states = ordered_states # Finally, commit.
return True
def size(self):
return self.__len__()
[docs] def addAutState(self, aut_state):
"""
Add an AutomatonState object to this automaton.
Input:
- `aut_state`: an AutomatonState object to be added to this Automaton object.
"""
if (isinstance(aut_state, AutomatonState)):
self.states.append(aut_state)
else:
printError("Input to addAutState must be of type AutomatonState", obj=self)
[docs] def getAutState(self, aut_state_id):
"""
Return an AutomatonState object stored in this Automaton object
whose id is `aut_state_id`.
Raise IndexError if such AutomatonState object does not exist.
Input:
- `aut_state_id`: an integer specifying the id of the AutomatonState object
to be returned by this function.
"""
if (aut_state_id < self.size() and self.states[aut_state_id].id == aut_state_id):
return self.states[aut_state_id]
else:
aut_state_index = self.size() - 1
while (aut_state_index >= 0 and aut_state_id != self.states[aut_state_index].id):
aut_state_index -= 1
if (aut_state_index >= 0):
return self.states[aut_state_index]
else:
raise IndexError("State ID " + str(aut_state_id) + " not found.")
[docs] def getAutInSet(self, aut_state_id):
"""Find all nodes that include given ID in their outward transitions.
If the automaton is viewed as a directed graph, and the given
ID corresponds to node v, then we are interested in all v'\in V
such that (v',v) is an edge in the graph.
(Comments on efficiency.) The automaton is stored as a tree,
where parents connect to children as in a linked list. Thus
given a node, finding the set of outward edges takes constant
time, whereas finding the inward set (as done by this method)
requires searching the set of nodes and their respective
transition sets; so, O(N*M) time, where N is the number of
nodes and M is the average number of outward edges.
Return list of nodes (instances of AutomatonState),
or None on error.
"""
try:
this_node = self.getAutState(aut_state_id)
except IndexError:
return None
if not isinstance(this_node, AutomatonState):
return None
inward_list = []
for k in range(len(self.states)):
if aut_state_id in self.states[k].transition:
inward_list.append(self.states[k])
return inward_list
[docs] def setAutStateState(self, aut_state_id, aut_state_state, verbose=0):
"""
Set the state of the AutomatonState object whose id is `aut_state_id` to
`aut_state_state`.
If such an AutomatoSstate object does not exist, an AutomatonState object
whose id is `aut_state_id` and whose state is `aut_state_state` will be added.
Input:
- `aut_state_id`: an integer that specifies the id of the AutomatonState
object to be set.
- `aut_state_state`: a dictionary that represents the new state of the
AutomatonState object.
"""
try:
aut_state = self.getAutState(aut_state_id)
aut_state.state = aut_state_state
if (verbose > 0):
print 'Setting state of AutomatonState ' + str(aut_state_id) + \
': ' + str(aut_state_state)
except IndexError:
# State does not already exist, add it
self.addAutState(AutomatonState(id=aut_state_id, state=aut_state_state, \
transition=[]))
if (verbose > 0):
print 'Adding state ' + str(aut_state_id) + ': ' + str(aut_state_state)
[docs] def setAutStateTransition(self, aut_state_id, aut_state_transition, verbose=0):
"""
Set the transition of the AutomatonState object whose id is `aut_state_id` to
`aut_state_transition`. If such automaton state does not exist, an
AutomatonState whose id is aut_state_id and whose transition is
aut_state_transition will be added.
Input:
- `aut_state_id`: an integer that specifies the id of the AutomatonState
object to be set.
- `aut_state_transition`: a list of id's of the AutomatonState objects to which
the AutomatonState object with id `aut_state_id` can transition.
"""
aut_state = self.getAutState(aut_state_id)
if (isinstance(aut_state, AutomatonState)):
aut_state.transition = aut_state_transition
if (verbose > 0):
print 'Setting transition of AutomatonState ' + str(aut_state_id) + \
': ' + str(aut_state_transition)
else:
self.addAutState(AutomatonState(id=aut_state_id, state={}, \
transition=aut_state_transition))
if (verbose > 0):
print 'Adding AutomatonState ' + str(aut_state_id) + \
' with transition ' + str(aut_state_transition)
[docs] def findAllAutState(self, state):
"""
Return all the AutomatonState objects stored in this automaton whose state
matches `state`.
Return -1 if such an AutomatonState objects is not found.
Input:
- `state`: a dictionary whose keys are the names of the variables
and whose values are the values of the variables.
"""
all_aut_states = []
for aut_state in self.states:
if (aut_state.state == state):
all_aut_states.append(aut_state)
return all_aut_states
[docs] def getAutInit(self):
"""Return list of nodes that are initial, i.e., have empty In set.
N.B., the set of initial nodes is not saved, so every time you
call getAutInit, all nodes are checked for empty inward edge
sets, which itself incurs a search cost (cf. doc for method
getAutInSet).
"""
init_nodes = []
for k in range(len(self.states)):
if len(self.getAutInSet(self.states[k].id)) == 0:
init_nodes.append(self.states[k])
return init_nodes
[docs] def findAllAutPartState(self, state_frag):
"""Return list of nodes consistent with the given fragment.
state_frag should be a dictionary. We say the state in a node
is ``consistent'' with the fragment if for every variable
appearing in state_frag, the valuations in state_frag and the
node are the same.
E.g., let aut be an instance of Automaton. Then
aut.findAllAutPartState({"foobar" : 1}) would return a list of
nodes (instances of AutomatonState) in which the variable
"foobar" is 1 (true).
"""
all_aut_states = []
for aut_state in self.states:
match_flag = True
for k in state_frag.items():
if k not in aut_state.state.items():
match_flag = False
break
if match_flag:
all_aut_states.append(aut_state)
return all_aut_states
[docs] def findAutState(self, state):
"""
Return the first AutomatonState object stored in this automaton whose state
matches `state`.
Return -1 if such an AutomatonState objects is not found.
Input:
- `state`: a dictionary whose keys are the names of the variables
and whose values are the values of the variables.
"""
for aut_state in self.states:
if (aut_state.state == state):
return aut_state
return -1
[docs] def findNextAutState(self, current_aut_state, env_state={},
deterministic_env=True):
"""
Return the next AutomatonState object based on `env_state`.
Return -1 if such an AutomatonState object is not found.
Input:
- `current_aut_state`: the current AutomatonState. Use current_aut_state = None
for unknown current or initial automaton state.
- `env_state`: a dictionary whose keys are the names of the environment
variables and whose values are the values of the variables.
- 'deterministic_env': specifies whether to choose the environment state deterministically.
"""
if (current_aut_state is None):
transition = range(self.size())
else:
transition = current_aut_state.transition[:]
def stateSatisfiesEnv(next_aut_id):
for var in env_state.keys():
if not (self.states[next_aut_id].state[var] == env_state[var]):
return False
return True
transition = filter(stateSatisfiesEnv, transition)
if len(transition) == 0:
return -1
elif (deterministic_env):
return self.states[transition[0]]
else:
return self.states[random.choice(transition)]
def stripNames(self):
for state in self.states:
for k in state.state.keys():
stripped = k.split(".")[-1]
state.state[stripped] = state.state[k]
del(state.state[k])
###################################################################
[docs]def createAut(aut_file, varnames=[], verbose=0):
"""
Construct an automation from aut_file.
Input:
- `aut_file`: the name of the text file containing the automaton.
- `varnames`: a list of all the variable names. If it is not empty, then this
function will also check whether the variables in aut_file are in varnames.
"""
automaton = Automaton(states_or_file=aut_file, varnames=[], verbose=verbose)
return automaton