Skip to content

Commit

Permalink
Introduced Actions and Events as subclasses of Action. Then I should …
Browse files Browse the repository at this point in the history
…have done the most of the work to get the modeler formulate the problem and solve it through PDDL+ planner with the PDDLWriter.
  • Loading branch information
Enrico Scala authored and Enrico Scala committed Sep 17, 2024
1 parent 9f10d35 commit bf6a322
Show file tree
Hide file tree
Showing 6 changed files with 333 additions and 3 deletions.
59 changes: 57 additions & 2 deletions unified_planning/io/pddl_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,10 +586,13 @@ def _write_domain(self, out: IO[str]):
out.write(")\n")

for a in self.problem.actions:
if isinstance(a, up.model.InstantaneousAction):
if isinstance(a, up.model.InstantaneousAction) or isinstance(a, up.model.Event):
if any(p.simplify().is_false() for p in a.preconditions):
continue
out.write(f" (:action {self._get_mangled_name(a)}")
if isinstance(a,up.model.Event):
out.write(f" (:event {self._get_mangled_name(a)}")
else:
out.write(f" (:action {self._get_mangled_name(a)}")
out.write(f"\n :parameters (")
for ap in a.parameters:
if ap.type.is_user_type():
Expand Down Expand Up @@ -628,6 +631,40 @@ def _write_domain(self, out: IO[str]):
)
out.write(")")
out.write(")\n")
elif isinstance(a, up.model.Process):
if any(p.simplify().is_false() for p in a.preconditions):
continue
out.write(f" (:process {self._get_mangled_name(a)}")
out.write(f"\n :parameters (")
for ap in a.parameters:
if ap.type.is_user_type():
out.write(
f" {self._get_mangled_name(ap)} - {self._get_mangled_name(ap.type)}"
)
else:
raise UPTypeError("PDDL supports only user type parameters")
out.write(")")
if len(a.preconditions) > 0:
precond_str = []
for p in (c.simplify() for c in a.preconditions):
if not p.is_true():
if p.is_and():
precond_str.extend(map(converter.convert, p.args))
else:
precond_str.append(converter.convert(p))
out.write(f'\n :precondition (and {" ".join(precond_str)})')
elif len(a.preconditions) == 0 and self.empty_preconditions:
out.write(f"\n :precondition ()")
if len(a.effects) > 0:
out.write("\n :effect (and")
for e in a.effects:
_write_derivative(
e,
out,
converter,
)
out.write(")")
out.write(")\n")
elif isinstance(a, DurativeAction):
if any(
c.simplify().is_false() for cl in a.conditions.values() for c in cl
Expand Down Expand Up @@ -1184,3 +1221,21 @@ def _write_effect(
out.write(")")
if effect.is_forall():
out.write(")")

def _write_derivative(
effect: Effect,
out: IO[str],
converter: ConverterToPDDLString,
):
simplified_cond = effect.condition.simplify()
# check for non-constant-bool-assignment
simplified_value = effect.value.simplify()
fluent = converter.convert(effect.fluent)
if effect.is_increase():
out.write(f" (increase {fluent} (* #t {converter.convert(simplified_value)} ))")
elif effect.is_decrease():
out.write(f" (decrease {fluent} (* #t {converter.convert(simplified_value)} ))")
else:
raise UPProblemDefinitionError(
"Derivative can only be expressed as increase, decrease in processes",
)
4 changes: 4 additions & 0 deletions unified_planning/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
InstantaneousAction,
DurativeAction,
SensingAction,
Process,
Event,
)
from unified_planning.model.effect import Effect, SimulatedEffect, EffectKind
from unified_planning.model.expression import (
Expand Down Expand Up @@ -82,6 +84,8 @@
__all__ = [
"Action",
"InstantaneousAction",
"Process",
"Event",
"DurativeAction",
"Effect",
"SimulatedEffect",
Expand Down
233 changes: 233 additions & 0 deletions unified_planning/model/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,3 +764,236 @@ def __repr__(self) -> str:
s.append(" ]\n")
s.append(" }")
return "".join(s)

"""
Below we have natural transitions. These are not controlled by the agent and would probably need a proper subclass. Natural transitions can be of two kinds:
Processes or Events.
Processes dictate how numeric variables evolve over time through the use of time-derivative functions
Events dictate the analogous of urgent transitions in timed automata theory
"""

class Process(Action):
"""This is the `Process` class, which implements the abstract `Process` class."""

def __init__(
self,
_name: str,
_parameters: Optional["OrderedDict[str, up.model.types.Type]"] = None,
_env: Optional[Environment] = None,
**kwargs: "up.model.types.Type",
):
Action.__init__(self, _name, _parameters, _env, **kwargs)
self._preconditions: List["up.model.fnode.FNode"] = []
self._effects: List[up.model.effect.Effect] = []
self._simulated_effect: Optional[up.model.effect.SimulatedEffect] = None
# fluent assigned is the mapping of the fluent to it's value if it is an unconditional assignment
self._fluents_assigned: Dict[
"up.model.fnode.FNode", "up.model.fnode.FNode"
] = {}
# fluent_inc_dec is the set of the fluents that have an unconditional increase or decrease
self._fluents_inc_dec: Set["up.model.fnode.FNode"] = set()

def __repr__(self) -> str:
s = []
s.append(f"process {self.name}")
first = True
for p in self.parameters:
if first:
s.append("(")
first = False
else:
s.append(", ")
s.append(str(p))
if not first:
s.append(")")
s.append(" {\n")
s.append(" preconditions = [\n")
for c in self.preconditions:
s.append(f" {str(c)}\n")
s.append(" ]\n")
s.append(" effects = [\n")
for e in self.effects:
s.append(f" {str(e)}\n")
s.append(" ]\n")
s.append(" }")
return "".join(s)

def __eq__(self, oth: object) -> bool:
if isinstance(oth, Process):
cond = (
self._environment == oth._environment
and self._name == oth._name
and self._parameters == oth._parameters
)
return (
cond
and set(self._preconditions) == set(oth._preconditions)
and set(self._effects) == set(oth._effects)
and self._simulated_effect == oth._simulated_effect
)
else:
return False

def __hash__(self) -> int:
res = hash(self._name)
for ap in self._parameters.items():
res += hash(ap)
for p in self._preconditions:
res += hash(p)
for e in self._effects:
res += hash(e)
res += hash(self._simulated_effect)
return res

def clone(self):
new_params = OrderedDict(
(param_name, param.type) for param_name, param in self._parameters.items()
)
new_instantaneous_action = Process(
self._name, new_params, self._environment
)
new_instantaneous_action._preconditions = self._preconditions[:]
new_instantaneous_action._effects = [e.clone() for e in self._effects]
new_instantaneous_action._fluents_assigned = self._fluents_assigned.copy()
new_instantaneous_action._fluents_inc_dec = self._fluents_inc_dec.copy()
new_instantaneous_action._simulated_effect = self._simulated_effect
return new_instantaneous_action

@property
def preconditions(self) -> List["up.model.fnode.FNode"]:
"""Returns the `list` of the `Action` `preconditions`."""
return self._preconditions

def clear_preconditions(self):
"""Removes all the `Action preconditions`"""
self._preconditions = []

@property
def effects(self) -> List["up.model.effect.Effect"]:
"""Returns the `list` of the `Action effects`."""
return self._effects

def clear_effects(self):
"""Removes all the `Action's effects`."""
self._effects = []
self._fluents_assigned = {}
self._fluents_inc_dec = set()

def __str__(self) -> str:
"""Return a string representation of the `Process`."""
return f"Process(name={self._name}, parameters={self._parameters})"
def _add_effect_instance(self, effect: "up.model.effect.Effect"):
assert (
effect.environment == self._environment
), "effect does not have the same environment of the action"

self._effects.append(effect)

def add_precondition(
self,
precondition: Union[
"up.model.fnode.FNode",
"up.model.fluent.Fluent",
"up.model.parameter.Parameter",
bool,
],
):
"""
Adds the given expression to `action's preconditions`.
:param precondition: The expression that must be added to the `action's preconditions`.
"""
(precondition_exp,) = self._environment.expression_manager.auto_promote(
precondition
)
assert self._environment.type_checker.get_type(precondition_exp).is_bool_type()
if precondition_exp == self._environment.expression_manager.TRUE():
return
free_vars = self._environment.free_vars_oracle.get_free_variables(
precondition_exp
)
if len(free_vars) != 0:
raise UPUnboundedVariablesError(
f"The precondition {str(precondition_exp)} has unbounded variables:\n{str(free_vars)}"
)
if precondition_exp not in self._preconditions:
self._preconditions.append(precondition_exp)

def add_derivative(
self,
fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"],
value: "up.model.expression.Expression"
):
"""
Adds the given `derivative effect` to the `process's effects`.
:param fluent: The `fluent` objective of the time derivative definition.
:param value: The given `fluent` is incremented by the given `value`.
"""
(
fluent_exp,
value_exp,
condition_exp,
) = self._environment.expression_manager.auto_promote(
fluent,
value,
True,
)
if not fluent_exp.is_fluent_exp() and not fluent_exp.is_dot():
raise UPUsageError(
"fluent field of add_increase_effect must be a Fluent or a FluentExp or a Dot."
)
if not fluent_exp.type.is_compatible(value_exp.type):
raise UPTypeError(
f"Process effect has an incompatible value type. Fluent type: {fluent_exp.type} // Value type: {value_exp.type}"
)
if not fluent_exp.type.is_int_type() and not fluent_exp.type.is_real_type():
raise UPTypeError("Increase effects can be created only on numeric types!")
self._add_effect_instance(
up.model.effect.Effect(
fluent_exp,
value_exp,
condition_exp,
kind=up.model.effect.EffectKind.INCREASE,
forall = tuple(),
)
)
def __repr__(self) -> str:
action_str = InstantaneousAction.__repr__(self)
return action_str.replace("action","process")

class Event(InstantaneousAction):
"""This class represents an event."""

def __init__(
self,
_name: str,
_parameters: Optional["OrderedDict[str, up.model.types.Type]"] = None,
_env: Optional[Environment] = None,
**kwargs: "up.model.types.Type",
):
InstantaneousAction.__init__(self, _name, _parameters, _env, **kwargs)
self._observed_fluents: List["up.model.fnode.FNode"] = []

def __eq__(self, oth: object) -> bool:
if isinstance(oth, Event):
return super().__eq__(oth)
else:
return False

def __repr__(self) -> str:
action_str = InstantaneousAction.__repr__(self)
return action_str.replace("action","event")

def __hash__(self) -> int:
res = hash(self._name)
for ap in self._parameters.items():
res += hash(ap)
for p in self._preconditions:
res += hash(p)
for e in self._effects:
res += hash(e)
res += hash(self._simulated_effect)
return res


19 changes: 19 additions & 0 deletions unified_planning/model/mixins/actions_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,25 @@ def durative_actions(self) -> Iterator["up.model.action.DurativeAction"]:
for a in self._actions:
if isinstance(a, up.model.action.DurativeAction):
yield a

@property
def processes(self) -> Iterator["up.model.action.Process"]:
"""Returs all the sensing actions of the problem.
IMPORTANT NOTE: this property does some computation, so it should be called as
seldom as possible."""
for a in self._actions:
if isinstance(a, up.model.action.Process):
yield a
@property
def events(self) -> Iterator["up.model.action.Event"]:
"""Returs all the sensing actions of the problem.
IMPORTANT NOTE: this property does some computation, so it should be called as
seldom as possible."""
for a in self._actions:
if isinstance(a, up.model.action.Event):
yield a

@property
def conditional_actions(self) -> List["up.model.action.Action"]:
Expand Down
Loading

0 comments on commit bf6a322

Please sign in to comment.