# Copyright (c) 2011-2016 by California Institute of Technology
# Copyright (c) 2016 by The Regents of the University of Michigan
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder(s) nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDERS OR THE CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
# IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
"""Algorithms related to discretization of continuous dynamics.
See Also
========
`find_controller`
"""
import copy
import logging
import multiprocessing as mp
import os
import pprint
import typing as _ty
import warnings
import numpy as np
import polytope as pc
import polytope.plot as _pplt
import scipy.sparse as sp
import tulip.abstract.feasible as _fsb
import tulip.abstract.plot as _aplt
import tulip.abstract.prop2partition as _p2p
import tulip.graphics as _graphics
plt = _graphics._plt
_mpl = _graphics._mpl
import tulip.hybrid as _hyb
import tulip.transys as trs
__all__ = [
'AbstractSwitched',
'AbstractPwa',
'discretize',
'discretize_switched',
'multiproc_discretize',
'multiproc_discretize_switched']
_logger = logging.getLogger(__name__)
debug = False
Polytope = (
pc.Polytope |
pc.Region)
SystemDynamics = (
_hyb.LtiSysDyn |
_hyb.PwaSysDyn)
PPP = _p2p.PropPreservingPartition
FTS = trs.FiniteTransitionSystem
class AbstractSwitched:
"""Abstraction of `SwitchedSysDyn`.
This class stores also mode-specific and
common information.
Attributes:
- `ppp`: merged partition, if any
Preserves both propositions and dynamics
- `ts`: common `TS`, if any
- `ppp2ts`: map from `ppp.regions` to `ts.states`
- `modes`: `dict` of `{mode: AbstractPwa}`
- `ppp2modes`: map from `ppp.regions` to
`modes[mode].ppp.regions` of the form:
```
mode: list
```
where `list` has same indices as `ppp.regions` and
elements in each `list` are indices of regions in
each `modes[mode].ppp.regions`.
- type: `dict`
Each partition corresponds to some mode.
(for switched systems)
In each mode, a `PwaSysDyn` is active.
"""
def __init__(
self,
ppp:
PPP |
None=None,
ts:
FTS |
None=None,
ppp2ts=None,
modes:
dict |
None=None,
ppp2modes:
dict[..., list] |
None=None):
if modes is None:
modes = dict()
self.ppp = ppp
self.ts = ts
self.ppp2ts = ppp2ts
self.modes = modes
self.ppp2modes = ppp2modes
def __str__(self):
s = (
'Abstraction of switched system\n'
f'common PPP:\n{self.ppp}'
f'common ts:\n{self.ts}')
items = [s]
for mode, ab in self.modes.items():
items.append(
f'mode: {mode}'
f', with abstraction:\n{ab}')
return ''.join(items)
def ppp2pwa(
self,
mode,
i:
int
) -> tuple[
int,
Polytope]:
"""Return original `Region` containing `Region` `i` in `mode`.
@param mode:
key of `modes`
@param i:
Region index in common partition `ppp.regions`
@return:
tuple `(j, region)` of:
- index `j` of `Region` and
- `Region` object
in `modes[mode].ppp.regions`
"""
region_idx = self.ppp2modes[mode][i]
ab = self.modes[mode]
return ab.ppp2pwa(region_idx)
def ppp2sys(
self,
mode,
i:
int
) -> tuple[
int,
_hyb.PwaSysDyn]:
"""Return index of active PWA subsystem in `mode`,
@param mode:
key of `modes`
@param i:
Region index in common partition `ppp.regions`.
@return:
tuple `(j, subsystem)` of:
- index `j` of PWA `subsystem`
- `LtiSysDyn` object `subsystem`
"""
region_idx = self.ppp2modes[mode][i]
ab = self.modes[mode]
return ab.ppp2sys(region_idx)
def plot(
self,
show_ts:
bool=False,
only_adjacent:
bool=False
) -> list['_mpl.axes.Axes']:
"""Plot mode partitions and merged partition, if one exists.
For details read `AbstractPwa.plot`.
"""
axs = list()
color_seed = 0
# merged partition exists ?
if self.ppp is not None:
for mode in self.modes:
env_mode, sys_mode = mode
edge_label = dict(
env_actions=env_mode,
sys_actions=sys_mode)
ax = _plot_abstraction(
self,
show_ts=False,
only_adjacent=False,
color_seed=color_seed)
_aplt.plot_ts_on_partition(
self.ppp, self.ts, self.ppp2ts,
edge_label, only_adjacent, ax)
axs.append(ax)
# plot mode partitions
for mode, ab in self.modes.items():
ax = ab.plot(
show_ts, only_adjacent, color_seed)
ax.set_title(
f'Abstraction for mode: {mode}')
axs.append(ax)
#if isinstance(self.ts, dict):
# for ts in self.ts:
# ax = ts.plot()
# axs.append(ax)
return axs
class AbstractPwa:
"""Discrete abstraction of PWA dynamics, with attributes:
- `ppp`: Partition into Regions.
Each Region corresponds to
a discrete state of the abstraction
- type: `PropPreservingPartition`
- `ts`: Finite transition system abstracting
the continuous system.
Each state corresponds to a Region in `ppp.regions`.
It can be fed into discrete synthesis algorithms.
- type: `FTS`
- `ppp2ts`: bijection between `ppp.regions` and `ts.states`.
Has common indices with `ppp.regions`.
Elements are states in `ts.states`.
(usually each state is a `str`)
- type: `list` of states
- `pwa`: system dynamics
- type: `PwaSysDyn`
- `pwa_ppp`: partition preserving both:
- propositions and
- domains of PWA subsystems
Used for non-conservative planning.
If just `LtiSysDyn`, then the only difference
of `pwa_ppp` from `orig_ppp` is convexification.
- type: `PropPreservingPartition`
- `orig_ppp`: partition preserving only propositions
i.e., agnostic of dynamics
- type: `PropPreservingPartition`
- `disc_params`: parameters used in discretization that
should be passed to the controller refinement
to ensure consistency
- type: dict
If any of the above is not given,
then it is initialized to `None`.
Notes
=====
1. There could be some redundancy in `ppp` and `ofts`,
in that they are both decorated with propositions.
This might be useful to keep each of
them as functional units on their own
(possible to change later).
2. The 'Pwa' in `AbstractPwa` includes `LtiSysDyn`
as a special case.
"""
def __init__(
self,
ppp=None,
ts=None,
ppp2ts=None,
pwa=None,
pwa_ppp=None,
ppp2pwa=None,
ppp2sys=None,
orig_ppp=None,
ppp2orig=None,
disc_params=None):
if disc_params is None:
disc_params = dict()
self.ppp = ppp
self.ts = ts
self.ppp2ts = ppp2ts
# pwa
self.pwa = pwa
self.pwa_ppp = pwa_ppp
self._ppp2pwa = ppp2pwa
self._ppp2sys = ppp2sys
# mapping
self.orig_ppp = orig_ppp
self._ppp2orig = ppp2orig
# original_regions -> pwa_ppp
# ppp2orig -> ppp2pwa_ppp
# ppp2pwa -> ppp2pwa_sys
self.disc_params = disc_params
def __str__(self):
return (
f'{self.ppp}{self.ts}' +
30 * '-' + '\n'
'Map PPP Regions ---> TS states:\n'
f'{self._ppp2other_str(self.ppp2ts)}\n'
'Map PPP Regions ---> PWA PPP Regions:\n'
f'{self._ppp2other_str(self._ppp2pwa)}\n'
'Map PPP Regions ---> PWA Subsystems:\n'
f'{self._ppp2other_str(self._ppp2sys)}\n'
'Map PPP Regions ---> Original PPP Regions:\n'
f'{self._ppp2other_str(self._ppp2orig)}\n'
'Discretization Options:\n\t'
f'{pprint.pformat(self.disc_params)}\n')
def ts2ppp(self, state):
region_index = self.ppp2ts.index(state)
region = self.ppp[region_index]
return (region_index, region)
def ppp2trans(
self,
region_index:
int
) -> tuple[
Polytope,
_hyb.LtiSysDyn]:
"""Return the transition set constraint and active subsystem,
for non-conservative planning.
"""
reg_idx, pwa_region = self.ppp2pwa(region_index)
sys_idx, sys = self.ppp2sys(region_index)
return pwa_region, sys
def ppp2pwa(
self,
region_index:
int
) -> tuple[
int,
pc.Region]:
"""Return dynamics and predicate-preserving region
and its index for PWA subsystem active in given region.
The returned region is the `trans_set` used for
non-conservative planning.
@param region_index:
index in `ppp.regions`.
"""
j = self._ppp2pwa[region_index]
pwa_region = self.pwa_ppp[j]
return (j, pwa_region)
def ppp2sys(
self,
region_index:
int
) -> tuple[
int,
pc.Region]:
"""Return index and PWA subsystem active in indexed region.
Semantics: `j`-th sub-system is active in `i`-th Region,
where `j = ppp2pwa[i]`
@param region_index:
index in `ppp.regions`.
"""
# LtiSysDyn ?
if self._ppp2sys is None:
return (0, self.pwa)
subsystem_idx = self._ppp2sys[region_index]
subsystem = self.pwa.list_subsys[subsystem_idx]
return (subsystem_idx, subsystem)
def ppp2orig(
self,
region_index:
int
) -> tuple[
int,
pc.Region]:
"""Return index and region of original partition.
The original partition is without any dynamics,
not even the PWA domains, only the polytopic predicates.
@param region_index:
index in `ppp.regions`.
"""
j = self._ppp2orig[region_index]
orig_region = self.orig_ppp[j]
return (j, orig_region)
def _ppp2other_str(
self,
ppp2other:
list
) -> str:
if ppp2other is None:
return ''
c = list()
for i, other in enumerate(ppp2other):
c.append(f'\t\t{i} -> {other}\n')
return ''.join(c)
def _debug_str_(self) -> str:
return (
f'{self.ppp}'
f'{self.ts}'
'(PWA + Prop)-Preserving Partition'
f'{self.pwa_ppp}'
'Original Prop-Preserving Partition'
f'{self.orig_ppp}')
def plot(
self,
show_ts:
bool=False,
only_adjacent:
bool=False,
color_seed:
int |
None=None
) -> '_mpl.axes.Axes':
"""Plot partition and optionally feasible transitions.
@param show_ts:
plot feasible transitions on partition
@param only_adjacent:
plot feasible transitions only
between adjacent regions. This reduces clutter,
but if `horizon > 1` and not all horizon used,
then some transitions could be hidden.
"""
ax = _plot_abstraction(
self, show_ts, only_adjacent, color_seed)
return ax
def verify_transitions(self) -> None:
_logger.info('verifying transitions...')
for from_state, to_state in self.ts.transitions():
i, from_region = self.ts2ppp(from_state)
j, to_region = self.ts2ppp(to_state)
trans_set, sys = self.ppp2trans(i)
params = {'N', 'close_loop', 'use_all_horizon'}
disc_params = {
k: v
for k, v in self.disc_params.items()
if k in params}
s0 = _fsb.solve_feasible(
from_region, to_region, sys,
trans_set=trans_set,
**disc_params)
msg = f'{i} ---> {j}'
if not from_region <= s0:
_logger.error(
f'incorrect transition: {msg}')
isect = from_region.intersect(s0)
ratio = isect.volume /from_region.volume
_logger.error(
f'intersection volume: {ratio} %')
else:
_logger.info(
f'correct transition: {msg}')
def _plot_abstraction(
ab:
AbstractPwa,
show_ts:
bool,
only_adjacent:
bool,
color_seed:
int
) -> _ty.Union[
'_mpl.axes.Axes',
None]:
if ab.ppp is None or ab.ts is None:
warnings.warn('Either ppp or ts is None.')
return
if show_ts:
ts = ab.ts
ppp2ts = ab.ppp2ts
else:
ts = None
ppp2ts = None
ax = ab.ppp.plot(
ts,
ppp2ts,
only_adjacent=only_adjacent,
color_seed=color_seed)
# ax = self.ts.plot()
return ax
[docs]
def discretize(
part:
PPP,
ssys:
SystemDynamics,
N:
int=10,
min_cell_volume:
float=0.1,
closed_loop:
bool=True,
conservative:
bool=False,
max_num_poly:
int=5,
use_all_horizon:
bool=False,
trans_length:
int=1,
remove_trans:
bool=False,
abs_tol:
float=1e-7,
plotit:
bool=False,
save_img:
bool=False,
cont_props:
list[pc.Polytope] |
None=None,
plot_every:
int=1,
simu_type:
_ty.Literal[
'bi',
'dual']
='bi'
) -> AbstractPwa:
"""Refine the partition via bisimulation or dual-simulation.
Refines the partition via either:
- bisimulation, or
- dual-simulation
algorithms, and establish transitions
based on reachability analysis.
Reference
=========
[NOTM12](
https://tulip-control.sourceforge.io/doc/bibliography.html#notm12)
Relevant
========
`prop2partition.pwa_partition`,
`prop2partition.part2convex`
@param N:
horizon length
@param min_cell_volume:
the minimum volume of
cells in the resulting partition.
@param closed_loop:
boolean indicating whether
the `closed loop` algorithm should be used.
(default is `True`)
@param conservative:
- `True`: force sequence in reachability analysis
to stay inside starting cell
- `False`: safety is ensured by keeping the
sequence inside a convexified version of
the original proposition-preserving cell.
@param max_num_poly:
maximum number of
polytopes in a region to use in
reachability analysis.
@param use_all_horizon:
in closed-loop algorithm:
if we should look for reachability also
in less than `N` steps.
@param trans_length:
the number of polytopes
allowed to cross in a transition.
A value of `1` checks transitions
only between neighbors, a value of `2` checks
neighbors of neighbors and so on.
@param remove_trans:
if `True`, then remove
the found transitions between non-neighbors.
@param abs_tol:
maximum volume for an "empty" polytope
@param plotit:
plot partitioning as it evolves
@param save_img:
save snapshots of partitioning
to PDF files,
requires `plotit=True`
@param cont_props:
continuous propositions to plot
@param simu_type:
- `'bi'` (default): use bisimulation partition
- `'dual'`: use dual-simulation partition
"""
match simu_type:
case 'bi':
_discretize = _discretize_bi
case 'dual':
_discretize = _discretize_dual
case _:
raise ValueError(
'Unknown simulation '
f'type: "{simu_type}"')
return _discretize(
part, ssys, N, min_cell_volume,
closed_loop, conservative,
max_num_poly, use_all_horizon,
trans_length, remove_trans,
abs_tol,
plotit, save_img, cont_props,
plot_every)
def _discretize_bi(
part:
PPP,
ssys:
SystemDynamics,
N:
int=10,
min_cell_volume:
float=0.1,
closed_loop:
bool=True,
conservative:
bool=False,
max_num_poly:
int=5,
use_all_horizon:
bool=False,
trans_length:
int=1,
remove_trans:
bool=False,
abs_tol:
float=1e-7,
plotit:
bool=False,
save_img:
bool=False,
cont_props:
list[pc.Polytope] |
None=None,
plot_every:
int=1
) -> AbstractPwa:
"""Refine partition, based on reachability analysis.
Refines the partition, and establishes transitions
based on reachability analysis. Use bi-simulation algorithm.
Reference
=========
1. [NOTM12](
https://tulip-control.sourceforge.io/doc/bibliography.html#notm12)
2. Wagenmaker, A. J.; Ozay, N.
"A Bisimulation-like Algorithm for Abstracting Control Systems."
54th Annual Allerton Conference on CCC 2016
Relevant
========
`prop2partition.pwa_partition`,
`prop2partition.part2convex`
@param N:
horizon length
@param min_cell_volume:
the minimum volume of
cells in the resulting partition.
@param closed_loop:
boolean indicating whether
the `closed loop` algorithm should be used.
(default is `True`)
@param conservative:
- `True`: force sequence in reachability
analysis to stay inside the starting cell
- `False` (default): safety is ensured by keeping
the sequence inside a convexified version of
the original proposition-preserving cell.
@param max_num_poly:
maximum number of polytopes
in a region to use in reachability analysis.
@param use_all_horizon:
in closed-loop algorithm:
if we should look for reachability also
in less than `N` steps.
@param trans_length:
the number of polytopes allowed
to cross in a transition.
A value of `1` checks transitions
only between neighbors, a value of `2` checks
neighbors of neighbors and so on.
@param remove_trans:
if `True`, then remove
the found transitions between non-neighbors.
@param abs_tol:
maximum volume for an "empty" polytope
@param plotit:
plot partitioning as it evolves
@param save_img:
save snapshots of partitioning
to PDF files,
`requires plotit=True`
@param cont_props:
continuous propositions to plot
"""
start_time = os.times()[0]
orig_ppp = part
min_cell_volume = (
min_cell_volume /
np.finfo(np.double).eps *
np.finfo(np.double).eps)
ispwa = isinstance(ssys, _hyb.PwaSysDyn)
islti = isinstance(ssys, _hyb.LtiSysDyn)
if ispwa:
part, ppp2pwa, part2orig = _p2p.pwa_partition(ssys, part)
else:
part2orig = range(len(part))
# Save original polytopes, require them to be convex
if conservative:
orig_list = None
orig = [0]
else:
part, new2old = _p2p.part2convex(part) # convexify
part2orig = [part2orig[i] for i in new2old]
# map new regions to pwa subsystems
if ispwa:
ppp2pwa = [ppp2pwa[i] for i in new2old]
remove_trans = False
# already allowed in nonconservative
orig_list = list()
for poly in part:
if len(poly) == 0:
orig_list.append(poly.copy())
elif len(poly) == 1:
orig_list.append(poly[0].copy())
else:
raise Exception(
'problem in convexification')
orig = list(range(len(orig_list)))
# Cheby radius of disturbance set
# (defined within the loop for pwa systems)
if islti:
if len(ssys.E) > 0:
rd = ssys.Wset.chebR
else:
rd = 0.
# Initialize matrix for pairs to check
IJ = part.adj.copy().toarray()
_logger.debug(
f'\n Starting IJ: \n{IJ}')
# next line omitted in discretize_overlap
IJ = reachable_within(
trans_length, IJ, part.adj.toarray())
# Initialize output
num_regions = len(part)
transitions = np.zeros(
[num_regions, num_regions],
dtype = int)
sol = copy.deepcopy(part.regions)
adj = part.adj.copy().toarray()
# next 2 lines omitted in discretize_overlap
if ispwa:
subsys_list = list(ppp2pwa)
else:
subsys_list = None
ss = ssys
# init graphics
if plotit:
_graphics._assert_pyplot()
plt.ion()
fig, (ax1, ax2) = plt.subplots(1, 2)
ax1.axis('scaled')
ax2.axis('scaled')
file_extension = 'pdf'
else:
plt = None
iter_count = 0
# List of how many "new" regions
# have been created for each region
# and a `list` of original number of neighbors
# num_new_reg = np.zeros(len(orig_list))
# num_orig_neigh = np.sum(adj, axis=1).flatten() - 1
progress = list()
# Do the abstraction
while np.sum(IJ) > 0:
ind = np.nonzero(IJ)
# `i`, `j` swapped in discretize_overlap
i = ind[1][0]
j = ind[0][0]
IJ[j, i] = 0
si = sol[i]
sj = sol[j]
si_tmp = copy.deepcopy(si)
sj_tmp = copy.deepcopy(sj)
if ispwa:
ss = ssys.list_subsys[subsys_list[i]]
if len(ss.E) > 0:
rd, xd = pc.cheby_ball(ss.Wset)
else:
rd = 0.
if conservative:
# Don't use trans_set
trans_set = None
else:
# Use original cell as trans_set
trans_set = orig_list[orig[i]]
S0 = _fsb.solve_feasible(
si, sj, ss, N, closed_loop,
use_all_horizon, trans_set,
max_num_poly)
_logger.info(
f'\n Working with partition cells: {i}, {j}')
msg = (
f'\t{i} (#polytopes = {len(si)}), and:\n'
f'\t{j} (#polytopes = {len(sj)})\n')
if ispwa:
msg += (
'\t with active subsystem: '
f'{subsys_list[i]}\n')
msg += (
'\t Computed reachable set S0 with volume: '
f'{S0.volume}\n')
_logger.debug(msg)
# _logger.debug(r'si \cap s0')
isect = si.intersect(S0)
vol1 = isect.volume
risect, xi = pc.cheby_ball(isect)
# _logger.debug(r'si \ s0')
diff = si.diff(S0)
vol2 = diff.volume
rdiff, xd = pc.cheby_ball(diff)
# if pc.is_fulldim(pc.Region([isect]).intersect(diff)):
# logging.getLogger('tulip.polytope').setLevel(logging.DEBUG)
# diff = pc.mldivide(si, S0, save=True)
#
# ax = S0.plot()
# ax.axis([0.0, 1.0, 0.0, 2.0])
# ax.figure.savefig('./img/s0.pdf')
#
# ax = si.plot()
# ax.axis([0.0, 1.0, 0.0, 2.0])
# ax.figure.savefig('./img/si.pdf')
#
# ax = isect.plot()
# ax.axis([0.0, 1.0, 0.0, 2.0])
# ax.figure.savefig('./img/isect.pdf')
#
# ax = diff.plot()
# ax.axis([0.0, 1.0, 0.0, 2.0])
# ax.figure.savefig('./img/diff.pdf')
#
# ax = isect.intersect(diff).plot()
# ax.axis([0.0, 1.0, 0.0, 2.0])
# ax.figure.savefig('./img/diff_cap_isect.pdf')
#
# _logger.error(r'Intersection \cap Difference != \emptyset')
#
# assert(False)
if vol1 <= min_cell_volume:
_logger.warning(
'\t too small: si \\cap Pre(sj), '
'so discard intersection')
if vol1 <= min_cell_volume and isect:
_logger.warning(
'\t discarded non-empty intersection: '
'consider reducing min_cell_volume')
if vol2 <= min_cell_volume:
_logger.warning(
'\t too small: si \\ Pre(sj), so not reached it')
# We don't want our partitions to
# be smaller than the disturbance set
# Could be a problem since cheby
# radius is calculated for smallest
# convex polytope, so if we have
# a region we might throw away a good cell.
if (
vol1 > min_cell_volume and
risect > rd and
vol2 > min_cell_volume and
rdiff > rd):
# Make sure new areas are Regions
# and add proposition lists
if len(isect) == 0:
isect = pc.Region([isect], si.props)
else:
isect.props = si.props.copy()
if len(diff) == 0:
diff = pc.Region([diff], si.props)
else:
diff.props = si.props.copy()
# replace si by intersection (single state)
isect_list = pc.separate(isect)
sol[i] = isect_list[0]
# cut difference into connected pieces
difflist = pc.separate(diff)
difflist += isect_list[1:]
# n_isect = len(isect_list) - 1
num_new = len(difflist)
# add each piece, as a new state
for region in difflist:
sol.append(region)
# keep track of PWA subsystems map to new states
if ispwa:
subsys_list.append(subsys_list[i])
n_cells = len(sol)
new_idx = range(
n_cells - 1,
n_cells - num_new - 1,
-1)
#
# Update transition matrix
transitions = np.pad(
transitions,
(0, num_new),
'constant')
transitions[i, :] = np.zeros(n_cells)
for r in new_idx:
#transitions[:, r] = transitions[:, i]
# All sets reachable from star
# are reachable from both part's
# except possibly the new part
transitions[i, r] = 0
transitions[j, r] = 0
# `sol[j]` is reachable from
# intersection of `sol[i]` and `S0`
if i != j:
transitions[j, i] = 1
# `sol[j]` is reachable from
# each piece of `S0 \cap sol[i]`
# for k in range(n_cells - n_isect - 2, n_cells):
# transitions[j, k] = 1
#
# Update adjacency matrix
old_adj = np.nonzero(adj[i, :])[0]
# reset new adjacencies
adj[i, :] = np.zeros([n_cells - num_new])
adj[:, i] = np.zeros([n_cells - num_new])
adj[i, i] = 1
adj = np.pad(
adj,
(0, num_new),
'constant')
for r in new_idx:
adj[i, r] = 1
adj[r, i] = 1
adj[r, r] = 1
if not conservative:
orig = np.hstack([orig, orig[i]])
# adjacency between pieces of `isect` and `diff`
for r in new_idx:
for k in new_idx:
if r is k:
continue
if pc.is_adjacent(sol[r], sol[k]):
adj[r, k] = 1
adj[k, r] = 1
msg = ''
if _logger.getEffectiveLevel() <= logging.DEBUG:
msg += f'\t\n Adding states {i} and '
for r in new_idx:
msg += f'{r} and '
msg += '\n'
_logger.debug(msg)
for k in np.setdiff1d(old_adj, [i,n_cells - 1]):
# Every "old" neighbor must be the neighbor
# of at least one of the new
if pc.is_adjacent(sol[i], sol[k]):
adj[i, k] = 1
adj[k, i] = 1
elif remove_trans and (trans_length == 1):
# Actively remove transitions
# between non-neighbors
transitions[i, k] = 0
transitions[k, i] = 0
for r in new_idx:
if pc.is_adjacent(sol[r], sol[k]):
adj[r, k] = 1
adj[k, r] = 1
elif remove_trans and (trans_length == 1):
# Actively remove transitions
# between non-neighbors
transitions[r, k] = 0
transitions[k, r] = 0
#
# Update IJ matrix
IJ = np.pad(
IJ,
(0, num_new),
'constant')
adj_k = reachable_within(trans_length, adj, adj)
sym_adj_change(IJ, adj_k, transitions, i)
for r in new_idx:
sym_adj_change(IJ, adj_k, transitions, r)
if _logger.getEffectiveLevel() <= logging.DEBUG:
_logger.debug(
f'\n\n Updated adj: \n{adj}'
f'\n\n Updated trans: \n{transitions}'
f'\n\n Updated IJ: \n{IJ}')
_logger.info(f'Divided region: {i}\n')
elif vol2 < abs_tol:
_logger.info(f'Found: {i} ---> {j}\n')
transitions[j, i] = 1
else:
if _logger.level <= logging.DEBUG:
_logger.debug(
f'\t Unreachable: {i} --X--> {j}\n'
f'\t\t diff vol: {vol2}\n'
f'\t\t intersect vol: {vol1}\n')
else:
_logger.info('\t unreachable\n')
transitions[j, i] = 0
# check to avoid overlapping Regions
if debug:
tmp_part = PPP(
domain=part.domain,
regions=sol,
adj=sp.lil_array(adj),
prop_regions=part.prop_regions)
assert(tmp_part.is_partition())
n_cells = len(sol)
progress_ratio = 1 - float(np.sum(IJ)) / n_cells**2
progress.append(progress_ratio)
msg = (
f'\t total # polytopes: {n_cells}\n'
f'\t progress ratio: {progress_ratio}\n')
_logger.info(msg)
iter_count += 1
# no plotting ?
if not plotit:
continue
if plt is None or _pplt.plot_partition is None:
continue
if iter_count % plot_every != 0:
continue
tmp_part = PPP(
domain=part.domain,
regions=sol, adj=sp.lil_array(adj),
prop_regions=part.prop_regions)
# plot pair under reachability check
ax2.clear()
si_tmp.plot(ax=ax2, color='green')
sj_tmp.plot(
ax2, color='red', hatch='o', alpha=0.5)
_pplt.plot_transition_arrow(si_tmp, sj_tmp, ax2)
S0.plot(
ax2, color='none', hatch='/', alpha=0.3)
fig.canvas.draw()
# plot partition
ax1.clear()
_pplt.plot_partition(
tmp_part,
transitions.T,
ax=ax1,
color_seed=23)
# plot dynamics
ssys.plot(ax1, show_domain=False)
# plot hatched continuous propositions
part.plot_props(ax1)
fig.canvas.draw()
# scale view based on domain,
# not only the current polytopes si, sj
l,u = part.domain.bounding_box
ax2.set_xlim(l[0, 0], u[0, 0])
ax2.set_ylim(l[1, 0], u[1, 0])
if save_img:
fname = (
f'movie{str(iter_count).zfill(3)}'
f'.{file_extension}')
fig.savefig(fname, dpi=250)
plt.pause(1)
new_part = PPP(
domain=part.domain,
regions=sol, adj=sp.lil_array(adj),
prop_regions=part.prop_regions)
# check completeness of adjacency matrix
if debug:
tmp_part = copy.deepcopy(new_part)
tmp_part.compute_adj()
# Generate transition system and add transitions
ofts = trs.FTS()
adj = sp.lil_array(transitions.T)
n = adj.shape[0]
ofts_states = list(range(n))
ofts.states.add_from(ofts_states)
ofts.transitions.add_adj(adj, ofts_states)
# Decorate TS with state labels
atomic_propositions = set(part.prop_regions)
ofts.atomic_propositions.add_from(atomic_propositions)
for state, region in zip(ofts_states, sol):
state_prop = region.props.copy()
ofts.states.add(state, ap=state_prop)
param = dict(
N=N,
trans_length=trans_length,
closed_loop=closed_loop,
conservative=conservative,
use_all_horizon=use_all_horizon,
min_cell_volume=min_cell_volume,
max_num_poly=max_num_poly)
ppp2orig = [part2orig[x] for x in orig]
end_time = os.times()[0]
time = end_time - start_time
msg = f'Total abstraction time: {time}[sec]'
print(msg)
_logger.info(msg)
if save_img and plt is not None:
fig, ax = plt.subplots(1, 1)
plt.plot(progress)
ax.set_xlabel('iteration')
ax.set_ylabel('progress ratio')
ax.figure.savefig('progress.pdf')
return AbstractPwa(
ppp=new_part,
ts=ofts,
ppp2ts=ofts_states,
pwa=ssys,
pwa_ppp=part,
ppp2pwa=orig,
ppp2sys=subsys_list,
orig_ppp=orig_ppp,
ppp2orig=ppp2orig,
disc_params=param)
def _discretize_dual(
part:
PPP,
ssys:
SystemDynamics,
N:
int=10,
min_cell_volume:
float=0.1,
closed_loop:
bool=True,
conservative:
bool=False,
max_num_poly:
int=5,
use_all_horizon:
bool=False,
trans_length:
int=1,
remove_trans:
bool=False,
abs_tol:
float=1e-7,
plotit:
bool=False,
save_img:
bool=False,
cont_props:
list[pc.Polytope] |
None=None,
plot_every:
int=1
) -> AbstractPwa:
"""Refine partition, based on reachability analysis.
Refines the partition, and establishes transitions
based on reachability analysis.
Uses dual-simulation algorithm.
Reference
=========
1. [NOTM12](
https://tulip-control.sourceforge.io/doc/bibliography.html#notm12)
2. Wagenmaker, A. J.; Ozay, N.
"A Bisimulation-like Algorithm for Abstracting Control Systems."
54th Annual Allerton Conference on CCC 2016
Relevant
========
`prop2partition.pwa_partition`,
`prop2partition.part2convex`
@param N:
horizon length
@param min_cell_volume:
the minimum volume of
ells in the resulting partition.
@param closed_loop:
`bool` indicating whether
the `closed loop` algorithm should be used.
(default is `True`)
@param conservative:
- `True`: force sequence in reachability analysis
to stay inside starting cell.
- `False`: safety is ensured by keeping the
sequence inside a convexified version of
the original proposition-preserving cell.
@param max_num_poly:
maximum number of polytopes
in a region to use in reachability analysis.
@param use_all_horizon:
in closed-loop algorithm:
if we should look for reachability also in
less than `N` steps.
@param trans_length:
the number of polytopes
allowed to cross in a transition.
- `1`: check transitions only between neighbors,
- `2`: check neighbors of neighbors and so on.
@param remove_trans:
if `True`, then remove
found transitions between non-neighbors.
@param abs_tol:
maximum volume for an "empty" polytope
@param plotit:
plot partitioning as it evolves
@param save_img:
save snapshots of partitioning
to PDF files,
requires `plotit=True`
@param cont_props:
continuous propositions to plot
"""
start_time = os.times()[0]
orig_ppp = part
min_cell_volume = (
min_cell_volume / np.finfo(np.double).eps
* np.finfo(np.double).eps)
ispwa = isinstance(ssys, _hyb.PwaSysDyn)
islti = isinstance(ssys, _hyb.LtiSysDyn)
if ispwa:
part, ppp2pwa, part2orig = _p2p.pwa_partition(ssys, part)
else:
part2orig = range(len(part))
# Save original polytopes, require them to be convex
if conservative:
orig_list = None
orig = [0]
else:
part, new2old = _p2p.part2convex(part) # convexify
part2orig = [part2orig[i] for i in new2old]
# map new regions to pwa subsystems
if ispwa:
ppp2pwa = [ppp2pwa[i] for i in new2old]
remove_trans = False
# already allowed in nonconservative
orig_list = list()
for poly in part:
if len(poly) == 0:
orig_list.append(poly.copy())
elif len(poly) == 1:
orig_list.append(poly[0].copy())
else:
raise Exception(
'problem in convexification')
orig = list(range(len(orig_list)))
# Cheby radius of disturbance set
# (defined within the loop for pwa systems)
if islti:
if len(ssys.E) > 0:
rd = ssys.Wset.chebR
else:
rd = 0.
# Initialize matrix for pairs to check
IJ = part.adj.copy().toarray()
_logger.debug(f'\n Starting IJ: \n{IJ}')
# next line omitted in discretize_overlap
IJ = reachable_within(
trans_length, IJ, part.adj.toarray())
# Initialize output
num_regions = len(part)
transitions = np.zeros(
[num_regions, num_regions],
dtype = int)
sol = copy.deepcopy(part.regions)
adj = part.adj.copy().toarray()
# next 2 lines omitted in `discretize_overlap`
if ispwa:
subsys_list = list(ppp2pwa)
else:
subsys_list = None
ss = ssys
# init graphics
if plotit:
_graphics._assert_pyplot()
plt.ion()
fig, (ax1, ax2) = plt.subplots(1, 2)
ax1.axis('scaled')
ax2.axis('scaled')
file_extension = 'pdf'
else:
plt = None
iter_count = 0
# List of how many "new" regions
# have been created for each region
# and a `list` of original number of neighbors
# num_new_reg = np.zeros(len(orig_list))
# num_orig_neigh = np.sum(adj, axis=1).flatten() - 1
progress = list()
# Do the abstraction
while np.sum(IJ) > 0:
ind = np.nonzero(IJ)
# `i`, `j` swapped in `discretize_overlap`
i = ind[1][0]
j = ind[0][0]
IJ[j, i] = 0
si = sol[i]
sj = sol[j]
si_tmp = copy.deepcopy(si)
sj_tmp = copy.deepcopy(sj)
# num_new_reg[i] += 1
# print(num_new_reg)
if ispwa:
ss = ssys.list_subsys[subsys_list[i]]
if len(ss.E) > 0:
rd, xd = pc.cheby_ball(ss.Wset)
else:
rd = 0.
if conservative:
# Don't use trans_set
trans_set = None
else:
# Use original cell as trans_set
trans_set = orig_list[orig[i]]
S0 = _fsb.solve_feasible(
si, sj, ss, N, closed_loop,
use_all_horizon, trans_set, max_num_poly)
_logger.info(
f'\n Working with partition cells: {i}, {j}')
msg = (
f'\t{i} (#polytopes = {len(si)}), and:\n'
f'\t{j} (#polytopes = {len(sj)})\n')
if ispwa:
msg += (
'\t with active subsystem: '
f'{subsys_list[i]}\n')
msg += (
'\t Computed reachable set S0 with volume: '
f'{S0.volume}\n')
_logger.debug(msg)
# _logger.debug(r'si \cap s0')
isect = si.intersect(S0)
vol1 = isect.volume
risect, xi = pc.cheby_ball(isect)
# _logger.debug(r'si \ s0')
rsi, xd = pc.cheby_ball(si)
vol2 = si.volume - vol1
# not accurate.
# need to check polytope class
if vol1 <= min_cell_volume:
_logger.warning(
'\t too small: si \\cap Pre(sj), '
'so discard intersection')
if vol1 <= min_cell_volume and isect:
_logger.warning(
'\t discarded non-empty intersection: '
'consider reducing min_cell_volume')
if vol2 <= min_cell_volume:
_logger.warning(
'\t too small: si \\ Pre(sj), '
'so not reached it')
# indicate if S0 has exists in sol
check_isect = False
# We don't want our partitions to be
# smaller than the disturbance set
# Could be a problem since cheby radius
# is calculated for smallest
# convex polytope, so if we have a region
# we might throw away a good cell.
if (
vol1 > min_cell_volume and
risect > rd and
vol2 > min_cell_volume and
rsi > rd):
# check if the intersection has
# existed in current partitions
for idx in range(len(sol)):
if(sol[idx] == isect):
_logger.info(
f'Found: {idx} ---> {j} '
'intersection exists.\n')
transitions[j, idx] = 1
check_isect = True
if not check_isect:
# Make sure new areas are Regions
# and add proposition lists
if len(isect) == 0:
isect = pc.Region([isect], si.props)
else:
isect.props = si.props.copy()
# add intersection in sol
isect_list = pc.separate(isect)
sol.append(isect_list[0])
n_cells = len(sol)
new_idx = n_cells-1
#
# Update adjacency matrix
old_adj = np.nonzero(adj[i, :])[0]
adj = np.pad(adj, (0, 1), 'constant')
# cell i and new_idx are adjacent
adj[i, new_idx] = 1
adj[new_idx, i] = 1
adj[new_idx, new_idx] = 1
if not conservative:
orig = np.hstack([orig, orig[i]])
if _logger.getEffectiveLevel() <= logging.DEBUG:
_logger.debug(
f'\t\n Adding states {new_idx}\n')
for k in np.setdiff1d(old_adj, [i,n_cells-1]):
# Every "old" neighbor must be the neighbor
# of at least one of the new
if pc.is_adjacent(sol[new_idx], sol[k]):
adj[new_idx, k] = 1
adj[k, new_idx] = 1
elif remove_trans and (trans_length == 1):
# Actively remove transitions
# between non-neighbors
transitions[new_idx, k] = 0
transitions[k, new_idx] = 0
# Update transition matrix
transitions = np.pad(
transitions,
(0, 1),
'constant')
adj_k = reachable_within(trans_length, adj, adj)
# transitions `i` ---> `k` for `k` is
# neighbor of `new_idx` should be
# kept by `new_idx`
transitions[:, new_idx
] = np.multiply(
transitions[:, i],
adj_k[:, i])
# if `j` and `new_idx` are neighbors,
# then add `new_idx` ---> `j`
if adj_k[j, new_idx] != 0:
transitions[j, new_idx] = 1
#
# Update IJ matrix
IJ = np.pad(IJ, (0, 1), 'constant')
sym_adj_change(IJ, adj_k, transitions, i)
sym_adj_change(IJ, adj_k, transitions, new_idx)
if _logger.getEffectiveLevel() <= logging.DEBUG:
_logger.debug(
f'\n\n Updated adj: \n{adj}'
f'\n\n Updated trans: \n{transitions}'
f'\n\n Updated IJ: \n{IJ}')
_logger.info(f'Divided region: {i}\n')
elif vol2 < abs_tol:
_logger.info(f'Found: {i} ---> {j}\n')
transitions[j, i] = 1
else:
if _logger.level <= logging.DEBUG:
_logger.debug(
f'\t Unreachable: {i} --X--> {j}\n'
f'\t\t diff vol: {vol2}\n'
f'\t\t intersect vol: {vol1}\n')
else:
_logger.info('\t unreachable\n')
transitions[j, i] = 0
# check to avoid overlapping Regions
if debug:
tmp_part = PPP(
domain=part.domain,
regions=sol, adj=sp.lil_array(adj),
prop_regions=part.prop_regions)
assert(tmp_part.is_partition())
n_cells = len(sol)
progress_ratio = 1 - float(np.sum(IJ)) / n_cells**2
progress.append(progress_ratio)
_logger.info(
f'\t total # polytopes: {n_cells}\n'
f'\t progress ratio: {progress_ratio}\n')
iter_count += 1
# needs to be removed later
# if(iter_count>=700):
# break
# no plotting ?
if not plotit:
continue
if plt is None or _pplt.plot_partition is None:
continue
if iter_count % plot_every != 0:
continue
tmp_part = PPP(
domain=part.domain,
regions=sol, adj=sp.lil_array(adj),
prop_regions=part.prop_regions)
# plot pair under reachability check
ax2.clear()
si_tmp.plot(ax=ax2, color='green')
sj_tmp.plot(
ax2, color='red', hatch='o', alpha=0.5)
_pplt.plot_transition_arrow(si_tmp, sj_tmp, ax2)
S0.plot(
ax2, color='none', hatch='/', alpha=0.3)
fig.canvas.draw()
# plot partition
ax1.clear()
_pplt.plot_partition(
tmp_part,
transitions.T,
ax=ax1,
color_seed=23)
# plot dynamics
ssys.plot(ax1, show_domain=False)
# plot hatched continuous propositions
part.plot_props(ax1)
fig.canvas.draw()
# scale view based on domain,
# not only the current polytopes si, sj
l,u = part.domain.bounding_box
ax2.set_xlim(l[0, 0], u[0, 0])
ax2.set_ylim(l[1, 0], u[1, 0])
if save_img:
fname = (
f'movie{str(iter_count).zfill(3)}'
f'.{file_extension}')
fig.savefig(fname, dpi=250)
plt.pause(1)
new_part = PPP(
domain=part.domain,
regions=sol, adj=sp.lil_array(adj),
prop_regions=part.prop_regions)
# check completeness of adjacency matrix
if debug:
tmp_part = copy.deepcopy(new_part)
tmp_part.compute_adj()
# Generate transition system and add transitions
ofts = trs.FTS()
adj = sp.lil_array(transitions.T)
n = adj.shape[0]
ofts_states = list(range(n))
ofts.states.add_from(ofts_states)
ofts.transitions.add_adj(adj, ofts_states)
# Decorate TS with state labels
atomic_propositions = set(part.prop_regions)
ofts.atomic_propositions.add_from(atomic_propositions)
for state, region in zip(ofts_states, sol):
state_prop = region.props.copy()
ofts.states.add(state, ap=state_prop)
param = dict(
N=N,
trans_length=trans_length,
closed_loop=closed_loop,
conservative=conservative,
use_all_horizon=use_all_horizon,
min_cell_volume=min_cell_volume,
max_num_poly=max_num_poly)
ppp2orig = [part2orig[x] for x in orig]
end_time = os.times()[0]
dt = end_time - start_time
msg = f'Total abstraction time: {dt} [sec]'
print(msg)
_logger.info(msg)
if save_img and plt is not None:
fig, ax = plt.subplots(1, 1)
plt.plot(progress)
ax.set_xlabel('iteration')
ax.set_ylabel('progress ratio')
ax.figure.savefig('progress.pdf')
return AbstractPwa(
ppp=new_part,
ts=ofts,
ppp2ts=ofts_states,
pwa=ssys,
pwa_ppp=part,
ppp2pwa=orig,
ppp2sys=subsys_list,
orig_ppp=orig_ppp,
ppp2orig=ppp2orig,
disc_params=param)
def reachable_within(
trans_length:
int,
adj_k:
np.ndarray,
adj:
np.ndarray
) -> np.ndarray:
"""Find cells reachable within trans_length hops."""
if trans_length <= 1:
return adj_k
k = 1
while k < trans_length:
adj_k = (np.dot(adj_k, adj) != 0).astype(int)
k += 1
adj_k = (adj_k > 0).astype(int)
return adj_k
def sym_adj_change(
IJ:
np.ndarray,
adj_k:
np.ndarray,
transitions:
np.ndarray,
i:
int
) -> None:
horizontal = adj_k[i, :] - transitions[i, :] > 0
vertical = adj_k[:, i] - transitions[:, i] > 0
IJ[i, :] = horizontal.astype(int)
IJ[:, i] = vertical.astype(int)
# DEFUNCT until further notice
def discretize_overlap(
closed_loop:
bool=False,
conservative:
bool=False
) -> PPP:
"""default False.
UNDER DEVELOPMENT; function signature may change without notice.
Calling will result in NotImplementedError.
"""
raise NotImplementedError
#
# if rdiff < abs_tol:
# _logger.info("Transition found")
# transitions[i,j] = 1
#
# elif ((vol1 > min_cell_volume) & (risect > rd) &
# (num_new_reg[i] <= num_orig_neigh[i]+1)):
#
# # Make sure new cell is Region and add proposition lists
# if len(isect) == 0:
# isect = pc.Region([isect], si.props)
# else:
# isect.props = si.props.copy()
#
# # Add new state
# sol.append(isect)
# size = len(sol)
#
# # Add transitions
# transitions = np.hstack([transitions, np.zeros([size - 1, 1],
# dtype=int) ])
# transitions = np.vstack([transitions, np.zeros([1, size],
# dtype=int) ])
#
# # All sets reachable from orig cell are reachable from both cells
# transitions[size-1,:] = transitions[i,:]
# transitions[size-1,j] = 1 # j is reachable from new cell
#
# # Take care of adjacency
# old_adj = np.nonzero(adj[i,:])[0]
#
# adj = np.hstack([adj, np.zeros([size - 1, 1], dtype=int) ])
# adj = np.vstack([adj, np.zeros([1, size], dtype=int) ])
# adj[i,size-1] = 1
# adj[size-1,i] = 1
# adj[size-1,size-1] = 1
#
# for k in np.setdiff1d(old_adj,[i,size-1]):
# if pc.is_adjacent(sol[size-1],sol[k],overlap=True):
# adj[size-1,k] = 1
# adj[k, size-1] = 1
# else:
# # Actively remove (valid) transitions between non-neighbors
# transitions[size-1,k] = 0
# transitions[k,size-1] = 0
#
# # Assign original proposition cell to new state and update counts
# if not conservative:
# orig = np.hstack([orig, orig[i]])
# print(num_new_reg)
# num_new_reg = np.hstack([num_new_reg, 0])
# num_orig_neigh = np.hstack([num_orig_neigh, np.sum(adj[size-1,:])-1])
#
# _logger.info(f"\n Adding state {size - 1}\n")
#
# # Just add adjacent cells for checking,
# # unless transition already found
# IJ = np.hstack([IJ, np.zeros([size - 1, 1], dtype=int) ])
# IJ = np.vstack([IJ, np.zeros([1, size], dtype=int) ])
# horiz2 = adj[size-1,:] - transitions[size-1,:] > 0
# verti2 = adj[:,size-1] - transitions[:,size-1] > 0
# IJ[size-1,:] = horiz2.astype(int)
# IJ[:,size-1] = verti2.astype(int)
# else:
# _logger.info(f"No transition found, intersect vol: {vol1}")
# transitions[i,j] = 0
#
# new_part = PPP(
# domain=part.domain,
# regions=sol, adj=np.array([]),
# trans=transitions, prop_regions=part.prop_regions,
# original_regions=orig_list, orig=orig)
# return new_part
def multiproc_discretize(
q:
mp.Queue,
mode:
str,
ppp:
PPP,
cont_dyn:
SystemDynamics,
disc_params:
dict
) -> None:
global _logger
_logger = mp.log_to_stderr()
name = mp.current_process().name
print(
f'Abstracting mode: {mode}, on: {name}')
absys = discretize(ppp, cont_dyn, **disc_params)
q.put((mode, absys))
print(f'Worker: {name} finished.')
def multiproc_get_transitions(
q:
mp.Queue,
absys,
mode,
ssys:
SystemDynamics,
params:
dict
) -> None:
global _logger
_logger = mp.log_to_stderr()
name = mp.current_process().name
print(
'Merged transitions for '
f'mode: {mode}, on: {name}')
trans = get_transitions(
absys, mode, ssys, **params)
q.put((mode, trans))
print(f'Worker: {name} finished.')
def multiproc_discretize_switched(
ppp:
PPP,
hybrid_sys:
_hyb.SwitchedSysDyn,
disc_params:
dict |
None=None,
plot:
bool=False,
show_ts:
bool=False,
only_adjacent:
bool=True
) -> AbstractSwitched:
"""Parallel implementation of `discretize_switched`.
Uses the multiprocessing package.
"""
_logger.info('parallel `discretize_switched` started')
modes = list(hybrid_sys.modes)
mode_nums = hybrid_sys.disc_domain_size
q = mp.Queue()
mode_args = dict()
for mode in modes:
cont_dyn = hybrid_sys.dynamics[mode]
mode_args[mode] = (
q, mode, ppp,
cont_dyn, disc_params[mode])
jobs = [
mp.Process(
target=multiproc_discretize,
args=args)
for args in mode_args.values()]
for job in jobs:
job.start()
# flush before join:
# <http://stackoverflow.com/questions/19071529/>
abstractions = dict()
for job in jobs:
mode, absys = q.get()
abstractions[mode] = absys
for job in jobs:
job.join()
# merge their domains
merged_abstr, ap_labeling = merge_partitions(abstractions)
n = len(merged_abstr.ppp)
_logger.info(f'Merged partition has: {n}, states')
# find feasible transitions over merged partition
for mode in modes:
cont_dyn = hybrid_sys.dynamics[mode]
params = disc_params[mode]
mode_args[mode] = (
q, merged_abstr, mode, cont_dyn, params)
jobs = [
mp.Process(
target=multiproc_get_transitions,
args=args)
for args in mode_args.values()]
for job in jobs:
job.start()
trans = dict()
for job in jobs:
mode, t = q.get()
trans[mode] = t
# merge the abstractions, creating a common TS
merge_abstractions(
merged_abstr, trans,
abstractions, modes, mode_nums)
if plot:
plot_mode_partitions(
merged_abstr, show_ts, only_adjacent)
return merged_abstr
def discretize_switched(
ppp:
PPP,
hybrid_sys:
_hyb.SwitchedSysDyn,
disc_params:
dict[..., dict] |
None=None,
plot:
bool=False,
show_ts:
bool=False,
only_adjacent:
bool=True
) -> AbstractSwitched:
"""Abstract switched dynamics over given partition.
@param hybrid_sys:
dynamics of switching modes
@param disc_params:
discretization parameters
passed to `discretize` for each mode.
See `discretize` for details.
(`dict` keyed by mode)
@param plot:
save partition images
@param show_ts, only_adjacent:
options for
`AbstractPwa.plot`.
@return:
abstracted dynamics,
some attributes are `dict` keyed by mode
"""
if disc_params is None:
disc_params = dict(N=1, trans_length=1)
_logger.info('discretizing hybrid system')
modes = list(hybrid_sys.modes)
mode_nums = hybrid_sys.disc_domain_size
# discretize each abstraction separately
abstractions = dict()
for mode in modes:
_logger.debug(30 * '-' + '\n')
_logger.info(f'Abstracting mode: {mode}')
cont_dyn = hybrid_sys.dynamics[mode]
absys = discretize(
ppp, cont_dyn,
**disc_params[mode])
_logger.debug(
f'Mode Abstraction:\n{absys}\n')
abstractions[mode] = absys
# merge their domains
merged_abstr, ap_labeling = merge_partitions(abstractions)
n = len(merged_abstr.ppp)
_logger.info(f'Merged partition has: {n}, states')
# find feasible transitions over merged partition
trans = dict()
for mode in modes:
cont_dyn = hybrid_sys.dynamics[mode]
params = disc_params[mode]
trans[mode] = get_transitions(
merged_abstr, mode, cont_dyn,
N=params['N'],
trans_length=params['trans_length'])
# merge the abstractions, creating a common TS
merge_abstractions(
merged_abstr, trans,
abstractions, modes, mode_nums)
if plot:
plot_mode_partitions(
merged_abstr, show_ts, only_adjacent)
return merged_abstr
def plot_mode_partitions(
swab:
AbstractSwitched,
show_ts:
bool,
only_adjacent:
bool
) -> None:
"""Save each mode's partition and final merged partition."""
axs = swab.plot(show_ts, only_adjacent)
if not axs:
_logger.error('failed to plot the partitions.')
return
n = len(swab.modes)
assert len(axs) == 2*n
# annotate
for ax in axs:
plot_annot(ax)
# save mode partitions
for ax, mode in zip(axs[:n], swab.modes):
fname = f'merged_{mode}.pdf'
ax.figure.savefig(fname)
# save merged partition
for ax, mode in zip(axs[n:], swab.modes):
fname = f'part_{mode}.pdf'
ax.figure.savefig(fname)
def plot_annot(
ax:
'_mpl.axes.Axes'
) -> None:
fontsize = 5
for tick in ax.xaxis.get_major_ticks():
tick.label1.set_fontsize(fontsize)
for tick in ax.yaxis.get_major_ticks():
tick.label1.set_fontsize(fontsize)
ax.set_xlabel('$v_1$', fontsize=fontsize + 6)
ax.set_ylabel('$v_2$', fontsize=fontsize + 6)
def merge_abstractions(
merged_abstr:
AbstractSwitched,
trans:
dict[..., AbstractPwa],
abstr,
modes,
mode_nums:
list[int]
) -> None:
"""Construct merged transitions."""
# TODO: check equality of atomic proposition sets
aps = abstr[modes[0]].ts.atomic_propositions
_logger.info(f'APs: {aps}')
sys_ts = trs.FTS()
# create stats
n = len(merged_abstr.ppp)
states = range(n)
sys_ts.states.add_from(states)
sys_ts.atomic_propositions.add_from(aps)
# copy AP labels from regions to discrete states
ppp2ts = states
for i, state in enumerate(ppp2ts):
props = merged_abstr.ppp[i].props
sys_ts.states[state]['ap'] = props
# create mode actions
env_actions, sys_actions = zip(*modes)
env_actions = list(map(str, env_actions))
sys_actions = list(map(str, sys_actions))
# no env actions ?
if mode_nums[0] == 0:
actions_per_mode = {
(e, s): {'sys_actions': str(s)}
for e, s in modes}
sys_ts.sys_actions.add_from(sys_actions)
elif mode_nums[1] == 0:
# no sys actions
actions_per_mode = {
(e, s): {'env_actions': str(e)}
for e, s in modes}
sys_ts.env_actions.add_from(env_actions)
else:
actions_per_mode = {
(e, s): dict(
env_actions=str(e),
sys_actions=str(s))
for e, s in modes}
env_actions, sys_actions = zip(*modes)
sys_ts.env_actions.add_from(env_actions)
sys_ts.sys_actions.add_from(sys_actions)
for mode in modes:
env_sys_actions = actions_per_mode[mode]
adj = trans[mode]
sys_ts.transitions.add_adj(
adj = adj,
adj2states = states,
**env_sys_actions)
merged_abstr.ts = sys_ts
merged_abstr.ppp2ts = ppp2ts
def get_transitions(
abstract_sys:
AbstractSwitched,
mode,
ssys,
N:
int=10,
closed_loop:
bool=True,
trans_length:
int=1
) -> sp.lil_array:
"""Find which transitions are feasible in given mode.
Used for the candidate transitions of the merged partition.
"""
_logger.info(
'checking which transitions remain '
'feasible after merging')
part = abstract_sys.ppp
# Initialize matrix for pairs to check
IJ = part.adj.copy()
if trans_length > 1:
k = 1
while k < trans_length:
IJ = np.dot(IJ, part.adj)
k += 1
IJ = (IJ > 0).astype(int)
# Initialize output
n = len(part)
transitions = sp.lil_array(
(n, n),
dtype=int)
# Do the abstraction
n_checked = 0
n_found = 0
while np.sum(IJ) > 0:
n_checked += 1
ind = np.nonzero(IJ)
i = ind[1][0]
j = ind[0][0]
IJ[j,i] = 0
_logger.debug(f'checking transition: {i} -> {j}')
si = part[i]
sj = part[j]
# Use original cell as trans_set
trans_set = abstract_sys.ppp2pwa(mode, i)[1]
active_subsystem = abstract_sys.ppp2sys(mode, i)[1]
trans_feasible = _fsb.is_feasible(
si, sj, active_subsystem, N,
closed_loop = closed_loop,
trans_set = trans_set)
if trans_feasible:
transitions[i, j] = 1
msg = '\t Feasible transition.'
n_found += 1
else:
transitions[i, j] = 0
msg = '\t Not feasible transition.'
_logger.debug(msg)
_logger.info(f'Checked: {n_checked}')
_logger.info(f'Found: {n_found}')
assert n_checked != 0, 'would divide '
_logger.info(
f'Survived merging: {float(n_found) / n_checked} % ')
return transitions
def multiproc_merge_partitions(abstractions):
"""LOGTIME in #processors parallel merging.
Assuming sufficient number of processors.
UNDER DEVELOPMENT; function signature may
change without notice.
Calling will result in `NotImplementedError`.
"""
raise NotImplementedError
def merge_partitions(
abstractions:
dict[..., AbstractPwa]
) -> tuple[
AbstractSwitched,
dict]:
"""Merge multiple abstractions.
@param abstractions:
keyed by mode
@return:
`(merged_abstraction, ap_labeling)`
"""
if not abstractions:
warnings.warn(
'Abstractions empty, '
'nothing to merge.')
return
# consistency check
for ab1 in abstractions.values():
for ab2 in abstractions.values():
p1 = ab1.ppp
p2 = ab2.ppp
if p1.prop_regions != p2.prop_regions:
raise Exception(
'merge: partitions have '
'different sets of '
'continuous propositions')
if (
not (p1.domain.A == p2.domain.A).all() or
not (p1.domain.b == p2.domain.b).all()):
raise Exception(
'merge: partitions have '
'different domains')
# check equality of original PPP partitions
if ab1.orig_ppp == ab2.orig_ppp:
_logger.info(
'original partitions happen to be equal')
init_mode = list(abstractions.keys())[0]
all_modes = set(abstractions)
remaining_modes = all_modes.difference(set([init_mode]))
print(f'init mode: {init_mode}')
print(f'all modes: {all_modes}')
print(f'remaining modes: {remaining_modes}')
# initialize iteration data
prev_modes = [init_mode]
# Create a list of merged-together regions
ab0 = abstractions[init_mode]
regions = list(ab0.ppp)
parents = {init_mode: list(range(len(regions)))}
ap_labeling = {
i: reg.props
for i, reg in enumerate(regions)}
for cur_mode in remaining_modes:
ab2 = abstractions[cur_mode]
r = merge_partition_pair(
regions, ab2, cur_mode, prev_modes,
parents, ap_labeling)
regions, parents, ap_labeling = r
prev_modes.append(cur_mode)
new_list = regions
# build adjacency based on spatial adjacencies of
# component abstractions.
# which justifies the assumed symmetry of part1.adj, part2.adj
# Basically, if two regions are either 1) part of the same region in one of
# the abstractions or 2) adjacent in one of the abstractions, then the two
# regions are adjacent in the switched dynamics.
n_reg = len(new_list)
adj = np.zeros([n_reg, n_reg], dtype=int)
for i, reg_i in enumerate(new_list):
for j, reg_j in enumerate(new_list[0:i]):
touching = False
for mode in abstractions:
pi = parents[mode][i]
pj = parents[mode][j]
part = abstractions[mode].ppp
if part.adj[pi, pj] == 1 or pi == pj:
touching = True
break
if not touching:
continue
if pc.is_adjacent(reg_i, reg_j):
adj[i, j] = 1
adj[j, i] = 1
adj[i, i] = 1
ppp = PPP(
domain=ab0.ppp.domain,
regions=new_list,
prop_regions=ab0.ppp.prop_regions,
adj=adj)
abstraction = AbstractSwitched(
ppp=ppp,
modes=abstractions,
ppp2modes=parents)
return (abstraction, ap_labeling)
def merge_partition_pair(
old_regions:
list[pc.Region],
ab2:
AbstractPwa,
cur_mode:
tuple,
prev_modes:
list[tuple],
old_parents:
dict,
old_ap_labeling:
dict[tuple, set]
) -> tuple[
list,
dict,
dict]:
"""Merge an Abstraction with the current partition iterate.
@param old_regions:
A `list` of `Region` that is from either:
1. The ppp of the first (initial) `AbstractPwa` to be merged.
2. A list of already-merged regions
@param ab2:
Abstracted piecewise affine dynamics to be merged into the
@param cur_mode:
mode to be merged
@param prev_modes:
list of modes that have already been merged together
@param old_parents:
dict of modes that have already been merged to dict of
indices of new regions to indices of regions.
A `dict` that maps each mode to
a `list` of region indices in the list
`old_regions` or
a `dict` that maps region indices to
regions in the original ppp for that mode
@param old_ap_labeling:
dict of states of already-merged modes to sets of
propositions for each state
@return: the following:
- `new_list`, list of new regions
- `parents`, same as input param `old_parents`, except that it
includes the mode that was just merged and for list of regions in
return value `new_list`
- `ap_labeling`, same as input param `old_ap_labeling`, except that it
includes the mode that was just merged.
"""
_logger.info('merging partitions')
part2 = ab2.ppp
modes = prev_modes + [cur_mode]
new_list = list()
parents = {mode:dict() for mode in modes}
ap_labeling = dict()
for i in range(len(old_regions)):
for j in range(len(part2)):
isect = pc.intersect(
old_regions[i],
part2[j])
rc, xc = pc.cheby_ball(isect)
# no intersection ?
if rc < 1e-5:
continue
_logger.info(
f'merging region: A{i}, with: B{j}')
# if Polytope, make it Region
if len(isect) == 0:
isect = pc.Region([isect])
# label the Region with propositions
isect.props = old_regions[i].props.copy()
new_list.append(isect)
idx = new_list.index(isect)
# keep track of parents
for mode in prev_modes:
parents[mode][idx] = old_parents[mode][i]
parents[cur_mode][idx] = j
# union of AP labels from parent states
ap_label_1 = old_ap_labeling[i]
ap_label_2 = ab2.ts.states[j]['ap']
_logger.debug(f'AP label 1: {ap_label_1}')
_logger.debug(f'AP label 2: {ap_label_2}')
# original partitions may be
# different if `_p2p.pwa_partition` used
# but must originate from same
# initial partition,
# i.e., have same continuous propositions,
# checked above
#
# so no two intersecting regions can
# have different AP labels,
# checked here
if ap_label_1 != ap_label_2:
raise Exception(
'Inconsistent AP labels between '
'intersecting regions of \n'
'partitions of switched system.')
ap_labeling[idx] = ap_label_1
return new_list, parents, ap_labeling