Source code for spec.form

# Copyright (c) 2011-2015 by California Institute of Technology
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the California Institute of Technology nor
#    the names of its contributors may be used to endorse or promote
#    products derived from this software without specific prior
#    written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL CALTECH
# OR THE CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
"""Formulae constituting specifications.

what this module adds to the pure syntactic manipulations of
`transformation` is a variable table, with game semantics in particular.

Also, those modules are for manipulating expressions.
This module knows about program structure,
namely the sections of a specification file.
"""
import collections.abc as _abc
import copy
import logging
import pprint
import time
import types
import typing as _ty
import re

import tulip.spec.parser as parser
import tulip.spec.transformation as tx
import tulip.spec.translation as ts


__all__ = [
    'QInit',
    'LTL',
    'GRSpec',
    'replace_dependent_vars']


QInit = _ty.Literal[
    r'\A \A',
    r'\A \E',
    r'\E \A',
    r'\E \E']
_logger = logging.getLogger(__name__)


class LTL:
    """LTL formula (specification).

    Minimal class that describes an LTL formula in the canonical TuLiP
    syntax.  It contains three attributes:

      - `formula`: a `str` of the formula.  Syntax is only enforced
        if the user requests it, e.g., using the `check_form` method.

      - `input_variables`: a `dict` of variables (names given as
        strings) and their domains; each key is a variable name and
        its value (in the dictionary) is its domain.  See notes below.
        Semantically, these variables are considered to be inputs
        (i.e., uncontrolled, externally determined).

      - `output_variables`: similar to `input_variables`, but
        considered to be outputs, i.e., controlled, the strategy for
        setting of which we seek in formal synthesis.

    N.B., domains are specified in multiple datatypes.  The type is
    indicated below in parenthesis.  Recognized domains, along with
    examples, are:

      - boolean (`str`); this domain is specified by `"boolean"`;
      - finite_set (`set`); e.g., `{1, 3, 5}`;
      - range (`tuple` of length 2); e.g., `(0, 15)`.

    As with the `formula` attribute, type-checking is only performed
    if requested by the user.  E.g., any iterable can act as a
    finite_set.  However, a range domain must be a `tuple` of length
    2; otherwise it is ambiguous with finite_set.
    """

    def __init__(
            self,
            formula:
                str |
                None=None,
            input_variables:
                dict[str, ...] |
                None=None,
            output_variables:
                dict[str, ...] |
                None=None):
        """Instantiate an LTL object.

        Any non-None arguments are saved to the corresponding
        attribute by reference.
        """
        if formula is None:
            formula = ''
        if input_variables is None:
            input_variables = dict()
        if output_variables is None:
            output_variables = dict()
        self.formula = formula
        self.input_variables = input_variables
        self.output_variables = output_variables
        self.check_vars()

    def __repr__(self):
        return (
            f"LTL('{self.formula}', "
            f"input_variables={self.input_variables}, "
            f"output_variables={self.output_variables})")

    def __str__(self):
        return str(self.formula)

    def _domain_str(self, d) -> str:
        if d == 'boolean':
            return d
        elif isinstance(d, tuple) and len(d) == 2:
            return f'[{d[0]}, {d[1]}]'
        elif hasattr(d, '__iter__'):
            return '{' + ', '.join(map(str, d)) + '}'
        else:
            raise ValueError('Unrecognized variable domain type.')

    def dumps(
            self,
            timestamp:
                bool=False
            ) -> str:
        """Dump TuLiP LTL file string.

        @param timestamp:
            If True, then add comment to file with
            current time in UTC.
        """
        if timestamp:
            output = '# Generated at ' + time.strftime(
                '%Y-%m-%d %H:%M:%S UTC', time.gmtime()) + '\n'
        else:
            output = ''
        output += '0  # Version\n%%\n'
        if self.input_variables:
            output += 'INPUT:\n'
            for k, v in self.input_variables.items():
                output += f'{k} : {self._domain_str(v)};\n'
        if self.output_variables:
            output += '\nOUTPUT:\n'
            for k, v in self.output_variables.items():
                output += f'{k} : {self._domain_str(v)};\n'
        return f'{output}\n%%\n{self.formula}'

    def check_vars(self) -> None:
        """Raise Exception if variabe definitions are invalid.

        Checks:

          - env and sys have common vars
          - some var is also a possible value of some var (including itself)
            (of arbitrary finite data type)
        """
        common_vars = {x for x in self.input_variables
                       if x in self.output_variables}
        if common_vars:
            raise Exception(
                'Env and sys have variables in '
                f'common: {common_vars}')

    def check_form(
            self,
            check_undeclared_identifiers:
                bool=False
            ) -> None:
        """Verify formula syntax and type-check variable domains.

        Return True iff OK.

        UNDER DEVELOPMENT; function signature may change without
        notice.  Calling will result in NotImplementedError.
        """
        raise NotImplementedError

    @staticmethod
    def loads(
            s:
                str
            ) -> 'LTL':
        """Create LTL object from TuLiP LTL file string."""
        for line in s.splitlines():
            line = line.strip()  # Ignore leading and trailing whitespace
            comment_ind = line.find('#')
            if comment_ind > -1:
                line = line[:comment_ind]
            if line:  # Version number is the first nonblank line
                try:
                    version = int(line)
                except ValueError:
                    raise ValueError("Malformed version number.")
                if version != 0:
                    raise ValueError("Unrecognized version number: " +
                                     str(version))
                break
        try:
            s = re.sub(
                r' \# .* (\n | $) ',
                '',
                s,
                flags=re.VERBOSE)
                    # Strip comments
            preamble, declar, formula = s.split('%%\n')
            input_ind = declar.find('INPUT:')
            output_ind = declar.find('OUTPUT:')
            if output_ind == -1:
                output_ind = 0  # Default is OUTPUT
            if input_ind == -1:
                input_section = ''
                output_section = declar[output_ind:]
            elif input_ind < output_ind:
                input_section = declar[input_ind:output_ind]
                output_section = declar[output_ind:]
            else:
                output_section = declar[output_ind:input_ind]
                input_section = declar[input_ind:]
            input_section = input_section.replace('INPUT:', '')
            output_section = output_section.replace('OUTPUT:', '')
            variables = [dict(), dict()]
            sections = [input_section, output_section]
            for i in range(2):
                for var_declar in sections[i].split(';'):
                    if not var_declar.strip():
                        continue
                    name, domain = var_declar.split(':')
                    name = name.strip()
                    domain = domain.lstrip().rstrip(';')
                    if domain[0] == '[':  # Range-type domain
                        domain = domain.split(',')
                        variables[i][name] = (
                            int(domain[0][1:]),
                            int(domain[1][:domain[1].index(']')]))
                    elif domain[0] == '{':  # Finite set domain
                        domain.strip()
                        assert domain[-1] == '}'
                        domain = domain[1:-1]  # Remove opening, closing braces
                        variables[i][name] = list()
                        for elem in domain.split(','):
                            try:
                                variables[i][name].append(int(elem))
                            except ValueError:
                                variables[i][name].append(str(elem))
                        variables[i][name] = set(variables[i][name])
                    elif domain == 'boolean':
                        variables[i][name] = domain
                    else:
                        raise TypeError
        except (ValueError, TypeError):
            raise ValueError('Malformed TuLiP LTL specification string.')
        return LTL(
            formula=formula,
            input_variables=variables[0],
            output_variables=variables[1])

    @staticmethod
    def load(
            f:
                str |
                _ty.IO
            ) -> 'LTL':
        """Wrap `loads` for reading from files.

        @param f:
            If `str`, attempt to open a
            file named "f" read-only.
        """
        if isinstance(f, str):
            f = open(f, 'rU')
        return LTL.loads(f.read())


VariableDeclaration = (
    dict[str, ...] |
    _abc.Iterable[str])
Conjuncts = (
    str |
    _abc.Iterable[str])


[docs] class GRSpec( LTL): r"""GR(1) specification. The basic form is: ``` (env_init & []env_safety & []<>env_prog_1 & []<>env_prog_2 & ...) -> (sys_init & []sys_safety & []<>sys_prog_1 & []<>sys_prog_2 & ...) ``` Attributes: - `moore`: select whether a strategy can see primed environment variables. - `plus_one`: select causal implication between assumptions and guarantees. - `qinit`: select quantification of initial values for variables: `win` below describes the set of winning states. `internal_init` is the initial condition for the internal strategy variables. `Op == expr` means operator `Op` is defined as the expression `expr`. - `r'\A \A'`: `forall env_vars: forall sys_vars: env_init -> win`. `sys_init` must be empty or contain true. The strategy enumeration iterates through all assignments that satisfy `env_init & internal_init`. - `r'\A \E'`: `forall env_vars: exist sys_vars: form`, where: - `form == sys_init & (env_init -> win)` (`plus_one is True`) - `form == env_init -> (sys_init & win)` (`plus_one is False`) The strategy enumeration iterates through all assignments that satisfy `\E sys_vars: env_init`, and picks assignments that satisfy `form & internal_init`. - `r'\E \A'`: ``` exist sys_vars: forall env_vars: form ``` where: - `form == sys_init & (env_init -> win)` (`plus_one is True`) - `form == env_init -> (sys_init & win)` (`plus_one is False`) The strategy enumeration picks an assignment that satisfies: ``` internal_init & \A env_vars: form ``` and iterates through all assignments that satisfy `env_init`. - `r'\E \E'`: ``` exist env_vars: exist sys_vars: sys_init & win ``` `env_init` must be empty or contain true. The strategy enumeration picks an assignment that satisfies: ``` sys_init & win & internal_init ``` - `env_vars`: alias for `input_variables` of `LTL`, concerning variables that are determined by the environment. - `env_init`: `list` of `str` that specifies the assumption about the initial state of the environment. - `env_safety`: `list` of `str` that specifies the assumption about the evolution of the environment state. - `env_prog`: `list` of `str` that specifies the justice assumption on the environment. - `sys_vars`: alias for `output_variables` of `LTL`, concerning variables that are controlled by the system. - `sys_init`: `list` of `str` that specifies the requirement on the initial state of the system. - `sys_safety`: `list` of `str` that specifies the safety requirement. - `sys_prog`: `list` of `str` that specifies the progress requirement. An empty list for any formula (e.g., if `env_init = list()`) is marked as `True` in the specification. This corresponds to the constant Boolean function, which usually means that subformula has no effect (is non-restrictive) on the spec. Consult `GRSpec.__init__` concerning arguments at the time of instantiation. """ def __init__( self, env_vars: VariableDeclaration | None=None, sys_vars: VariableDeclaration | None=None, env_init: Conjuncts='', sys_init: Conjuncts='', env_safety: Conjuncts='', sys_safety: Conjuncts='', env_prog: Conjuncts='', sys_prog: Conjuncts='', moore: bool=True, plus_one: bool=True, qinit: QInit=r'\A \A', parser=parser): """Instantiate a GRSpec object. Instantiating GRSpec without arguments results in an empty formula. The default domain of a variable is "boolean". @param env_vars: If env_vars is a dictionary, then its keys should be variable names, and values are domains of the corresponding variable. Else, if env_vars is an iterable, then assume all environment variables are `boolean` (or "atomic propositions"). Cf. `GRSpec` for details. @param sys_vars: Mutatis mutandis, env_vars. @param env_init, env_safety, env_prog, sys_init, sys_safety, sys_prog: An empty string is converted to an empty list. A string is placed in a list. iterables are converted to lists. Cf. `GRSpec`. @param qinit: see class docstring """ self.parser = parser self._ast = dict() self._cache = { 'string': dict(), 'jtlv': dict(), 'gr1c': dict(), 'slugs': dict()} self._bool_int = dict() self._parts = { x + y for x in {'env_', 'sys_'} for y in {'init', 'safety', 'prog'}} self.moore = moore self.plus_one = plus_one self.qinit = qinit if env_vars is None: env_vars = dict() elif not isinstance(env_vars, dict): env_vars = dict( (v, 'boolean') for v in env_vars) if sys_vars is None: sys_vars = dict() elif not isinstance(sys_vars, dict): sys_vars = dict( (v, 'boolean') for v in sys_vars) self.env_vars = copy.deepcopy(env_vars) self.sys_vars = copy.deepcopy(sys_vars) def mapper( part: Conjuncts, name: str ) -> list[str]: match part: case str() if part: return [part] case _abc.Iterable(): return list(part) case _: raise TypeError( 'Expected `Iterable` ' f'as `{name}`, but ' 'instead got: ' f'{part}') parts = [ env_init, sys_init, env_safety, sys_safety, env_prog, sys_prog] (env_init, sys_init, env_safety, sys_safety, env_prog, sys_prog ) = map( mapper, parts, self._parts) self.env_init = env_init self.sys_init = sys_init self.env_safety = env_safety self.sys_safety = sys_safety self.env_prog = env_prog self.sys_prog = sys_prog LTL.__init__( self, formula=self.to_canon(), input_variables=self.env_vars, output_variables=self.sys_vars) def declare(self, *arg, **kw): """Declare flexible variables. Positional arguments are names of Boolean-valued variables. Keyword arguments are names of integer or string valued variables, declared by a pair of integers, or a sequence of strings respectively. By default declare as system variables. If `env=True` then declare as environment variables. To declare a variable named `env`, use the constructor or modify directly the attributes `env_vars`, `sys_vars`. If already declared, the given type hints should match existing ones. """ env = kw.pop('env', False) assert isinstance(env, bool), env # "env" not a var name d = dict() for k, v in kw.items(): is_int = len(v) == 2 and all( isinstance(q, int) for q in v) if is_int: d[k] = tuple(v) continue # duck check by appending '' d[k] = list(s + '' for s in v) d.update((v, 'boolean') for v in arg) # `LTL.*_variables` equal these (see `self.__init__`) target, other = self.sys_vars, self.env_vars if env: target, other = other, target # redeclarations must match for k, v in d.items(): assert k not in other, (k, other) if k in target: assert target[k] == v, (target[k], v) target.update(d) def __repr__(self): args = ',\n\n'.join([ f'env_vars={self.env_vars!r}', f'sys_vars={self.sys_vars!r}', f'env_init={self.env_init!r}', f'sys_init={self.sys_init!r}', f'env_safety={self.env_safety!r}', f'sys_safety={self.sys_safety!r}', f'env_prog={self.env_prog!r}', f'sys_prog={self.sys_prog!r}']) return f'{type(self).__name__}({args})' def __str__(self): return self.to_canon() def dumps( self, timestamp: bool=False ) -> str: self.formula = self.to_canon() return LTL.dumps(self, timestamp=timestamp) @staticmethod def loads( s: str ) -> 'GRSpec': """Create GRSpec object from TuLiP LTL file string. UNDER DEVELOPMENT; function signature may change without notice. Calling will result in NotImplementedError. """ raise NotImplementedError @staticmethod def load( f: str | _ty.IO ) -> 'GRSpec': """Wrap `loads` for reading from files. UNDER DEVELOPMENT; function signature may change without notice. Calling will result in NotImplementedError. """ raise NotImplementedError def pretty(self) -> str: """Return a string intended to be readable.""" output = 'ENVIRONMENT VARIABLES:\n' if self.env_vars: for k, v in self.env_vars.items(): output += f'\t{k}\t{v}\n' else: output += '\t(none)\n' output += '\nSYSTEM VARIABLES:\n' if self.sys_vars: for k, v in self.sys_vars.items(): output += f'\t{k}\t{v}\n' else: output += '\t(none)\n' output += '\nFORMULA:\n' output += 'ASSUMPTION:\n' if self.env_init: output += ( ' INITIAL\n\t ' + '\n\t& '.join( f'({f})' for f in self.env_init ) + '\n') if self.env_safety: output += ( ' SAFETY\n\t []' + '\n\t& []'.join( f'({f})' for f in self.env_safety ) + '\n') if self.env_prog: output += ( ' LIVENESS\n\t []<>' + '\n\t& []<>'.join( f'({f})' for f in self.env_prog ) + '\n') output += 'GUARANTEE:\n' if self.sys_init: output += ( ' INITIAL\n\t ' + '\n\t& '.join( f'({f})' for f in self.sys_init ) + '\n') if self.sys_safety: output += ( ' SAFETY\n\t []' + '\n\t& []'.join( f'({f})' for f in self.sys_safety ) + '\n') if self.sys_prog: output += ( ' LIVENESS\n\t []<>' + '\n\t& []<>'.join( f'({f})' for f in self.sys_prog ) + '\n') return output def check_syntax(self) -> None: """Raise `AssertionError` for misplaced primed variables.""" self._assert_no_primed( self.env_init, 'assumed initial condition') self._assert_no_primed( self.sys_init, 'guaranteed initial condition') self._assert_no_primed( self.env_prog, 'liveness assumption') self._assert_no_primed( self.env_prog, 'liveness guarantee') for f in self.env_safety: a = self.ast(f) primed = tx.collect_primed_vars(a) for var in primed: if var not in self.sys_vars: continue raise AssertionError( 'Syntax error: ' f'primed system variable "{var}"' f' found in env safety: {f}') def _assert_no_primed( self, formulae: _abc.Iterable[str], name: str ) -> None: """Raise `AssertionError` if primed vars in `formulae`.""" for f in formulae: a = self.ast(f) primed = tx.collect_primed_vars(a) if not primed: continue raise AssertionError( 'Syntax error: ' f'primed variables: {primed}' f' found in {name}: {f}') def copy(self) -> 'GRSpec': """Return a copy of `self`.""" r = GRSpec( env_vars=dict(self.env_vars), sys_vars=dict(self.sys_vars), env_init=copy.copy(self.env_init), env_safety=copy.copy(self.env_safety), env_prog=copy.copy(self.env_prog), sys_init=copy.copy(self.sys_init), sys_safety=copy.copy(self.sys_safety), sys_prog=copy.copy(self.sys_prog)) r.moore = self.moore r.plus_one = self.plus_one r.qinit = self.qinit return r def __or__( self, other: 'GRSpec' ) -> 'GRSpec': """Create union of two specifications.""" result = self.copy() if not isinstance(other, GRSpec): raise TypeError('type(other) must be GRSpec') assert self.moore == other.moore, ( self.moore, other.moore) assert self.plus_one == other.plus_one, ( self.plus_one, other.plus_one) assert self.qinit == other.qinit, ( self.qinit, other.qinit) # common vars have same types ? for varname in set(other.env_vars) & set(result.env_vars): if other.env_vars[varname] != result.env_vars[varname]: raise ValueError('Mismatched variable domains') for varname in set(other.sys_vars) & set(result.sys_vars): if other.sys_vars[varname] != result.sys_vars[varname]: raise ValueError('Mismatched variable domains') result.env_vars.update(other.env_vars) result.sys_vars.update(other.sys_vars) for x in self._parts: getattr(result, x).extend(getattr(other, x)) return result def to_canon(self) -> str: """Output formula in TuLiP LTL syntax. The format is described in the [Specifications section]( https://tulip-control.sourceforge.io/doc/specifications.html) of the TuLiP User's Guide. """ conj_cstr = lambda s: ' && ' if s else '' assumption = '' if self.env_init: assumption += _conj(self.env_init) if self.env_safety: assumption += conj_cstr(assumption) + _conj(self.env_safety, '[]') if self.env_prog: assumption += conj_cstr(assumption) + _conj(self.env_prog, '[]<>') guarantee = '' if self.sys_init: guarantee += conj_cstr(guarantee) + _conj(self.sys_init) if self.sys_safety: guarantee += conj_cstr(guarantee) + _conj(self.sys_safety, '[]') if self.sys_prog: guarantee += conj_cstr(guarantee) + _conj(self.sys_prog, '[]<>') # Put the parts together, simplifying in special cases if guarantee: if assumption: return f'({assumption}) -> ({guarantee})' else: return guarantee else: return 'True' def sub_values( self, var_values: dict[str, ...] ) -> dict: """Substitute given values for variables. Note that there are three ways to substitute values for variables: - syntactic using this function - no substitution by user code, instead flatten to python and use `eval` together with a `dict` defining the values of variables, as done in `eval_init`. For converting non-integer finite types to integer types, use `replace_finite_by_int`. @return: `dict` of ASTs after the substitutions, keyed by original clause (before substitution). """ _logger.info('substitute values for variables...') a = copy.deepcopy(self._ast) for formula, tree in a.items(): g = tx.Tree.from_recursive_ast(tree) tx.sub_values(g, var_values) a[formula] = g.to_recursive_ast() _logger.info('done with substitutions.\n') return a def compile_init( self, no_str: bool ) -> types.CodeType: """Compile python expression for initial conditions. The returned bytecode can be used with `eval` and a `dict` assigning values to variables. Its value is the implication ```tla env_init => sys_init ``` Use the corresponding python data types for the `dict` values: - `bool` for Boolean variables - `int` for integers - `str` for arbitrary finite types @param no_str: if True, then compile the formula where all string variables have been replaced by integers. Otherwise compile the original formula containing strings. @return: python expression compiled for `eval` """ self.str_to_int() init = { 'env': self.env_init, 'sys': self.sys_init} pyinit = dict() for side, clauses in init.items(): if no_str: clauses = [self._bool_int[x] for x in clauses] _logger.info(f'clauses to compile: {clauses}') c = [ts.translate_ast(self.ast(x), 'python').flatten() for x in clauses] _logger.info(f'after translation to python: {c}') s = _conj(c, op='and') if not s: s = 'True' pyinit[side] = s assumption = pyinit['env'] guarantee = pyinit['sys'] s = f'not ({assumption}) or ({guarantee})' return compile(s, '<string>', 'eval') def str_to_int(self) -> None: """Replace arbitrary finite vars with int vars. Returns spec itself if it contains only int vars. Otherwise it returns a copy of spec with all arbitrary finite vars replaced by int-valued vars. """ _logger.info('convert string variables to integers...') vars_dict = dict(self.env_vars) vars_dict.update(self.sys_vars) fvars = {v: d for v, d in vars_dict.items() if isinstance(d, list)} # replace symbols by ints for p in self._parts: for x in getattr(self, p): if self._bool_int.get(x) in self._ast: _logger.debug(f'{x} is in _bool_int cache') continue else: _logger.debug(f'{x} is not in _bool_int cache') # get AST a = self.ast(x) # create AST copy with int and bool vars only g = tx.Tree.from_recursive_ast(a) tx.sub_constants(g, fvars) b = g.to_recursive_ast() # formula of int/bool AST f = b.flatten() self._ast[f] = b # cache # remember map from clauses to int/bool clauses self._bool_int[x] = f _logger.info('done converting to integer variables.\n') def ast( self, x: str): """Return AST corresponding to formula x. If AST for formula `x` has already been computed earlier, then return cached result. """ if _logger.getEffectiveLevel() <= logging.DEBUG: _logger.debug('current cache of ASTs:\n' + pprint.pformat(self._ast) + 3 * '\n') _logger.debug(f'check if: {x}, is in cache.') if x in self._ast: _logger.debug(f'{x} is already in cache') else: _logger.info(f'AST cache does not contain:\n\t{x}' '\nNeed to parse.') self.parse() return self._ast[x] def parse(self) -> None: """Parse each clause and store it. The AST resulting from each clause is stored in the `dict` attribute `ast`. """ _logger.info('parsing ASTs to cache them...') vardoms = dict(self.env_vars) vardoms.update(self.sys_vars) # parse new clauses and cache the resulting ASTs for p in self._parts: s = getattr(self, p) for x in s: if x in self._ast: _logger.debug(f'{x} is already in cache') continue _logger.debug(f'parse: {x}') tree = self.parser.parse(x) g = tx.Tree.from_recursive_ast(tree) tx.check_for_undefined_identifiers(g, vardoms) self._ast[x] = tree # rm cached ASTs that correspond to deleted clauses self._collect_cache_garbage(self._ast) _logger.info('done parsing ASTs.\n') def _collect_cache_garbage( self, cache: dict ) -> None: _logger.info('collecting garbage from GRSpec cache...') # rm cached ASTs that correspond to deleted clauses s = set(cache) for p in self._parts: # emptied earlier ? if not s: break w = getattr(self, p) # exclude given formulas s.difference_update(w) # exclude int/bool-only forms of formulas s.difference_update({self._bool_int.get(x) for x in w}) for x in s: cache.pop(x) _logger.info(f'cleaned {len(s)} cached elements.\n')
def replace_dependent_vars( spec: GRSpec, bool2form: dict[str, str] ) -> None: _logger.debug( 'replacing dependent variables ' f'using the map:\n\t{bool2form}') vs = dict(spec.env_vars) vs.update(spec.sys_vars) _logger.debug(f'variables:\n\t{vs}') bool2subtree = dict() for boolvar, formula in bool2form.items(): _logger.debug(f'checking var: {boolvar}') if boolvar in vs: assert vs[boolvar] == 'boolean' _logger.debug( f'{boolvar} is indeed Boolean') else: _logger.debug( f'spec does not contain var: {boolvar}') tree = parser.parse(formula) bool2subtree[boolvar] = tx.Tree.from_recursive_ast(tree) for s in {'env_init', 'env_safety', 'env_prog', 'sys_init', 'sys_safety', 'sys_prog'}: part = getattr(spec, s) new = list() for clause in part: _logger.debug( f'replacing in clause:\n\t{clause}') tree = spec.ast(clause) g = tx.Tree.from_recursive_ast(tree) tx.sub_bool_with_subtree(g, bool2subtree) f = g.to_recursive_ast().flatten() new.append(f) _logger.debug( f'caluse tree after replacement:\n\t{f}') setattr(spec, s, new) def _conj( iterable: _abc.Iterable, unary: str='', op: str='&&' ) -> str: return f' {op} '.join( f'{unary}({s})' for s in iterable)