# Copyright (c) 2011, 2012 by California Institute of Technology
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the California Institute of Technology nor
# the names of its contributors may be used to endorse or promote
# products derived from this software without specific prior
# written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL CALTECH
# OR THE CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# $Id$
"""
Algorithms related to discretization containing both MATLAB interface
and Python discretization. Input calculation is available in python
only but should work also for a state space partition discretized in
MATLAB.
Classes:
- CtsSysDyn
- PwaSubsysDyn
- PwaSysDyn
Primary functions:
- discretize
- get_input
Helper functions:
- solveFeasable
- discretizeM
- discretizeToMatlab
- discretizeFromMatlab
- getInputHelper
- createLM
- get_max_extreme
"""
import sys, os, time, subprocess
from copy import deepcopy
import numpy as np
from scipy import io as sio
from cvxopt import matrix,solvers
import itertools
import polytope as pc
from prop2part import PropPreservingPartition, pwa_partition
from errorprint import printWarning, printError, printInfo
matfile_dir = os.path.join(os.path.abspath(os.path.dirname(sys.argv[0])), \
'tmpmat')
to_matfile = os.path.join(matfile_dir, 'dataToMatlab.mat')
from_matfile = os.path.join(matfile_dir, 'dataFromMatlab.mat')
donefile = os.path.join(matfile_dir, 'done.txt')
def _block_diag2(A,B):
"""Like block_diag() in scipy.linalg, but restricted to 2 inputs.
Old versions of the linear algebra package in SciPy (i.e.,
scipy.linalg) do not have a block_diag() function. Providing
_block_diag2() here until most folks are using sufficiently
up-to-date SciPy installations improves portability.
"""
if len(A.shape) == 1: # Cast 1d array into matrix
A = np.array([A])
if len(B.shape) == 1:
B = np.array([B])
C = np.zeros((A.shape[0]+B.shape[0], A.shape[1]+B.shape[1]))
C[:A.shape[0], :A.shape[1]] = A.copy()
C[A.shape[0]:, A.shape[1]:] = B.copy()
return C
[docs]class CtsSysDyn:
"""CtsSysDyn class for specifying the continuous dynamics:
s[t+1] = A*s[t] + B*u[t] + E*d[t] + K
u[t] \in Uset - polytope object
d[t] \in Wset - polytope object
A CtsSysDyn object contains the fields A, B, E, K, Uset and Wset
as defined above.
**Constructor**:
**CtsSysDyn** ([ `A` = [][, `B` = [][, `E` = [][, `K` = [][, `Uset` = [][, `Wset` = []]]]]]])
"""
def __init__(self, A=[], B=[], E=[], K=[], Uset=None, Wset=None):
if Uset == None:
print "Warning: Uset not given in CtsSysDyn()"
if (Uset != None) & (not isinstance(Uset, pc.Polytope)):
raise Exception("CtsSysDyn: `Uset` has to be a Polytope")
self.A = A
self.B = B
if len(K) == 0:
self.K = np.zeros([A.shape[1], 1])
else:
self.K = K.reshape(K.size,1)
if len(E) == 0:
self.E = np.zeros([A.shape[1], 1])
self.Wset = pc.Polytope()
else:
self.E = E
self.Wset = Wset
self.Uset = Uset
def __str__(self):
output = "A =\n"+str(self.A)
output += "\nB =\n"+str(self.B)
output += "\nE =\n"+str(self.E)
output += "\nK =\n"+str(self.K)
output += "\nUset =\n"+str(self.Uset)
output += "\nWset =\n"+str(self.Wset)
return output
[docs]class PwaSubsysDyn(CtsSysDyn):
"""PwaSubsysDyn class for specifying a subsystem of piecewise affine continuous dynamics.
s[t+1] = A_i*s[t] + B_i*u[t] + E_i*d[t] + K_i
u[t] \in Uset_i - polytope object
d[t] \in Wset_i - polytope object
for H_i*s[t]<=g_i - subdomain, type: polytope object
PwaSubsysDyn class inherits from CtsSysDyn with the additional field:
- `sub_domain`: domain with nonempty interior where these dynamics are active, type: polytope
"""
def __init__(self, A=[], B=[], E=[], K=[], Uset=None, Wset=None, sub_domain=None):
if sub_domain == None:
print "Warning: sub_domain is not given in PwaSubsysDyn()"
if (sub_domain != None) & (not isinstance(sub_domain, pc.Polytope)):
raise Exception("PwaSubsysDyn: `sub_domain` has to be a Polytope")
CtsSysDyn.__init__(self, A, B, E, K, Uset, Wset)
self.sub_domain = sub_domain
[docs]class PwaSysDyn:
"""PwaSysDyn class for specifying a piecewise affine system.
A PwaSysDyn object contains the fields:
- `list_subsys`: list of PwaSubsysDyn
- `domain`: domain over which piecewise affine system is defined, type: polytope
For the system to be well-defined the sub_domains of its subsystems should be
mutually exclusive (modulo intersections with empty interior) and cover the domain.
"""
def __init__(self, list_subsys=[], domain=None):
if domain == None:
print "Warning: domain not given in PwaSysDyn()"
if (domain != None) & (not isinstance(domain, pc.Polytope)):
raise Exception("PwaSysDyn: `domain` has to be a Polytope")
if len(list_subsys) > 0:
uncovered_dom = domain.copy()
n = list_subsys[0].A.shape[1] # State space dimension
m = list_subsys[0].B.shape[1] # Input space dimension
p = list_subsys[0].E.shape[1] # Disturbance space dimension
for subsys in list_subsys:
uncovered_dom = pc.mldivide(uncovered_dom, subsys.sub_domain)
if (n!=subsys.A.shape[1] or m!=subsys.B.shape[1] or p!=subsys.E.shape[1]):
raise Exception("PwaSysDyn: state, input, disturbance dimensions\
have to be the same for all subsystems")
if not pc.is_empty(uncovered_dom):
raise Exception("PwaSysDyn: subdomains have to cover the domain")
for x in itertools.combinations(list_subsys, 2):
if pc.is_fulldim(pc.intersect(x[0].sub_domain,x[1].sub_domain)):
raise Exception("PwaSysDyn: subdomains have to be mutually exclusive")
self.list_subsys = list_subsys
self.domain = domain
[docs]def discretize(part, ssys, N=10, min_cell_volume=0.1, closed_loop=True, \
use_mpt=False, conservative=True, max_num_poly=5, \
use_all_horizon=False, trans_length=1, remove_trans=False,
abs_tol=1e-7, verbose=0):
"""Refine the partition and establish transitions based on reachability
analysis.
Input:
- `part`: a PropPreservingPartition object
- `ssys`: a CtsSysDyn or PwaSysDyn object
- `N`: horizon length
- `min_cell_volume`: the minimum volume of cells in the resulting
partition.
- `closed_loop`: boolean indicating whether the `closed loop`
algorithm should be used. default True.
- `use_mpt`: if True, use MPT-based abstraction algorithm.
- `conservative`: if true, force sequence in reachability analysis
to stay inside starting cell. If false, safety
is ensured by keeping the sequence inside the
original proposition preserving cell which needs
to be convex. In order to use the value false,
ensure to have a convex initial partition or use
prop2partconvex to postprocess the proposition
preserving partition before calling discretize.
- `max_num_poly`: maximum number of polytopes in a region to use in
reachability analysis.
- `use_all_horizon`: in closed loop algorithm: if we should look
for reach- ability also in less than N steps.
- `trans_length`: the number of polytopes allowed to cross in a
transition. a value of 1 checks transitions
only between neighbors, a value of 2 checks
neighbors of neighbors and so on.
- `remove_trans`: if True, remove found transitions between
non-neighbors.
- `abs_tol`: maximum volume for an "empty" polytope
- `verbose`: level of verbosity
Output:
- A PropPreservingPartition object with transitions
"""
orig_list = []
min_cell_volume = (min_cell_volume/np.finfo(np.double).eps ) * np.finfo(np.double).eps
if isinstance(ssys,PwaSysDyn):
part = pwa_partition(ssys, part)
# Save original polytopes, require them to be convex
if not conservative:
for poly in part.list_region:
if len(poly) == 0:
orig_list.append(poly.copy())
elif len(poly) == 1:
orig_list.append(poly.list_poly[0].copy())
else:
raise Exception("solveFeasible: original list contains non-convex \
polytope regions")
orig = range(len(orig_list))
else:
orig_list = None
orig = 0
# Call MPT discretization
if use_mpt:
if isinstance(ssys,PwaSysDyn):
raise Exception("discretize: Piecewise affine system discretization in\
MPT is not supported")
else:
return discretizeM(part, ssys, N=N, auto=True, minCellVolume=min_cell_volume, \
maxNumIterations=5, useClosedLoopAlg=closed_loop, \
useAllHorizonLength=use_all_horizon, useLargeSset=False, \
timeout=-1, maxNumPoly=max_num_poly, verbose=verbose)
# Cheby radius of disturbance set (defined within the loop for pwa systems)
if isinstance(ssys,CtsSysDyn):
if len(ssys.E) > 0:
rd,xd = pc.cheby_ball(ssys.Wset)
else:
rd = 0.
# Initialize matrix for pairs to check
IJ = part.adj.copy()
if trans_length > 1:
k = 1
while k < trans_length:
IJ = np.dot(IJ, part.adj)
k += 1
IJ = (IJ > 0).astype(int)
# Initialize output
transitions = np.zeros([part.num_regions,part.num_regions], dtype = int)
sol = deepcopy(part.list_region)
adj = part.adj.copy()
subsys_list = deepcopy(part.list_subsys)
ss = ssys
# Do the abstraction
while np.sum(IJ) > 0:
ind = np.nonzero(IJ)
i = ind[1][0]
j = ind[0][0]
IJ[j,i] = 0
si = sol[i]
sj = sol[j]
if isinstance(ssys,PwaSysDyn):
ss = ssys.list_subsys[subsys_list[i]]
if len(ss.E) > 0:
rd,xd = pc.cheby_ball(ss.Wset)
else:
rd = 0.
if verbose > 1:
print "\n Working with states " + str(i) + " and " + str(j) \
+ " with lengths " + str(len(si)) + " and " + str(len(sj))
if isinstance(ssys,PwaSysDyn):
print "where subsystem " + str(subsys_list[i]) + " is active."
if conservative:
# Don't use trans_set
S0 = solveFeasable(si,sj,ss,N, closed_loop=closed_loop,
max_num_poly=max_num_poly, use_all_horizon=use_all_horizon)
else:
# Use original cell as trans_set
S0 = solveFeasable(si,sj,ss,N, closed_loop=closed_loop,\
trans_set=orig_list[orig[i]], use_all_horizon=use_all_horizon, \
max_num_poly=max_num_poly)
if verbose > 1:
print "Computed reachable set S0 with volume " + str(pc.volume(S0))
isect = pc.intersect(si, S0)
risect, xi = pc.cheby_ball(isect)
vol1 = pc.volume(isect)
diff = pc.mldivide(si, S0)
rdiff, xd = pc.cheby_ball(isect)
vol2 = pc.volume(diff)
# We don't want our partitions to be smaller than the disturbance set
# Could be a problem since cheby radius is calculated for smallest
# convex polytope, so if we have a region we might throw away a good
# cell.
if (vol1 > min_cell_volume) & (vol2 > min_cell_volume) & (rdiff > rd) & \
(risect > rd):
# Make sure new areas are Regions and add proposition lists
if len(isect) == 0:
isect = pc.Region([isect], si.list_prop)
else:
isect.list_prop = si.list_prop
if len(diff) == 0:
diff = pc.Region([diff], si.list_prop)
else:
diff.list_prop = si.list_prop
# Add new states
sol[i] = isect
difflist = pc.separate(diff) # Separate the difference
num_new = len(difflist)
for reg in difflist:
sol.append(reg)
if isinstance(ssys,PwaSysDyn):
subsys_list.append(subsys_list[i])
size = len(sol)
# Update transition matrix
transitions = np.hstack([transitions, np.zeros([size-num_new, num_new], dtype=int) ])
transitions = np.vstack([transitions, np.zeros([num_new, size], dtype=int) ])
transitions[i,:] = np.zeros(size)
for kk in range(num_new):
#transitions[:,size-1-kk] = transitions[:,i] # All sets reachable from start are reachable from both part's
transitions[i,size-1-kk] = 0 # except possibly the new part
transitions[j,size-1-kk] = 0
if i != j:
transitions[j,i] = 1 # sol[j] is reachable from intersection of sol[i] and S0..
# Update adjacency matrix
old_adj = np.nonzero(adj[i,:])[0]
adj[i,:] = np.zeros([size-num_new])
adj[:,i] = np.zeros([size-num_new])
adj = np.hstack([adj, np.zeros([size-num_new, num_new], dtype=int) ])
adj = np.vstack([adj, np.zeros([num_new, size], dtype=int) ])
for kk in range(num_new):
adj[i,size-1-kk] = 1
adj[size-1-kk,i] = 1
adj[size-1-kk,size-1-kk] = 1
if not conservative:
orig = np.hstack([orig, orig[i]])
adj[i,i] = 1
if verbose > 1:
output = "\n Adding states " + str(i) + " and "
for kk in range(num_new):
output += str(size-1-kk) + " and "
print output + "\n"
for k in np.setdiff1d(old_adj,[i,size-1]):
# Every "old" neighbor must be the neighbor of at least one of the new
if pc.is_adjacent(sol[i],sol[k]):
adj[i,k] = 1
adj[k,i] = 1
elif remove_trans & (trans_length == 1):
# Actively remove transitions between non-neighbors
transitions[i,k] = 0
transitions[k,i] = 0
for kk in range(num_new):
if pc.is_adjacent(sol[size-1-kk],sol[k]):
adj[size-1-kk,k] = 1
adj[k, size-1-kk] = 1
elif remove_trans & (trans_length == 1):
# Actively remove transitions between non-neighbors
transitions[size-1-kk,k] = 0
transitions[k,size-1-kk] = 0
# Update IJ matrix
IJ = np.hstack([IJ, np.zeros([size-num_new, num_new], dtype=int) ])
IJ = np.vstack([IJ, np.zeros([num_new, size], dtype=int) ])
adj_k = adj
if trans_length > 1:
k = 1
while k < trans_length:
adj_k = np.dot(adj_k, adj)
k += 1
adj_k = (adj_k > 0).astype(int)
horiz1 = adj_k[i,:] - transitions[i,:] > 0
verti1 = adj_k[:,i] - transitions[:,i] > 0
IJ[i,:] = horiz1.astype(int)
IJ[:,i] = verti1.astype(int)
for kk in range(num_new):
horiz2 = adj_k[size-1-kk,:] - transitions[size-1-kk,:] > 0
verti2 = adj_k[:,size-1-kk] - transitions[:,size-1-kk] > 0
IJ[size-1-kk,:] = horiz2.astype(int)
IJ[:,size-1-kk] = verti2.astype(int)
if verbose > 1:
print "\n Updated adj: \n" + str(adj)
print "\n Updated trans: \n" + str(transitions)
print "\n Updated IJ: \n" + str(IJ)
elif vol2 < abs_tol:
if verbose > 1:
print "Transition found"
transitions[j,i] = 1
else:
if verbose > 1:
print "No transition found, diff vol: " + str(vol2) + \
", intersect vol: " + str(vol1)
transitions[j,i] = 0
new_part = PropPreservingPartition(domain=part.domain, num_prop=part.num_prop, \
list_region=sol, num_regions=len(sol), adj=adj, \
trans=transitions, list_prop_symbol=part.list_prop_symbol, \
orig_list_region=orig_list, orig=orig, list_subsys = subsys_list)
return new_part
[docs]def discretize_overlap(part, ssys, N=10, min_cell_volume=0.1, closed_loop=False,\
conservative=False, max_num_poly=5, \
use_all_horizon=False, abs_tol=1e-7, verbose=0):
"""Refine the partition and establish transitions based on reachability
analysis.
Input:
- `part`: a PropPreservingPartition object
- `ssys`: a CtsSysDyn object
- `N`: horizon length
- `min_cell_volume`: the minimum volume of cells in the resulting
partition.
- `closed_loop`: boolean indicating whether the `closed loop`
algorithm should be used. default False.
- `conservative`: if true, force sequence in reachability analysis
to stay inside starting cell. If false, safety
is ensured by keeping the sequence inside the
original proposition preserving cell.
- `max_num_poly`: maximum number of polytopes in a region to use
in reachability analysis.
- `use_all_horizon`: in closed loop algorithm: if we should look
for reach- ability also in less than N steps.
- `abs_tol`: maximum volume for an "empty" polytope
Output:
- A PropPreservingPartition object with transitions
"""
min_cell_volume = (min_cell_volume/np.finfo(np.double).eps ) * np.finfo(np.double).eps
orig_list = []
for poly in part.list_region:
if len(poly) == 0:
orig_list.append(poly.copy())
elif len(poly) == 1:
orig_list.append(poly.list_poly[0].copy())
else:
raise Exception("solveFeasible: original list contains non-convex \
polytope regions")
orig = range(len(orig_list))
# Cheby radius of disturbance set
if len(ssys.E) > 0:
rd,xd = pc.cheby_ball(ssys.Wset)
else:
rd = 0.
# Initialize matrix for pairs to check
IJ = part.adj.copy()
if verbose > 1:
print "\n Starting IJ: \n" + str(IJ)
# Initialize output
transitions = np.zeros([part.num_regions,part.num_regions], dtype = int)
sol = deepcopy(part.list_region)
adj = part.adj.copy()
# List of how many "new" regions that have been created for each region
# and a list of original number of neighbors
num_new_reg = np.zeros(len(orig_list))
num_orig_neigh = np.sum(adj, axis=1).flatten() - 1
while np.sum(IJ) > 0:
ind = np.nonzero(IJ)
i = ind[0][0]
j = ind[1][0]
IJ[i,j] = 0
num_new_reg[i] += 1
print num_new_reg
si = sol[i]
sj = sol[j]
if verbose > 1:
print "\n Working with states " + str(i) + " and " + str(j)
if conservative:
S0 = solveFeasable(si,sj,ssys,N, closed_loop=closed_loop,
min_vol=min_cell_volume, max_num_poly=max_num_poly,\
use_all_horizon=use_all_horizon)
else:
S0 = solveFeasable(si,sj,ssys,N, closed_loop=closed_loop,\
min_vol=min_cell_volume, trans_set=orig_list[orig[i]], \
use_all_horizon=use_all_horizon, max_num_poly=max_num_poly)
if verbose > 1:
print "Computed reachable set S0 with volume " + str(pc.volume(S0))
isect = pc.intersect(si, S0)
risect, xi = pc.cheby_ball(isect)
vol1 = pc.volume(isect)
diff = pc.mldivide(si, S0)
rdiff, xd = pc.cheby_ball(diff)
# We don't want our partitions to be smaller than the disturbance set
# Could be a problem since cheby radius is calculated for smallest
# convex polytope, so if we have a region we might throw away a good
# cell.
if rdiff < abs_tol:
if verbose > 1:
print "Transition found"
transitions[i,j] = 1
elif (vol1 > min_cell_volume) & (risect > rd) & \
(num_new_reg[i] <= num_orig_neigh[i]+1):
# Make sure new cell is Region and add proposition lists
if len(isect) == 0:
isect = pc.Region([isect], si.list_prop)
else:
isect.list_prop = si.list_prop
# Add new state
sol.append(isect)
size = len(sol)
# Add transitions
transitions = np.hstack([transitions, np.zeros([size - 1, 1], dtype=int) ])
transitions = np.vstack([transitions, np.zeros([1, size], dtype=int) ])
transitions[size-1,:] = transitions[i,:] # All sets reachable from orig cell are reachable from both cells
transitions[size-1,j] = 1 # j is reachable from new cell
# Take care of adjacency
old_adj = np.nonzero(adj[i,:])[0]
adj = np.hstack([adj, np.zeros([size - 1, 1], dtype=int) ])
adj = np.vstack([adj, np.zeros([1, size], dtype=int) ])
adj[i,size-1] = 1
adj[size-1,i] = 1
adj[size-1,size-1] = 1
for k in np.setdiff1d(old_adj,[i,size-1]):
if pc.is_adjacent(sol[size-1],sol[k],overlap=True):
adj[size-1,k] = 1
adj[k, size-1] = 1
else:
# Actively remove (valid) transitions between non-neighbors
transitions[size-1,k] = 0
transitions[k,size-1] = 0
# Assign original proposition cell to new state and update counts
if not conservative:
orig = np.hstack([orig, orig[i]])
print num_new_reg
num_new_reg = np.hstack([num_new_reg, 0])
num_orig_neigh = np.hstack([num_orig_neigh, np.sum(adj[size-1,:])-1])
if verbose > 1:
print "\n Adding state" + str(size-1) + "\n"
# Just add adjacent cells for checking, unless transition already found
IJ = np.hstack([IJ, np.zeros([size - 1, 1], dtype=int) ])
IJ = np.vstack([IJ, np.zeros([1, size], dtype=int) ])
horiz2 = adj[size-1,:] - transitions[size-1,:] > 0
verti2 = adj[:,size-1] - transitions[:,size-1] > 0
IJ[size-1,:] = horiz2.astype(int)
IJ[:,size-1] = verti2.astype(int)
#if verbose > 1:
#print "\n Updated adj: \n" + str(adj)
#print "\n Updated trans: \n" + str(transitions)
#print "\n Updated IJ: \n" + str(IJ)
else:
if verbose > 1:
print "No transition found, intersect vol: " + str(vol1)
transitions[i,j] = 0
new_part = PropPreservingPartition(domain=part.domain, num_prop=part.num_prop,
list_region=sol, num_regions=len(sol), adj=np.array([]),
trans=transitions, list_prop_symbol=part.list_prop_symbol,
orig_list_region=orig_list, orig=orig)
return new_part
[docs]def solveFeasable(P1, P2, ssys, N, max_cell=10, closed_loop=True, \
use_all_horizon=False, trans_set=None, max_num_poly=5):
'''Computes the subset x0 of `P1' from which `P2' is reachable
in horizon `N', with respect to system dynamics `ssys'. The closed
loop algorithm solves for one step at a time, which keeps the dimension
of the polytopes down.
Input:
- `P1`: A Polytope or Region object
- `P2`: A Polytope or Region object
- `ssys`: A CtsSysDyn object
- `N`: The horizon length
- `closed_loop`: If true, take 1 step at the time. This keeps down
polytope dimension and handles disturbances
better. Default: True.
- `use_all_horizon`: Used for closed loop algorithm. If true,
allow reachability also in less than N steps.
- `trans_set`: If specified, force transitions to be in this
set. If empty, P1 is used
Output:
`x0`: A Polytope or Region object defining the set in P1 from which
P2 is reachable
'''
part1 = P1.copy() # Initial set
part2 = P2.copy() # Terminal set
if closed_loop:
if trans_set != None:
# The intermediate steps are allowed to be in trans_set,
# if it is defined
ttt = trans_set
else:
ttt = part1
temp_part = part2
for i in range(N,1,-1):
x0 = solveFeasable(ttt, temp_part, ssys, 1, closed_loop=False,
trans_set=trans_set)
if use_all_horizon:
temp_part = pc.union(x0, temp_part, check_convex=True)
else:
temp_part = x0
if not pc.is_fulldim(temp_part):
return pc.Polytope()
x0 = solveFeasable(part1, temp_part, ssys, 1, closed_loop=False,
trans_set=trans_set)
if use_all_horizon:
temp_part = pc.union(x0, temp_part, check_convex=True)
else:
temp_part = x0
return temp_part
if len(part1) > max_num_poly:
# Just use the max_num_poly largest volumes for reachability
vol_list = np.zeros(len(part1))
for i in range(len(part1)):
vol_list[i] = pc.volume(part1.list_poly[i])
ind = np.argsort(-vol_list)
temp = []
for i in ind[range(max_num_poly)]:
temp.append(part1.list_poly[i])
part1 = pc.Region(temp, [])
if len(part2) > max_num_poly:
# Just use the max_num_poly largest volumes for reachability
vol_list = np.zeros(len(part2))
for i in range(len(part2)):
vol_list[i] = pc.volume(part2.list_poly[i])
ind = np.argsort(-vol_list)
temp = []
for i in ind[range(max_num_poly)]:
temp.append(part2.list_poly[i])
part2 = pc.Region(temp, [])
if len(part1) > 0:
# Recursive union of sets
poly = solveFeasable(part1.list_poly[0], part2, ssys, N,\
trans_set=trans_set, closed_loop=closed_loop, \
use_all_horizon=use_all_horizon)
for i in range(1, len(part1)):
s0 = solveFeasable(part1.list_poly[i], part2, ssys, N,\
trans_set=trans_set, closed_loop=closed_loop, \
use_all_horizon=use_all_horizon)
poly = pc.union(poly, s0, check_convex=True)
return poly
if len(part2) > 0:
# Recursive union of sets
poly = solveFeasable(part1, part2.list_poly[0], ssys, N, \
trans_set=trans_set, closed_loop=closed_loop, \
use_all_horizon=use_all_horizon)
for i in range(1, len(part2)):
s0 = solveFeasable(part1, part2.list_poly[i], ssys, N, \
trans_set=trans_set, closed_loop=closed_loop, \
use_all_horizon=use_all_horizon)
poly = pc.union(poly, s0, check_convex=True)
return poly
if trans_set == None:
trans_set = part1
L,M = createLM(ssys, N, part1, trans_set, part2)
# Ready to make polytope
poly1 = pc.reduce(pc.Polytope(L,M))
# Project poly1 onto lower dim
n = np.shape(ssys.A)[1]
poly1 = pc.projection(poly1, range(1,n+1))
return pc.reduce(poly1)
[docs]def discretizeM(part, ssys, N = 10, auto=True, minCellVolume = 0.1, \
maxNumIterations = 5, useClosedLoopAlg = True, \
useAllHorizonLength = True, useLargeSset = False, \
timeout = -1, maxNumPoly = 5, verbose = 2):
"""Discretize the continuous state space using MATLAB implementation.
Input:
- `part`: a PropPreservingPartition object
- `ssys`: a CtsSysDyn object
- `N`: horizon length
- `auto`: a boolean that indicates whether to automatically run
the MATLAB implementation of discretize.
- `minCellVolume`: the minimum volume of cells in the resulting
partition.
- `maxNumIterations`: the maximum number of iterations
- `useClosedLoopAlg`: a boolean that indicates whether to use the
closed loop algorithm. For the difference
between the closed loop and the open loop
algorithm, see Borrelli, F. Constrained
Optimal Control of Linear and Hybrid
Systems, volume 290 of Lecture Notes in
Control and Information
Sciences. Springer. 2003.
- `useAllHorizonLength`: a boolean that indicates whether all the
horizon length up to probStruct.N can be
used. This option is relevant only when
the closed loop algorithm is used.
- `useLargeSset`: a boolean that indicates whether when solving
the reachability problem between subcells of the
original partition, the cell of the original
partition should be used for the safe set.
- `timeout`: timeout (in seconds) for polytope union operation.
If negative, the timeout won't be used. Note that
using timeout requires MATLAB parallel computing
toolbox.
- `maxNumPoly`: the maximum number of polytopes in a region used
in computing reachability.
- `verbose`: level of verbosity
"""
if (os.path.isfile(globals()["to_matfile"])):
os.remove(globals()["to_matfile"])
if (os.path.isfile(globals()["from_matfile"])):
os.remove(globals()["from_matfile"])
if (os.path.isfile(globals()["donefile"])):
os.remove(globals()["donefile"])
starttime = time.time()
discretizeToMatlab(part, ssys, N, minCellVolume, \
maxNumIterations, useClosedLoopAlg, \
useAllHorizonLength, useLargeSset, \
timeout, maxNumPoly, verbose)
if (auto):
try:
mpath = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'matlab')
mcommand = "addpath('" + mpath + "'); p = '" + matfile_dir + "';"
mcommand += "try, runDiscretizeMatlab; catch, disp(lasterr); quit; end;"
mcommand += "quit;"
cmd = subprocess.call( \
["matlab", "-nojvm", "-nosplash", "-r", mcommand])
auto = True
except:
printError("Cannot run matlab. Please make sure that MATLAB is in your PATH.")
auto = False
if (not os.path.isfile(globals()["donefile"]) or \
os.path.getmtime(globals()["donefile"]) <= \
os.path.getmtime(globals()["to_matfile"])):
printError("Discretization failed!")
auto = False
if (not auto):
printInfo("\nPlease run 'runDiscretizeMatlab' in the 'matlab' folder.\n")
print("Waiting for MATLAB output...")
while (not os.path.isfile(globals()["donefile"]) or \
os.path.getmtime(globals()["donefile"]) <= \
os.path.getmtime(globals()["to_matfile"])):
if (verbose > 0):
print("Waiting for MATLAB output...")
time.sleep(10)
dyn = discretizeFromMatlab(part)
return dyn
[docs]def discretizeToMatlab(part, ssys, N = 10, minCellVolume=0.1, \
maxNumIterations=5, useClosedLoopAlg=True, \
useAllHorizonLength=True, useLargeSset=False, \
timeout=-1, maxNumPoly=5, verbose=0):
"""Generate an input file for MATLAB implementation of discretize.
Input:
- `part`: a PropPreservingPartition object
- `ssys`: a CtsSysDyn object
- `N`: horizon length
- `minCellVolume`: the minimum volume of cells in the resulting partition
- `maxNumIterations`: the maximum number of iterations
- `useClosedLoopAlg`: a boolean that indicates whether to use the closed loop algorithm.
For the difference between the closed loop and the open loop algorithm,
see Borrelli, F. Constrained Optimal Control of Linear and Hybrid Systems,
volume 290 of Lecture Notes in Control and Information Sciences. Springer. 2003.
- `useAllHorizonLength`: a boolean that indicates whether all the horizon length up
to probStruct.N can be used. This option is relevant only when the closed
loop algorithm is used.
- `useLargeSset`: a boolean that indicates whether when solving the reachability
problem between subcells of the original partition, the cell of the
original partition should be used for the safe set.
- `timeout`: timeout (in seconds) for polytope union operation.
If negative, the timeout won't be used. Note that using timeout requires MATLAB
parallel computing toolbox.
- `maxNumPoly`: the maximum number of polytopes in a region used in computing reachability.
- `verbose`: level of verbosity
"""
data = {}
adj = deepcopy(part.adj)
for i in xrange(0, len(adj)):
adj[i][i] = 1
data['adj'] = adj
data['minCellVolume'] = minCellVolume
data['maxNumIterations'] = maxNumIterations
data['useClosedLoopAlg'] = useClosedLoopAlg
data['useAllHorizonLength'] = useAllHorizonLength
data['useLargeSset'] = useLargeSset
data['timeout'] = timeout
data['maxNumPoly'] = maxNumPoly
data['verbose'] = verbose
data['A'] = ssys.A
data['B'] = ssys.B
data['E'] = ssys.E
data['UsetA'] = ssys.Uset.A
data['Usetb'] = ssys.Uset.b
# data['Uset'] = ssys.Uset
if (isinstance(ssys.Wset, pc.Polytope)):
data['WsetA'] = ssys.Wset.A
data['Wsetb'] = ssys.Wset.b
else:
data['WsetA'] = []
data['Wsetb'] = []
# data['Wset'] = ssys.Wset
data['N'] = N
numpolyvec = []
den = []
for i1 in range(0,len(part.list_region)):
numpolyvec.append(len(part.list_region[i1].list_poly))
for i2 in range(0,len(part.list_region[i1].list_poly)):
pp = part.list_region[i1].list_poly[i2]
data['Reg'+str(i1+1)+'Poly'+str(i2+1)+'Ab'] = np.concatenate((pp.A,np.array([pp.b]).T),1)
data['numpolyvec'] = numpolyvec
matfile = globals()["to_matfile"]
if (not os.path.exists(os.path.abspath(os.path.dirname(matfile)))):
os.mkdir(os.path.abspath(os.path.dirname(matfile)))
sio.savemat(matfile, data)
print('MATLAB input saved to ' + matfile)
[docs]def discretizeFromMatlab(origPart):
"""Load the data from MATLAB discretize implementation.
Input:
- origPart: a PropPreservingPartition object
"""
matfile = globals()["from_matfile"]
if (os.path.getmtime(matfile) <= os.path.getmtime(globals()["to_matfile"])):
printWarning("The MATLAB output file is older than the MATLAB input file.")
cont = raw_input('Continue [c]?: ')
if (cont.lower() != 'c'):
return False
print('Loading MATLAB output from ' + matfile)
data = sio.loadmat(matfile)
trans = data['trans']
a1 = data['numNewCells']
numNewCells = np.zeros((a1.shape[0],1))
numNewCells[0:,0] = a1[:,0]
newCellVol = data['newCellVol']
num_cells = data['num_cells'][0][0]
a2 = data['numpoly']
numpoly = np.zeros(a2.shape)
numpoly[0:,0:] = a2[0:,0:]
regs = []
for i1 in range(0,num_cells):
for i2 in range(0,numNewCells[i1]):
polys = []
props = []
for i3 in range(0,int(numpoly[i1,i2])):
Ab = np.array(data['Cell'+str(i1)+'Reg'+str(i2)+'Poly'+str(i3)+'Ab'].tolist(), dtype=np.float64)
A = deepcopy(Ab[:,0:-1])
b = np.zeros((A.shape[0],1))
b[0:,0] = deepcopy(Ab[:,-1])
polys.append(pc.Polytope(A,b))
props = origPart.list_region[i1].list_prop
regs.append(pc.Region(polys,props))
domain = deepcopy(origPart.domain)
num_prop = deepcopy(origPart.num_prop)
num_regions = len(regs)
list_prop_symbol = deepcopy(origPart.list_prop_symbol)
newPartition = PropPreservingPartition(domain, num_prop, regs, num_regions, [], \
trans, list_prop_symbol)
return newPartition
[docs]def createLM(ssys, N, list_P, Pk=None, PN=None, disturbance_ind=None):
"""Compute the components of the polytope L [x(0)' u(0)' ... u(N-1)']' <= M
which stacks the following constraints
- x(t+1) = A x(t) + B u(t) + E d(t)
- [u(k); x(k)] \in ssys.Uset for all k
If list_P is a Polytope:
- x(0) \in list_P if list_P
- x(k) \in Pk for k= 1,2, .. N-1
- x(N) \in PN
If list_P is a list of polytopes
- x(k) \in list_P[k] for k= 0, 1 ... N
The returned polytope describes the intersection of the polytopes
for all possible Input:
- `ssys`: CtsSysDyn dynamics
- `N`: horizon length
- `list_P`: list of Polytopes or Polytope
- `Pk, PN`: Polytopes
- `disturbance_ind`: list indicating which k's that disturbance should
be taken into account. default is [1,2, ... N]
"""
if isinstance(list_P, pc.Polytope):
list = []
list.append(list_P)
for i in range(1,N):
list.append(Pk)
list.append(PN)
list_P = list
if disturbance_ind == None:
disturbance_ind = range(1,N+1)
A = ssys.A
B = ssys.B
E = ssys.E
D = ssys.Wset
K = ssys.K
PU = ssys.Uset
n = A.shape[1] # State space dimension
m = B.shape[1] # Input space dimension
p = E.shape[1] # Disturbance space dimension
if np.sum(np.abs(E)) > 0:
if not pc.is_fulldim(D):
E = np.zeros(K.shape)
list_len = np.zeros(len(list_P))
for i in range(len(list_P)):
list_len[i] = list_P[i].A.shape[0]
LUn = np.shape(PU.A)[0]
Lk = np.zeros([np.sum(list_len), n+N*m])
LU = np.zeros([LUn*N,n+N*m])
Mk = np.zeros([np.sum(list_len),1])
MU = np.tile(PU.b.reshape(PU.b.size,1), (N,1))
Gk = np.zeros([np.sum(list_len), p*N])
GU = np.zeros([LUn*N, p*N])
K_hat = np.tile(K, (N,1))
B_diag = B
E_diag = E
for i in range(N-1):
B_diag = _block_diag2(B_diag,B)
E_diag = _block_diag2(E_diag,E)
A_n = np.eye(n)
A_k = np.zeros([n, n*N])
sum_vert = 0
for i in range(N+1):
Li = list_P[i]
######### FOR L #########
AB_line = np.hstack([A_n, np.dot(A_k, B_diag)])
Lk[np.ix_(range(sum_vert, sum_vert + Li.A.shape[0]), range(0,Lk.shape[1])) ] = \
np.dot(Li.A, AB_line)
if i < N:
if PU.A.shape[1] == m:
LU[np.ix_(range(i*LUn, (i+1)*LUn), range(n + m*i, n + m*(i+1)))] = PU.A
elif PU.A.shape[1] == m+n:
uk_line = np.zeros([m, n + m*N])
uk_line[np.ix_(range(m), range(n+m*i, n+m*(i+1)))] = np.eye(m)
A_mult = np.vstack([uk_line, AB_line])
b_mult = np.zeros([m+n, 1])
b_mult[range(m, m+n), :] = np.dot(A_k, K_hat)
LU[np.ix_(range(i*LUn, (i+1)*LUn), range(n+m*N))] = np.dot(PU.A, A_mult)
MU[range(i*LUn, (i+1)*LUn), :] -= np.dot(PU.A, b_mult)
######### FOR M #########
Mk[range(sum_vert, sum_vert + Li.A.shape[0]), :] = Li.b.reshape(Li.b.size,1) - \
np.dot(np.dot(Li.A,A_k), K_hat)
######### FOR G #########
if i in disturbance_ind:
Gk[np.ix_(range(sum_vert, sum_vert + Li.A.shape[0]), range(Gk.shape[1]) )] = \
np.dot(np.dot(Li.A,A_k), E_diag)
if (PU.A.shape[1] == m+n) & (i < N):
A_k_E_diag = np.dot(A_k, E_diag)
d_mult = np.vstack([np.zeros([m, p*N]), A_k_E_diag])
GU[np.ix_(range(LUn*i, LUn*(i+1)), range(p*N))] = \
np.dot(PU.A, d_mult)
####### Iterate #########
if i < N:
sum_vert += Li.A.shape[0]
A_n = np.dot(A, A_n)
A_k = np.dot(A, A_k)
A_k[np.ix_(range(n), range(i*n, (i+1)*n))] = np.eye(n)
# Get disturbance sets
if np.sum(np.abs(Gk)) > 0:
G = np.vstack([Gk,GU])
D_hat = get_max_extreme(G,D,N)
else:
D_hat = np.zeros([np.sum(list_len) + LUn*N,1])
# Put together matrices L, M
L = np.vstack([Lk, LU])
M = np.vstack([Mk, MU]) - D_hat
return L,M
[docs]def get_max_extreme(G,D,N):
'''Calculate the array d_hat such that d_hat = max(G*DN_extreme),
where DN_extreme are the vertices of the set D^N.
This is used to describe the polytope L*x <= M - G*d_hat. Calculating d_hat
is equivalen to taking the intersection of the polytopes L*x <= M - G*d_i
for every possible d_i in the set of extreme points to D^N.
Input:
- `G`: The matrix to maximize with respect to
- `D`: Polytope describing the disturbance set
- `N`: Horizon length
Output:
- `d_hat`: Array describing the maximum possible effect from disturbance'''
D_extreme = pc.extreme(D)
nv = D_extreme.shape[0]
dim = D_extreme.shape[1]
DN_extreme = np.zeros([dim*N, nv**N])
for i in range(nv**N):
# Last N digits are indices we want!
ind = np.base_repr(i, base=nv, padding=N)
for j in range(N):
DN_extreme[range(j*dim,(j+1)*dim),i] = D_extreme[int(ind[-j-1]),:]
d_hat = np.amax(np.dot(G,DN_extreme), axis=1)
return d_hat.reshape(d_hat.size,1)
[docs]def is_seq_inside(x0, u_seq, ssys, P0, P1):
"""Checks if the plant remains inside P0 for time t = 1, ... N-1 and
that the plant reaches P1 for time t = N.
Used to test a computed input sequence. No disturbance is taken into
account.
Input:
- `x0`: initial point for execution
- `u_seq`: (N x m) array where row k is input for t = k
- `ssys`: CtsSysDyn dynamics
- `P0`: Polytope where we want x(k) to remain for k = 1, ... N-1
Output:
- `True` if x(k) \in P0 for k = 1, .. N-1 and x(N) \in P1. False otherwise
"""
N = u_seq.shape[0]
x = x0.reshape(x0.size,1)
A = ssys.A
B = ssys.B
if len(ssys.K) == 0:
K = np.zeros(x.shape)
else:
K = ssys.K
inside = True
for i in range(N-1):
u = u_seq[i,:].reshape(u_seq[i,:].size,1)
x = np.dot(A,x) + np.dot(B,u) + K
if not pc.is_inside(P0, x):
inside = False
un_1 = u_seq[N-1,:].reshape(u_seq[N-1,:].size,1)
xn = np.dot(A,x) + np.dot(B,un_1) + K
if not pc.is_inside(P1, xn):
inside = False
return inside
[docs]def get_cellID(x0, part):
"""Return an integer specifying in which discrete state the continuous state x0
belongs to.
Input:
- `x0`: initial continuous state
- `part`: PropPreservingPartition object specifying the state space partition
Output:
- cellID: int specifying the discrete state in `part` x0 belongs to, -1 if x0 does
not belong to any discrete state.
Note1: If there are overlapping partitions (i.e., x0 can belong to more than one
discrete state), this just returns the first ID"""
cellID = -1
for i in range(part.num_regions):
if pc.is_inside(part.list_region[i], x0):
cellID = i
break
return cellID