#!/usr/bin/env python
#
# Copyright (c) 2011 by California Institute of Technology
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the California Institute of Technology nor
# the names of its contributors may be used to endorse or promote
# products derived from this software without specific prior
# written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL CALTECH
# OR THE CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# $Id$
"""
Several functions that help in working with tulipcon XML files.
This module includes a few functions for parsing YAML files, providing
an easier way to specify transition systems. See rsimple_example.yaml
in the "examples" directory for reference.
"""
import re
import xml.etree.ElementTree as ET
import numpy as np
import rhtlp
import discretize
import automaton
import prop2part
import polytope as pc
import jtlvint
import errorprint as ep
from spec import GRSpec
from StringIO import StringIO
try:
import yaml
except ImportError:
yaml = None # Thus calling a function that depends on PyYAML will
# lead to an exception.
#################
# Problem Types #
#################
INCOMPLETE_PROB = -1
NONE_PROB = 0
SYNTH_PROB = 1
RHTLP_PROB = 2
###############
# XML Globals #
###############
DEFAULT_NAMESPACE = "http://tulip-control.sourceforge.net/ns/0"
def readXMLfile(fname, verbose=0):
"""Read tulipcon XML string directly from a file.
Returns 3-tuple, as described in *loadXML* docstring. (This is a
convenience method, mostly only wrapping *loadXML*.)
"""
with open(fname, "r") as f:
x = f.read()
return loadXML(x, verbose=verbose)
[docs]def loadXML(x, verbose=0, namespace=DEFAULT_NAMESPACE):
"""Return ({SynthesisProb,PropPreservingPartition}, CtsSysDyn, Automaton).
The first element of the returned tuple depends on the problem
type detected. If type "none", then it is an instance of
PropPreservingPartition, e.g., as returned by the function
discretize in module discretize. If of type "synth" or "rhtlp",
then an instance of SynthesisProb or a child class is returned.
Any empty or missing items are set to None, or an exception is
raised if the missing item is required.
The argument x can also be an instance of
xml.etree.ElementTree._ElementInterface ; this is mainly for
internal use, e.g. by the function untagpolytope and some
load/dumpXML methods elsewhere.
To easily read and process this string from a file, instead call the method
*readXMLfile*
"""
if not isinstance(x, str) and not isinstance(x, ET._ElementInterface):
raise TypeError("tag to be parsed must be given as a string or ElementTree._ElementInterface.")
if isinstance(x, str):
elem = ET.fromstring(x)
else:
elem = x
if (namespace is None) or (len(namespace) == 0):
ns_prefix = ""
else:
ns_prefix = "{"+namespace+"}"
if elem.tag != ns_prefix+"tulipcon":
raise TypeError("root tag should be tulipcon.")
if ("version" not in elem.attrib.keys()):
raise ValueError("unversioned tulipcon XML string.")
if int(elem.attrib["version"]) != 0:
raise ValueError("unsupported tulipcon XML version: "+str(elem.attrib["version"]))
ptype_tag = elem.find(ns_prefix+"prob_type")
if ptype_tag is None:
ptype = INCOMPLETE_PROB
elif ptype_tag.text == "none":
ptype = NONE_PROB
elif ptype_tag.text == "synth":
ptype = SYNTH_PROB
elif ptype_tag.text == "rhtlp":
ptype = RHTLP_PROB
else:
raise ValueError("Unrecognized prob_type: "+str(ptype_tag.text))
# Build CtsSysDyn, or set to None
c_dyn = elem.find(ns_prefix+"c_dyn")
if c_dyn is None:
sys_dyn = None
else:
(tag_name, A) = untagmatrix(c_dyn.find(ns_prefix+"A"))
(tag_name, B) = untagmatrix(c_dyn.find(ns_prefix+"B"))
(tag_name, E) = untagmatrix(c_dyn.find(ns_prefix+"E"))
(tag_name, K) = untagmatrix(c_dyn.find(ns_prefix+"K"))
(tag_name, Uset) = untagpolytope(c_dyn.find(ns_prefix+"U_set"))
(tag_name, Wset) = untagpolytope(c_dyn.find(ns_prefix+"W_set"))
sys_dyn = discretize.CtsSysDyn(A, B, E, K, Uset, Wset)
# Extract LTL specification
s_elem = elem.find(ns_prefix+"spec")
if s_elem.find(ns_prefix+"env_init") is not None: # instance of GRSpec style
spec = GRSpec()
for spec_tag in ["env_init", "env_safety", "env_prog",
"sys_init", "sys_safety", "sys_prog"]:
if s_elem.find(ns_prefix+spec_tag) is None:
raise ValueError("invalid specification in tulipcon XML string.")
(tag_name, li) = untaglist(s_elem.find(ns_prefix+spec_tag),
cast_f=str, namespace=namespace)
li = [v.replace("<", "<") for v in li]
li = [v.replace(">", ">") for v in li]
li = [v.replace("&", "&") for v in li]
setattr(spec, spec_tag, li)
else: # assume, guarantee strings style
spec = ["", ""]
if s_elem.find(ns_prefix+"assume") is not None:
spec[0] = s_elem.find(ns_prefix+"assume").text
if s_elem.find(ns_prefix+"guarantee") is not None:
spec[1] = s_elem.find(ns_prefix+"guarantee").text
for k in [0, 1]: # Undo special character encoding
if spec[k] is None:
spec[k] = ""
else:
spec[k] = spec[k].replace("<", "<")
spec[k] = spec[k].replace(">", ">")
spec[k] = spec[k].replace("&", "&")
# Build Automaton
aut_elem = elem.find(ns_prefix+"aut")
if aut_elem is None \
or ((aut_elem.text is None) and len(aut_elem.getchildren()) == 0):
aut = None
else:
aut = automaton.Automaton()
if not aut.loadXML(aut_elem, namespace=DEFAULT_NAMESPACE):
ep.printError("failed to read Automaton from given tulipcon XML string.")
aut = None
# Discrete dynamics, if available
d_dyn = elem.find(ns_prefix+"d_dyn")
if d_dyn is None:
prob = None
else:
(tag_name, env_vars) = untagdict(elem.find(ns_prefix+"env_vars"))
(tag_name, sys_disc_vars) = untagdict(elem.find(ns_prefix+"sys_vars"))
if ((d_dyn.find(ns_prefix+"domain") is None)
and (d_dyn.find(ns_prefix+"trans") is None)
and (d_dyn.find(ns_prefix+"prop_symbols") is None)):
disc_dynamics = None
else:
(tag_name, domain) = untagpolytope(d_dyn.find(ns_prefix+"domain"))
(tag_name, trans) = untagmatrix(d_dyn.find(ns_prefix+"trans"), np_type=np.uint8)
(tag_name, prop_symbols) = untaglist(d_dyn.find(ns_prefix+"prop_symbols"), cast_f=str)
region_elem = d_dyn.find(ns_prefix+"regions")
list_region = []
if region_elem is not None:
region_items = region_elem.findall(ns_prefix+"region")
if region_items is not None and len(region_items) > 0:
for region_item in region_items:
(tag_name, R) = untagregion(region_item, cast_f_list=int,
np_type_P=np.float64)
list_region.append(R)
orig_map_elem = d_dyn.find(ns_prefix+"orig_map")
orig_part_elem = d_dyn.find(ns_prefix+"orig_partition")
if (orig_map_elem is None) or (orig_part_elem is None):
orig_list_region = None
orig = None
else:
(tag_name, orig) = untaglist(orig_map_elem, cast_f=int)
cell_items = orig_part_elem.findall(ns_prefix+"cell")
orig_list_region = []
if cell_items is not None and len(cell_items) > 0:
for cell_item in cell_items:
(tag_name, P) = untagpolytope(cell_item)
orig_list_region.append(P)
disc_dynamics = prop2part.PropPreservingPartition(domain=domain,
num_prop=len(prop_symbols),
list_region=list_region,
num_regions=len(list_region),
adj=0,
trans=trans,
list_prop_symbol=prop_symbols,
orig_list_region=orig_list_region,
orig=orig)
# Build appropriate ``problem'' instance
if ptype == SYNTH_PROB:
prob = jtlvint.generateJTLVInput(env_vars=env_vars,
sys_disc_vars=sys_disc_vars,
spec=spec,
disc_props={},
disc_dynamics=disc_dynamics,
smv_file="", spc_file="", verbose=2)
elif ptype == RHTLP_PROB:
if disc_dynamics is not None:
prob = rhtlp.RHTLPProb(shprobs=[], Phi="True", discretize=False,
env_vars=env_vars,
sys_disc_vars=sys_disc_vars,
disc_dynamics=disc_dynamics,
#cont_props=cont_props,
spec=spec)
else:
prob = rhtlp.RHTLPProb(shprobs=[], Phi="True", discretize=False,
env_vars=env_vars,
sys_disc_vars=sys_disc_vars,
#cont_props=cont_props,
spec=spec)
elif ptype == NONE_PROB:
prob = disc_dynamics
else: #ptype == INCOMPLETE_PROB
prob = None
return (prob, sys_dyn, aut)
[docs]def dumpXML(prob=None, spec=['',''], sys_dyn=None, aut=None,
disc_dynamics=None,
synthesize_aut=False, verbose=0, pretty=False):
"""Return tulipcon XML string.
prob is an instance of SynthesisProb or a child class, such as
RHTLPProb (both defined in rhtlp.py). sys_dyn is an instance of
CtsSysDyn (defined in discretize.py), or None. If None, then
continuous dynamics are considered empty (i.e., there are none).
disc_dynamics is an instance of PropPreservingPartition, such as
returned by the function discretize in module discretize. This
argument is supported to permit entire avoidance of SynthesisProb
or related classes.
If pretty is True, then use indentation and newlines to make the
resulting XML string more visually appealing.
aut is an instance of Automaton. If None (rather than an
instance), then an empty <aut> tag is written.
spec may be an instance of GRSpec or a list. If of GRSpec, then
it is formed as expected. If spec is a list, then first element
of "assume" string, and second element of "guarantee" string.
spec=None is also accepted, in which case the specification is
considered empty, but note that this could cause problems later
unless some content is introduced.
** synthesize_aut flag is IGNORED **
If synthesize_aut is True, then if prob.__realizable is not None,
use its value to determine whether a previously computed \*.aut
file should be read. Else (if prob.__realizable is None or
False), then try to compute automaton (saving to file
prob.__jtlvfile + '.aut', as usual). If synthesize_aut is False,
then just save an empty <aut></aut>.
** verbose flag is IGNORED (but that will change...) **
To easily save the result here to a file, instead call the method
*writeXMLfile*
"""
if prob is not None and not isinstance(prob, (rhtlp.SynthesisProb, rhtlp.RHTLPProb)):
raise TypeError("prob should be an instance (or child) of rhtlp.SynthesisProb.")
if sys_dyn is not None and not isinstance(sys_dyn, discretize.CtsSysDyn):
raise TypeError("sys_dyn should be an instance of discretizeM.CtsSysDyn or None.")
if disc_dynamics is not None and not isinstance(disc_dynamics, prop2part.PropPreservingPartition):
raise TypeError("disc_dynamics should be an instance of prop2part.PropPreservingPartition or None.")
if aut is not None and not isinstance(aut, automaton.Automaton):
raise TypeError("aut should be an instance of Automaton or None.")
if spec is not None and not isinstance(spec, (list, GRSpec)):
raise TypeError("spec should be an instance of list or GRSpec")
if pretty:
nl = "\n" # Newline
idt = " " # Indentation
else:
nl = ""
idt = ""
idt_level = 0
output = '<?xml version="1.0" encoding="UTF-8"?>'+nl
output += '<tulipcon xmlns="http://tulip-control.sourceforge.net/ns/0" version="0">'+nl
idt_level += 1
if sys_dyn is not None:
output += idt*idt_level+'<prob_type>'
if isinstance(prob, rhtlp.RHTLPProb): # Beware of order and inheritance
output += 'rhtlp'
elif isinstance(prob, rhtlp.SynthesisProb):
output += 'synth'
else: # prob is None
output += 'none'
output += '</prob_type>'+nl
output += idt*idt_level+'<c_dyn>'+nl
idt_level += 1
output += idt*idt_level+tagmatrix("A",sys_dyn.A)+nl
output += idt*idt_level+tagmatrix("B",sys_dyn.B)+nl
output += idt*idt_level+tagmatrix("E",sys_dyn.E)+nl
output += idt*idt_level+tagmatrix("K",sys_dyn.K)+nl
# Need facility for setting sample period; maybe as new attribute in CtsSysDyn class?
output += idt*idt_level+'<sample_period>1</sample_period>'+nl
output += idt*idt_level+tagpolytope("U_set", sys_dyn.Uset)+nl
output += idt*idt_level+tagpolytope("W_set", sys_dyn.Wset)+nl
idt_level -= 1
output += idt*idt_level+'</c_dyn>'+nl
if prob is not None:
output += tagdict("env_vars", prob.getEnvVars(), pretty=pretty, idt_level=idt_level)
output += tagdict("sys_vars", prob.getSysVars(), pretty=pretty, idt_level=idt_level)
else:
output += tagdict("env_vars", dict([(v,"boolean") for v in spec.env_vars]), pretty=pretty, idt_level=idt_level)
output += tagdict("sys_vars", dict([(v,"boolean") for v in spec.sys_vars]), pretty=pretty, idt_level=idt_level)
if spec is None:
output += idt*idt_level+'<spec><assume></assume><guarantee></guarantee></spec>'+nl
elif isinstance(spec, GRSpec):
# Copy out copies of GRSpec attributes
spec_dict = {"env_init": spec.env_init, "env_safety": spec.env_safety,
"env_prog": spec.env_prog,
"sys_init": spec.sys_init, "sys_safety": spec.sys_safety,
"sys_prog": spec.sys_prog}
# Map special XML characters to safe forms
for k in spec_dict.keys():
if isinstance(spec_dict[k], str):
spec_dict[k] = spec_dict[k].replace("<", "<")
spec_dict[k] = spec_dict[k].replace(">", ">")
spec_dict[k] = spec_dict[k].replace("&", "&")
else:
spec_dict[k] = [v.replace("<", "<") for v in spec_dict[k]]
spec_dict[k] = [v.replace(">", ">") for v in spec_dict[k]]
spec_dict[k] = [v.replace("&", "&") for v in spec_dict[k]]
# Finally, dump XML tags
output += idt*idt_level+'<spec>'+nl
for (k, v) in spec_dict.items():
if isinstance(v, str):
v = [v,]
output += idt*(idt_level+1)+ taglist(k, v) +nl
output += idt*idt_level+'</spec>'+nl
else:
for k in [0,1]:
spec[k] = spec[k].replace("<", "<")
spec[k] = spec[k].replace(">", ">")
spec[k] = spec[k].replace("&", "&")
output += idt*idt_level+'<spec><assume>'+spec[0]+'</assume>'+nl
output += idt*(idt_level+1)+'<guarantee>'+spec[1]+'</guarantee></spec>'+nl
if (disc_dynamics is None) and (prob is not None):
disc_dynamics = prob.getDiscretizedDynamics()
if disc_dynamics is not None:
output += idt*idt_level+'<d_dyn>'+nl
idt_level += 1
output += idt*idt_level+tagpolytope("domain", disc_dynamics.domain)+nl
output += idt*idt_level+tagmatrix("trans", disc_dynamics.trans)+nl
output += idt*idt_level+taglist("prop_symbols",
disc_dynamics.list_prop_symbol)+nl
output += idt*idt_level+'<regions>'+nl
idt_level += 1
if disc_dynamics.list_region is not None and len(disc_dynamics.list_region) > 0:
for R in disc_dynamics.list_region:
output += tagregion(R, pretty=pretty, idt_level=idt_level)
idt_level -= 1
output += idt*idt_level+'</regions>'+nl
if disc_dynamics.orig_list_region is not None:
output += idt*idt_level+taglist("orig_map", disc_dynamics.orig)+nl
output += idt*idt_level+'<orig_partition>'+nl
idt_level += 1
for P in disc_dynamics.orig_list_region:
output += idt*idt_level+tagpolytope("cell", P)+nl
idt_level -= 1
output += idt*idt_level+'</orig_partition>'+nl
idt_level -= 1
output += idt*idt_level+'</d_dyn>'+nl
if aut is None:
output += idt*idt_level+'<aut></aut>'+nl
else:
output += aut.dumpXML(pretty=pretty, idt_level=idt_level)+nl
output += idt_level*idt+'<extra></extra>'+nl
idt_level -= 1
assert idt_level == 0
output += '</tulipcon>'+nl
return output
def writeXMLfile(fname, prob=None, spec=['',''], sys_dyn=None, aut=None,
disc_dynamics=None,
synthesize_aut=False, verbose=0, pretty=False):
"""Write tulipcon XML string directly to a file.
Returns nothing. (This is a convenience method, mostly only
wrapping *dumpXML*.)
"""
with open(fname, "w") as f:
f.write(dumpXML(prob=prob, spec=spec, sys_dyn=sys_dyn, aut=aut,
disc_dynamics=disc_dynamics,
synthesize_aut=synthesize_aut, verbose=verbose,
pretty=pretty))
return
def loadXMLtrans(x, namespace=DEFAULT_NAMESPACE):
"""Read only the continuous transition system from a tulipcon XML string.
I.e., the continuous dynamics (A, B, etc.), and the
proposition-preserving partition, along with its reachability
data.
Return (sys_dyn, disc_dynamics, horizon).
Raise exception if critical error.
"""
if not isinstance(x, str) and not isinstance(x, ET._ElementInterface):
raise TypeError("tag to be parsed must be given as a string or ElementTree._ElementInterface.")
if isinstance(x, str):
elem = ET.fromstring(x)
else:
elem = x
if (namespace is None) or (len(namespace) == 0):
ns_prefix = ""
else:
ns_prefix = "{"+namespace+"}"
if elem.tag != ns_prefix+"tulipcon":
raise TypeError("root tag should be tulipcon.")
if ("version" not in elem.attrib.keys()):
raise ValueError("unversioned tulipcon XML string.")
if int(elem.attrib["version"]) != 0:
raise ValueError("unsupported tulipcon XML version: "+str(elem.attrib["version"]))
# Build CtsSysDyn, or set to None
c_dyn = elem.find(ns_prefix+"c_dyn")
if c_dyn is None:
sys_dyn = None
else:
(tag_name, A) = untagmatrix(c_dyn.find(ns_prefix+"A"))
(tag_name, B) = untagmatrix(c_dyn.find(ns_prefix+"B"))
(tag_name, E) = untagmatrix(c_dyn.find(ns_prefix+"E"))
(tag_name, Uset) = untagpolytope(c_dyn.find(ns_prefix+"U_set"))
(tag_name, Wset) = untagpolytope(c_dyn.find(ns_prefix+"W_set"))
sys_dyn = discretize.CtsSysDyn(A, B, E, [], Uset, Wset)
# Discrete dynamics, if available
d_dyn = elem.find(ns_prefix+"d_dyn")
if d_dyn is None:
horizon = None
disc_dynamics = None
else:
if not d_dyn.attrib.has_key("horizon"):
raise ValueError("missing horizon length used for reachability computation.")
horizon = int(d_dyn.attrib["horizon"])
if (d_dyn.find(ns_prefix+"domain") is None) \
and (d_dyn.find(ns_prefix+"trans") is None) \
and (d_dyn.find(ns_prefix+"prop_symbols") is None):
disc_dynamics = None
else:
(tag_name, domain) = untagpolytope(d_dyn.find(ns_prefix+"domain"))
(tag_name, trans) = untagmatrix(d_dyn.find(ns_prefix+"trans"), np_type=np.uint8)
(tag_name, prop_symbols) = untaglist(d_dyn.find(ns_prefix+"prop_symbols"), cast_f=str)
region_elem = d_dyn.find(ns_prefix+"regions")
list_region = []
if region_elem is not None:
region_items = d_dyn.find(ns_prefix+"regions").findall(ns_prefix+"item")
if region_items is not None and len(region_items) > 0:
for region_item in region_items:
(tag_name, R) = untagregion(region_item, cast_f_list=int,
np_type_P=np.float64)
list_region.append(R)
disc_dynamics = prop2part.PropPreservingPartition(domain=domain,
num_prop=len(prop_symbols),
list_region=list_region,
num_regions=len(list_region),
adj=0,
trans=trans,
list_prop_symbol=prop_symbols)
return (sys_dyn, disc_dynamics, horizon)
def dumpXMLtrans(sys_dyn, disc_dynamics, horizon, extra="",
pretty=False):
"""Return tulipcon XML containing only a continuous transition system.
The argument extra (as a string) is copied verbatim into the
<extra> element.
The "pretty" flag has the same meaning as elsewhere (e.g., see
docstring for dumpXML function).
"""
if not isinstance(sys_dyn, discretize.CtsSysDyn):
raise TypeError("sys_dyn must be an instance of discretizeM.CtsSysDyn")
if not isinstance(disc_dynamics, prop2part.PropPreservingPartition):
raise TypeError("disc_dynamics must be an instance of prop2part.PropPreservingPartition")
if pretty:
nl = "\n" # Newline
idt = " " # Indentation
else:
nl = ""
idt = ""
idt_level = 0
output = '<?xml version="1.0" encoding="UTF-8"?>'+nl
output += '<tulipcon xmlns="http://tulip-control.sourceforge.net/ns/0" version="0">'+nl
idt_level += 1
output += idt*idt_level+'<c_dyn>'+nl
idt_level += 1
output += idt*idt_level+tagmatrix("A",sys_dyn.A)+nl
output += idt*idt_level+tagmatrix("B",sys_dyn.B)+nl
output += idt*idt_level+tagmatrix("E",sys_dyn.E)+nl
# Need facility for setting sample period; maybe as new attribute in CtsSysDyn class?
output += idt*idt_level+'<sample_period>1</sample_period>'+nl
output += idt*idt_level+tagpolytope("U_set", sys_dyn.Uset)+nl
output += idt*idt_level+tagpolytope("W_set", sys_dyn.Wset)+nl
idt_level -= 1
output += idt*idt_level+'</c_dyn>'+nl
output += idt*idt_level+'<d_dyn horizon="'+str(horizon)+'">'+nl
idt_level += 1
output += idt*idt_level+tagpolytope("domain", disc_dynamics.domain)+nl
output += idt*idt_level+tagmatrix("trans", disc_dynamics.trans)+nl
output += idt*idt_level+taglist("prop_symbols",
disc_dynamics.list_prop_symbol)+nl
output += idt*idt_level+'<regions>'+nl
idt_level += 1
if disc_dynamics.list_region is not None and len(disc_dynamics.list_region) > 0:
for R in disc_dynamics.list_region:
output += tagregion(R, pretty=pretty, idt_level=idt_level)
idt_level -= 1
output += idt*idt_level+'</regions>'+nl
if disc_dynamics.orig_list_region is not None:
output += idt*idt_level+taglist("orig_map", disc_dynamics.orig)+nl
output += idt*idt_level+'<orig_partition>'+nl
idt_level += 1
for P in disc_dynamics.orig_list_region:
output += idt*idt_level+tagpolytope("cell", P)+nl
idt_level -= 1
output += idt*idt_level+'</orig_partition>'+nl
idt_level -= 1
output += idt*idt_level+'</d_dyn>'+nl
output += idt_level*idt+'<extra>'+extra+'</extra>'+nl
idt_level -= 1
assert idt_level == 0
output += '</tulipcon>'+nl
return output
def untaglist(x, cast_f=float,
namespace=DEFAULT_NAMESPACE):
"""Extract list from given tulipcon XML tag (string).
Use function cast_f for type-casting extracting element strings.
The default is float, but another common case is cast_f=int (for
"integer"). If cast_f is set to None, then items are left as
extracted, i.e. as strings.
The argument x can also be an instance of
xml.etree.ElementTree._ElementInterface ; this is mainly for
internal use, e.g. by the function untagpolytope and some
load/dumpXML methods elsewhere.
Return result as 2-tuple, containing name of the tag (as a string)
and the list obtained from it.
"""
if not isinstance(x, str) and not isinstance(x, ET._ElementInterface):
raise TypeError("tag to be parsed must be given as a string or ElementTree._ElementInterface.")
if isinstance(x, str):
elem = ET.fromstring(x)
else:
elem = x
if (namespace is None) or (len(namespace) == 0):
ns_prefix = ""
else:
ns_prefix = "{"+namespace+"}"
# Extract list
if cast_f is None:
cast_f = str
litems = elem.findall(ns_prefix+'litem')
if len(litems) > 0:
li = [cast_f(k.attrib['value']) for k in litems]
elif elem.text is None:
li = []
else:
li = [cast_f(k) for k in elem.text.split()]
return (elem.tag, li)
def untagdict(x, cast_f_keys=None, cast_f_values=None,
namespace=DEFAULT_NAMESPACE):
"""Extract list from given tulipcon XML tag (string).
Use functions cast_f_keys and cast_f_values for type-casting
extracting key and value strings, respectively, or None. The
default is None, which means the extracted keys (resp., values)
are left untouched (as strings), but another common case is
cast_f_values=int (for "integer") or cast_f_values=float (for
"floating-point numbers"), while leaving cast_f_keys=None to
indicate dictionary keys are strings.
The argument x can also be an instance of
xml.etree.ElementTree._ElementInterface ; this is mainly for
internal use, e.g. by the function untagpolytope and some
load/dumpXML methods elsewhere.
Return result as 2-tuple, containing name of the tag (as a string)
and the dictionary obtained from it.
"""
if not isinstance(x, str) and not isinstance(x, ET._ElementInterface):
raise TypeError("tag to be parsed must be given as a string or ElementTree._ElementInterface.")
if isinstance(x, str):
elem = ET.fromstring(x)
else:
elem = x
if (namespace is None) or (len(namespace) == 0):
ns_prefix = ""
else:
ns_prefix = "{"+namespace+"}"
# Extract dictionary
items_li = elem.findall(ns_prefix+'item')
if cast_f_keys is None:
cast_f_keys = str
if cast_f_values is None:
cast_f_values = str
di = dict()
for item in items_li:
# N.B., we will overwrite duplicate keys without warning!
di[cast_f_keys(item.attrib['key'])] = cast_f_values(item.attrib['value'])
return (elem.tag, di)
def untagmatrix(x, np_type=np.float64):
"""Extract matrix from given tulipcon XML tag (string).
np_type is the NumPy type into which to cast read string. The
default is the most common, 64-bit floating point.
The argument x can also be an instance of
xml.etree.ElementTree._ElementInterface ; this is mainly for
internal use, e.g. by the function untagpolytope and some
load/dumpXML methods elsewhere.
Return result as 2-tuple, containing name of the tag (as a string)
and the NumPy ndarray obtained from it.
"""
if not isinstance(x, str) and not isinstance(x, ET._ElementInterface):
raise TypeError("tag to be parsed must be given as a string or ElementTree._ElementInterface.")
if isinstance(x, str):
elem = ET.fromstring(x)
else:
elem = x # N.B., just passing the reference.
if elem.attrib['type'] != "matrix":
raise ValueError("tag should indicate type of ``matrix''.")
if elem.text is None:
x_mat = np.array([]) # Handle special empty case.
else:
num_rows = int(elem.attrib['r'])
num_cols = int(elem.attrib['c'])
x_mat = np.array([k for k in elem.text.split()], dtype=np_type)
x_mat = x_mat.reshape(num_rows, num_cols)
return (elem.tag, x_mat)
def untagpolytope(x, np_type=np.float64, namespace=DEFAULT_NAMESPACE):
"""Extract polytope from given tuilpcon XML tag (string).
Essentially calls untagmatrix and gathers results into Polytope
instance.
The argument x can also be an instance of
xml.etree.ElementTree._ElementInterface ; this is mainly for
internal use, e.g. by the function untagpolytope and some
load/dumpXML methods elsewhere.
Return result as 2-tuple, containing name of the tag (as a string)
and the Polytope (as defined in tulip.polytope_computations).
"""
if not isinstance(x, str) and not isinstance(x, ET._ElementInterface):
raise TypeError("tag to be parsed must be given as a string or ElementTree._ElementInterface.")
if isinstance(x, str):
elem = ET.fromstring(x)
else:
elem = x
if elem.attrib['type'] != "polytope":
raise ValueError("tag should indicate type of ``polytope''.")
if (namespace is None) or (len(namespace) == 0):
ns_prefix = ""
else:
ns_prefix = "{"+DEFAULT_NAMESPACE+"}"
h_tag = elem.find(ns_prefix+'H')
k_tag = elem.find(ns_prefix+'K')
(H_out_name, H_out) = untagmatrix(h_tag, np_type=np_type)
(K_out_name, K_out) = untagmatrix(k_tag, np_type=np_type)
return (elem.tag, pc.Polytope(H_out, K_out, normalize=False))
def untagregion(x, cast_f_list=str, np_type_P=np.float64,
namespace=DEFAULT_NAMESPACE):
"""Extract region from given tulipcon XML tag (string).
The argument x can also be an instance of
xml.etree.ElementTree._ElementInterface ; this is mainly for
internal use, e.g. by the function untagpolytope and some
load/dumpXML methods elsewhere.
Type-casting is handled as described by the untaglist and
untagpolytope functions, which are passed the references
cast_f_list and np_type_P, respectively.
Return the result as 2-tuple, containing name of the tag (as a string)
and the Region (as defined in tulip.polytope_computations).
"""
if not isinstance(x, str) and not isinstance(x, ET._ElementInterface):
raise TypeError("tag to be parsed must be given as a string or ElementTree._ElementInterface.")
if (namespace is None) or (len(namespace) == 0):
ns_prefix = ""
else:
ns_prefix = "{"+DEFAULT_NAMESPACE+"}"
if isinstance(x, str):
elem = ET.fromstring(x)
else:
elem = x
if elem.tag != ns_prefix+"region":
raise ValueError("tag must be an instance of ``region''.")
(tag_name, list_prop) = untaglist(elem.find(ns_prefix+"list_prop"), cast_f=cast_f_list)
if list_prop is None:
list_prop = []
poly_tags = elem.findall(ns_prefix+"reg_item")
list_poly = []
if poly_tags is not None and len(poly_tags) > 0:
for P_elem in poly_tags:
(tag_name, P) = untagpolytope(P_elem, np_type=np_type_P,
namespace=namespace)
list_poly.append(P)
return (elem.tag, pc.Region(list_poly=list_poly, list_prop=list_prop))
def tagdict(name, di, pretty=False, idt_level=0):
"""Create tag that basically stores a dictionary object.
If pretty is True, then use indentation and newlines to make the
resulting XML string more visually appealing. idt_level is the
base indentation level on which to create automaton string. This
level is only relevant if pretty=True.
N.B., all keys and values are treated as strings (and frequently
wrapped in str() to force this behavior).
Return the resulting string. On failure, raises an appropriate
exception, or returns False.
"""
if not isinstance(name, str):
raise TypeError("tag name must be a string.")
if pretty:
nl = "\n" # Newline
idt = " " # Indentation
else:
nl = ""
idt = ""
output = idt*idt_level+'<'+name+'>'+nl
for (k, v) in di.items():
output += idt*(idt_level+1)+'<item key="' + str(k) \
+ '" value="' + str(v) + '" />'+nl
output += idt*idt_level+'</'+name+'>'+nl
return output
def tagpolytope(name, P):
"""Create tag of type "Polytope", with given name.
Polytope is as defined in tulip.polytope_computations module.
Return the resulting string. On failure, raises an appropriate
exception, or returns False.
"""
if not isinstance(name, str):
raise TypeError("tag name must be a string.")
# Handle nil polytope case
if P is None or P == []:
P = pc.Polytope(np.array([]), np.array([]), normalize=False)
output = '<'+name+' type="polytope">'
output += tagmatrix("H", P.A)
output += tagmatrix("K", P.b)
output += '</'+name+'>'
return output
def taglist(name, li, no_litem=False):
"""Create tag that basically stores a list object.
N.B., all list elements are treated as strings (and wrapped in
str() to force this behavior).
By default, if an element in given list is of type string, then
each element is placed in its own <litem> tag. This allows
elements to be arbitrary (printable) strings in an XML file. To
disable this, invoke taglist with no_litem=True.
Return the resulting string. On failure, raises an appropriate
exception, or returns False.
"""
if not isinstance(name, str):
raise TypeError("tag name must be a string.")
output = '<'+name+'>'
if li is not None:
has_str = False
for k in li:
if isinstance(k, str):
has_str = True
break
if has_str and not no_litem:
for k in li:
output += '<litem value="'+str(k)+'" />'
else:
output += ' '.join([str(k) for k in li])
output += '</'+name+'>'
return output
def tagregion(R, pretty=False, idt_level=0):
"""Create tag of type "Region."
Region is as defined in tulip.polytope_computations module.
If pretty is True, then use indentation and newlines to make the
resulting XML string more visually appealing. idt_level is the
base indentation level on which to create automaton string. This
level is only relevant if pretty=True.
Return the resulting string. On failure, raises an appropriate
exception, or returns False.
"""
if pretty:
nl = "\n" # Newline
idt = " " # Indentation
else:
nl = ""
idt = ""
# Handle nil Region case
if R is None or R == []:
R = pc.Region(list_poly=[], list_prop=[])
output = idt*idt_level+'<region>'+nl
idt_level += 1
output += idt*idt_level+taglist("list_prop", R.list_prop)+nl
if R.list_poly is not None and len(R.list_poly) > 0:
for P in R.list_poly:
output += idt*idt_level+tagpolytope("reg_item", P)+nl
output += idt*(idt_level-1)+'</region>'+nl
return output
def tagmatrix(name, A):
"""Create tag of type "matrix", with given name.
Given matrix, A, should be an ndarray. If it is a vector, rather
than a matrix (i.e. if len(A.shape) = 1), then it is regarded as a
column vector, i.e., we set r="m" c="1", where A has m
elements.
<name type="matrix" r="n" c="m">...</name>
Return the resulting string. On failure, raises an appropriate
exception, or returns False.
"""
if not isinstance(name, str):
raise TypeError("tag name must be a string.")
# Handle empty matrix case.
if A is None or A == []:
A = np.array([])
if len(A.shape) == 1 and A.shape[0] == 0: # Empty matrix?
output = '<'+name+' type="matrix" r="0" c="0"></'+name+'>'
elif len(A.shape) == 1: # Column vector?
output = '<'+name+' type="matrix" r="'+str(A.shape[0]) \
+'" c="1">'
for i in range(A.shape[0]-1):
output += str(A[i]) + ' '
output += str(A[-1]) + '</'+name+'>'
else: # Otherwise, treat as matrix
output = '<'+name+' type="matrix" r="'+str(A.shape[0]) \
+'" c="'+str(A.shape[1])+'">'
for i in range(A.shape[0]):
for j in range(A.shape[1]):
if i == A.shape[0]-1 and j == A.shape[1]-1:
break # ...since last element is not followed by a comma.
output += str(A[i][j]) + ' '
output += str(A[-1][-1]) + '</'+name+'>'
return output
def yaml_polytope(x):
"""Given dictionary from YAML data file, return polytope."""
if x.has_key("V"):
tmp_V = np.loadtxt(StringIO(x["V"]))
return pc.qhull(tmp_V)
else:
tmp_H = np.loadtxt(StringIO(x["H"]))
tmp_K = np.loadtxt(StringIO(x["K"]))
if len(tmp_K.shape) == 1:
tmp_K = np.reshape(tmp_K, (tmp_K.shape[0], 1))
return pc.Polytope(tmp_H, tmp_K)
def readYAMLfile(fname, verbose=0):
"""Wrap loadYAML function."""
with open(fname, "r") as f:
x = f.read()
return loadYAML(x, verbose=verbose)
[docs]def loadYAML(x, verbose=0):
"""Read transition system specified using YAML in given string.
Return (sys_dyn, initial_partition, N), where:
- sys_dyn is the system dynamics (instance of CtsSysDyn),
- cont_prop is the continuous propositions
of the space (instance of PropPreservingPartition), and
- N is the horizon length (default is 10).
To easily read and process this string from a file, instead call
the method *readYAMLfile*.
Raise exception if critical error.
"""
if yaml is None:
raise ImportError("PyYAML package not found.\nTo read/write YAML, you will need to install PyYAML; see http://pyyaml.org/")
dumped_data = yaml.load(x)
# Sanity check
if (("A" not in dumped_data.keys())
or ("B" not in dumped_data.keys())
or ("U" not in dumped_data.keys())
or ("H" not in dumped_data["U"].keys())
or ("K" not in dumped_data["U"].keys())
or ("cont_prop" not in dumped_data.keys())
or ("X" not in dumped_data.keys())
or ("assumption" not in dumped_data.keys())
or ("guarantee" not in dumped_data.keys())):
raise ValueError("Missing required data.")
# Parse dynamics related strings
A = np.loadtxt(StringIO(dumped_data["A"]))
B = np.loadtxt(StringIO(dumped_data["B"]))
U = yaml_polytope(dumped_data["U"])
X = yaml_polytope(dumped_data["X"])
if dumped_data.has_key("E"):
E = np.loadtxt(StringIO(dumped_data["E"]))
else:
E = [] # No disturbances indicated by empty list.
if dumped_data.has_key("W"):
W = yaml_polytope(dumped_data["W"])
else:
W = [] # No disturbances
if dumped_data.has_key("horizon"):
N = dumped_data["horizon"]
else:
N = 10 # Default to 10, as in function discretize.discretize
env_vars = dict()
if dumped_data.has_key("env_vars"):
env_vars = dumped_data["env_vars"]
sys_disc_vars = dict()
if dumped_data.has_key("sys_disc_vars"):
sys_disc_vars = dumped_data["sys_disc_vars"]
# Parse initial partition
cont_prop = dict()
for (k, v) in dumped_data["cont_prop"].items():
cont_prop[k] = yaml_polytope(v)
# Spec
assumption = dumped_data["assumption"]
guarantee = dumped_data["guarantee"]
if verbose > 0:
print "A =\n", A
print "B =\n", B
print "E =\n", E
print "X =", X
print "U =", U
print "W =", W
print "horizon (N) =", N
for (k, v) in cont_prop.items():
print k+" =\n", v
# Build transition system
sys_dyn = discretize.CtsSysDyn(A, B, E, [], U, W)
initial_partition = prop2part.prop2part2(X, cont_prop)
return (sys_dyn, initial_partition, N, assumption, guarantee, env_vars, sys_disc_vars)