Source code for transys.machines

# Copyright (c) 2013-2015 by California Institute of Technology
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the California Institute of Technology nor
#    the names of its contributors may be used to endorse or promote
#    products derived from this software without specific prior
#    written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL CALTECH
# OR THE CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
"""Finite State Machines Module"""
import collections.abc as _abc
import copy
import pprint as _pp
import random
import sys
import typing as _ty

import tulip.transys.export.machine2scxml as _scxml
import tulip.transys.labeled_graphs as _graphs


__all__ = [
    'create_machine_ports',
    'MooreMachine',
    'MealyMachine',
    'guided_run',
    'random_run',
    'interactive_run',
    'moore2mealy',
    'mealy2moore']


_hl = 40 * '-'
# port type
pure = {'present', 'absent'}


def is_valuation(
        ports:
            dict[str, ...],
        valuations:
            dict[str, ...]
        ) -> None:
    for name, port_type in ports.items():
        curvaluation = valuations[name]
        # functional set membership description ?
        if callable(port_type):
            ok = port_type(curvaluation)
        else:
            ok = curvaluation in port_type
        if not ok:
            raise TypeError(
                'Not a valuation.')


def create_machine_ports(
        spc_vars:
            dict
        ) -> dict:
    """Create proper port domains of valuations, given port types.

    @param spc_vars:
        port names and types inside `tulip`.
        For arbitrary finite types the type
        can be a list of strings,
        instead of a range of integers.
        These are as originally defined by the user or synth.
    """
    ports = dict()
    for env_var, var_type in spc_vars.items():
        if var_type == 'boolean':
            domain = {0, 1}
        elif isinstance(var_type, tuple):
            # integer domain
            start, end = var_type
            domain = set(range(start, end + 1))
        elif isinstance(var_type, list):
            # arbitrary finite domain
            # defined by list var_type
            domain = set(var_type)
        ports[env_var] = domain
    return ports


[docs] class Transducer( _graphs.LabeledDiGraph): r"""Sequential Transducer, i.e., a letter-to-letter function. Inputs ====== `P = {p1, p2,...}` is the set of input ports. An input port p takes values in a set `Vp`. The set `Vp` is called the "type" of input port `p`. A "valuation" is an assignment of values to the input ports in `P`. We call "inputs" the set of pairs: ``` {(p_i, Vp_i), ...} ``` of input ports p_i and their corresponding types `Vp_i`. A guard is a predicate (bool-valued) used as sub-label for a transition. A guard is defined by a set and evaluated using set membership. So given an input port value `p=x`, then if: ``` x \in guard_set ``` then the guard is `True`, otherwise it is `False`. The "inputs" are defined by an `OrderedDict`: ```python {'p1': explicit, 'p2': check, 'p3': None, ...} ``` where: - `explicit`: is an iterable representation of Vp, possible only for discrete Vp. If 'p1' is explicitly typed, then guards are evaluated directly: ``` input_port_value == guard_value ? ``` - `check`: is a class with methods: - `__contains__(x)`: check if guard value given to input port `'p1'` is in the set of possible values Vp. - `__call__(guard_set, input_port_value)`: check if `input_port_value \in guard_set` This allows symbolic type definitions. For example, `input_port_value` might be assigned `int` values, but the `guard_set` be defined by a symbolic expression as the string: `'x<=5'`. Then the user is responsible for providing the appropriate method to the Mealy Machine, using the custom `check` class described here. Note that we could provide a rudimentary library for the basic types of checks, e.g., for the above simple symbolic case, where using function `eval()` is sufficient. - `None`: signifies that no type is currently defined for this input port, so input type checking and guard evaluation are disabled. This can be used to skip type definitions when they are not needed by the user. However, since Machines are in general the output of synthesis, it follows that they are constructed by code, so the benefits of typedefs will be considerable compared to the required coding effort. Guards annotate transitions: Guards: States x States ---> Input_Predicates Outputs ======= Similarly defined to inputs, but: - for Mealy Machines they annotate transitions - for Moore Machines they annotate states State Variables =============== Similarly defined to inputs, they annotate states, for both Mealy and Moore machines: States ---> State_Variables Update Function =============== The transition relation: - for Mealy Machines: ``` States x Input_Valuations ---> Output_Valuations x States ``` Note that in the range Output_Valuations are ordered before States to emphasize that an output_valuation is produced during the transition, NOT at the next state. The data structure representation of the update function is by storage of the Guards function and definition of Guard evaluation for each input port via the OrderedDict discussed above. - for Moore Machines: ``` States x Input_Valuations ---> States States ---> Output_valuations ``` Note ==== A transducer may operate on either finite or infinite words, i.e., it is not equipped with interpretation semantics on the words, so it does not "care" about word length. It continues as long as its input is fed with letters. For Machines, each state label consists of (possibly multiple) sublabels, each of which is either a variable, or, only for Moore machines, may be an output. See Also ======== FSM, MealyMachine, MooreMachine """ def __init__(self): # values will point to # values of _*_label_def below self.state_vars = dict() self.inputs = dict() self.outputs = dict() # self.set_actions = dict() # state labeling self._state_label_def = dict() self._state_dot_label_format = { 'type?label': ':', 'separator': r'\\n'} # edge labeling self._transition_label_def = dict() self._transition_dot_label_format = { 'type?label': ':', 'separator': r'\\n'} self._transition_dot_mask = dict() self._state_dot_mask = dict() self.default_export_fname = 'fsm' _graphs.LabeledDiGraph.__init__(self) self.dot_node_shape = dict( normal='ellipse') self.default_export_fname = 'fsm' def add_inputs( self, new_inputs: dict, masks: dict | None=None ) -> None: """Create new inputs. @param new_inputs: `dict` of pairs {port_name : port_type} where: - port_name: str - port_type: Iterable | check class @param masks: custom mask functions, for each sublabel based on its current value each such function returns: - True, if the sublabel should be shown - False, otherwise (to hide it) `dict` of functions `{port_name : mask_function}` each `mask_function` returns bool """ for port_name, port_type in new_inputs.items(): # append self._transition_label_def[port_name] = port_type # inform inputs self.inputs[port_name] = port_type # printing format self._transition_dot_label_format[port_name] = str(port_name) if masks is None: continue if port_name in masks: mask_func = masks[port_name] self._transition_dot_mask[port_name] = mask_func def add_state_vars( self, new_state_vars: dict ) -> None: for var_name, var_type in new_state_vars.items(): # append self._state_label_def[var_name] = var_type # inform state vars self.state_vars[var_name] = self._state_label_def[var_name] # printing format self._state_dot_label_format[var_name] = str(var_name)
class MooreMachine( Transducer): """Moore machine. A Moore machine implements the discrete dynamics: ``` x[k + 1] = f(x[k], u[k]) y[k] = g(x[k]) ``` where: - `k`: discrete time = sequence index - `x`: state = valuation of state variables - `X`: set of states = `S` - `u`: inputs = valuation of input ports - `y`: output actions = valuation of output ports - `f`: `X-> 2^X`, transition function - `g`: `X-> Out`, output function Observe that the output depends only on the state. Note ==== valuation: assignment of values to each port Reference ========= [M56]( https://tulip-control.sourceforge.io/doc/bibliography.html#m56) """ def __init__(self): """Instantiate a Moore state machine.""" Transducer.__init__(self) self.dot_node_shape = dict( normal='ellipse') self.default_export_fname = 'moore' def __str__(self): """Get informal string representation.""" s = ( f'{_hl}\nMoore Machine: {self.name}\n{_hl}\n' + 'State Variables:\n\t(name : type)\n' + _print_ports(self.state_vars) + 'Input Ports:\n\t(name : type)\n' + _print_ports(self.inputs) + 'Output Ports:\n\t(name : type)\n' + _print_ports(self.outputs) + 'States & State Var Values: (state : outputs : vars)\n') for state, label_dict in self.states(data=True): s += f'\t{state} :\n' # split into vars and outputs var_values = { k: v for k, v in label_dict.items() if k in self.state_vars} output_values = { k: v for k, v in label_dict.items() if k in self.outputs} s += ( f'{_print_label(var_values)} : ' f'{_print_label(output_values)}') s += ( 'Initial States:\n' + _pp.pformat(self.states.initial, indent=3) + 2 * '\n') s += 'Transitions & Labels: (from --> to : label)\n' for from_state, to_state, label_dict in self.transitions(data=True): s += ( f'\t{from_state} ---> ' f'{to_state} :\n' + _print_label(label_dict)) s += f'{_hl}\n' return s def add_outputs( self, new_outputs: dict, masks: dict | None=None ) -> None: for port_name, port_type in new_outputs.items(): # append self._state_label_def[port_name] = port_type # inform state vars self.outputs[port_name] = port_type # printing format self._state_dot_label_format[port_name] = ( f'/{port_name}') if masks is None: continue if port_name in masks: mask_func = masks[port_name] self._state_dot_mask[port_name] = mask_func class MealyMachine( Transducer): """Mealy machine. Examples ======== Traffic Light: Fig. 3.14, p.72 [LS11]( https://tulip-control.sourceforge.io/doc/bibliography.html#ls11) ```python m = MealyMachine() pure_signal = {'present', 'absent'} m.add_inputs([('tick', pure_signal) ]) m.add_outputs([('go', pure_signal), ('stop', pure_signal) ]) m.states.add_from(['red', 'green', 'yellow']) m.states.initial.add('red') ``` For brevity: ```python p = 'present' a = 'absent' ``` The transitions can equivalently be defined with dict(). So instead of the previous `m.transitions.add`, we can use: ```python label = dict( tick=p, go=p, stop=a) m.transitions.add('red', 'green', **label) label = dict( tick=p, go=a, stop=p) m.transitions.add('green', 'yellow', **label) label = dict( tick=p, go=a, stop=p) m.transitions.add('yellow', 'red', **label) ``` This avoids any ordering issues, i.e., changing the order of the sublabels does not matter: ```python label = dict( go=p, tick=p, stop=a) m.transitions.add('red', 'green', **label) ``` Theory ====== A Mealy machine implements the discrete dynamics: ``` x[k + 1] = f(x[k], u[k]) y[k] = g(x[k], u[k]) ``` where: - `k`: discrete time = sequence index - `x`: state = valuation of state variables - `X`: set of states = S - `u`: inputs = valuation of input ports - `y`: output actions = valuation of output ports - `f`: `X -> 2^X`, transition function - `g`: `X -> Out`, output function Observe that the output is defined when a reaction occurs to an input. Note ==== valuation: assignment of values to each port Reference ========= [M55]( https://tulip-control.sourceforge.io/doc/bibliography.html#m55) """ def __init__(self): Transducer.__init__(self) # will point to selected values of # self._transition_label_def self.dot_node_shape = dict( normal='ellipse') self.default_export_fname = 'mealy' def __str__(self): """Get informal string representation.""" s = ( f'{_hl}\nMealy Machine: {self.name}\n{_hl}\n' + 'State Variables:\n\t(name : type)\n' + _print_ports(self.state_vars)) s += 'States & State Var Values:\n' for state, label_dict in self.states(data=True): s += ( f'\t{state} :\n' f'{_print_label(label_dict)}') s += ( 'Initial States:\n' + _pp.pformat(self.states.initial, indent=3) + 2 * '\n' + 'Input Ports:\n\t(name : type)\n' + _print_ports(self.inputs) + 'Output Ports:\n\t(name : type)\n' + _print_ports(self.outputs) + 'Transitions & Labels: (from --> to : label)\n') for from_state, to_state, label_dict in self.transitions(data=True): s += ( f'\t{from_state} ---> ' f'{to_state} :\n' + _print_label(label_dict)) s += f'{_hl}\n' return s def _save( self, path: str, fileformat: _ty.Literal['scxml'] ) -> bool: """Export options available only for Mealy machines.""" if fileformat != 'scxml': return False s = _scxml.mealy2scxml(self) # dump to file with open(path, 'w') as f: f.write(s) return True def add_outputs( self, new_outputs: dict, masks: dict | None=None ) -> None: """Add new outputs. @param new_outputs: dict of pairs {port_name : port_type} where: - port_name: str - port_type: Iterable | check class @param masks: - keys are port_names (see arg: new_outputs) - values are custom mask functions, for each sublabel based on its current value. Each such function returns: - True, if the sublabel should be shown - False, otherwise (to hide it) """ for port_name, port_type in new_outputs.items(): # append self._transition_label_def[port_name] = port_type # inform state vars self.outputs[port_name] = ( self._transition_label_def[port_name]) # printing format self._transition_dot_label_format[port_name] = ( f'/{port_name}') if masks is None: continue if port_name in masks: mask_func = masks[port_name] self._transition_dot_mask[port_name] = mask_func def reaction( self, from_state, inputs: dict[str, ...], lazy: bool=False ) -> tuple[ _ty.Any, dict[str, ...]]: """Return next state and output, when reacting to given inputs. The machine must be deterministic. (for each state and input at most a single transition enabled, this notion does not coincide with output-determinism) Not exactly a wrapper of `Transitions.find`, because it matches only that part of an edge label that corresponds to the inputs. @param from_state: transition starts from this state. element of `self.states` @param inputs: `dict` that assigns a valid value to each input port. {'port_name': port_value, ...} @param lazy: Lazy evaluation of inputs? If lazy=True, then allow an incomplete specification of input if there is precisely one enabled transition. @return: output values and next state. `(next_state, outputs)` where `outputs`: `{'port_name': port_value, ...}` """ if lazy: restricted_inputs = set(self.inputs).intersection(inputs.keys()) else: restricted_inputs = self.inputs # match only inputs (explicit valuations, not symbolic) enabled_trans = [ (i, j, d) for i, j, d in self.edges([from_state], data=True) if project_dict(d, restricted_inputs) == inputs] if not enabled_trans: some_possibilities = list() for _, _, d in self.edges([from_state], data=True): # The number of possible inputs to suggest here is # arbitrary. Consider making it a function parameter. if len(some_possibilities) >= 5: break possible_inputs = project_dict(d, restricted_inputs) if possible_inputs not in some_possibilities: some_possibilities.append(possible_inputs) # must be deterministic try: (_, next_state, attr_dict), = enabled_trans except ValueError as error: if not enabled_trans: if not some_possibilities: raise ValueError( f'state {from_state} is a dead-end. ' 'There are no possible inputs from ' 'it.' ) from error else: raise ValueError( 'not a valid input, ' 'some possible inputs include: ' f'{some_possibilities}' ) from error else: raise ValueError( 'must be input-deterministic, ' 'found enabled transitions: ' f'{enabled_trans}' ) from error outputs = project_dict(attr_dict, self.outputs) return (next_state, outputs) def reactionpart( self, from_state, inputs: dict[str, ...] ) -> tuple[ _ty.Any, dict[str, ...]]: """Wraps `reaction()` with `lazy=True`.""" return self.reaction( from_state, inputs, lazy=True) def run( self, from_state: _ty.Optional=None, input_sequences: dict[..., list] | None=None): """Guided or interactive run. @param input_sequences: if `None`, then call `interactive_run`, otherwise call `guided_run`. @return: output of `guided_run`, otherwise `None`. """ if input_sequences is None: interactive_run( self, from_state=from_state) else: return guided_run( self, from_state=from_state, input_sequences=input_sequences) def guided_run( mealy: MealyMachine, from_state: _ty.Optional=None, input_sequences: dict[..., list] | None=None ) -> tuple[ list, dict[..., list]]: """Run deterministic machine reacting to given inputs. @param from_state: start simulation @param mealy: input-deterministic Mealy machine @param from_state: start simulation at this state. If `None`, then use the unique initial state `Sinit`. @param input_sequences: one sequence of values for each input port @return: sequence of states and sequence of output valuations `(states, output_sequences)` where: - `states` not containing `from_state` """ seqs = input_sequences # abbreviation missing_ports = set(mealy.inputs).difference(seqs) if missing_ports: raise ValueError( 'missing input port(s): ' f'{missing_ports}') # dict of lists ? non_lists = { k: v for k, v in seqs.items() if not isinstance(v, list)} if non_lists: raise TypeError( 'Values must be lists, ' f'for: {non_lists}') # uniform list len ? cardinalities = set(map( len, seqs.values())) if len(cardinalities) > 1: raise ValueError( 'All input sequences must be of equal length.') # note: initial sys state # non-determinism not checked # initial sys edge non-determinism # checked instead (more restrictive) if from_state is None: state = next(iter(mealy.states.initial)) else: state = from_state n = len(next(iter(seqs.values()))) states_seq = list() output_seqs = { k: list() for k in mealy.outputs} for i in range(n): inputs = { k: v[i] for k, v in seqs.items()} state, outputs = mealy.reaction(state, inputs) states_seq.append(state) for k in output_seqs: output_seqs[k].append( outputs[k]) return ( states_seq, output_seqs) def random_run( mealy: MealyMachine, from_state: _ty.Optional=None, N: int=10 ) -> tuple[ list, dict[str, list]]: """Return run from given state for N random inputs. Inputs selected randomly in a way that does not block the machine So they are not arbitrarily random. If the machine is a valid synthesis solution, then all safe environment inputs can be generated this way. Randomly generated inputs may violate liveness assumption on environment. @param mealy: input-deterministic Mealy machine @param N: number of reactions (inputs) @return: same as `guided_run` """ if from_state is None: state = next(iter(mealy.states.initial)) else: state = from_state states_seq = list() output_seqs = { k: list() for k in mealy.outputs} for i in range(N): trans = mealy.transitions.find([state]) # choose next transition selected_trans = random.choice(list(trans)) _, new_state, attr_dict = selected_trans # extend execution trace states_seq.append(new_state) # extend output traces outputs = project_dict( attr_dict, mealy.outputs) for k in output_seqs: output_seqs[k].append(outputs[k]) # updates old_state = state state = new_state # printing inputs = project_dict( attr_dict, mealy.inputs) print( f'move from\n\t state: {old_state}' f'\n\t with input: {inputs}' f'\n\t to state: {new_state}' '\n\t reacting by producing ' f'output: {outputs}') return ( states_seq, output_seqs) def interactive_run( mealy: MealyMachine, from_state: _ty.Optional=None ) -> None: """Run input-deterministic Mealy machine using user input. @param mealy: input-deterministic Mealy machine """ if from_state is None: state = next(iter(mealy.states.initial)) else: state = from_state while True: print( f'\n Current state: {state}') if _interactive_run_step(mealy, state) is None: break def _interactive_run_step( mealy: MealyMachine, state ) -> bool | None: if state is None: raise Exception('Current state is None') # note: the spaghettiness of # previous version was caused # by interactive simulation allowing # both output-non-determinism and # implementing spawning (which # makes sense only for generators, # *not* for transducers) trans = mealy.transitions.find([state]) if not trans: print('Stop: no outgoing transitions.') return None while True: try: selected_trans = _select_transition(mealy, trans) except: print( 'Selection not recognized. ' 'Please try again.') if selected_trans is None: return None from_, to_state, attr_dict = selected_trans inputs = project_dict( attr_dict, mealy.inputs) outputs = project_dict( attr_dict, mealy.outputs) print( f'Moving from state: {state}' f', to state: {to_state}\n' f'given inputs: {inputs}\n' f'reacting with outputs: {outputs}') return True def _select_transition( mealy: MealyMachine, trans: list[tuple] ) -> tuple: msg = 'Found more than 1 outgoing transitions:' + 2 * '\n' for i, t in enumerate(trans): from_state, to_state, attr_dict = t inputs = project_dict( attr_dict, mealy.inputs) outputs = project_dict( attr_dict, mealy.outputs) msg += ( f'\t{i} : ' f'{from_state} ---> {to_state}\n' f'\t inputs: {inputs}' f'\t outputs: {outputs}\n\n') msg += ( '\n' 'Select from the available transitions above\n' 'by giving its integer,\n' 'Press "Enter" to stop the simulation:\n' '\t int = ') print(msg) id_selected = sys.stdin.readline().rstrip('\r\n') if not id_selected: return None return trans[int(id_selected)] def moore2mealy( moore: MooreMachine ) -> MealyMachine: """Convert Moore machine to equivalent Mealy machine. Reference ========= [LS11]( https://tulip-control.sourceforge.io/doc/bibliography.html#ls11) """ if not isinstance(moore, MooreMachine): raise TypeError( 'moore must be a MooreMachine') mealy = MealyMachine() # cp inputs for port_name, port_type in moore.inputs.items(): mask_func = moore._transition_dot_mask.get(port_name) if mask_func is None: masks = None else: masks = {port_name: mask_func} mealy.add_inputs({port_name: port_type}, masks=masks) # cp outputs for port_name, port_type in moore.outputs.items(): mask_func = moore._state_dot_mask.get(port_name) if mask_func is None: masks = None else: masks = {port_name: mask_func} mealy.add_outputs({port_name: port_type}, masks=masks) # cp states mealy.states.add_from(moore.states()) mealy.states.initial.add_from(moore.states.initial) # cp transitions for si in moore: output_values = { k: v for k, v in moore.states[si].items() if k in moore.outputs} output_values = copy.deepcopy(output_values) for si_, sj, attr_dict in moore.transitions.find(si): # note that we don't filter only input ports, # so other edge annotation is preserved attr_dict = copy.deepcopy(attr_dict) attr_dict.update(output_values) mealy.transitions.add(si, sj, attr_dict) return mealy def mealy2moore( mealy: MealyMachine ) -> MooreMachine: """Convert Mealy machine to almost equivalent Moore machine. A Mealy machine cannot be transformed to an equivalent Moore machine. It can be converted to a Moore machine with an arbitrary initial output, which outputs the Mealy output at its next reaction. Reference ========= [LS11]( https://tulip-control.sourceforge.io/doc/bibliography.html#ls11) """ # TODO: check for when Mealy is exactly convertible to Moore if not isinstance(mealy, MealyMachine): raise TypeError( 'moore must be a MealyMachine') moore = MooreMachine() # cp inputs for port_name, port_type in mealy.inputs.items(): mask_func = mealy._transition_dot_mask.get(port_name) if mask_func is None: masks = None else: masks = {port_name: mask_func} moore.add_inputs({port_name: port_type}, masks=masks) # cp outputs for port_name, port_type in mealy.outputs.items(): mask_func = mealy._transition_dot_mask.get(port_name) if mask_func is None: masks = None else: masks = {port_name: mask_func} moore.add_outputs({port_name: port_type}, masks=masks) # initial state with arbitrary label out = { k: list(v)[0] for k, v in mealy.outputs.items()} s0 = list(mealy.states.initial)[0] # create maps between Moore and Mealy states moore2mealy_states = dict() # {qj: si} (function) mealy2moore_states = dict() # {si: {qj, qk, ...}} (relation) new_s0 = _create_state_str( s0, out, moore, moore2mealy_states, mealy2moore_states) moore.states.add(new_s0, out) moore.states.initial.add(new_s0) # cp transitions and create appropriate states Q = set() S = set() Q.add(new_s0) S.add(new_s0) while Q: new_si = Q.pop() si = moore2mealy_states[new_si] for si_, sj, attr_dict in mealy.transitions.find(si): in_values, out_values = _split_io(attr_dict, mealy) new_sj = _create_state_str( sj, out_values, moore, moore2mealy_states, mealy2moore_states) moore.transitions.add(new_si, new_sj, in_values) if new_sj not in S: Q.add(new_sj) S.add(new_sj) return moore def _print_ports( port_dict: dict ) -> str: def print_port(name_type): port_name, port_type = name_type return ( f'\t{port_name} : ' f'{_pp.pformat(port_type)}\n') text = ''.join(map( print_port, port_dict.items())) return f'{text}\n' def _print_label( label_dict: dict ) -> str: def print_label(name_value): name, value = name_value return f'\t\t{name} : {value}\n' text = ''.join(map( print_label, label_dict.items())) return f'{text}\n' def _create_state_str( mealy_state, output, moore: MooreMachine, moore2mealy_states: dict, mealy2moore_states: dict ) -> str: """Used to create Moore states when converting Mealy -> Moore.""" for s in mealy2moore_states.setdefault(mealy_state, set()): # check output values if moore.states[s] == output: return s # create new s = f's{len(moore)}' moore.states.add(s, output) moore2mealy_states[s] = mealy_state mealy2moore_states[mealy_state].add(s) return s def _split_io( attr_dict: dict, machine: Transducer ) -> tuple[ dict[str, ...], dict[str, ...]]: """Split into inputs and outputs.""" input_values = { k: v for k, v in attr_dict.items() if k in machine.inputs} output_values = { k: v for k, v in attr_dict.items() if k in machine.outputs} return input_values, output_values def project_dict( x: _abc.Mapping, y: _abc.Container ) -> dict: return { k: x[k] for k in x if k in y} def trim_dict( x: _abc.Mapping, y: _abc.Container ) -> dict: return { k: x[k] for k in x if k not in y} def strip_ports( mealy: MooreMachine, names: _abc.Iterable[str] ) -> MealyMachine: """Remove ports in `names`. For example, to remove the atomic propositions labeling the transition system `ts` used (so they are dependent variables), call it as: ```python strip_ports(mealy, ts.atomic_propositions) ``` """ new = MealyMachine() def trim(ports): return trim_dict( ports, names) new.add_inputs(trim( mealy.inputs)) new.add_outputs(trim( mealy.outputs)) new.add_nodes_from(mealy) new.states.initial.add_from( mealy.states.initial) edges = mealy.edges(data=True) for u, v, d in edges: d = trim_dict(d, names) new.add_edge(u, v, **d) return new