Source code for abstract.prop2partition

# Copyright (c) 2011-2014 by California Institute of Technology
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the California Institute of Technology nor
#    the names of its contributors may be used to endorse or promote
#    products derived from this software without specific prior
#    written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL CALTECH
# OR THE CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
"""Proposition-preserving partition module."""
import copy
import logging
import typing as _ty
import warnings

import numpy as np
import polytope as pc
import polytope.plot as _pplot
import scipy.sparse as sp

import tulip.graphics as _graphics
_mpl = _graphics._mpl
import tulip.transys as trs


__all__ = [
    'PropPreservingPartition',
    'PPP',
    'prop2part',
    'part2convex',
    'pwa_partition',
    'add_grid',
    'ppp2ts']


_hl = 40 * '-'
_logger = logging.getLogger(__name__)
Polytope = (
    pc.Polytope |
    pc.Region)


[docs] class PropPreservingPartition( pc.MetricPartition): """Partition class with following fields: Attributes: - `domain`: the domain we want to partition - `regions`: `Region`s of proposition-preserving partition - `adj`: sparse matrix showing which regions are adjacent. The order of `Region`s is the same as in the `list` `regions`. - type: `scipy` `lil` `sparse` - `prop_regions`: map from atomic proposition symbols to continuous subsets Relevant ======== `prop2part` """ def __init__( self, domain: Polytope | None=None, regions: list[Polytope] | None=None, adj: sp.lil_array | None=None, prop_regions: dict[..., Polytope] | None=None, check: bool=True): if regions is None: regions = list() if prop_regions is None: self.prop_regions = None else: try: # don't call it # use try because it should work # vs hasattr, which would look like normal selection prop_regions.keys except: raise TypeError( '`prop_regions` must be `dict`.' f'Got instead: {type(prop_regions)}') raise TypeError(msg) self.prop_regions = copy.deepcopy(prop_regions) n = len(regions) if hasattr(adj, 'shape'): m, k = adj.shape if m != k: raise ValueError('adj must be square') if m != n: raise ValueError( '`adj` size does not agree ' 'with number of regions') self.regions = regions[:] if check: for region in regions: if not region <= domain: raise ValueError( f'Partition: Region:\n\n{region}\n' 'is not subset of given domain:\n\t' f'{domain}') self.is_symbolic() self.domain = domain super().__init__(domain) self.adj = adj def reg2props( self, region_index: int ) -> set: return self.regions[region_index].props.copy() # TODO: iterator over pairs # TODO: use `networkx` graph to store partition def is_symbolic(self) -> None: """Check that the set of preserved predicates are bijectively mapped to the symbols. Symbols = Atomic Propositions """ if self.prop_regions is None: logging.warning( 'No continuous propositions defined.') return for region in self.regions: if region.props <= set(self.prop_regions): continue raise ValueError( 'Partitions: Region labeled with propositions:\n\t' f'{region.props}\n' 'not all of which are in the ' 'continuous atomic propositions:\n\t' f'{set(self.prop_regions)}') def preserves_predicates(self) -> bool: """Return `True` if each `Region` <= Predicates for the predicates in `prop_regions.values`, where `prop_regions` is a bijection to "continuous" propositions of the specification's alphabet. Note ==== 1. `prop_regions` in practice need not be injective. It doesnt hurt - though creates unnecessary redundancy. 2. The specification alphabet is fixed an user-defined. It should be distinguished from the auxiliary alphabet generated automatically during abstraction, which defines another partition with its own bijection to TS. """ all_props = set(self.prop_regions) for region in self.regions: # Propositions True in Region for prop in region.props: preimage = self.prop_regions[prop] if not region <= preimage: return False # Propositions False in Region for prop in all_props.difference(region.props): preimage = self.prop_regions[prop] if region.intersect(preimage).volume > pc.polytope.ABS_TOL: return False return True def __str__(self): """Get informal string representation.""" s = ( f'\n{_hl}\n' 'Proposition Preserving Partition:\n' + _hl + 2 * '\n' + f'Domain: {self.domain}\n') for j, region in enumerate(self.regions): s += f'Region: {j}\n' if self.prop_regions is not None: s += '\t Propositions: ' active_props = ' '.join(region.props) if active_props: s += f'{active_props}\n' else: s += '{}\n' s += str(region) if hasattr(self.adj, 'toarray'): s += 'Adjacency matrix:\n' s += f'{self.adj.toarray()}\n' return s def plot( self, trans: np.ndarray=None, ppp2trans: list | None=None, only_adjacent: bool=False, ax: _ty.Union[ '_mpl.axes.Axes', None] =None, plot_numbers: bool=True, color_seed: int | None=None ) -> '_mpl.axes.Axes': """For details see `polytope.plot.plot_partition`.""" return _pplot.plot_partition( self, trans, ppp2trans, only_adjacent, ax, plot_numbers, color_seed) def plot_props( self, ax: _ty.Union[ '_mpl.axes.Axes', None] =None, text_color: str='yellow' ) -> '_mpl.axes.Axes': """Plot labeled regions of continuous propositions.""" if ax is None: ax, fig = _graphics.newax() l, u = self.domain.bounding_box ax.set_xlim(l[0,0], u[0,0]) ax.set_ylim(l[1,0], u[1,0]) for prop, poly in self.prop_regions.items(): isect_poly = poly.intersect(self.domain) isect_poly.plot( ax, color='none', hatch='/') isect_poly.text( prop, ax, color=text_color) return ax
PPP = PropPreservingPartition
[docs] def prop2part( state_space: pc.Polytope, cont_props_dict: dict[pc.Polytope] ) -> PropPreservingPartition: """ Takes a domain `state_space` and a `list` of propositions `cont_props`, and returns a proposition-preserving partition of the state-space. Relevant ======== `PropPreservingPartition`, `polytope.Polytope` @param state_space: problem domain @param cont_props_dict: propositions @return: state-space quotient partition induced by propositions """ first_poly = list() # Initial Region's polytopes first_poly.append(state_space) regions = [pc.Region(first_poly)] for cur_prop in cont_props_dict: cur_prop_poly = cont_props_dict[cur_prop] num_reg = len(regions) prop_holds_reg = list() for i in range(num_reg): #i region counter region_now = regions[i].copy() #loop for prop holds prop_holds_reg.append(0) prop_now = regions[i].props.copy() dummy = region_now.intersect(cur_prop_poly) # does cur_prop hold in dummy ? if pc.is_fulldim(dummy): dum_prop = prop_now.copy() dum_prop.add(cur_prop) # is dummy a Polytope ? if len(dummy) == 0: regions[i] = pc.Region([dummy], dum_prop) else: # dummy is a Region dummy.props = dum_prop.copy() regions[i] = dummy.copy() prop_holds_reg[-1] = 1 else: #does not hold in the whole region # (-> no need for the 2nd loop) regions.append(region_now) continue #loop for prop does not hold regions.append(pc.Region([], props=prop_now) ) dummy = region_now.diff(cur_prop_poly) if pc.is_fulldim(dummy): dum_prop = prop_now.copy() # is dummy a Polytope ? if len(dummy) == 0: regions[-1] = pc.Region( [pc.reduce(dummy)], dum_prop) else: # dummy is a Region dummy.props = dum_prop.copy() regions[-1] = dummy.copy() else: regions.pop() count = 0 for hold_count in range(len(prop_holds_reg)): if prop_holds_reg[hold_count]==0: regions.pop(hold_count-count) count+=1 mypartition = PropPreservingPartition( domain = copy.deepcopy(state_space), regions = regions, prop_regions = copy.deepcopy(cont_props_dict)) mypartition.adj = pc.find_adjacent_regions(mypartition).copy() return mypartition
def part2convex( ppp: PropPreservingPartition ) -> tuple[ PropPreservingPartition, list]: """Refine partition so that cells be convex. Takes a proposition-preserving partition, and generates another proposition-preserving partition, such that each cell in the new partition is a convex polytope. @return: refinement into convex polytopes and map from new to old Regions """ cvxpart = PropPreservingPartition( domain=copy.deepcopy(ppp.domain), prop_regions=copy.deepcopy(ppp.prop_regions)) new2old = list() for i in range(len(ppp.regions)): simplified_reg = pc.union( ppp.regions[i], ppp.regions[i], check_convex=True) for j in range(len(simplified_reg)): region_now = pc.Region( [simplified_reg[j]], ppp.regions[i].props) cvxpart.regions.append(region_now) new2old += [i] cvxpart.adj = pc.find_adjacent_regions(cvxpart).copy() return (cvxpart, new2old) def pwa_partition( pwa_sys: 'PwaSysDyn', ppp: PropPreservingPartition, abs_tol: float=1e-5 ) -> tuple[ PropPreservingPartition, list, list]: """This function takes: - a piecewise affine system `pwa_sys` and - a proposition-preserving partition `ppp` whose domain is a subset of the domain of `pwa_sys` and returns a *refined* proposition preserving partition where in each region a unique subsystem of `pwa_sys` is active. Reference ========= Modified from Petter Nilsson's code implementing merge algorithm in: Nilsson et al. `Temporal Logic Control of Switched Affine Systems with an Application in Fuel Balancing`, ACC 2012. Relevant ======== `discretize` @return: new partition and associated maps: - new partition `new_ppp` - map of `new_ppp.regions` to `pwa_sys.list_subsys` - map of `new_ppp.regions` to `ppp.regions` """ if pc.is_fulldim(ppp.domain.diff(pwa_sys.domain)): raise Exception( 'piecewise-affine system ' 'is not defined everywhere in ' 'the state-space') # for each subsystem's domain, cut it into pieces # each piece is the intersection with # a unique Region in ppp.regions new_list = list() subsys_list = list() parents = list() for i, subsys in enumerate(pwa_sys.list_subsys): for j, region in enumerate(ppp.regions): isect = region.intersect(subsys.domain) if pc.is_fulldim(isect): rc, xc = pc.cheby_ball(isect) if rc < abs_tol: warnings.warn( 'One of the regions in ' 'the refined PPP is ' 'too small, this may cause ' 'numerical problems') # not Region yet, but Polytope ? if len(isect) == 0: isect = pc.Region([isect]) # label with AP isect.props = region.props.copy() # store new Region new_list.append(isect) # keep track of original Region in ppp.regions parents.append(j) # index of subsystem active within isect subsys_list.append(i) # compute spatial adjacency matrix n = len(new_list) adj = sp.lil_array((n, n), dtype=np.int8) for i, ri in enumerate(new_list): pi = parents[i] for j, rj in enumerate(new_list[0:i]): pj = parents[j] if (ppp.adj[pi, pj] == 1) or (pi == pj): if pc.is_adjacent(ri, rj): adj[i, j] = 1 adj[j, i] = 1 adj[i, i] = 1 new_ppp = PropPreservingPartition( domain = ppp.domain, regions = new_list, adj = adj, prop_regions = ppp.prop_regions) return (new_ppp, subsys_list, parents) def add_grid( ppp: PropPreservingPartition, grid_size: float | list[float] | None=None, num_grid_pnts: int | list[int] | None=None, abs_tol: float=1e-10 ) -> PropPreservingPartition: """Refine proposition-preserving partition using grids. This function takes a proposition-preserving partition `ppp`, and the size of the grid, or the number of grids, and returns a refined proposition-preserving partition with grids. Input: - `grid_size`: the size of the grid - `num_grid_pnts`: the number of grids for each dimension Output: - partition with grids Note: There could be numerical instabilities when the continuous propositions in `ppp` do not align well with the grid, resulting in very small regions. Performace significantly degrades without `glpk`. """ if grid_size != None and num_grid_pnts != None: raise Exception( 'Only one of ' 'the grid size or number of ' 'grid points parameters is ' 'allowed to be given.') if grid_size == None and num_grid_pnts == None: raise Exception( 'At least one of the ' 'grid size or number of ' 'grid points parameters ' 'must be given.') dim = len(ppp.domain.A[0]) domain_bb = ppp.domain.bounding_box size_list = list() if grid_size != None: if isinstance(grid_size, list): if len(grid_size) == dim: size_list = grid_size else: raise Exception( '`grid_size` is not ' 'given in a correct format.') elif isinstance(grid_size, float): for i in range(dim): size_list.append(grid_size) else: raise Exception( 'grid_size is not given in ' 'a correct format.') else: if isinstance(num_grid_pnts, list): if len(num_grid_pnts) == dim: for i in range(dim): if isinstance( num_grid_pnts[i], int): grid_size=( float(domain_bb[1][i]) - float(domain_bb[0][i]) ) / num_grid_pnts[i] size_list.append(grid_size) else: raise Exception( '`num_grid_pnts` is not ' 'given in a correct format.') else: raise Exception( '`num_grid_pnts` is not given ' 'in a correct format.') elif isinstance(num_grid_pnts, int): for i in range(dim): grid_size=( float(domain_bb[1][i]) - float(domain_bb[0][i]) ) / num_grid_pnts size_list.append(grid_size) else: raise Exception( '`num_grid_pnts` is not given ' 'in a correct format.') j = 0 list_grid = dict() while j < dim: list_grid[j] = compute_interval( float(domain_bb[0][j]), float(domain_bb[1][j]), size_list[j], abs_tol) if j > 0: if j == 1: re_list = list_grid[j - 1] re_list = product_interval( re_list, list_grid[j]) else: re_list = product_interval( re_list, list_grid[j]) j += 1 new_list = list() parent = list() for i in range(len(re_list)): temp_list = list() j = 0 while j < dim * 2: temp_list.append( [re_list[i][j], re_list[i][j + 1]]) j = j + 2 for j in range(len(ppp.regions)): tmp = pc.box2poly(temp_list) isect = tmp.intersect(ppp.regions[j], abs_tol) # if pc.is_fulldim(isect): rc, xc = pc.cheby_ball(isect) if rc > abs_tol / 2: if rc < abs_tol: print( 'Warning: ' 'One of the regions in ' 'the refined PPP is too small' ', this may cause numerical problems') if len(isect) == 0: isect = pc.Region([isect], []) isect.props = ppp.regions[j].props.copy() new_list.append(isect) parent.append(j) adj = sp.lil_array( (len(new_list), len(new_list)), dtype=np.int8) for i in range(len(new_list)): adj[i, i] = 1 for j in range(i + 1, len(new_list)): todo = ( (ppp.adj[parent[i], parent[j]] == 1 or parent[i] == parent[j]) and pc.is_adjacent(new_list[i], new_list[j])) if not todo: continue adj[i, j] = 1 adj[j, i] = 1 return PropPreservingPartition( domain=ppp.domain, regions=new_list, adj=adj, prop_regions=ppp.prop_regions) #### Helper functions #### def compute_interval( low_domain: float, high_domain: float, size: float, abs_tol: float=1e-7 ) -> list[ tuple[ float, float]]: """Compute intervals for each dimension.""" list_g = list() i = low_domain while True: if (i + size + abs_tol) >= high_domain: list_g.append([i, high_domain]) break else: list_g.append([i, i + size]) i = i + size return list_g def product_interval( list1: list[float], list2: list[float] ) -> list[float]: """Combine all intervals, for any two interval lists.""" new_list = list() for m in range(len(list1)): for n in range(len(list2)): new_list.append(list1[m] + list2[n]) return new_list def ppp2ts( part: PropPreservingPartition ) -> tuple[ trs.FTS, dict]: """Derive transition system from proposition preserving partition. @param part: labeled polytopic partition from which to derive the transition system @return: `(ts, state_map)` finite transition system labeled with propositions from the given partition, and map of polytope indices to transition system states. """ # generate transition system and add transitions ofts = trs.FTS() adj = part.adj # `sp.lil_array` n = adj.shape[0] ofts_states = range(n) ofts_states = trs.prepend_with(ofts_states, 's') ofts.states.add_from(ofts_states) ofts.transitions.add_adj(adj, ofts_states) # decorate TS with state labels atomic_propositions = set(part.prop_regions) ofts.atomic_propositions.add_from(atomic_propositions) for state, region in zip(ofts_states, part.regions): state_prop = region.props.copy() ofts.states.add(state, ap=state_prop) return (ofts, ofts_states)