# Copyright (c) 2012-2016 by California Institute of Technology
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the California Institute of Technology nor
# the names of its contributors may be used to endorse or promote
# products derived from this software without specific prior
# written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL CALTECH
# OR THE CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
"""Routines for working with gridworlds.
Note (24 June 2012): Several pieces of source code are taken or
derived from btsynth; see <http://scottman.net/2012/btsynth>
"""
import copy
import itertools as _itr
import random
import time
import typing as _ty
import numpy as np
try:
import matplotlib as _mpl
import matplotlib.animation
import matplotlib.cm as mpl_cm
import matplotlib.patches
import matplotlib.pyplot as plt
except ImportError:
_mpl = None
import tulip.abstract as _abs
import tulip.spec.form as _form
import tulip._utils as _utl
# inline:
#
# import polytope
# import tulip.abstract.prop2part
__all__ = [
'GridWorld',
'random_world',
'narrow_passage',
'unoccupied',
'add_trolls',
'extract_coord',
'animate_paths']
IJ = tuple[int, int]
XY = tuple[float, float]
if _ty.TYPE_CHECKING:
FuncAnimation = _mpl.animation.FuncAnimation
else:
FuncAnimation = _utl.get_type(
_mpl, 'animation.FuncAnimation')
class GridWorld:
"""4-connected grids with primitives.
Obstacles and goals are examples of
primitives.
"""
def __init__(
self,
gw_desc:
str |
None=None,
prefix:
str="Y"):
"""Load gridworld from `gw_desc`.
If `gw_desc is None`,
make empty instance.
@param gw_desc:
Gridworld description, or
None to create an empty instance.
@param prefix:
Prefix for naming
gridworld cell variables.
"""
if gw_desc is not None:
gw = self.loads(gw_desc)
self.W = gw.W
self.init_list = gw.init_list
self.goal_list = gw.goal_list
else:
self.W = None
self.init_list = list()
self.goal_list = list()
self.prefix = prefix
self.offset = (0, 0)
def __eq__(
self,
other:
'GridWorld'):
"""Test for equality.
Does not compare prefixes of cell variable names.
"""
if self.W is None and other.W is None:
return True
if self.W is None or other.W is None:
return False
# Only one of
# the two is undefined.
if self.size() != other.size():
return False
if np.all(self.W != other.W):
return False
if self.goal_list != other.goal_list:
return False
if self.init_list != other.init_list:
return False
return True
def __ne__(
self,
other:
'GridWorld'):
return not self.__eq__(other)
def __str__(self):
return self.pretty(show_grid=True)
def __getitem__(
self,
key:
IJ,
use_next:
bool=False,
nonbool:
bool=True):
"""Return variable name corresponding to this cell.
Supports negative wrapping, e.g., if Y is an instance of
GridWorld, then Y[-1,-1] will return the variable name of the
cell in the bottom-right corner, Y[0,-1] the name of the
top-right corner cell, etc. As usual in Python, you can only
wrap around once.
@param next:
Use the primed (i.e., state at next time step)
form of the variable.
@param nonbool:
If True, then use variables with integer domains.
"""
if self.W is None:
raise ValueError(
"Gridworld is empty; "
"no names available.")
if len(key) != len(self.W.shape):
raise ValueError(
"malformed gridworld key.")
if (
key[0] < -self.W.shape[0] or
key[1] < -self.W.shape[1] or
key[0] >= self.W.shape[0] or
key[1] >= self.W.shape[1]):
raise ValueError(
"gridworld key is out of bounds.")
if key[0] < 0:
key = (self.W.shape[0] + key[0], key[1])
if key[1] < 0:
key = (key[0], self.W.shape[1] + key[1])
if nonbool:
if use_next:
return (
f'((X ({self.prefix}'
f'_r = {key[0] + self.offset[0]}'
f')) && (X ({self.prefix}'
f'_c = {key[1] + self.offset[1]})))')
else:
return (
f'(({self.prefix}'
f'_r = {key[0] + self.offset[0]}'
f') && ({self.prefix}'
f'_c = {key[1] + self.offset[1]}))')
else:
if use_next:
out = 'X '
else:
out = ''
out += (
f'{self.prefix}_'
f'{key[0] + self.offset[0]}_'
f'{key[1] + self.offset[1]}')
return out
def __copy__(self):
return GridWorld(
self.dumps(),
prefix=self.prefix)
def copy(self) -> 'GridWorld':
"""Deep-copy GridWorld instance.
The copy has no dependence on the original.
"""
return self.__copy__()
def state(
self,
key:
IJ,
offset:
IJ
=(0, 0),
nonbool:
bool=True
) -> dict[str, int]:
"""Return dictionary form of state with keys of variable names.
Supports negative indices for key, e.g., as in __getitem__.
The offset argument is motivated by the use-case of multiple
agents whose moves are governed by separate "gridworlds" but
who interact in a shared space; with an offset, we can make
"sub-gridworlds" and enforce rules like mutual exclusion.
@param nonbool:
If True, then use variables with integer domains.
"""
if self.W is None:
raise ValueError(
"Gridworld is empty; "
"no cells exist.")
if len(key) != len(self.W.shape):
raise ValueError(
"malformed gridworld key.")
if (
key[0] < -self.W.shape[0] or
key[1] < -self.W.shape[1] or
key[0] >= self.W.shape[0] or
key[1] >= self.W.shape[1]):
raise ValueError(
"gridworld key is out of bounds.")
if key[0] < 0:
key = (self.W.shape[0] + key[0], key[1])
if key[1] < 0:
key = (key[0], self.W.shape[1] + key[1])
output = dict()
if nonbool:
output[f'{self.prefix}_r'] = key[0] + offset[0]
output[f'{self.prefix}_c'] = key[1] + offset[1]
else:
for i in range(self.W.shape[0]):
for j in range(self.W.shape[1]):
output[
f'{self.prefix}_'
f'{i + offset[0]}_'
f'{j + offset[1]}'] = 0
output[
f'{self.prefix}_'
f'{key[0] + offset[0]}_'
f'{key[1] + offset[1]}'] = 1
return output
def is_empty(
self,
coord:
IJ,
extend:
bool=False
) -> bool:
"""Is cell at coord empty?
@param coord:
(row, column) pair; supports negative indices.
@param extend:
If True, then do not wrap indices and treat any
cell outside the grid as being occupied.
"""
if self.W is None:
raise ValueError(
"Gridworld is empty; "
"no cells exist.")
if len(coord) != len(self.W.shape):
raise ValueError(
"malformed gridworld coord.")
if extend and (coord[0] < 0 or coord[1] < 0
or coord[0] > self.W.shape[0] - 1
or coord[1] > self.W.shape[1] - 1):
return False
if self.W[coord[0]][coord[1]] == 0:
return True
return False
def mark_occupied(
self,
coord:
IJ):
"""Mark cell at coord as statically (permanently) occupied."""
if self.W is None:
raise ValueError(
"Gridworld is empty; no cells exist.")
self.W[coord[0]][coord[1]] = 1
def mark_empty(
self,
coord:
IJ):
"""Mark cell at coord as empty."""
if self.W is None:
raise ValueError(
"Gridworld is empty; no cells exist.")
self.W[coord[0]][coord[1]] = 0
def is_reachable(
self,
start:
IJ,
stop:
IJ):
"""Decide whether there is a path from start cell to stop.
Assume the gridworld is 4-connected.
@param start:
(row, column) pair; supports negative indices.
@param stop:
same as start argument.
@return:
True if there is a path, False otherwise.
"""
# Check input values and
# handle negative coordinates
if self.W is None:
raise ValueError(
"Gridworld is empty; "
"no names available.")
if len(start) != len(self.W.shape):
raise ValueError(
"malformed gridworld start coordinate.")
if (
start[0] < -self.W.shape[0] or
start[1] < -self.W.shape[1] or
start[0] >= self.W.shape[0] or
start[1] >= self.W.shape[1]):
raise ValueError(
"gridworld start coordinate "
"is out of bounds.")
if start[0] < 0:
start = (self.W.shape[0] + start[0], start[1])
if start[1] < 0:
start = (start[0], self.W.shape[1] + start[1])
if len(stop) != len(self.W.shape):
raise ValueError(
"malformed gridworld stop coordinate.")
if (
stop[0] < -self.W.shape[0] or
stop[1] < -self.W.shape[1] or
stop[0] >= self.W.shape[0] or
stop[1] >= self.W.shape[1]):
raise ValueError(
"gridworld stop coordinate is out of bounds.")
if stop[0] < 0:
stop = (self.W.shape[0] + stop[0], stop[1])
if stop[1] < 0:
stop = (stop[0], self.W.shape[1] + stop[1])
# Quick sanity check
if not (self.is_empty(start) and self.is_empty(stop)):
return False
# Similar to depth-first search
OPEN = [start]
CLOSED = list()
while len(OPEN) > 0:
current = OPEN.pop()
if current == stop:
return True
for i, j in [(1, 0), (-1, 0), (0, 1), (0, -1)]:
if (
current[0] + i < 0 or
current[0] + i >= self.W.shape[0] or
current[1] + j < 0 or
current[1] + j >= self.W.shape[1]):
continue
if (
self.is_empty((current[0] + i, current[1] + j)) and
(current[0] + i, current[1] + j) not in CLOSED and
(current[0] + i, current[1] + j) not in OPEN):
OPEN.append((current[0] + i, current[1] + j))
CLOSED.append(current)
return False
def plot(
self,
font_pt:
int=18,
show_grid:
bool=False,
grid_width:
int=2,
troll_list:
list |
None=None,
axes=None
) -> None:
"""Draw figure depicting this gridworld.
Figure legend (symbolic form in parenthesis):
- "I" ('m+') : possible initial position;
- "G" ('r*') : goal;
- "E" ('gx') : goal of a troll; its extent is indicated by gray cells
@param font_pt:
size (in points) for rendering text in the
figure. If 0, then use symbols instead (see legend above).
@param troll_list:
...same as the argument with the same name
given to `add_trolls`.
@param axes:
Instance of matplotlib.axes._subplots.AxesSubplot
on which to draw, or None, in which case a new figure
is created.
"""
if troll_list is None:
troll_list = list()
if _mpl is None:
print(
'matplotlib not available, '
'so skipping GridWorld.plot()')
return
W = self.W.copy()
W = np.ones(shape=W.shape) - W
if axes is None:
fig = plt.figure()
ax = plt.subplot(111)
else:
ax = axes
ax.imshow(W, cmap=mpl_cm.gray,
aspect="equal", interpolation="nearest",
vmin=0., vmax=1.)
xmin, xmax, ymin, ymax = plt.axis()
x_steps = np.linspace(xmin, xmax, W.shape[1] + 1)
y_steps = np.linspace(ymin, ymax, W.shape[0] + 1)
if show_grid:
for k in x_steps:
ax.plot([k, k], [ymin, ymax], 'k-', linewidth=grid_width)
for k in y_steps:
ax.plot([xmin, xmax], [k, k], 'k-', linewidth=grid_width)
ax.axis([xmin, xmax, ymin, ymax])
for p in self.init_list:
if font_pt > 0:
ax.text(p[1], p[0], "I", size=font_pt)
else:
ax.plot(p[1], p[0], 'm+')
for p in self.goal_list:
if font_pt > 0:
ax.text(p[1], p[0], "G", size=font_pt)
else:
ax.plot(p[1], p[0], 'r*')
for center, radius in troll_list:
if font_pt > 0:
ax.text(center[1], center[0], "E", size=font_pt)
else:
ax.plot(center[1], center[0], 'gx')
if (
center[0] >= W.shape[0] or
center[0] < 0 or
center[1] >= W.shape[1] or
center[1] < 0):
raise ValueError("troll center is outside of gridworld")
t_offset = (max(0, center[0] - radius), max(0, center[1] - radius))
t_size = [center[0] - t_offset[0] + radius +
1, center[1] - t_offset[1] + radius + 1]
if t_offset[0] + t_size[0] >= W.shape[0]:
t_size[0] = W.shape[0] - t_offset[0]
if t_offset[1] + t_size[1] >= W.shape[1]:
t_size[1] = W.shape[1] - t_offset[1]
t_size = (t_size[0], t_size[1])
for i in range(t_size[0]):
for j in range(t_size[1]):
if self.W[i + t_offset[0]][j + t_offset[1]] == 0:
ax.add_patch(
matplotlib.patches.Rectangle(
(x_steps[
j + t_offset[1]],
y_steps[W.shape[0] - (i + t_offset[0])]),
1, 1, color=(.8, .8, .8)))
ax.axis([xmin, xmax, ymin, ymax])
def pretty(
self,
show_grid:
bool=False,
line_prefix:
str="",
path:
list |
None=None,
goal_order:
bool=False
) -> str:
"""Return pretty-for-printing string.
@param show_grid:
If True, then grid the pretty world and show
row and column labels along the outer edges.
@param line_prefix:
prefix each line with this string.
"""
if path is None:
path = list()
def compress(p):
return [
p[n]
for n in
range(len(p) - 1)
if p[n] != p[n + 1]]
# See comments in code for
# the method loads regarding values in W
if self.W is None:
return ""
# LEGEND:
# * - wall
# (as used in original world
# matrix definition);
# G - goal location;
# I - possible initial location.
out_str = line_prefix
def direct(c1, c2):
y1, x1 = c1
y2, x2 = c2
if x1 > x2:
return "<"
elif x1 < x2:
return ">"
elif y1 > y2:
return "^"
elif y1 < y2:
return "v"
else:
# c1 == c2
return "."
if show_grid:
out_str += " " + "".join(
str(k).rjust(2)
for k in
range(self.W.shape[1])) + "\n"
else:
out_str += "-" * (self.W.shape[1] + 2) + "\n"
# if path:
# path = compress(path)
for i in range(self.W.shape[0]):
out_str += line_prefix
if show_grid:
out_str += " " + "-" * (self.W.shape[1] * 2 + 1) + "\n"
out_str += line_prefix
out_str += str(i).rjust(2)
else:
out_str += "|"
for j in range(self.W.shape[1]):
if show_grid:
out_str += "|"
if self.W[i][j] == 0:
if (i, j) in self.init_list:
out_str += "I"
elif (i, j) in self.goal_list:
if goal_order:
out_str += str(self.goal_list.index((i, j)))
else:
out_str += "G"
elif (i, j) in path:
indices = (n for n, c in enumerate(
path) if c == (i, j))
for x in indices:
d = direct((i, j), path[(x + 1) % len(path)])
if d != ".":
break
out_str += d
else:
out_str += " "
elif self.W[i][j] == 1:
out_str += "*"
else:
raise ValueError(
"Unrecognized internal world W encoding.")
out_str += "|\n"
out_str += line_prefix
if show_grid:
out_str += " " + "-" * (self.W.shape[1] * 2 + 1) + "\n"
else:
out_str += "-" * (self.W.shape[1] + 2) + "\n"
return out_str
def size(self) -> tuple[int, int]:
"""Return size of gridworld as a tuple in row-major order."""
if self.W is None:
return (0, 0)
return self.W.shape
[docs]
@classmethod
def loads(
cls,
gw_desc:
str
) -> 'GridWorld':
"""Reincarnate using given gridworld description string.
@param gw_desc:
String containing a gridworld description.
In a gridworld description, any line beginning with # is
ignored (regarded as a comment). The first non-blank and
non-comment line must give the grid size as two positive
integers separated by whitespace, with the first being the
number of rows and the second the number of columns.
Each line after the size line is used to construct a row of
the gridworld. These are read in order with maximum number of
lines being the number of rows in the gridworld. A row
definition is whitespace-sensitive up to the number of columns
(any characters beyond the column count are ignored, so in
particular trailing whitespace is allowed) and can include the
following symbols:
- ' ' : an empty cell,
- '*' : a statically occupied cell,
- 'I' : possible initial cell,
- 'G' : goal cell (must be visited infinitely often).
If the end of file is reached before all rows have been
constructed, then the remaining rows are assumed to be empty.
After all rows have been constructed, the remainder of the
file is ignored.
"""
###################################################
# Internal format notes:
#
# W is a matrix of integers with
# the same shape as the
# gridworld. Each element has
# value indicating properties of
# the corresponding cell,
# according the following key.
#
# 0 - empty,
# 1 - statically (permanently) occupied.
###################################################
W = None
init_list = list()
goal_list = list()
row_index = -1
for line in gw_desc.splitlines():
if row_index != -1:
# Size has been read,
# so we are processing
# row definitions
if row_index >= W.shape[0]:
break
for j in range(min(len(line), W.shape[1])):
if line[j] == " ":
W[row_index][j] = 0
elif line[j] == "*":
W[row_index][j] = 1
elif line[j] == "I":
init_list.append((row_index, j))
elif line[j] == "G":
goal_list.append((row_index, j))
else:
raise ValueError(
'unrecognized row symbol "'
f'{line[j]}".')
row_index += 1
else:
# Still looking for
# gridworld size in
# the given string
if len(line.strip()) == 0 or line.lstrip()[0] == "#":
continue
# Ignore blank and
# comment lines
line_el = line.split()
W = np.zeros((int(line_el[0]), int(line_el[1])),
dtype=np.int32)
row_index = 0
if W is None:
raise ValueError(
"malformed gridworld description.")
# Arrived here without errors,
# so actually reincarnate
gw = GridWorld()
gw.W = W
gw.init_list = init_list
gw.goal_list = goal_list
return gw
@classmethod
def load(
cls,
gw_file:
str
) -> 'GridWorld':
"""Read description from given file.
Merely a convenience wrapper for the `loads` method.
"""
with open(gw_file, "r") as f:
text = f.read()
return cls.loads(text)
def dumps(
self,
line_prefix:
str=""
) -> str:
"""Dump gridworld description string.
@param line_prefix:
prefix each line with this string.
"""
if self.W is None:
raise ValueError(
"Gridworld does not exist.")
out_str = line_prefix + " ".join(map(
str, self.W.shape)) + "\n"
for i in range(self.W.shape[0]):
out_str += line_prefix
for j in range(self.W.shape[1]):
if self.W[i][j] == 0:
if (i, j) in self.init_list:
out_str += "I"
elif (i, j) in self.goal_list:
out_str += "G"
else:
out_str += " "
elif self.W[i][j] == 1:
out_str += "*"
else:
raise ValueError(
"Unrecognized internal world W encoding.")
out_str += "\n"
return out_str
def dump_subworld(
self,
size:
IJ,
offset:
IJ=(0, 0),
prefix:
str="Y",
extend:
bool=False
) -> 'GridWorld':
"""Generate new GridWorld instance from part of current one.
Does not perform automatic truncation (to make desired
subworld fit); instead a ValueError exception is raised.
However, the "extend" argument can be used to achieve
something similar.
Possible initial positions and goals are not included in the
returned GridWorld instance.
@param size:
(height, width)
@param prefix:
String to be used as prefix for naming
subgridworld cell variables.
@param extend:
If True, then any size and offset is permitted,
where any positions outside the actual gridworld are
assumed to be occupied.
"""
if self.W is None:
raise ValueError("Gridworld does not exist.")
if len(size) != len(self.W.shape) or len(offset) != len(self.W.shape):
raise ValueError("malformed size or offset.")
if not extend:
if (
offset[0] < 0 or
offset[0] >= self.W.shape[0] or
offset[1] < 0 or
offset[1] >= self.W.shape[1]):
raise ValueError("offset is out of bounds.")
if (
size[0] < 1 or
size[1] < 1 or
offset[0] + size[0] > self.W.shape[0] or
offset[1] + size[1] > self.W.shape[1]):
raise ValueError(
"unworkable subworld size, given offset.")
sub = GridWorld(prefix=prefix)
sub.W = self.W[offset[0]:(
offset[0] + size[0]), offset[1]:(offset[1] + size[1])].copy()
else:
sub = GridWorld(prefix=prefix)
sub.W = np.ones(size)
self_offset = (max(offset[0], 0), max(offset[1], 0))
self_offset = (min(self_offset[0], self.W.shape[
0] - 1), min(self_offset[1], self.W.shape[1] - 1))
sub_offset = (max(-offset[0], 0), max(-offset[1], 0))
sub_offset = (min(sub_offset[0], sub.W.shape[
0] - 1), min(sub_offset[1], sub.W.shape[1] - 1))
actual_size = (
min(size[0],
self.W.shape[0] - self_offset[0],
sub.W.shape[0] - sub_offset[0]),
min(size[1],
self.W.shape[1] - self_offset[1],
sub.W.shape[1] - sub_offset[1]))
sub.W[sub_offset[0]:(sub_offset[0] + actual_size[0]),
sub_offset[1]:(sub_offset[1] + actual_size[1])] = self.W[
self_offset[0]:(self_offset[0] + actual_size[0]),
self_offset[1]:(self_offset[1] + actual_size[1])]
return sub
def dump_ppartition(
self,
side_lengths:
XY=(1.0, 1.0),
offset:
XY=(0.0, 0.0),
nonbool:
bool=True
) -> _abs.PropPreservingPartition:
"""Return proposition-preserving partition from this gridworld.
Adjacency of cells is as returned by prop2partition.prop2part().
@param side_lengths:
pair (W, H) giving width and height of
each cell, assumed to be the same across the grid.
@param offset:
2-dimensional coordinate declaring where the
bottom-left corner of the gridworld should be placed
in the continuous space; default places it at the origin.
"""
try:
from polytope import Polytope
except ImportError as error:
raise ImportError(
'GridWorld.dump_ppartition() requires '
'the Python package polytope.'
) from error
try:
from tulip.abstract import prop2partition
except ImportError as error:
raise ImportError(
'`GridWorld.dump_ppartition()` requires '
'the module `tulip.abstract`.'
) from error
if self.W is None:
raise ValueError(
"Gridworld does not exist.")
a = np.array([
[0, -1],
[0, 1],
[-1, 0],
[1, 0]],
dtype=np.float64)
b = np.array([
- offset[1],
offset[1] +
self.W.shape[0] * side_lengths[1],
- offset[0],
offset[0] +
self.W.shape[1] * side_lengths[0]],
dtype=np.float64)
domain = Polytope(A=a, b=b)
cells = dict()
for i in range(self.W.shape[0]):
for j in range(self.W.shape[1]):
if nonbool:
cell_var = self.__getitem__((i, j))
else:
cell_var = f'{self.prefix}_{i}_{j}'
# adjacency[i]
cells[cell_var] = Polytope(
A=np.array(
[[0, -1],
[0, 1],
[-1, 0],
[1, 0]], dtype=np.float64),
b=np.array(
[-offset[1] -
(self.W.shape[0] - i - 1) * side_lengths[1],
offset[1] +
(self.W.shape[0] - i) * side_lengths[1],
-offset[0] - j * side_lengths[0],
offset[0] + (j + 1) * side_lengths[0]],
dtype=np.float64))
return prop2partition.prop2part(domain, cells)
def spec(
self,
offset:
IJ=(0, 0),
controlled_dyn:
bool=True,
nonbool:
bool=True
) -> _form.GRSpec:
"""Return GRSpec instance describing this gridworld.
The offset argument is motivated by the use-case of multiple
agents whose moves are governed by separate "gridworlds" but
who interact in a shared space; with an offset, we can make
"sub-gridworlds" and enforce rules like mutual exclusion.
If nonbool is False, then variables are named according to
prefix_R_C, where prefix is given (attribute of this GridWorld
object), R is the row, and C is the column of the cell
(0-indexed). If nonbool is True (default), cells are
identified with subformulae of the form:
((prefix_r = R) & (prefix_c = C))
`GridWorld.__getitem__` and `extract_coord` provide
reference implementations.
@param offset:
index offset to apply when generating the
specification; e.g., given prefix of "Y",
offset=(2,1) would cause the variable for the cell at
(0,3) to be named Y_2_4.
@param controlled_dyn:
whether to treat this gridworld as
describing controlled ("system") or uncontrolled
("environment") variables.
@param nonbool:
If True, then use variables with integer domains.
"""
if self.W is None:
raise ValueError(
"Gridworld does not exist.")
row_low = 0
row_high = self.W.shape[0]
col_low = 0
col_high = self.W.shape[1]
spec_trans = list()
orig_offset = copy.copy(self.offset)
if nonbool:
self.offset = (0, 0)
else:
self.offset = offset
# Safety, transitions
for i in range(row_low, row_high):
for j in range(col_low, col_high):
if self.W[i][j] == 1:
continue
# Cannot start from
# an occupied cell.
spec_trans.append(self.__getitem__(
(i, j), nonbool=nonbool) + " -> (")
# Normal transitions:
spec_trans[-1] += self.__getitem__(
(i, j),
use_next=True,
nonbool=nonbool)
if i > row_low and self.W[i - 1][j] == 0:
spec_trans[-1] += " || " + self.__getitem__(
(i - 1, j),
use_next=True,
nonbool=nonbool)
if j > col_low and self.W[i][j - 1] == 0:
spec_trans[-1] += " || " + self.__getitem__(
(i, j - 1),
use_next=True,
nonbool=nonbool)
if i < row_high - 1 and self.W[i + 1][j] == 0:
spec_trans[-1] += " || " + self.__getitem__(
(i + 1, j),
use_next=True,
nonbool=nonbool)
if j < col_high - 1 and self.W[i][j + 1] == 0:
spec_trans[-1] += " || " + self.__getitem__(
(i, j + 1),
use_next=True,
nonbool=nonbool)
spec_trans[-1] += ")"
# Safety, static
for i in range(row_low, row_high):
for j in range(col_low, col_high):
if self.W[i][j] == 1:
spec_trans.append(
"!(" + self.__getitem__((i, j), use_next=True,
nonbool=nonbool) + ")")
# Safety, mutex;
# only needed when using
# boolean variables for cells
if not nonbool:
pos_indices = [k for k in _itr.product(
range(row_low, row_high), range(col_low, col_high))]
disj = list()
for outer_ind in pos_indices:
conj = list()
if (
outer_ind != (-1, -1) and
self.W[outer_ind[0]][outer_ind[1]] == 1):
continue
if outer_ind == (-1, -1):
conj.append(f"{self.prefix}_n_n'")
else:
conj.append(
self.__getitem__(
(outer_ind[0], outer_ind[1]),
use_next=True, nonbool=nonbool))
for inner_ind in pos_indices:
if (
(
inner_ind != (-1, -1) and
self.W[inner_ind[0]][inner_ind[1]] == 1) or
outer_ind == inner_ind):
continue
if inner_ind == (-1, -1):
conj.append(f"(!X {self.prefix}_n_n)")
else:
conj.append(
"(!" +
self.__getitem__(
(inner_ind[0], inner_ind[1]),
use_next=True,
nonbool=nonbool) + ")")
disj.append("(" + " && ".join(conj) + ")")
spec_trans.append("\n|| ".join(disj))
if nonbool:
sys_vars = {
f"{self.prefix}_r": (0, self.W.shape[0] - 1),
f"{self.prefix}_c": (0, self.W.shape[1] - 1)}
else:
sys_vars = set()
for i in range(row_low, row_high):
for j in range(col_low, col_high):
sys_vars.add(self.__getitem__((i, j), nonbool=nonbool))
if nonbool:
initspec = [self.__getitem__(loc, nonbool=nonbool)
for loc in self.init_list]
else:
initspec = list()
for loc in self.init_list:
mutex = [self.__getitem__((loc[0], loc[1]), nonbool=nonbool)]
mutex.extend([f"!{ovar}" for ovar in sys_vars if ovar !=
self.__getitem__(loc, nonbool=nonbool)])
initspec.append("(" + " && ".join(mutex) + ")")
init_str = " || ".join(initspec)
spec_goal = list()
for loc in self.goal_list:
spec_goal.append(self.__getitem__(loc, nonbool=nonbool))
self.offset = orig_offset
if controlled_dyn:
return _form.GRSpec(
sys_vars=sys_vars,
sys_init=init_str,
sys_safety=spec_trans,
sys_prog=spec_goal)
else:
return _form.GRSpec(
env_vars=sys_vars,
env_init=init_str,
env_safety=spec_trans,
env_prog=spec_goal)
def scale(
self,
xf:
int=1,
yf:
int=1
) -> 'GridWorld':
"""Return a new gridworld equivalent to this but scaled by integer
factor (xf, yf). In the new world, obstacles are increased in size but
initials and goals change their position only. If this world is of size
(h, w) then the returned world will be of size (h*yf, w*xf).
@param xf:
scaling factor for columns
@param yf:
scaling factor for rows
"""
shape_scaled = (self.W.shape[0] * yf, self.W.shape[1] * xf)
scaleW = np.zeros(shape_scaled, dtype=np.int32)
scale_goal = list()
scale_init = list()
for row in range(shape_scaled[0]):
for col in range(shape_scaled[1]):
y, x = (row // yf, col // xf)
yr, xr = (row % yf, col % xf)
if self.W[y, x] == 1:
scaleW[row, col] = 1
if (yr, xr) == (0, 0):
if (y, x) in self.goal_list:
scale_goal.append((row, col))
if (y, x) in self.init_list:
scale_init.append((row, col))
scale_gw = GridWorld(prefix=self.prefix)
scale_gw.W = scaleW
scale_gw.goal_list = scale_goal
scale_gw.init_list = scale_init
return scale_gw
def random_world(
size,
wall_density:
float=.2,
num_init:
int=1,
num_goals:
int=2,
prefix:
str="Y",
ensure_feasible:
bool=False,
timeout:
int |
None=None,
num_trolls:
int=0
) -> _ty.Union[
'GridWorld',
tuple[
'GridWorld',
list],
None]:
"""Generate random gridworld of given size.
While an instance of GridWorld is returned, other views of the
result are possible; e.g., to obtain a description string, use
`GridWorld.dumps`.
@param size:
a pair, indicating number of rows and columns.
@param wall_density:
the ratio of walls to total number of cells.
@param num_init:
number of possible initial positions.
@param num_goals:
number of positions to be visited infinitely often.
@param prefix:
string to be used as prefix for naming gridworld
cell variables.
@param num_trolls:
number of random trolls to generate, each
occupies an area of radius 1. If nonzero, then a list
specifying the trolls will also be returned.
@param ensure_feasible:
guarantee that all goals and initial
positions are mutually reachable, assuming a 4-connected
grid. This method may not be complete, i.e., may fail to
return a feasible random gridworld with the given
parameters. Note that "feasibility" does not account for
nondeterminism (in particular, nonzero num_trolls
argument has no effect.)
@param timeout:
if ensure_feasible, then quit if no correct random
world is found before timeout seconds. If timeout is
None (default), then do not impose time constraints.
@return:
is `None` if timeout occurs.
"""
if ensure_feasible and timeout is not None:
st = time.time()
num_cells = size[0] * size[1]
goal_list = list()
init_list = list()
troll_list = list()
W = np.zeros(num_cells, dtype=np.int32)
num_blocks = int(np.round(wall_density * num_cells))
for i in range(num_goals):
avail_inds = np.array(range(num_cells))[W == 0]
avail_inds = list(_itr.filterfalse(
goal_list.__contains__,
avail_inds))
goal_list.append(
avail_inds[np.random.randint(low=0, high=len(avail_inds))])
for i in range(num_init):
avail_inds = np.array(range(num_cells))[W == 0]
def filter_(index):
return (
index not in goal_list and
index not in init_list)
avail_inds = list(filter(
filter_, avail_inds))
init_list.append(
avail_inds[np.random.randint(low=0, high=len(avail_inds))])
for i in range(num_trolls):
avail_inds = np.array(range(num_cells))[W == 0]
avail_inds = [
k for k in avail_inds
if (
k not in goal_list and
k not in init_list and
k not in troll_list)]
troll_list.append(
avail_inds[np.random.randint(low=0, high=len(avail_inds))])
bcounter = 0
while bcounter < num_blocks:
# Add blocks (or "wall cells")
avail_inds = np.array(range(num_cells))[W == 0]
avail_inds = [
k for k in avail_inds
if (
k not in goal_list and
k not in init_list and
k not in troll_list)]
changed_index = np.random.randint(low=0, high=len(avail_inds))
W[avail_inds[changed_index]] = 1
bcounter += 1
if ensure_feasible:
if timeout is not None and time.time() - st > timeout:
return None
# If feasibility must be guaranteed,
# then check whether
# the newly unreachable cell
# is permissible.
W_tmp = W.reshape(size)
goal_list_tmp = [(k // size[1], k % size[1]) for k in goal_list]
init_list_tmp = [(k // size[1], k % size[1]) for k in init_list]
troll_list_tmp = [(k // size[1], k % size[1]) for k in troll_list]
world = GridWorld(prefix=prefix)
world.W = W_tmp
chain_of_points = init_list_tmp[:]
chain_of_points.extend(goal_list_tmp)
is_feasible = True
for i in range(len(chain_of_points)):
if not world.is_reachable(
chain_of_points[i],
chain_of_points[(i + 1) % len(chain_of_points)]):
is_feasible = False
break
if not is_feasible:
W[avail_inds[changed_index]] = 0
bcounter -= 1
# Reshape the gridworld to final form;
# build and return the result.
W = W.reshape(size)
goal_list = [(k // size[1], k % size[1]) for k in goal_list]
init_list = [(k // size[1], k % size[1]) for k in init_list]
troll_list = [((k // size[1], k % size[1]), 1) for k in troll_list]
world = GridWorld(prefix=prefix)
world.W = W
world.goal_list = goal_list
world.init_list = init_list
if num_trolls > 0:
return world, troll_list
return world
def narrow_passage(
size,
passage_width:
int=1,
num_init:
int=1,
num_goals:
int=2,
passage_length:
float=0.4,
ptop:
int |
None=None,
prefix:
str="Y"
) -> 'GridWorld':
"""Generate a narrow-passage world: this is a world containing
two zones (initial, final) with a tube connecting them.
@param size:
a pair, indicating number of rows and columns.
@param passage_width:
the width of the connecting passage in cells.
@param passage_length:
the length of the passage as a proportion of the
width of the world.
@param num_init:
number of possible initial positions.
@param num_goals:
number of positions to be visited infinitely often.
@param ptop:
row number of top of passage, default (None) is random
@param prefix:
string to be used as prefix for naming gridworld
cell variables.
"""
w, h = size
if w < 3 or h < 3:
raise ValueError(
"Gridworld too small: "
"minimum dimension 3")
Z = unoccupied(size, prefix)
# Zone width is 30% of
# world width by default
zone_width = ((1.0 - passage_length) / 2.0) * size[1]
izone = int(max(1, zone_width))
# boundary of left zone
gzone = size[1] - int(max(1, zone_width))
# boundary of right zone
too_many_init_or_goals = (
izone * size[0] < num_init or
gzone * size[0] < num_goals)
if too_many_init_or_goals:
raise ValueError(
"Too many initials/goals for grid size")
if ptop is None:
ptop = np.random.randint(0, size[0] - passage_width)
passage = range(ptop, ptop + passage_width)
for y in range(0, size[0]):
if y in passage:
continue
for x in range(izone, gzone):
Z.W[y][x] = 1
avail_cells = [
(y, x)
for y in range(size[0])
for x in range(izone)]
Z.init_list = random.sample(avail_cells, num_init)
avail_cells = [
(y, x)
for y in range(size[0])
for x in range(gzone, size[1])]
Z.goal_list = random.sample(avail_cells, num_goals)
return Z
def unoccupied(
size:
IJ,
prefix:
str="Y"
) -> GridWorld:
"""Generate entirely unoccupied gridworld of given size.
@param size:
number of rows and columns.
@param prefix:
prefix for naming gridworld
cell variables.
"""
if len(size) < 2:
raise TypeError("invalid gridworld size.")
return GridWorld(
f'{size[0]} {size[1]}',
prefix="Y")
def add_trolls(
Y:
GridWorld,
troll_list:
list,
prefix:
str="X",
start_anywhere:
bool=False,
nonbool:
bool=True,
get_moves_lists:
bool=True
) -> (
_form.GRSpec |
tuple[_form.GRSpec, list]):
"""Create GR(1) specification with troll-like obstacles.
Trolls are introduced into the specification with names derived
from the given prefix and a number (matching the order in
troll_list). Note that mutual exclusion is only between the
controlled "Y gridworld" position and each troll, but not
between trolls.
@param Y:
The controlled gridworld, describing in particular
static obstacles that must be respected by the trolls.
@param troll_list:
List of pairs of center position, to which the
troll must always eventually return, and radius defining
the extent of the trollspace. The radius is measured
using infinity-norm.
@param start_anywhere:
If True, then initial troll position can be
anywhere in its trollspace. Else (default), the troll is
assumed to begin each game at its center position.
@param nonbool:
If True, then use variables with integer domains.
@param get_moves_lists:
Consult returned value description below.
@return:
If get_moves_lists is True, then returns (spec, moves_N)
where spec is the specification incorporating all of the
trolls, and moves_N is a list of lists of states (where
"state" is given as a dictionary with keys of variable
names), where the length of moves_N is equal to the
number of trolls, and each element of moves_N is a list
of possible states of that the corresponding troll
(dynamic obstacle). If get_moves_lists is False, then
moves_N is not returned and not computed.
"""
X = list()
X_ID = -1
if get_moves_lists:
moves_N = list()
num_rows, num_cols = Y.size()
for center, radius in troll_list:
if (
center[0] >= num_rows or
center[0] < 0 or
center[1] >= num_cols or
center[1] < 0):
raise ValueError(
"troll center is outside of gridworld")
t_offset = (max(0, center[0] - radius), max(0, center[1] - radius))
t_size = [center[0] - t_offset[0] + radius +
1, center[1] - t_offset[1] + radius + 1]
if t_offset[0] + t_size[0] >= num_rows:
t_size[0] = num_rows - t_offset[0]
if t_offset[1] + t_size[1] >= num_cols:
t_size[1] = num_cols - t_offset[1]
t_size = (t_size[0], t_size[1])
X_ID += 1
X.append((t_offset, Y.dump_subworld(
t_size, offset=t_offset, prefix=f'{prefix}_{X_ID}')))
X[-1][1].goal_list = [(center[0] - t_offset[0],
center[1] - t_offset[1])]
if start_anywhere:
X[-1][1].init_list = list()
for i in range(X[-1][1].size()[0]):
for j in range(X[-1][1].size()[1]):
if X[-1][1].is_empty((i, j)):
X[-1][1].init_list.append((i, j))
else:
X[-1][1].init_list = [(center[0] - t_offset[0],
center[1] - t_offset[1])]
if get_moves_lists:
moves_N.append([])
for i in range(t_size[0]):
for j in range(t_size[1]):
moves_N[-1].append(
X[-1][1].state((i, j),
offset=t_offset, nonbool=nonbool))
spec = Y.spec(controlled_dyn=True, nonbool=nonbool)
for Xi in X:
spec |= Xi[1].spec(offset=(-Xi[0][0], -Xi[0][1]),
controlled_dyn=False, nonbool=nonbool)
# Mutual exclusion
for i in range(Y.size()[0]):
for j in range(Y.size()[1]):
for Xi in X:
if (
i >= Xi[0][0] and
i < Xi[0][0] + Xi[1].size()[0] and
j >= Xi[0][1] and
j < Xi[0][1] + Xi[1].size()[1]):
if nonbool:
Xivar = (
f'((X {Xi[1].prefix}'
f'_r = {i - Xi[0][0]}'
f') & (X {Xi[1].prefix}_c = '
f'{j - Xi[0][1]}))')
else:
Xivar = (
f'X {Xi[1].prefix}'
f'_{i}_{j}')
spec.sys_safety.append(
"!(" + Y.__getitem__(
(i, j), nonbool=nonbool, use_next=True) +
f" && {Xivar})")
if get_moves_lists:
return (spec, moves_N)
return spec
def extract_coord(
var_name:
str
) -> (
None |
tuple[
str,
int,
int]):
"""Assuming prefix_R_C format, return `(prefix, row, column)`.
The "nowhere" coordinate has form prefix_n_n. To indicate this,
(-1, -1) is returned as the row, column position.
If error, return None or throw exception.
"""
if not isinstance(var_name, str):
raise TypeError(
"extract_coord: invalid argument type; "
"must be string.")
name_frags = var_name.split("_")
if len(name_frags) < 3:
return None
try:
if name_frags[-1] == "n" and name_frags[-2] == "n":
# Special "nowhere" case
prefix = "_".join(name_frags[:-2])
return (prefix, -1, -1)
col = int(name_frags[-1])
row = int(name_frags[-2])
except ValueError:
return None
prefix = "_".join(name_frags[:-2])
return (prefix, row, col)
def animate_paths(
Z:
GridWorld,
paths:
list[list],
jitter:
float=0.0,
save_prefix:
str |
None=None
) -> (
None |
FuncAnimation):
"""Animate a list of paths simultaneously in world Z.
The animation is with `matplotlib`.
@param Z:
Gridworld for which paths were generated.
@param paths:
List of paths to animate (one per robot). Each path
is a list of pairs of gridworld cells, i.e., of the form
[(r0, c0), (r1, c1), ...], where the first position is
row r0 and column c0, etc.
@param jitter:
Random jitter added to each coordinate value in
animation. Makes the robot's path more visible by
avoiding overlap.
@param save_prefix:
If not None, do not show an animation but
produce a series of images "<save_prefix>nnn.png" which
can be compiled into an animated GIF.
"""
if _mpl is None:
print('matplotlib not available, '
'so skipping gridworld.animate_paths()')
return
last = len(paths[0]) - 1
numberic_str_width = len(str(last))
colors = 'rgbcmyk'
fig = plt.figure()
ax = fig.add_subplot(111)
def init():
Z.plot(
font_pt=min(288 // Z.W.shape[1], 48),
show_grid=True,
axes=ax)
def update_line(num, dlist, lines):
for (p, t), d in zip(lines, dlist):
t.set_data(d[..., :num + 1])
p.set_data(d[..., num])
if save_prefix:
num_str = str(num).zfill(
numberic_str_width)
fig.savefig(
f'{save_prefix}{num_str}.png')
return lines,
data = list()
lines = list()
for n, path in enumerate(paths):
arr = np.array([[x, y] for y, x in path]).transpose()
arr = np.add(arr, jitter * (np.random.rand(*arr.shape) - 0.5))
data.append(arr)
l, = ax.plot(
[], [], 'o',
color=colors[n],
markersize=10.0,
zorder=2)
l_trail, = ax.plot(
[], [], '-',
color=colors[n],
zorder=1)
lines.append((l, l_trail))
if not save_prefix:
anim = FuncAnimation(
fig, update_line, len(paths[0]),
init_func=init,
fargs=(data, lines),
interval=500)
return anim
zeros = '0' * numberic_str_width
print(
f'Writing {save_prefix}{zeros}.png - '
f'{save_prefix}{last}.png')
for n in range(len(paths[0])):
update_line(n, data, lines)
return None