Source code for spec

#!/usr/bin/env python
#
# Copyright (c) 2011, 2012 by California Institute of Technology
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
# 
# 3. Neither the name of the California Institute of Technology nor
#    the names of its contributors may be used to endorse or promote
#    products derived from this software without specific prior
#    written permission.
# 
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL CALTECH
# OR THE CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
# 
# $Id$
""" 
Specification module
"""

import re, copy, ltl_parse

[docs]class GRSpec: """ GRSpec class for specifying a GR[1] specification of the form:: (env_init & []env_safety & []<>env_prog) -> (sys_init & []sys_safety & []<>sys_prog) A GRSpec object contains the following fields: - C{env_vars}: a list of variables (names given as strings) that are determined by the environment. - C{env_init}: a string or a list of string that specifies the assumption about the initial state of the environment. - C{env_safety}: a string or a list of string that specifies the assumption about the evolution of the environment state. - C{env_prog}: a string or a list of string that specifies the justice assumption on the environment. - C{sys_vars}: a list of variables (names given as strings) that are controlled by the system. - C{sys_init}: a string or a list of string that specifies the requirement on the initial state of the system. - C{sys_safety}: a string or a list of string that specifies the safety requirement. - C{sys_prog}: a string or a list of string that specifies the progress requirement. An empty list for any formula (e.g., if env_init = []) is marked as "True" in the specification. This corresponds to the constant Boolean function, which usually means that subformula has no effect (is non-restrictive) on the spec. """ def __init__(self, env_vars=[], sys_vars=[], env_init='', sys_init='', env_safety='', sys_safety='', env_prog='', sys_prog=''): self.env_vars = copy.copy(env_vars) self.sys_vars = copy.copy(sys_vars) self.env_init = copy.deepcopy(env_init) self.sys_init = copy.deepcopy(sys_init) self.env_safety = copy.deepcopy(env_safety) self.sys_safety = copy.deepcopy(sys_safety) self.env_prog = copy.deepcopy(env_prog) self.sys_prog = copy.deepcopy(sys_prog) # It is easier to work with lists... if isinstance(self.sys_safety, str): if len(self.sys_safety) == 0: self.sys_safety = [] else: self.sys_safety = [self.sys_safety] if isinstance(self.sys_init, str): if len(self.sys_init) == 0: self.sys_init = [] else: self.sys_init = [self.sys_init] if isinstance(self.sys_prog, str): if len(self.sys_prog) == 0: self.sys_prog = [] else: self.sys_prog = [self.sys_prog] if isinstance(self.env_safety, str): if len(self.env_safety) == 0: self.env_safety = [] else: self.env_safety = [self.env_safety] if isinstance(self.env_init, str): if len(self.env_init) == 0: self.env_init = [] else: self.env_init = [self.env_init] if isinstance(self.env_prog, str): if len(self.env_prog) == 0: self.env_prog = [] else: self.env_prog = [self.env_prog]
[docs] def importGridWorld(self, gworld, offset=(0,0), controlled_dyn=True): """Append specification describing a gridworld. Basically, call the spec method of the given GridWorld object and merge with its output. See documentation about the L{spec<gridworld.GridWorld.spec>} method of L{GridWorld<gridworld.GridWorld>} class for details. @type gworld: L{GridWorld} """ s = gworld.spec(offset=offset, controlled_dyn=controlled_dyn) for evar in s.env_vars: if evar not in self.env_vars: self.env_vars.append(evar) for svar in s.sys_vars: if svar not in self.sys_vars: self.sys_vars.append(svar) self.env_init.extend(s.env_init) self.env_safety.extend(s.env_safety) self.env_prog.extend(s.env_prog) self.sys_init.extend(s.sys_init) self.sys_safety.extend(s.sys_safety) self.sys_prog.extend(s.sys_prog)
[docs] def importDiscDynamics(self, disc_dynamics, cont_varname="cellID"): """Append results of discretization (abstraction) to specification. disc_dynamics is an instance of PropPreservingPartition, such as returned by the function discretize in module discretize. Notes ===== - The cell names are *not* mangled, in contrast to the approach taken in the createProbFromDiscDynamics method of the SynthesisProb class. - Any name in disc_dynamics.list_prop_symbol matching a system variable is removed from sys_vars, and its occurrences in the specification are replaced by a disjunction of corresponding cells. - gr1c does not (yet) support variable domains beyond Boolean, so we treat each cell as a separate Boolean variable and explicitly enforce mutual exclusion. """ if len(disc_dynamics.list_region) == 0: # Vacuous call? return cont_varname += "_" # ...to make cell number easier to read for i in range(len(disc_dynamics.list_region)): if (cont_varname+str(i)) not in self.sys_vars: self.sys_vars.append(cont_varname+str(i)) # The substitution code and transition code below are mostly # from createProbFromDiscDynamics and toJTLVInput, # respectively, in the rhtlp module, with some style updates. for prop_ind, prop_sym in enumerate(disc_dynamics.list_prop_symbol): reg = [j for j in range(len(disc_dynamics.list_region)) if disc_dynamics.list_region[j].list_prop[prop_ind] != 0] if len(reg) == 0: subformula = "False" subformula_next = "False" else: subformula = " | ".join([cont_varname+str(regID) for regID in reg]) subformula_next = " | ".join([cont_varname+str(regID)+"'" for regID in reg]) prop_sym_next = prop_sym+"'" self.sym2prop(props={prop_sym_next:subformula_next}) self.sym2prop(props={prop_sym:subformula}) # It is easier to work with lists... if isinstance(self.sys_safety, str): if len(self.sys_safety) == 0: self.sys_safety = [] else: self.sys_safety = [self.sys_safety] if isinstance(self.sys_init, str): if len(self.sys_init) == 0: self.sys_init = [] else: self.sys_init = [self.sys_init] # Transitions for from_region in range(len(disc_dynamics.list_region)): to_regions = [i for i in range(len(disc_dynamics.list_region)) if disc_dynamics.trans[i][from_region] != 0] self.sys_safety.append(cont_varname+str(from_region) + " -> (" + " | ".join([cont_varname+str(i)+"'" for i in to_regions]) + ")") # Mutex self.sys_init.append("") self.sys_safety.append("") for regID in range(len(disc_dynamics.list_region)): if len(self.sys_safety[-1]) > 0: self.sys_init[-1] += "\n| " self.sys_safety[-1] += "\n| " self.sys_init[-1] += "(" + cont_varname+str(regID) if len(disc_dynamics.list_region) > 1: self.sys_init[-1] += " & " + " & ".join(["(!"+cont_varname+str(i)+")" for i in range(len(disc_dynamics.list_region)) if i != regID]) self.sys_init[-1] += ")" self.sys_safety[-1] += "(" + cont_varname+str(regID)+"'" if len(disc_dynamics.list_region) > 1: self.sys_safety[-1] += " & " + " & ".join(["(!"+cont_varname+str(i)+"')" for i in range(len(disc_dynamics.list_region)) if i != regID]) self.sys_safety[-1] += ")"
[docs] def sym2prop(self, props, verbose=0): """Replace the symbols of propositions in this spec with the actual propositions""" # Replace symbols for propositions on discrete variables with the actual # propositions if (props is not None): symfound = True while (symfound): symfound = False for propSymbol, prop in props.iteritems(): if propSymbol[-1] != "'": # To handle gr1c primed variables propSymbol += r"\b" if (verbose > 2): print '\t' + propSymbol + ' -> ' + prop if (isinstance(self.env_init, str)): if (len(re.findall(r'\b'+propSymbol, self.env_init)) > 0): self.env_init = re.sub(r'\b'+propSymbol, '('+prop+')', self.env_init) symfound = True else: for i in xrange(0, len(self.env_init)): if (len(re.findall(r'\b'+propSymbol, self.env_init[i])) > 0): self.env_init[i] = re.sub(r'\b'+propSymbol, '('+prop+')', \ self.env_init[i]) symfound = True if (isinstance(self.sys_init, str)): if (len(re.findall(r'\b'+propSymbol, self.sys_init)) > 0): self.sys_init = re.sub(r'\b'+propSymbol, '('+prop+')', self.sys_init) symfound = True else: for i in xrange(0, len(self.sys_init)): if (len(re.findall(r'\b'+propSymbol, self.sys_init[i])) > 0): self.sys_init[i] = re.sub(r'\b'+propSymbol, '('+prop+')', \ self.sys_init[i]) symfound = True if (isinstance(self.env_safety, str)): if (len(re.findall(r'\b'+propSymbol, self.env_safety)) > 0): self.env_safety = re.sub(r'\b'+propSymbol, '('+prop+')', self.env_safety) symfound = True else: for i in xrange(0, len(self.env_safety)): if (len(re.findall(r'\b'+propSymbol, self.env_safety[i])) > 0): self.env_safety[i] = re.sub(r'\b'+propSymbol, '('+prop+')', \ self.env_safety[i]) symfound = True if (isinstance(self.sys_safety, str)): if (len(re.findall(r'\b'+propSymbol, self.sys_safety)) > 0): self.sys_safety = re.sub(r'\b'+propSymbol, '('+prop+')', self.sys_safety) symfound = True else: for i in xrange(0, len(self.sys_safety)): if (len(re.findall(r'\b'+propSymbol, self.sys_safety[i])) > 0): self.sys_safety[i] = re.sub(r'\b'+propSymbol, '('+prop+')', \ self.sys_safety[i]) symfound = True if (isinstance(self.env_prog, str)): if (len(re.findall(r'\b'+propSymbol, self.env_prog)) > 0): self.env_prog = re.sub(r'\b'+propSymbol, '('+prop+')', self.env_prog) symfound = True else: for i in xrange(0, len(self.env_prog)): if (len(re.findall(r'\b'+propSymbol, self.env_prog[i])) > 0): self.env_prog[i] = re.sub(r'\b'+propSymbol, '('+prop+')', \ self.env_prog[i]) symfound = True if (isinstance(self.sys_prog, str)): if (len(re.findall(r'\b'+propSymbol, self.sys_prog)) > 0): self.sys_prog = re.sub(r'\b'+propSymbol, '('+prop+')', self.sys_prog) symfound = True else: for i in xrange(0, len(self.sys_prog)): if (len(re.findall(r'\b'+propSymbol, self.sys_prog[i])) > 0): self.sys_prog[i] = re.sub(r'\b'+propSymbol, '('+prop+')', \ self.sys_prog[i]) symfound = True
def toSMVSpec(self): trees = [] # NuSMV can only handle system states for s, ops in [(self.sys_init, []), (self.sys_safety, ['[]']), (self.sys_prog, ['[]', '<>'])]: if s: if isinstance(s, str): s = [s] subtrees = [] ops.reverse() # so we apply operators outwards for f in s: t = ltl_parse.parse(f) # assign appropriate temporal operators for op in ops: t = ltl_parse.ASTUnTempOp.new(t, op) subtrees.append(t) # & together expressions t = reduce(lambda x, y: ltl_parse.ASTAnd.new(x, y), subtrees) trees.append(t) # & together converted subformulae return reduce(lambda x, y: ltl_parse.ASTAnd.new(x, y), trees)
[docs] def toJTLVSpec(self): """Return a list of two strings [assumption, guarantee] corresponding to this GR[1] specification.""" spec = ['', ''] # env init if (isinstance(self.env_init, str)): if (len(self.env_init) > 0 and not self.env_init.isspace()): spec[0] += '-- valid initial env states\n' spec[0] += '\t' + self.env_init else: desc_added = False for env_init in self.env_init: if (len(env_init) > 0): if (len(spec[0]) > 0): spec[0] += ' & \n' if (not desc_added): spec[0] += '-- valid initial env states\n' desc_added = True spec[0] += '\t' + env_init # env safety if (isinstance(self.env_safety, str)): if (len(self.env_safety) > 0): if (len(spec[0]) > 0): spec[0] += ' & \n' spec[0] += '-- safety assumption on environment\n' spec[0] += '\t[](' + self.env_safety + ')' else: desc_added = False for env_safety in self.env_safety: if (len(env_safety) > 0): if (len(spec[0]) > 0): spec[0] += ' & \n' if (not desc_added): spec[0] += '-- safety assumption on environment\n' desc_added = True spec[0] += '\t[](' + env_safety + ')' # env progress if (isinstance(self.env_prog, str)): if (len(self.env_prog) > 0): if (len(spec[0]) > 0): spec[0] += ' & \n' spec[0] += '-- justice assumption on environment\n' spec[0] += '\t[]<>(' + self.env_prog + ')' else: desc_added = False for prog in self.env_prog: if (len(prog) > 0): if (len(spec[0]) > 0): spec[0] += ' & \n' if (not desc_added): spec[0] += '-- justice assumption on environment\n' desc_added = True spec[0] += '\t[]<>(' + prog + ')' # sys init if (isinstance(self.sys_init, str)): if (len(self.sys_init) > 0 and not self.sys_init.isspace()): spec[1] += '-- valid initial system states\n' spec[1] += '\t' + self.sys_init else: desc_added = False for sys_init in self.sys_init: if (len(sys_init) > 0): if (len(spec[1]) > 0): spec[1] += ' & \n' if (not desc_added): spec[1] += '-- valid initial system states\n' desc_added = True spec[1] += '\t' + sys_init # sys safety if (isinstance(self.sys_safety, str)): if (len(self.sys_safety) > 0): if (len(spec[1]) > 0): spec[1] += ' & ' spec[1] += '-- safety requirement on system\n' spec[1] +='\t[](' + self.sys_safety + ')' else: desc_added = False for sys_safety in self.sys_safety: if (len(sys_safety) > 0): if (len(spec[1]) > 0): spec[1] += ' & \n' if (not desc_added): spec[1] += '-- safety requirement on system\n' desc_added = True spec[1] += '\t[](' + sys_safety + ')' # sys progress if (isinstance(self.sys_prog, str)): if (len(self.sys_prog) > 0): if (len(spec[1]) > 0): spec[1] += ' & \n' spec[1] += '-- progress requirement on system\n' spec[1] += '\t[]<>(' + self.sys_prog + ')' else: desc_added = False for prog in self.sys_prog: if (len(prog) > 0): if (len(spec[1]) > 0): spec[1] += ' & \n' if (not desc_added): spec[1] += '-- progress requirement on system\n' desc_added = True spec[1] += '\t[]<>(' + prog + ')' return spec
[docs] def dumpgr1c(self): """Dump to gr1c specification string.""" output = "ENV: "+" ".join(self.env_vars)+";\n" output += "SYS: "+" ".join(self.sys_vars)+";\n\n" if isinstance(self.env_init, str): output += "ENVINIT: "+self.env_init+";\n" else: output += "ENVINIT: "+"\n& ".join(["("+s+")" for s in self.env_init])+";\n" if len(self.env_safety) == 0: output += "ENVTRANS:;\n" elif isinstance(self.env_safety, str): output += "ENVTRANS: []"+self.env_safety+";\n" else: output += "ENVTRANS: "+"\n& ".join(["[]("+s+")" for s in self.env_safety])+";\n" if len(self.env_prog) == 0: output += "ENVGOAL:;\n\n" elif isinstance(self.env_prog, str): output += "ENVGOAL: []<>"+self.env_prog+";\n\n" else: output += "ENVGOAL: "+"\n& ".join(["[]<>("+s+")" for s in self.env_prog])+";\n\n" if isinstance(self.sys_init, str): output += "SYSINIT: "+self.sys_init+";\n" else: output += "SYSINIT: "+"\n& ".join(["("+s+")" for s in self.sys_init])+";\n" if len(self.sys_safety) == 0: output += "SYSTRANS:;\n" elif isinstance(self.sys_safety, str): output += "SYSTRANS: []"+self.sys_safety+";\n" else: output += "SYSTRANS: "+"\n& ".join(["[]("+s+")" for s in self.sys_safety])+";\n" if len(self.sys_prog) == 0: output += "SYSGOAL:;\n" elif isinstance(self.sys_prog, str): output += "SYSGOAL: []<>"+self.sys_prog+";\n" else: output += "SYSGOAL: "+"\n& ".join(["[]<>("+s+")" for s in self.sys_prog])+";\n" return output