Source code for jtlvint

# Copyright (c) 2012, 2013 by California Institute of Technology
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
# 
# 3. Neither the name of the California Institute of Technology nor
#    the names of its contributors may be used to endorse or promote
#    products derived from this software without specific prior
#    written permission.
# 
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL CALTECH
# OR THE CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
""" 
Interface to the JTLV implementation of GR(1) synthesis

JTLV: http://jtlv.ysaar.net/


based on code from:
    grgameint.py, jtlvint.py, rhtlp.py by Nok Wongpiromsarn 


@author: Vasu Raman
"""

import copy, itertools, os, re, subprocess, tempfile, textwrap
import warnings
from collections import OrderedDict

import transys
from spec import GRSpec

#for checking form of spec
import pyparsing
from pyparsing import *
from tulip.spec import parse

JTLV_PATH = os.path.abspath(os.path.dirname(__file__))
JTLV_EXE = 'jtlv_grgame.jar'


def check_realizable(spec, heap_size='-Xmx128m', priority_kind=-1, init_option=1, verbose=0):
    """Decide realizability of specification defined by given GRSpec object.

    ...for standalone use

    Return True if realizable, False if not, or an error occurs.
    """
    fSMV, fLTL, fAUT = create_files(spec)
    return solve_game(spec, fSMV, fLTL, fAUT, heap_size, priority_kind, init_option, verbose)
    os.unlink(fSMV)
    os.unlink(fLTL)
    os.unlink(fAUT)
    

def solve_game(spec, fSMV, fLTL, fAUT, heap_size='-Xmx128m', priority_kind=3, init_option=1, verbose=0):
    """Decide realizability of specification defined by given GRSpec object.

    Return True if realizable, False if not, or an error occurs.
    Automaton is extracted for file names by fAUT, unless priority_kind == -1
    

    @param spec: a GRSpec object.
    @param fSMV, fLTL, fAUT: file name for use with JTLV.  This
        enables synthesize() to access them later on.
    @param heap_size: a string that specifies java heap size.

    @param priority_kind: a string of length 3 or an integer that specifies
        the type of priority used in extracting the
        automaton. Possible values of C{priority_kind} are:

            - -1 - No Automaton
            - 3 - 'ZYX'
            - 7 - 'ZXY'
            - 11 - 'YZX'
            - 15 - 'YXZ'
            - 19 - 'XZY'
            - 23 - 'XYZ'

        Here X means that the controller tries to disqualify one of
        the environment assumptions, Y means that the controller tries
        to advance with a finite path to somewhere, and Z means that
        the controller tries to satisfy one of his guarantees.

    @param init_option: an integer in that specifies how to handle the
        initial state of the system. Possible values of C{init_option}
        are

            - 0 - The system has to be able to handle all the possible initial system
              states specified on the guarantee side of the specification.
            - 1 (default) - The system can choose its initial state, in response to the initial
              environment state. For each initial environment state, the resulting
              automaton contains exactly one initial system state, starting from which
              the system can satisfy the specification.
            - 2 - The system can choose its initial state, in response to the initial
              environment state. For each initial environment state, the resulting
              automaton contain all the possible initial system states, starting from which
              the system can satisfy the specification.

    @param verbose: an integer that specifies the level of verbosity.
    """
    priority_kind = get_priority(priority_kind)
    
    # init_option
    if (isinstance(init_option, int)):
        if (init_option < 0 or init_option > 2):
            warnings.warn("Unknown init_option. Setting it to the default (1)")
            init_option = 1
    else:
        warnings.warn("Unknown init_option. Setting it to the default (1)")
        init_option = 1

    
    call_JTLV(heap_size, fSMV, fLTL, fAUT, priority_kind, init_option, verbose)
    
    realizable = False
    
    f = open(fAUT, 'r')
    for line in f:
        if ("Specification is realizable" in line):
            realizable = True
            break
        elif ("Specification is unrealizable" in line):
            realizable = False
            break
        
    if (realizable and priority_kind > 0):
        print("\nAutomaton successfully synthesized.\n")
    elif (priority_kind > 0):
        print("\nERROR: Specification was unrealizable.\n")

    return realizable



[docs]def synthesize(spec, heap_size='-Xmx128m', priority_kind = 3, init_option = 1, verbose=0): """Synthesize a strategy satisfying the spec. Arguments are described in documentation for L{solve_game}. Return strategy as instance of Automaton class, or None if unrealizable or error occurs. """ fSMV, fLTL, fAUT = create_files(spec) realizable = solve_game(spec, fSMV, fLTL, fAUT, heap_size, priority_kind, init_option, verbose) # Build Automaton if (not realizable): counter_examples = get_counterexamples(fAUT, verbose=verbose) os.unlink(fSMV) os.unlink(fLTL) os.unlink(fAUT) return counter_examples else: aut = load_file(fAUT, spec, verbose=verbose) os.unlink(fSMV) os.unlink(fLTL) os.unlink(fAUT) return aut
def create_files(spec): """Create temporary files for read/write by JTLV.""" fSMV = tempfile.NamedTemporaryFile(delete=False,suffix="smv") fSMV.write(generate_JTLV_SMV(spec)) fSMV.close() fLTL = tempfile.NamedTemporaryFile(delete=False,suffix="ltl") fLTL.write(generate_JTLV_LTL(spec)) fLTL.close() fAUT = tempfile.NamedTemporaryFile(delete=False) fAUT.close() return fSMV.name, fLTL.name, fAUT.name def get_priority(priority_kind): """Convert the priority_kind to the corresponding integer.""" if (isinstance(priority_kind, str)): if (priority_kind == 'ZYX'): priority_kind = 3 elif (priority_kind == 'ZXY'): priority_kind = 7 elif (priority_kind == 'YZX'): priority_kind = 11 elif (priority_kind == 'YXZ'): priority_kind = 15 elif (priority_kind == 'XZY'): priority_kind = 19 elif (priority_kind == 'XYZ'): priority_kind = 23 else: warnings.warn("Unknown priority_kind. Setting it to the default (ZYX)") priority_kind = 3 elif (isinstance(priority_kind, int)): if (priority_kind > 0 and priority_kind != 3 and priority_kind != 7 and \ priority_kind != 11 and priority_kind != 15 and priority_kind != 19 and \ priority_kind != 23): warnings.warn("Unknown priority_kind. Setting it to the default (ZYX)") priority_kind = 3 else: warnings.warn("Unknown priority_kind. Setting it to the default (ZYX)") priority_kind = 3 return priority_kind def call_JTLV(heap_size, fSMV, fLTL, fAUT, priority_kind, init_option, verbose): """Subprocess calls to JTLV.""" if (verbose > 0): print('Calling jtlv with the following arguments:') print(' heap size: ' + heap_size) print(' jtlv path: ' + JTLV_PATH) print(' priority_kind: ' + str(priority_kind) + '\n') if (len(JTLV_EXE) > 0): jtlv_grgame = os.path.join(JTLV_PATH, JTLV_EXE) if (verbose > 1): print(' java ' +str(heap_size) +' -jar ' +str(jtlv_grgame) +' ' + str(fSMV) +' ' +str(fLTL) +' ' +str(fAUT) +' ' + str(priority_kind) +' ' +str(init_option) ) cmd = subprocess.call( \ ["java", heap_size, "-jar", jtlv_grgame, fSMV, fLTL, fAUT, \ str(priority_kind), str(init_option)]) else: # For debugging purpose classpath = os.path.join(JTLV_PATH, "JTLV") + ":" + \ os.path.join(JTLV_PATH, "JTLV", "jtlv-prompt1.4.1.jar") if (verbose > 1): print(' java ' +str(heap_size) +' -cp ' +str(classpath) + ' GRMain ' +str(fSMV) +' ' +str(fLTL) +' ' + str(fAUT) +' ' +str(priority_kind) +' ' +str(init_option) ) cmd = subprocess.call([ "java", heap_size, "-cp", classpath, "GRMain", fSMV, fLTL, fAUT, str(priority_kind), str(init_option) ]) # cmd = subprocess.Popen( \ # ["java", heap_size, "-cp", classpath, "GRMain", smv_file, ltl_file, \ # aut_file, str(priority_kind), str(init_option)], \ # stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) def generate_JTLV_SMV(spec, verbose=0): """Return the SMV module definitions needed by JTLV. It takes as input a GRSpec object. N.B., assumes all variables are Boolean (i.e., atomic propositions). """ smv = "" # Write the header smv+=textwrap.dedent(""" MODULE main VAR e : env(); s : sys(); """); # Define env vars smv+=(textwrap.dedent(""" MODULE env -- inputs VAR """)) for var in spec.env_vars.keys(): smv+= '\t\t' smv+= var smv+= ' : boolean;\n' # Define sys vars smv+=(textwrap.dedent(""" MODULE sys -- outputs VAR """)) for var in spec.sys_vars.keys(): smv+= '\t\t' smv+= var smv+= ' : boolean;\n' return smv def generate_JTLV_LTL(spec, verbose=0): """Return the LTLSPEC for JTLV. It takes as input a GRSpec object. N.B., assumes all variables are Boolean (i.e., atomic propositions). """ specLTL = spec.to_jtlv() assumption = specLTL[0] guarantee = specLTL[1] if not check_gr1(assumption, guarantee, spec.env_vars.keys(), spec.sys_vars.keys()): raise Exception('Spec not in GR(1) format') assumption = re.sub(r'\b'+'True'+r'\b', 'TRUE', assumption) guarantee = re.sub(r'\b'+'True'+r'\b', 'TRUE', guarantee) assumption = re.sub(r'\b'+'False'+r'\b', 'FALSE', assumption) guarantee = re.sub(r'\b'+'False'+r'\b', 'FALSE', guarantee) assumption = assumption.replace('==', '=') guarantee = guarantee.replace('==', '=') assumption = assumption.replace('&&', '&') guarantee = guarantee.replace('&&', '&') assumption = assumption.replace('||', '|') guarantee = guarantee.replace('||', '|') # Replace any environment variable var in spec with e.var and replace any # system variable var with s.var for var in spec.env_vars.keys(): assumption = re.sub(r'\b'+var+r'\b', 'e.'+var, assumption) guarantee = re.sub(r'\b'+var+r'\b', 'e.'+var, guarantee) for var in spec.sys_vars.keys(): assumption = re.sub(r'\b'+var+r'\b', 's.'+var, assumption) guarantee = re.sub(r'\b'+var+r'\b', 's.'+var, guarantee) # Assumption ltl = 'LTLSPEC\n(\n' if assumption: ltl += assumption else: ltl += "TRUE" ltl += '\n);\n' # Guarantee ltl += '\nLTLSPEC\n(\n' if guarantee: ltl += guarantee else: ltl += "TRUE" ltl += '\n);' return ltl def load_file(aut_file, spec, verbose=0): """Construct a Mealy Machine from an aut_file. N.B., assumes all variables are Boolean (i.e., atomic propositions). Input: - `aut_file`: the name of the text file containing the automaton, or an (open) file-like object. - `spec`: a GROne spec. """ env_vars = spec.env_vars.keys() # Enforce Boolean var assumption sys_vars = spec.sys_vars.keys() if isinstance(aut_file, str): f = open(aut_file, 'r') closable = True else: f = aut_file # Else, assume aut_file behaves as file object. closable = False #build Mealy Machine m = transys.MealyMachine() # input defs inputs = OrderedDict([list(a) for a in \ zip(env_vars, itertools.repeat({0, 1}))]) m.add_inputs(inputs) # outputs def outputs = OrderedDict([list(a) for a in \ zip(sys_vars, itertools.repeat({0, 1}))]) m.add_outputs(outputs) # state variables def state_vars = outputs m.add_state_vars(state_vars) varnames = sys_vars+env_vars stateDict = {} for line in f: # parse states if (line.find('State ') >= 0): stateID = re.search('State (\d+)', line) stateID = int(stateID.group(1)) state = dict(re.findall('(\w+):(\w+)', line)) for var, val in state.iteritems(): try: state[var] = int(val) except: state[var] = val if (len(varnames) > 0): if not var in varnames: print('Unknown variable ' + var) for var in varnames: if not var in state.keys(): print('Variable ' + var + ' not assigned') # parse transitions if (line.find('successors') >= 0): transition = re.findall(' (\d+)', line) for i in xrange(0,len(transition)): transition[i] = int(transition[i]) m.states.add(stateID) # mark initial states (states that do not appear in previous transitions) seenSoFar = [t for (s,trans) in stateDict.values() for t in trans] if stateID not in seenSoFar: m.states.initial.add(stateID) stateDict[stateID] = (state,transition) # add transitions with guards to the Mealy Machine for from_state in stateDict.keys(): state, transitions = stateDict[from_state] for to_state in transitions: guard = stateDict[to_state][0] try: m.transitions.add_labeled( from_state, to_state, guard, check=False ) except Exception, e: raise Exception('Failed to add transition:\n' +str(e) ) """ # label states for to_state in m.states: predecessors = m.states.pre(to_state) # any incoming edges ? if predecessors: # pick a predecessor from_state = predecessors[0] trans = m.transitions.find([from_state], [to_state] ) (from_, to_, trans_label) = trans[0] state_label = {k:trans_label[k] for k in m.state_vars} m.states.label(to_state, state_label) """ return m def get_counterexamples(aut_file, verbose=0): """Return a list of dictionaries, each representing a counter example. Input: - `aut_file`: a string containing the name of the file containing the counter examples generated by JTLV. """ counter_examples = [] line_found = False f = open(aut_file, 'r') for line in f: if (line.find('The env player can win from states') >= 0): line_found = True continue if (line_found and (len(line) == 0 or line.isspace())): line_found = False if (line_found): counter_ex = dict(re.findall('(\w+):([-+]?\d+)', line)) for var, val in counter_ex.iteritems(): counter_ex[var] = int(val) counter_examples.append(counter_ex) if (verbose > 0): print(counter_ex) return counter_examples def remove_comments(spec): """Remove comment lines from string.""" speclines = spec.split('\n') newspec = '' for line in speclines: if not '--' in line: newspec+=line+'\n' return newspec def check_gr1(assumption, guarantee, env_vars, sys_vars): """Check format of a GR(1) specification.""" assumption = remove_comments(assumption) guarantee = remove_comments(guarantee) # Check that dictionaries are in the correct format if not check_vars(env_vars): return False if not check_vars(sys_vars): return False # Check for parentheses mismatch if not check_parentheses(assumption): return False if not check_parentheses(guarantee): return False # Check that all non-special-characters metioned are variable names # or possible values varnames = env_vars+sys_vars if not check_spec(assumption,varnames): return False if not check_spec(guarantee, varnames): return False # Literals cannot start with G, F or X unless quoted restricted_alphas = filter(lambda x: x not in "GFX", alphas) # Quirk: allow literals of the form (G|F|X)[0-9_][A-Za-z0-9._]* so we can have X0 etc. bool_keyword = CaselessKeyword("TRUE") | CaselessKeyword("FALSE") var = ~bool_keyword + (Word(restricted_alphas, alphanums + "._:") | \ Regex("[A-Za-z][0-9_][A-Za-z0-9._:]*") | QuotedString('"')).setParseAction(parse.ASTVar) atom = var | bool_keyword.setParseAction(parse.ASTBool) number = var | Word(nums).setParseAction(parse.ASTNum) # arithmetic expression arith_expr = operatorPrecedence(number, [(oneOf("* /"), 2, opAssoc.LEFT, parse.ASTArithmetic), (oneOf("+ -"), 2, opAssoc.LEFT, parse.ASTArithmetic), ("mod", 2, opAssoc.LEFT, parse.ASTArithmetic)]) # integer comparison expression comparison_expr = Group(arith_expr + oneOf("< <= > >= != = ==") + arith_expr).setParseAction(parse.ASTComparator) proposition = comparison_expr | atom # Check that the syntax is GR(1). This uses pyparsing UnaryTemporalOps = ~bool_keyword + oneOf("next") + ~Word(nums + "_") next_ltl_expr = operatorPrecedence(proposition, [("'", 1, opAssoc.LEFT, parse.ASTUnTempOp), ("!", 1, opAssoc.RIGHT, parse.ASTNot), (UnaryTemporalOps, 1, opAssoc.RIGHT, parse.ASTUnTempOp), (oneOf("& &&"), 2, opAssoc.LEFT, parse.ASTAnd), (oneOf("| ||"), 2, opAssoc.LEFT, parse.ASTOr), (oneOf("xor ^"), 2, opAssoc.LEFT, parse.ASTXor), ("->", 2, opAssoc.RIGHT, parse.ASTImp), ("<->", 2, opAssoc.RIGHT, parse.ASTBiImp), (oneOf("= !="), 2, opAssoc.RIGHT, parse.ASTComparator), ]) always_expr = pyparsing.Literal("[]") + next_ltl_expr always_eventually_expr = pyparsing.Literal("[]") + \ pyparsing.Literal("<>") + next_ltl_expr gr1_expr = next_ltl_expr | always_expr | always_eventually_expr # Final Check GR1_expression = pyparsing.operatorPrecedence(gr1_expr, [("&", 2, pyparsing.opAssoc.RIGHT)]) try: GR1_expression.parseString(assumption) except ParseException: print("Assumption is not in GR(1) format.") return False try: GR1_expression.parseString(guarantee) except ParseException: print("Guarantee is not in GR(1) format") return False return True def check_parentheses(spec): """Check whether all the parentheses in a spec are closed. Return False if there are errors and True when there are no errors. """ open_parens = 0 for index, char in enumerate(spec): if char == "(": open_parens += 1 elif char == ")": open_parens -= 1 if open_parens != 0: if open_parens > 0: print("The spec is missing " + str(open_parens) + " close-" + "parentheses or has " + str(open_parens) + " too many " + "open-parentheses") elif open_parens < 0: print("The spec is missing " + str(-open_parens) + " open-" + "parentheses or has " + str(open_parens) + " too many " + "close-parentheses") return False return True def check_vars(varNames): """Complain if any variable name is a number or not a string. """ for item in varNames: # Check that the vars are strings if type(item) != str: print("Prop " + str(item) + " is invalid") return False # Check that the keys are not numbers try: int(item) float(item) print("Prop " + str(item) + " is invalid") return False except ValueError: continue return True def check_spec(spec, varNames): """Verify that all non-operators in "spec" are in the list of vars. """ # Replace all special characters with whitespace special_characters = ["next(", "[]", "<>", "->", "&", "|", "!", \ "(", ")", "\n", "<", ">", "<=", ">=", "<->", "\t", "="] for word in special_characters: spec = spec.replace(word, "") # Now, replace all variable names and values with whitespace as well. for value in varNames: if isinstance(value, (list, tuple)): for individual_value in value: spec = spec.replace(str(individual_value), "") else: spec = spec.replace(value, "") # Remove all instances of "true" and "false" spec = spec.lower() spec = spec.replace("true", "") spec = spec.replace("false", "") # Make sure that the resulting string is empty spec = spec.split() return not spec