Source code for synth

# Copyright (c) 2012-2015 by California Institute of Technology
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the California Institute of Technology nor
#    the names of its contributors may be used to endorse or promote
#    products derived from this software without specific prior
#    written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL CALTECH
# OR THE CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
"""Interface to library of synthesis tools, e.g., `gr1c`, `omega`."""
import collections.abc as _abc
import copy
import logging
import pprint
import typing as _ty
import warnings

import networkx as _nx

import tulip.interfaces.gr1c as gr1c
import tulip.interfaces.gr1py as gr1py
import tulip.interfaces.omega as omega_int
try:
    import tulip.interfaces.slugs as slugs
except ImportError:
    slugs = None
import tulip.spec as _spec
import tulip.transys as transys


__all__ = [
    'mutex',
    'exactly_one',
    'sys_to_spec',
    'env_to_spec',
    'build_dependent_var_table',
    'synthesize_many',
    'synthesize',
    'is_realizable',
    'strategy2mealy',
    'mask_outputs',
    'determinize_machine_init']


_logger = logging.getLogger(__name__)
_hl = '\n' + 60 * '-'


Formulas = list[str]
Solver = _ty.Literal[
    'gr1c',
    'gr1py',
    'omega',
    'slugs',
    ]
maybe_mealy = (
    transys.MealyMachine |
    None)
maybe_fts = (
    transys.FTS |
    None)


def _pstr(s) -> str:
    return f'({s})'


def _disj(set0) -> str:
    return ' || '.join(map(
        _pstr, set0))


def _conj(set0) -> str:
    nonempty = filter(None, set0)
    return ' && '.join(map(
        _pstr, nonempty))


def _conj_intersection(
        set0,
        set1,
        parenth:
            bool=True
        ) -> str:
    conjuncts = filter(
        set1.__contains__,
        set0)
    if parenth:
        conjuncts = map(
            _pstr, conjuncts)
    return ' && '.join(conjuncts)


def _conj_neg(
        set0,
        parenth:
            bool=True
        ) -> str:
    if parenth:
        return ' && '.join([
            f'!({x})'
            for x in set0])
    else:
        return ' && '.join([
            f'!{x}'
            for x in set0])


def _conj_neg_diff(
        set0,
        set1,
        parenth:
            bool=True
        ) -> str:
    items = filter(
        lambda x:
            x not in set1,
        set0)
    if parenth:
        items = map(_pstr, items)
    return ' && '.join(
        f'!{x}'
        for x in items)


def mutex(iterable) -> list[str]:
    """Mutual exclusion for all time."""
    iterable = list(filter(None, iterable))
    if not iterable or len(iterable) <= 1:
        return list()
    return [_conj([
        f'!({x}) || ({_conj_neg_diff(iterable, [x])})'
        for x in iterable])]


def exactly_one(iterable) -> list[str]:
    """N-ary xor.

    Contrast with pure mutual exclusion.
    """
    if len(iterable) <= 1:
        return list(map(
            _pstr, iterable))
    def conjoin(x):
        return _conj_neg_diff(iterable, [x])
    def formula(x):
        return f'({x}) && {conjoin(x)}'
    disjunction = _disj(map(formula, iterable))
    return [f'({disjunction})']


def _conj_action(
        actions_dict:
            dict,
        action_type:
            _abc.Hashable,
        nxt:
            bool=False,
        ids:
            dict |
            None=None
        ) -> str:
    """Return conjunct if `action_type` in `actions_dict`.

    @param actions_dict:
        `dict` with pairs `action_type_name : action_value`
    @param action_type:
        key to look for in `actions_dict`
        (here typically a str)
    @param nxt:
        prepend or not with the next operator
    @param ids:
        map `action_value` -> value used in solver input,
        for example, for gr1c
    @return:
        - conjunct (includes `&&` operator) if:

            - `action_type` in `actions_dict`, and
            - `action_value` is not
              the empty string (modeling "no constrain")

          includes next operator (`X`) if `nxt = True`.
        - empty string otherwise
    """
    if action_type not in actions_dict:
        return ''
    action = actions_dict[action_type]
    if ids is not None:
        action = ids[action]
    if action == '':
        return ''
    if nxt:
        return f' X{_pstr(action)}'
    return _pstr(action)


def _conj_actions(
        actions_dict:
            dict,
        solver_expr:
            dict |
            None=None,
        nxt:
            bool=False
        ) -> str:
    """Conjunction of multiple action types.

    Includes solver expression substitution.
    See also `_conj_action`.
    """
    _logger.debug(
        f'conjunction of actions: {actions_dict}')
    _logger.debug(
        'mapping to solver equivalents: '
        f'{solver_expr}')
    if not actions_dict:
        _logger.debug('actions_dict empty, returning empty string\n')
        return ''
    if solver_expr is not None:
        actions = [
            solver_expr[
                type_name][
                action_value]
            for type_name, action_value in
                actions_dict.items()]
    else:
        actions = actions_dict
    _logger.debug(
        f'after substitution: {actions}')
    conjuncted_actions = _conj(actions)
    _logger.debug(
        'conjuncted actions: '
        f'{conjuncted_actions}\n')
    if nxt:
        return f' X{_pstr(conjuncted_actions)}'
    return _pstr(conjuncted_actions)

# duplicate states are impossible,
# because each networkx vertex is unique
# non-contiguous integers for states work too,
# though a less efficient representation.


def iter2var(
        states:
            _abc.Iterable[int | str],
        variables:
            dict,
        statevar:
            str,
        bool_states:
            bool,
        must:
            _ty.Literal[
                'mutex',
                'xor',
                None]
        ) -> tuple[
            dict,
            list[str] | None]:
    """Represent finite domain in GR(1).

    An integer or string variable can be used,
    or multiple Boolean variables.

    If the possible values are:

      - mutually exclusive (use_mutex == True)
      - bool actions have not been requested (bool_actions == False)

    then an integer or string variable represents the variable in GR(1).

    If all values are integers, then an integer is used.
    If all values are strings, then a string variable is used.
    Otherwise an exception is raised, unless Booleans have been requested.

    If the values are not mutually exclusive,
    then only Boolean variables can represent them.

    Suppose N possible values are defined.
    The int variable is allowed to take N+1 values.
    The additional value corresponds to all, e.g., actions, being False.

    If FTS values are integers,
    then the additional action is an int value.

    If FTS values are strings (e.g., 'park', 'wait'),
    then the additional action is 'none'.
    They are treated by `spec` as an arbitrary finite domain.

    An option `min_one` is internally available,
    in order to allow only N values of the variable.
    This requires that the variable takes at least one value each time.

    Combined with a mutex constraint, it yields an n-ary xor constraint.

    @param states:
        values of domain.
    @param variables:
        to be augmented with
        integer or
        string variable or
        Boolean variables.
    @param statevar:
        name to use for integer or string valued variable.
    @param bool_states:
        if True, then use bool variables.
        Otherwise use integer or string valued variable.
    @return:
        `tuple` of:
        - mapping from values to GR(1) actions.
          If Booleans are used, then GR(1) are the same.
          Otherwise, they map to e.g. 'act = "wait"' or 'act = 3'
        - constraints to be added to `trans` and/or `init` in GR(1)
    """
    if not states:
        _logger.debug('empty container, so empty dict for solver expr')
        return dict(), None
    _logger.debug(
        f'mapping domain: {states}\n'
        '\tto expression understood by '
        'a GR(1) solver.')
    assert must in {'mutex', 'xor', None}
    # options for modeling actions
    if must in {'mutex', 'xor'}:
        use_mutex = True
    else:
        use_mutex = False
    if must == 'xor':
        min_one = True
    else:
        min_one = False
    # no mutex -> cannot use int variable
    if not use_mutex:
        _logger.debug(
            'not using mutex: '
            'Booleans must model actions')
        bool_states = True
    _logger.debug(
        'options for modeling actions:\n\t'
        f'mutex: {use_mutex}\n'
        f'\tmin_one: {min_one}')
    all_str = all(
        isinstance(x, str)
        for x in states)
    if bool_states:
        _logger.debug(
            'states modeled as Boolean variables')
        if not all_str:
            raise TypeError(
                'If Boolean, all states must be strings.')
        state_ids = {x: x for x in states}
        variables.update({
            s: 'boolean'
            for s in states})
        # single action ?
        if len(mutex(state_ids.values())) == 0:
            return state_ids, None
        # handle multiple actions
        if use_mutex and not min_one:
            constraint = mutex(state_ids.values())[0]
        elif use_mutex and min_one:
            constraint = exactly_one(state_ids.values())[0]
        elif min_one:
            raise Exception('min_one requires mutex')
        else:
            constraint = 'True'
        constraint = [constraint]
    else:
        _logger.debug('states not modeled as Booleans')
        if statevar in variables:
            raise ValueError(
                f'state variable: {statevar}'
                f' already exists in: {variables}')
        all_int = all(isinstance(x, int) for x in states)
        if all_int:
            _logger.debug('all states are integers')
            # extra value modeling all False ?
            if min_one:
                n = max(states)
            else:
                n = max(states) + 1
            f = lambda x: f'{statevar} = {x}'
            domain = (min(states), n)
            _logger.debug(
                f'created solver variable: {statevar}'
                f'\n\t with domain: {domain}')
        elif all_str:
            _logger.debug('all states are strings')
            assert use_mutex
            f = lambda x: f'{statevar} = "{x}"'
            domain = list(states)
            if not min_one:
                domain += [f'{statevar}none']
                _logger.debug(
                    'domain has been extended, because all actions\n\t'
                    'could be False (constraint: min_one = False).')
        else:
            raise TypeError(
                'Integer and string states must not be mixed.')
        state_ids = {
            x: f(x)
            for x in states}
        variables[statevar] = domain
        constraint = None
    tabs = 2 * '\t'
    _logger.debug(
        f'for tulip variable: {statevar}\n'
        'the map from [tulip action values] ---> '
        '[solver expressions] is:\n'
        f'{tabs}{state_ids}')
    return state_ids, constraint


def _add_actions(
        constraint:
            list[str],
        init:
            list,
        trans:
            list):
    if constraint is None:
        return
    trans += [f'X ({constraint[0]})']
    init += constraint


def _fts2spec(
        fts:
            transys.FiniteTransitionSystem,
        ignore_initial:
            bool,
        statevar:
            str,
        actionvar:
            str |
            None=None,
        bool_states:
            bool=False,
        bool_actions:
            bool=False
        ) -> tuple:
    """Convert closed FTS to GR(1) representation."""
    raise Exception('deprecated')
    assert isinstance(fts, transys.FiniteTransitionSystem)
    aps = fts.aps
    states = fts.states
    actions = fts.actions
    sys_init = list()
    sys_trans = list()
    sys_vars = {
        ap: 'boolean'
        for ap in aps}
    action_ids, constraint = iter2var(
        actions, sys_vars,
        actionvar, bool_actions,
        fts.actions_must)
    _add_actions(constraint, sys_init, sys_trans)
    state_ids, constraint = iter2var(
        states, sys_vars,
        statevar, bool_states,
        must='xor')
    if constraint is not None:
        sys_trans += constraint
    sys_init += _sys_init_from_ts(
        states, state_ids, aps, ignore_initial)
    sys_trans += _sys_trans_from_ts(
        states, state_ids, fts.transitions,
        action_ids=action_ids)
    tmp_init, tmp_trans = _ap_trans_from_ts(
        states, state_ids, aps)
    sys_init += tmp_init
    sys_trans += tmp_trans
    return (
        sys_vars,
        sys_init,
        sys_trans)


def sys_to_spec(
        ofts:
            transys.FiniteTransitionSystem,
        ignore_initial:
            bool,
        statevar:
            str,
        bool_states:
            bool=False,
        bool_actions=False
        ) -> _spec.GRSpec:
    """Convert transition system to GR(1) fragment of LTL.

    The attribute `FTS.owner` defines who controls the system,
    as described next. It can take values `'env'` or `'sys'`.

    The following are represented by
    variables controlled by `ofts.owner`:

      - the current state
      - the atomic propositions annotating states
      - the system actions annotating edges

    The following are represented by
    variables controlled by the other player:

      - the environment actions annotating edges

    Multiple types of environment and system actions can be defined.
    Make sure that, depending on the player,
    `'env'` or `'sys'` are part of the action type names,
    so that `synth.synthesize` can recognize them.

    Caution
    =======
    There are aspects of `FTS` that
    need to be separately specified in a logic formula.

    An example are the initial conditions constraining the values
    of environment and system actions.

    See also
    ========
    `sys_trans_from_ts}, `env_open_fts2spec`,
    `create_actions`, `create_states`

    @param ignore_initial:
        Do not include initial state info from TS.
        Enable this to mask absence of FTS initial states.
        Useful when initial states are specified in another way,
        e.g., directly augmenting the spec part.
    @param state_var:
        name to be used for the integer or string
        variable that equals the current transition system state.
    @param bool_states:
        deprecated as inefficient

        if `True`, then use one Boolean variable
        to represent each state (one-hot encoding).
        Otherwise use a single integer variable,
        different values of which correspond to states of
        `ofts` (binary encoding).
    @param bool_actions:
        Similar to `bool_states`.
        For each type of system actions,
        and each type of environment actions:

          - if `True`, then for each possible value of that action type,
            use a different Boolean variable to represent it.

          - Otherwise use a single integer variable,
            that ranges over the possible action values.
    @return:
        logic formula in GR(1) form representing `ofts`.
    """
    if not isinstance(ofts, transys.FiniteTransitionSystem):
        raise TypeError(
            'ofts must be FTS, '
            f'got instead: {type(ofts)}')
    assert ofts.owner == 'sys'
    aps = ofts.aps
    states = ofts.states
    trans = ofts.transitions
    assert not set(aps).intersection(states)
    # init
    sys_init = list()
    sys_trans = list()
    env_init = list()
    env_trans = list()
    sys_vars = {
        ap: 'boolean'
        for ap in aps}
    env_vars = dict()
    actions = ofts.actions
    sys_action_ids = dict()
    env_action_ids = dict()
    for action_type, codomain in actions.items():
        if set(codomain).intersection(aps):
            raise AssertionError(
                codomain, aps)
        if set(codomain).intersection(states):
            raise AssertionError(
                codomain, states)
        if action_type in states:
            raise AssertionError(
                action_type)
        if action_type in aps:
            raise AssertionError(
                action_type)
        msg = f'action_type:\n\t{action_type}\n'
        msg += f'with codomain:\n\t{codomain}'
        _logger.debug(msg)
        if 'sys' in action_type:
            _logger.debug('Found sys action')
            action_ids, constraint = iter2var(
                codomain, sys_vars,
                action_type, bool_actions,
                ofts.sys_actions_must)
            _add_actions(constraint, sys_init, sys_trans)
            _logger.debug(
                'Updating sys_action_ids with:\n'
                f'\t{action_ids}')
            sys_action_ids[action_type] = action_ids
        elif 'env' in action_type:
            _logger.debug('Found env action')
            action_ids, constraint = iter2var(
                codomain, env_vars,
                action_type, bool_actions,
                ofts.env_actions_must)
            _add_actions(constraint, env_init, env_trans)
            _logger.debug(
                'Updating env_action_ids with:\n'
                f'\t{action_ids}')
            env_action_ids[action_type] = action_ids
    state_ids, constraint = iter2var(
        states, sys_vars,
        statevar, bool_states,
        must='xor')
    if constraint is not None:
        sys_trans += constraint
    sys_init += _sys_init_from_ts(
        states, state_ids, aps, ignore_initial)
    sys_trans += _sys_trans_from_ts(
        states, state_ids, trans,
        sys_action_ids=sys_action_ids,
        env_action_ids=env_action_ids)
    tmp_init, tmp_trans = _ap_trans_from_ts(
        states, state_ids, aps)
    sys_init += tmp_init
    sys_trans += tmp_trans
    env_trans += _env_trans_from_sys_ts(
        states, state_ids, trans, env_action_ids)
    return _spec.GRSpec(
        sys_vars=sys_vars,
        env_vars=env_vars,
        env_init=env_init,
        sys_init=sys_init,
        env_safety=env_trans,
        sys_safety=sys_trans)


def env_to_spec(
        ofts:
            transys.FiniteTransitionSystem,
        ignore_initial:
            bool,
        statevar:
            str,
        bool_states:
            bool=False,
        bool_actions:
            bool=False
        ) -> _spec.GRSpec:
    """Convert env transition system to GR(1) representation.

    The following are represented by environment variables:

      - the current state
      - the atomic propositions annotating states
      - the environment actions annotating edges

    The following are represented by system variables:

      - the system actions annotating edges

    Multiple types of environment and system actions can be defined.

    For more details see `sys_to_spec`.

    See also
    ========
    `sys_open_fts2spec`
    """
    if not isinstance(ofts, transys.FiniteTransitionSystem):
        raise TypeError(
            'ofts must be FTS, '
            f'got instead: {type(ofts)}')
    assert ofts.owner == 'env'
    aps = ofts.aps
    states = ofts.states
    trans = ofts.transitions
    assert not set(aps).intersection(states)
    # init
    sys_init = list()
    sys_trans = list()
    env_init = list()
    env_trans = list()
    # since APs are tied to env states,
    # let them be env variables
    env_vars = {ap: 'boolean' for ap in aps}
    sys_vars = dict()
    actions = ofts.actions
    sys_action_ids = dict()
    env_action_ids = dict()
    for action_type, codomain in actions.items():
        if set(codomain).intersection(aps):
            raise AssertionError(
                codomain, aps)
        if set(codomain).intersection(states):
            raise AssertionError(
                codomain, states)
        if action_type in states:
            raise AssertionError(
                action_type)
        if action_type in aps:
            raise AssertionError(
                action_type)
        if 'sys' in action_type:
            action_ids, constraint = iter2var(
                codomain, sys_vars,
                action_type, bool_actions,
                ofts.sys_actions_must)
            _add_actions(
                constraint,
                sys_init,
                sys_trans)
            sys_action_ids[action_type] = action_ids
        elif 'env' in action_type:
            action_ids, constraint = iter2var(
                codomain, env_vars,
                action_type, bool_actions,
                ofts.env_actions_must)
            _add_actions(
                constraint,
                env_init,
                env_trans)
            env_action_ids[action_type] = action_ids
    # some duplication here,
    # because we don't know
    # whether the user will provide
    # a system TS as well
    # and whether that TS will contain
    # all the system actions
    # defined in the environment TS
    state_ids, constraint = iter2var(
        states, env_vars,
        statevar, bool_states,
        must='xor')
    if constraint is not None:
        env_trans += constraint
    env_init += _sys_init_from_ts(
        states, state_ids, aps, ignore_initial)
    env_trans += _env_trans_from_env_ts(
        states, state_ids, trans,
        env_action_ids=env_action_ids,
        sys_action_ids=sys_action_ids)
    tmp_init, tmp_trans = _ap_trans_from_ts(
        states, state_ids, aps)
    env_init += tmp_init
    env_trans += tmp_trans
    return _spec.GRSpec(
        sys_vars=sys_vars,
        env_vars=env_vars,
        env_init=env_init,
        sys_init=sys_init,
        env_safety=env_trans,
        sys_safety=sys_trans)


def _sys_init_from_ts(
        states,
        state_ids,
        aps,
        ignore_initial:
            bool=False
        ) -> Formulas:
    """Initial state, including enforcement of exactly one."""
    init = list()
    # skip ?
    if ignore_initial:
        return init
    if not states.initial:
        msg = (
            'FTS has no initial states.\n'
            'Enforcing this renders False the GR(1):\n'
            ' - guarantee if this is a system TS,\n'
            '   so the spec becomes trivially False.\n'
            ' - assumption if this is an environment TS,\n'
            '   so the spec becomes trivially True.')
        raise Exception(msg)
        init.append('False')
        return init
    init.append(
        _disj(map(
            state_ids.__getitem__,
            states.initial)))
    return init


def _sys_trans_from_ts(
        states,
        state_ids,
        trans,
        action_ids=None,
        sys_action_ids=None,
        env_action_ids=None
        ) -> Formulas:
    """Convert transition relation to GR(1) sys_safety.

    The transition relation may be closed or open,
    i.e., depend only on system, or also on environment actions.

    No mutexes enforced by this function among:

        - sys states
        - env actions

    An edge attribute 'previous' can be optionally set to
    an iterable of edge attribute keys.
    The actions with those action_types those keys
    will not be prepended by the next operator.

    This enables defining both current and next actions, e.g.,

    some_action && X(some_other_action)

    About label type checking: in principle everything should work the
    same if the base class LabeledDiGraph was replaced by MultiDiGraph,
    so that users can play around with their own bare graphs,
    when they don't need the label typing overhead.

    @param trans:
        `Transitions` as from the transitions
        attribute of `FTS`.
    @param action_ids:
        same as `sys-action_ids`
        Caution: to be removed in a future release
    @param sys_action_ids:
        dict of dicts
        outer dict keyed by action_type
        each inner dict keyed by action_value
        each inner dict value is the
        solver expression for that action value

        for example an action type with an
        arbitrary finite discrete codomain can be modeled either:

          - as Boolean variables, so each possible action value
            becomes a different Boolean variable with the same
            name, thus `sys_action_ids[action_type]` will be
            the identity map on `action_values` for that `action_type`.

          - as integer variables, so each possible action value
            becomes a different expression in the solver (e.g. gr1c)
            input format. Then `sys_action_ids[action_type]` maps
            `action_value` -> solver expression of the form:

            `action_type = i`

            where `i` corresponds to that particular  `action_type`.
    @param env_action_ids:
        same as `sys-action_ids`
    """
    _logger.debug('modeling sys transitions in logic')
    sys_trans = list()
    # Transitions
    for from_state in states:
        from_state_id = state_ids[from_state]
        precond = _pstr(from_state_id)
        cur_trans = trans.find([from_state])
        msg = (
            f'from state: {from_state}'
            ', the available transitions are:\n'
            f'\t{cur_trans}')
        _logger.debug(msg)
        # no successor states ?
        if not cur_trans:
            _logger.debug(
                f'state: {from_state} is deadend !')
            sys_trans.append(
                f'{precond} -> X(False)')
            continue
        cur_str = list()
        for from_state, to_state, label in cur_trans:
            to_state_id = state_ids[to_state]
            postcond = [
                f'X{_pstr(to_state_id)}']
            _logger.debug(
                f'label = {label}')
            if 'previous' in label:
                previous = label['previous']
            else:
                previous = set()
            _logger.debug(
                f'previous = {previous}')
            env_actions = {
                k: v
                for k, v in
                    label.items()
                if 'env' in k}
            prev_env_act = {
                k: v
                for k, v in
                    env_actions.items()
                if k in previous}
            next_env_act = {
                k: v
                for k, v in
                    env_actions.items()
                if k not in previous}
            postcond += [_conj_actions(prev_env_act, env_action_ids,
                                       nxt=False)]
            postcond += [_conj_actions(next_env_act, env_action_ids,
                                       nxt=True)]
            sys_actions = {
                k: v
                for k, v in
                    label.items()
                if 'sys' in k}
            prev_sys_act = {
                k: v
                for k, v in
                    sys_actions.items()
                if k in previous}
            next_sys_act = {
                k: v
                for k, v in
                    sys_actions.items()
                if k not in previous}
            postcond += [
                _conj_actions(
                    prev_sys_act, sys_action_ids,
                    nxt=False)]
            postcond += [
                _conj_actions(
                    next_sys_act, sys_action_ids,
                    nxt=True)]
            # if system FTS given
            # in case 'actions in label,
            # then action_ids is a dict,
            # not a dict of dicts,
            # because certainly this came
            # from an FTS, not an OpenFTS
            if 'actions' in previous:
                postcond += [
                    _conj_action(
                        label, 'actions',
                        ids=action_ids,
                        nxt=False)]
            else:
                postcond += [
                    _conj_action(
                        label, 'actions',
                        ids=action_ids,
                        nxt=True)]
            cur_str += [_conj(postcond)]
            msg = (
                f'guard to state: {to_state}'
                f', with state_id: {to_state_id}'
                f', has post-conditions: {postcond}')
            _logger.debug(msg)
        sys_trans += [
            f'{precond} -> ({_disj(cur_str)})']
    return sys_trans


def _env_trans_from_sys_ts(
        states,
        state_ids,
        trans,
        env_action_ids
        ) -> Formulas:
    """Convert environment actions to GR(1) env_safety.

    This constrains the actions available next to the environment
    based on the system FTS.

    Purpose is to prevent env from blocking sys by purely
    picking a combination of actions for which sys has no outgoing
    transition from that state.

    Might become optional in the future,
    depending on the desired way of defining env behavior.

    @param env_action_ids:
        dict of dicts,
        see `sys_trans_from_ts`.
    """
    env_trans = list()
    # this probably useless for multiple action types
    if not env_action_ids:
        return env_trans
    for from_state in states:
        from_state_id = state_ids[from_state]
        precond = _pstr(from_state_id)
        cur_trans = trans.find([from_state])
        # no successor states ?
        if not cur_trans:
            # nothing modeled for env,
            # since sys has X(False) anyway
            # for action_type, codomain_map in env_action_ids.items():
            # env_trans += [f'{precond} -> X({s})']
            continue
        # collect possible next env actions
        next_env_action_combs = set()
        for from_state, to_state, label in cur_trans:
            env_actions = {
                k: v
                for k, v in
                    label.items()
                if 'env' in k}
            if not env_actions:
                continue
            _logger.debug(
                f'env_actions: {env_actions}')
            _logger.debug(
                f'env_action_ids: {env_action_ids}')
            env_action_comb = _conj_actions(
                env_actions, env_action_ids)
            _logger.debug(
                f'env_action_comb: {env_action_comb}')
            next_env_action_combs.add(env_action_comb)
        next_env_actions = _disj(next_env_action_combs)
        _logger.debug(
            f'next_env_actions: {next_env_actions}')
        # no next env actions ?
        if not next_env_actions:
            continue
        env_trans += [
            f'{precond} -> X({next_env_actions})']
    return env_trans


def _env_trans_from_env_ts(
        states,
        state_ids,
        trans,
        action_ids=None,
        env_action_ids=None,
        sys_action_ids=None
        ) -> Formulas:
    r"""Convert environment TS transitions to GR(1) representation.

    This contributes to the \rho_e(X, Y, X') part of the spec,
    i.e., constrains the next environment state variables' valuation
    depending on the previous environment state variables valuation
    and the previous system action (system output).
    """
    env_trans = list()
    for from_state in states:
        from_state_id = state_ids[from_state]
        precond = _pstr(from_state_id)
        cur_trans = trans.find([from_state])
        # no successor states ?
        if not cur_trans:
            env_trans += [f'{precond} -> X(False)']
            msg = (
                'Environment dead-end found.\n'
                'If sys can force env to dead-end,\n'
                'then GR(1) assumption becomes False,\n'
                'and spec trivially True.')
            warnings.warn(msg)
            continue
        cur_list = list()
        found_free = False
            # any environment transition
        # not conditioned on the previous system output ?
        for from_state, to_state, label in cur_trans:
            to_state_id = state_ids[to_state]
            postcond = [f'X{_pstr(to_state_id)}']
            env_actions = {
                k: v
                for k, v in
                    label.items()
                if 'env' in k}
            postcond += [
                _conj_actions(
                    env_actions,
                    env_action_ids,
                    nxt=True)]
            # remember: this is
            # an environment FTS,
            # so no next for sys
            sys_actions = {
                k: v
                for k, v in
                    label.items()
                if 'sys' in k}
            postcond += [
                _conj_actions(
                    sys_actions,
                    sys_action_ids)]
            postcond += [
                _conj_action(
                    label, 'actions',
                    nxt=True,
                    ids=action_ids)]
            # todo: test this clause
            if not sys_actions:
                found_free = True
            cur_list += [_conj(postcond)]
        # can sys block env by
        # setting all previous sys outputs to False ?
        # then env assumption becomes False,
        # so the spec trivially True: avoid this
        if not found_free and sys_action_ids:
            msg = (
                'no free env outgoing transition found\n'
                'instead will take disjunction with negated sys actions')
            _logger.debug(msg)
            for action_type, codomain in sys_action_ids.items():
                conj = _conj_neg(codomain.values())
                cur_list += [conj]
                msg = (
                    f'for action_type: {action_type}\n'
                    f'with codomain: {codomain}\n'
                    f'the negated conjunction is: {conj}')
                _logger.debug(msg)
        env_trans += [
            f'{_pstr(precond)} -> '
            f'({_disj(cur_list)})']
    return env_trans


def _ap_trans_from_ts(
        states,
        state_ids,
        aps) -> tuple[
            Formulas,
            Formulas]:
    """Require atomic propositions to follow states according to label."""
    init = list()
    trans = list()
    # no AP labels ?
    if not aps:
        return (init, trans)
    # initial labeling
    for state in states:
        state_id = state_ids[state]
        label = states[state]
        ap_str = _sprint_aps(label, aps)
        if not ap_str:
            continue
        init += [
            f'!({_pstr(state_id)}) || ({ap_str})']
    # transitions of labels
    for state in states:
        label = states[state]
        state_id = state_ids[state]
        tmp = _sprint_aps(label, aps)
        if not tmp:
            continue
        trans += [
            f'X(({state_id}) -> ({tmp}))']
    return (init, trans)


def _sprint_aps(label, aps) -> str:
    if 'ap' in label:
        tmp0 = _conj_intersection(
            aps, label['ap'],
            parenth=False)
    else:
        tmp0 = ''
    if 'ap' in label:
        tmp1 = _conj_neg_diff(
            aps, label['ap'],
            parenth=False)
    else:
        tmp1 = _conj_neg(
            aps,
            parenth=False)
    if tmp0 and tmp1:
        tmp = f'{tmp0} && {tmp1}'
    else:
        tmp = f'{tmp0}{tmp1}'
    return tmp


def build_dependent_var_table(
        fts:
            transys.FTS,
        statevar:
            str
        ) -> dict[str, str]:
    """Return substitution rules for dependent variables.

    The dependent variables in a transition system are the
    atomic propositions that are used to label states.

    They are "dependent" because their values are completely
    determined by knowledge of the current state in the
    transition system.

    The returned substitutions can be used

    @param statevar:
        name of variable used for the current state
        For example if it is 'loc', then the states
        `'s0', 's1'` are mapped to:

        ```python
        {'s0': '(loc = "s0")',
         's1': '(loc = "s1")'}
        ```
    @return:
        `{'p': '((loc = "s1") | (loc = "s2") | ...)', ...}`
        where:

          - `'p'` is a proposition in
            `fts.atomic_propositions`
          - the states "s1", "s2" are labeled with `'p'`
          - `loc` is the string variable used for
            the state of `fts`.
    """
    state_ids, __ = iter2var(
        fts.states,
        variables=dict(),
        statevar=statevar,
        bool_states=False,
        must='xor')
    ap2states = map_ap_to_states(fts)
    return {k: _disj(state_ids[x] for x in v)
            for k, v in ap2states.items()}


def map_ap_to_states(
        fts:
            transys.FTS
        ) -> dict[str, set]:
    """For each proposition find the states labeled with it.

    @return:
        `{'p': s, ...}` where
        `'p'` a proposition and
        `s` a set of states in `fts`.
    """
    table = {
        p: set()
        for p in fts.atomic_propositions}
    for u in fts:
        for p in fts.nodes[u]['ap']:
            table[p].add(u)
    return table


def synthesize_many(
        specs:
            _spec.GRSpec,
        ts:
            dict[
                str,
                transys.FiniteTransitionSystem] |
            None=None,
        ignore_init:
            set |
            None=None,
        solver:
            Solver='omega'
        ) -> maybe_mealy:
    """Synthesize from logic specs and multiple transition systems.

    The transition systems are composed synchronously, i.e.,
    they all have to take a transition at each time step.
    The synchronous composition is obtained by taking the
    conjunction of the formulas describing each transition system.

    The states of each transition system can be either:

      - all integers, or
      - all strings

    In either case the transition system state will be
    represented in logic with a single variable,
    that ranges over a finite set of
    integers or strings, respectively.

    The keys of `ts` are used to name each state variable.
    So the logic formula for `ts['name']` will be `'name'`.

    Who controls this state variable is determined from
    the attribute `FTS.owner` that can take the values:

      - `'env'`
      - `'sys'`

    For example:

    ```python
    ts.states.add_from(range(4))
    ts['door'].owner = 'env'
    ```

    will result in a logic formula with
    an integer variable `'door'`
    controlled by the environment and
    taking values over `{0, 1, 2, 3}`.

    The example:

    ```python
    ts.states.add_from(['a', 'b', 'c'])
    ts['door'].owner = 'sys'
    ```

    will instead result in a string variable `'door'`
    controlled by the system and taking
    values over `{'a', 'b', 'c'}`.

    @param ignore_init:
        `set` of keys from `ts`
    @param solver:
        See function `synthesize` for
        available options.
    """
    assert isinstance(ts, dict), ts
    for name, t in ts.items():
        if not isinstance(t, transys.FiniteTransitionSystem):
            raise AssertionError(t)
        ignore = name in ignore_init
        statevar = name
        match t.owner:
            case 'sys':
                sys_spec = sys_to_spec(t, ignore, statevar)
                _copy_options_from_ts(sys_spec, t, specs)
                specs |= sys_spec
            case 'env':
                env_spec = env_to_spec(t, ignore, statevar)
                _copy_options_from_ts(env_spec, t, specs)
                specs |= env_spec
    return _synthesize(
        specs, solver,
        rm_deadends=True)


[docs] def synthesize( specs: _spec.GRSpec, env: maybe_fts=None, sys: maybe_fts=None, ignore_env_init: bool=False, ignore_sys_init: bool=False, rm_deadends: bool=True, solver: Solver='omega' ) -> maybe_mealy: """Call synthesis tool `solver` on the specification. There are three attributes of `specs` that define what kind of controller you are looking for: 1. `moore`: What information the controller knows when deciding the next values of controlled variables: - Moore: can read current state, but not next environment variable values, or - Mealy: can read current state and next environment variable values. 2. `qinit`: Quantification of initial variable values: Whether all states that satisfy a predicate should be winning, or the initial values of some (or all) the variables is subject to the synthesizer's choice. 3. `plus_one`: The form of assume-guarantee specification, i.e., how the system guarantees relate to assumptions about the environment. For more details about these attributes, see `GRSpec`. The states of the transition system can be either: - all integers, or - all strings For more details of how the transition system is represented in logic look at `synthesize_many`. Beware! ======= This function provides a generic interface to a variety of routines. Being under active development, the types of arguments supported and types of objects returned may change without notice. @param env: A transition system describing the environment: - states controlled by environment - input: sys_actions - output: env_actions - initial states constrain the environment This constrains the transitions available to the environment, given the outputs from the system. @param sys: A transition system describing the system: - states controlled by the system - input: env_actions - output: sys_actions - initial states constrain the system @param ignore_env_init: Ignore any initial state information contained in env. @param ignore_sys_init: Ignore any initial state information contained in sys. @param rm_deadends: return a strategy that contains no terminal states. @param solver: Magic string that declares what tool to invoke, what method to use, etc. Currently recognized forms: For GR(1) synthesis: - `"gr1c"`: use gr1c via `interfaces.gr1c`. written in C using CUDD, symbolic - `"gr1py"`: use gr1py via `interfaces.gr1py`. Python, enumerative - `"omega"`: use omega via `interfaces.omega`. Python using `dd` or Cython using CUDD, symbolic - `"slugs"`: use slugs via `interfaces.slugs`. C++ using CUDD, symbolic @return: If spec is realizable, then return a Mealy machine implementing the strategy. Otherwise return None. """ specs = _spec_plus_sys( specs, env, sys, ignore_env_init, ignore_sys_init) return _synthesize(specs, solver, rm_deadends)
def _synthesize( specs: _spec.GRSpec, solver: Solver, rm_deadends: bool ) -> maybe_mealy: """Return `MealyMachine` that implements `specs`.""" match solver: case 'gr1c': strategy = gr1c.synthesize(specs) case 'slugs': if slugs is None: raise ValueError( 'Import of slugs interface failed. ' 'Please verify installation of "slugs".') strategy = slugs.synthesize(specs) case 'gr1py': strategy = gr1py.synthesize(specs) case 'omega': strategy = omega_int.synthesize_enumerated_streett(specs) case _: options = {'gr1c', 'gr1py', 'omega', 'slugs'} raise ValueError( f'Unknown solver: "{solver}". ' f'Available options are: {options}') return _trim_strategy( strategy, specs, rm_deadends=rm_deadends) def _trim_strategy( strategy: maybe_mealy, specs: _spec.GRSpec, rm_deadends: bool ) -> maybe_mealy: """Return `MealyMachine` without deadends, or `None`. If `strategy is None`, then return `None`. @param rm_deadends: if `True`, then remove deadends from the Mealy machine """ # While the return values of # the solver interfaces vary, # we expect here that strategy is # either None to indicate unrealizable or # a networkx.DiGraph ready to be # passed to strategy2mealy(). if strategy is None: return None ctrl = strategy2mealy(strategy, specs) _logger.debug( 'Mealy machine has: ' f'n = {len(ctrl.states)} states.') if rm_deadends: ctrl.remove_deadends() return ctrl def is_realizable( specs: _spec.GRSpec, env: maybe_fts=None, sys: maybe_fts=None, ignore_env_init: bool=False, ignore_sys_init: bool=False, solver: Solver='omega' ) -> bool: """Check realizability. For details see `synthesize`. """ specs = _spec_plus_sys( specs, env, sys, ignore_env_init, ignore_sys_init) match solver: case 'gr1c': r = gr1c.check_realizable(specs) case 'slugs': if slugs is None: raise ValueError( 'Import of slugs interface failed. ' 'Please verify installation of "slugs".') r = slugs.check_realizable(specs) case 'gr1py': r = gr1py.check_realizable(specs) case 'omega': r = omega_int.is_realizable(specs) case _: raise ValueError( 'Undefined synthesis solver: ' f'{solver = }' 'Available options are "gr1c", ' '"slugs", and "gr1py"') if r: _logger.debug('is realizable') else: _logger.debug('is not realizable') return r def _spec_plus_sys( specs: _spec.GRSpec, env: maybe_fts, sys: maybe_fts, ignore_env_init: bool, ignore_sys_init: bool ) -> _spec.GRSpec: if sys is not None: if hasattr(sys, 'state_varname'): statevar = sys.state_varname else: _logger.info( 'sys.state_varname undefined. ' 'Will use the default ' 'variable name: "loc".') statevar = 'loc' sys_formula = sys_to_spec( sys, ignore_sys_init, statevar=statevar) _copy_options_from_ts( sys_formula, sys, specs) if specs.qinit == r'\A \A': sys_formula.env_init.extend( sys_formula.sys_init) sys_formula.sys_init = list() specs = specs | sys_formula pp_formula = sys_formula.pretty() _logger.debug( 'sys TS:\n' f'{pp_formula}{_hl}') if env is not None: if hasattr(env, 'state_varname'): statevar = env.state_varname else: _logger.info( 'env.state_varname undefined. ' 'Will use the default ' 'variable name: "eloc".') statevar = 'eloc' env_formula = env_to_spec( env, ignore_env_init, statevar=statevar) if specs.qinit == r'\A \A': env_formula.env_init.extend( env_formula.sys_init) env_formula.sys_init = list() _copy_options_from_ts(env_formula, env, specs) specs = specs | env_formula _logger.debug( 'env TS:\n' f'{env_formula.pretty()}{_hl}') _logger.info( 'Overall Spec:\n' f'{specs.pretty()}{_hl}') return specs def _copy_options_from_ts( ts_spec, ts, specs ) -> None: """Copy `moore, qinit, plus_one` from `ts`, if set. Otherwise copy the values of those attributes from `specs`. """ if hasattr(ts, 'moore'): cp = ts else: cp = specs ts_spec.moore = cp.moore ts_spec.plus_one = cp.plus_one ts_spec.qinit = cp.qinit def strategy2mealy( A: _nx.DiGraph, spec: _spec.GRSpec ) -> transys.MealyMachine: """Convert strategy to Mealy transducer. Note that the strategy is a deterministic game graph, but the input `A` is given as the contraction of this game graph. @param A: strategy """ if not A: raise AssertionError( 'graph `A` has no nodes, ' f'{A = }') _logger.info( 'converting strategy (compact) ' 'to Mealy machine') env_vars = spec.env_vars sys_vars = spec.sys_vars mach = transys.MealyMachine() inputs = transys.machines.create_machine_ports(env_vars) mach.add_inputs(inputs) outputs = transys.machines.create_machine_ports(sys_vars) mach.add_outputs(outputs) str_vars = { k: v for k, v in env_vars.items() if isinstance(v, list)} str_vars.update({ k: v for k, v in sys_vars.items() if isinstance(v, list)}) mach.states.add_from(A) all_vars = dict(env_vars) all_vars.update(sys_vars) u = next(iter(A)) strategy_vars = A.nodes[u]['state'].keys() assert set(all_vars).issubset(strategy_vars) # transitions labeled with I/O for u in A: for v in A.successors(u): d = A.nodes[v]['state'] d = { k: v for k, v in d.items() if k in all_vars} d = _int2str(d, str_vars) mach.transitions.add( u, v, attr_dict=None, check=False, **d) _logger.info( f'node: {v}, state: {d}') # special initial state, for first reaction initial_state = 'Sinit' mach.states.add(initial_state) mach.states.initial.add(initial_state) # fix an ordering for keys keys = list(all_vars) if hasattr(A, 'initial_nodes'): _init_edges_using_initial_nodes( A, mach, keys, all_vars, str_vars, initial_state) else: _init_edges_using_compile_init( spec, A, mach, keys, all_vars, str_vars, initial_state) n = len(A) m = len(mach) if m != n + 1: raise AssertionError( n, m) if not mach.successors('Sinit'): newlines = 2 * '\n' nodes = pprint.pformat( A.nodes(data=True)) raise AssertionError( 'The machine obtained from the strategy ' 'does not have any initial states !\n' 'The strategy is:\n' f'vertices:{nodes}{newlines}' f'edges:\n{A.edges()}{newlines}' f'and the machine:\n{mach}{newlines}' 'and the specification is:\n' f'{spec.pretty()}{newlines}') return mach def _init_edges_using_initial_nodes( A: _nx.DiGraph, mach: transys.MealyMachine, keys, all_vars, str_vars: dict, initial_state ) -> None: assert A.initial_nodes init_valuations = set() for u in A.initial_nodes: d = A.nodes[u]['state'] vals = tuple(d[k] for k in keys) # already an initial valuation ? if vals in init_valuations: continue init_valuations.add(vals) d = { k: v for k, v in d.items() if k in all_vars} d = _int2str(d, str_vars) mach.transitions.add( initial_state, u, attr_dict=None, **d) def _init_edges_using_compile_init( spec: _spec.GRSpec, A: _nx.DiGraph, mach: transys.MealyMachine, keys, all_vars, str_vars: dict, initial_state ) -> None: init_valuations = set() # to store tuples of # dict values for fast search isinit = spec.compile_init(no_str=True) # Mealy reaction to initial env input init_valuations = set() tmp = dict() for u, d in A.nodes(data=True): var_values = d['state'] vals = tuple(var_values[k] for k in keys) # already an initial valuation ? if vals in init_valuations: continue # add edge: Sinit -> u ? tmp.update(var_values) if eval(isinit, tmp): var_values = { k: v for k, v in var_values.items() if k in all_vars} label = _int2str(var_values, str_vars) mach.transitions.add( initial_state, u, attr_dict=None, check=False, **label) # remember variable values to avoid # spurious non-determinism wrt # the machine's memory # # in other words, # "state" omits the strategy's memory # hidden (existentially quantified) # so multiple nodes can be # labeled with the same state # # non-uniqueness here would be # equivalent to multiple choices for # initializing the hidden memory. init_valuations.add(vals) _logger.debug( f'found initial state: {u}') _logger.debug( f'machine vertex: {u}, ' f'has var values: {var_values}') def _int2str( label: dict, str_vars: dict ) -> dict: """Replace integers with string values for string variables. @param label: mapping from variable names, to integer (as strings) @param str_vars: mapping that defines those variables that should be converted from integer to string variables. Each variable is mapped to a list of strings that comprise its range. This list defines how integer values correspond to string literals for that variable. """ label = dict(label) label.update({ k: str_vars[k][int(v)] for k, v in label.items() if k in str_vars}) return label def mask_outputs( machine: transys.MealyMachine ) -> None: """Erase outputs from each edge where they are zero.""" for _, _, d in machine.edges(data=True): for k in d: pop = ( k in machine.outputs and d[k] == 0) if pop: d.pop(k) def determinize_machine_init( mach: transys.MealyMachine, init_out_values: dict | None=None ) -> transys.MealyMachine: """Return a determinized copy of `mach` with given initial outputs. The transducers produced by synthesis can have multiple initial output valuations as possible reactions to a given input valuation. Possible reasons for this are: 1. the system does not have full control over its initial state. For example the option "ALL_INIT" of `gr1c`. 2. the strategy returned by the solver has multiple vertices that satisfy the initial conditions. Case 1 ====== Requires an initial condition to be specified for each run of the transducer, because the transducer does not have full freedom to pick the initial output values. Note that solver options like "ALL_INIT" assume that the system has no control initially. Any output valuation that satisfies the initial conditions can occur. However, this may be too restrictive. The system may have control over the initial values of some outputs, but not others. For the outputs it can initially control, the non-determinism resulting from synthesis is redundancy and can be removed arbitrarily, as in Case 2. Case 2 ====== The function `strategy2mealy` returns a transducer that for each initial input valuation, for each initial output valuation, reacts with a unique transition. But this can yield multile reactions to a single input, even for solver options like "ALL_ENV_EXIST_SYS_INIT" for `gr1c`. The reason is that there can be multiple strategy vertices that satisfy the initial conditions, but the solver included them not because they are needed as initial reactions, but to be visited later by the strategy. These redundant initial reactions can be removed, and because the system has full control over their values, they can be removed in an arbitrary manner, keeping only a single reaction, for each input valuation. Algorithm ========= Returns a deterministic transducer. This means that at each transducer vertex, for each input valuation, there is only a single reaction (output valuation) available. The non-determinism is resolved for the initial reaction by ensuring the outputs given in `init_out_values` take those values. The remaining outputs are determinized arbitrarily. See also ======== `synthesize`, `strategy2mealy` @param mach: possibly non-deterministic transducer, as produced, for example, by `synthesize`. @param init_out_values: mapping from output ports that the system cannot control initially, to the initial values they take in this instance of the game. """ mach = copy.deepcopy(mach) if init_out_values is None: init_out_values = dict() '''determinize given outputs (uncontrolled)''' # restrict attention to # given output ports only given_ports = tuple(filter( init_out_values.__contains__, mach.outputs)) rm_edges = set() edges = mach.edges( ['Sinit'], data=True, keys=True) for i, j, key, d in edges: for k in given_ports: if d[k] != init_out_values[k]: rm_edges.add((i, j, key)) break mach.remove_edges_from(rm_edges) # # determinize arbitrarily any # remnant non-determinism # # input valuations already seen # tuples of values used for # efficiency (have __hash__) possible_inputs = set() # fix a key order inputs = tuple(mach.inputs) rm_edges = set() edges = mach.edges( ['Sinit'], data=True, keys=True) for i, j, key, d in edges: in_values = tuple(d[k] for k in inputs) # newly encountered input valuation ? if in_values not in possible_inputs: possible_inputs.add(in_values) continue else: edge = (i, j, key) rm_edges.add(edge) mach.remove_edges_from(rm_edges) return mach