Source code for grsim

# !/usr/bin/env python
#
# Copyright (c) 2011 by California Institute of Technology
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
# 
# 3. Neither the name of the California Institute of Technology nor
#    the names of its contributors may be used to endorse or promote
#    products derived from this software without specific prior
#    written permission.
# 
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL CALTECH
# OR THE CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
# 
# $Id$
""" 
-----------------
Simulation Module
-----------------
"""

import time
import random
from multiprocessing import Process
from subprocess import call

from automaton import Automaton, AutomatonState
from errorprint import printWarning, printError
from congexf import dumpGexf, changeGexfAttvalue
from gephistream import GephiStream

activeID = 'is_active'

[docs]def grsim(aut_list, aut_trans_dict={}, env_states=[{}], num_it=20, deterministic_env=True, graph_vis=False, destfile="sim_graph.gexf", label_vars=None, delay=2, vis_depth=3): """ Simulate an execution of the given list of automata and return a sequence of automaton states. If graph_vis is True, simulate the automata live in Gephi. For the simplest use case, try something like: aut_states_list = grsim([aut], env_states=[{state: var...}]) Note that 'aut_states_list' is composed of (autID, autState) tuples, 'aut' is enclosed in a list, and 'env_states' is a list of state dictionaries. Arguments: - `aut_list` -- a list of Automaton objects containing the automata generated from jtlvint.synthesize or jtlv.computeStrategy function. - `aut_trans_dict` -- a dictionary in which the keys correspond to exit states of automata and the values are the entry states of the next automata. Both keys and values are tuples in the following format: (AutomatonID, AutomatonState) where 'AutomatonID' is an integer corresponding to the index of the current automaton and 'AutomatonState' is the current automaton state. - `env_states` -- a list of dictionaries of environment state, specifying the sequence of environment states. If the length of this sequence is less than `num_it`, then this function will automatically pick the environment states for the rest of the execution. - `num_it` -- the number of iterations. - `deterministic_env` -- specify whether this function will choose the environment state deterministically. - `graph_vis` -- specify whether to visualize the simulation in Gephi. - `destfile` -- for graph visualization, the string name of the desired destination '.gexf' file. - `label_vars` -- for graph visualization, a list of the names of the system or environment variables to be encoded as labels. - `delay` -- for graph visualization, the time between simulation steps. - `vis_depth` -- for graph visualization, set the number of previous states to continue displaying. Return: List of automaton states, corresponding to a sequence of simulated transitions. Each entry will be formatted as follows: (AutomatonID, AutomatonState) as described above under 'aut_trans_dict'. """ if not (isinstance(aut_list, list) and len(aut_list) > 0 and isinstance(aut_trans_dict, dict) and isinstance(env_states, list) and isinstance(num_it, int) and isinstance(deterministic_env, bool) and isinstance(graph_vis, bool) and isinstance(destfile, str) and (label_vars == None or isinstance(label_vars, list)) and isinstance(delay, int) and isinstance(vis_depth, int)): raise TypeError("Invalid arguments to grsim") # aut_states will hold the list of automaton states traversed. aut_states = [] # Start at the first automaton. aut_id = 0 aut = aut_list[aut_id] # Set 'aut_state' to 'None' to find a valid initial state in # 'findNextAutState'. aut_state = None if graph_vis: visualizer = visualizeGraph(aut_list, destfile, label_vars=label_vars) active_nodes = [] # Simulate automata by stepping through the sequence of environment # states. If there are no more available, randomly choose a valid # transition. for i in range(num_it): # Set current env_state, if available. if i < len(env_states): env_state = env_states[i] else: env_state = {} # Find an automaton state that satisfies 'env_state'. Note that # 'findNextAutState' returns -1 if no state is possible, and that # a 'current_aut_state' of None means to choose an initial state. aut_state = aut.findNextAutState(current_aut_state=aut_state, env_state=env_state, deterministic_env=deterministic_env) if aut_state == -1: printError('The specified sequence of environment states ' + \ 'does not satisfy the environment assumption.') return aut_states if graph_vis: # Update active_nodes list. active_nodes.append((aut_id, aut_state)) # Stream updated active_nodes. visualizer.update(active_nodes) if len(active_nodes) > vis_depth: del active_nodes[0] time.sleep(delay) aut_states.append((aut_id, aut_state)) # Transition to next automaton? If yes, change 'aut' and 'aut_state' # to the corresponding ones in the next automaton. if (aut_id, aut_state) in aut_trans_dict.keys(): new_state_tuple = aut_trans_dict[(aut_id, aut_state)] aut_id = new_state_tuple[0] aut = aut_list[aut_id] aut_state = new_state_tuple[1] if graph_vis: # Update active_nodes list. active_nodes.append((aut_id, aut_state)) # Stream updated active_nodes. visualizer.update(active_nodes) if len(active_nodes) > vis_depth: del active_nodes[0] time.sleep(delay) aut_states.append((aut_id, aut_state)) if graph_vis: visualizer.close() return aut_states ###################################################################
[docs]def writeSimStatesToFile(states, file, verbose=0): """ Write a simulation trace (sequence of states) to a text file. Arguments: - `states` -- a list of tuples of automaton states, formatted as: (AutomatonID, AutomatonState) where 'AutomatonID' is an integer corresponding to the index of the current automaton and 'AutomatonState' is the current automaton state. - `file` -- the string name of the desired destination file. Return: (nothing) """ f = open(file, 'w') if (verbose > 0): print 'Writing simulation result to ' + file for state in states: for var in state[1].state.keys(): f.write(var + ':') f.write(str(state[1].state[var]) + ', ') f.write('\n') f.close() ###################################################################
[docs]def writeStatesToFile(aut_list, destfile, aut_states_list=[], label_vars=None): """ Write the states and transitions from a list of automata to a '.gexf' graph file. If a list of simulated states is given, record the sequence of traversed states. Arguments: - `aut_list` -- a list of Automaton objects. - `destfile` -- the string name of the desired destination file. - `aut_states_list` -- a list of tuples of automaton states, formatted as: (AutomatonID, AutomatonState) where 'AutomatonID' is an integer corresponding to the index of the current automaton and 'AutomatonState' is the current automaton state. - `label_vars` -- a list of the names of the system or environment variables to be encoded as labels. Return: (nothing) """ if not (isinstance(aut_list, list) and isinstance(destfile, str) and isinstance(aut_states_list, list) and (label_vars == None or isinstance(label_vars, list))): raise TypeError("Invalid arguments to writeStatesToFile") # Generate a Gexf-formatted string of automata. output = dumpGexf(aut_list, label_vars=label_vars) # 'aut_states_list' is a list of automaton/automaton state tuples. # Transitioning from one tuple to the next should correspond to changing # automaton states in the receding horizon case. iteration = 1 for state_tuple in aut_states_list: output = changeGexfAttvalue(output, activeID, iteration, node_id=str(state_tuple[0]) + '.' + str(state_tuple[1])) iteration += 1 print "Writing graph states to " + destfile f = open(destfile, 'w') f.write(output) f.close()
[docs]class visualizeGraph: """ Open Gephi (a graph visualization application) and stream a live automaton simulation to it. Arguments: - `aut_list` -- a list of Automaton objects. - `destfile` -- the string name of a '.gexf' graph file to be opened in Gephi. - `label_vars` -- a list of the names of the system or environment variables to be encoded as labels. Fields: - `gs` -- a Gephi streaming server. - `gephi` -- an asynchronous process running Gephi. """ def __init__(self, aut_list, destfile, label_vars=None): # First write the automata to file and open them in Gephi. writeStatesToFile(aut_list, destfile, label_vars=label_vars) # Changes to the graph will be streamed from this server. self.gs = GephiStream('server') # Open Gephi in a separate thread. print "Opening " + destfile + " in Gephi." self.gephi = Process(target=lambda: call(["gephi", destfile])) self.gephi.start() # Wait for user before streaming simulation. raw_input("When Gephi has loaded, press 'return' or 'enter'\n" + \ "to start streaming the automaton simulation.\n") def close(self): # Close the graph streaming server and the Gephi thread. self.gs.close() print 'Close Gephi to exit.' self.gephi.join()
[docs] def update(self, active_nodes): """Update the graph by streaming the changed active nodes. Arguments: - `active_nodes` -- an ordered list of nodes that should be active. Return: (nothing) """ for (i, active_node) in enumerate(active_nodes): self.gs.changeNode(active_node[0], active_node[1], {activeID: i})