Skip to content

Commit

Permalink
RL doc string (#465)
Browse files Browse the repository at this point in the history
* First rough draft

* Minors

* Reformat

* Lint

* Resolve PR comments

* Rl type specific env getter (#466)

* 1. type-sensitive env variable getter; 2. updated READMEs for examples

* fixed bugs

* fixed bugs

* bug fixes

* lint

Co-authored-by: ysqyang <v-yangqi@microsoft.com>
Co-authored-by: yaqiu <v-yaqiu@microsoft.com>

* Example bug fix

* Optimize parser.py

* Resolve PR comments

* Rl config doc (#467)

* 1. type-sensitive env variable getter; 2. updated READMEs for examples

* added detailed doc

* lint

* wording refined

* resolved some PR comments

* resolved more PR comments

* typo fix

Co-authored-by: ysqyang <v-yangqi@microsoft.com>

Co-authored-by: ysqyang <ysqyang@gmail.com>
Co-authored-by: ysqyang <v-yangqi@microsoft.com>
Co-authored-by: yaqiu <v-yaqiu@microsoft.com>
  • Loading branch information
4 people authored Feb 24, 2022
1 parent f77d052 commit 101baeb
Show file tree
Hide file tree
Showing 54 changed files with 1,391 additions and 730 deletions.
15 changes: 8 additions & 7 deletions examples/rl/README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
# Reinforcement Learning (RL) Examples

This folder contains scenarios that employ reinforcement learning. MARO's RL toolkit makes it possible to use a common workflow on different scenarios, so long as the necessary scenario-related components are provided. The workflow consists of Python scripts for running the necessary components in single-threaded and distributed modes under ``workflows``. General scenario-independent settings can be found in ``config.yml``. The scenario can be chosen by setting the ``scenario`` field in this file.
This folder contains scenarios that employ reinforcement learning. MARO's RL toolkit provides scenario-agnostic workflows to run a variety of scenarios in single-thread, multi-process or distributed modes.

## How to Run

Scripts to run the common workflow in docker containers are in ``scripts/docker``. Start by choosing "single", "sync" or "async" for the ``mode`` field in ``config.yml`` to run a scenario in single-threaded, synchronous and asynchronous modes, respectively. Go to this folder and execute ``bash run.sh`` to launch the program and Docker Compose will take care of starting the necessary containers. Note that the script will build the docker image first if it has not already been built by running ``bash build.sh``. When the program is finished, be sure to run ``bash kill.sh`` to clean up the containers and remove the network.
The ``main.py`` script can be used to run the scenarios under ``examples/rl`` or any user-defined scenario that provides the necessary components (see the section below for details) . To choose a scenario, edit ``SCENARIO_PATH`` in the script to point to the desired scenario folder. You may also edit the rest of the config variables to your own preference. Note that this script runs in single-thread mode only.
To run a scenario in multi-process mode on a local machine, you will need to use the CLI tool (which requires MARO [installation from the source](https://github.com/microsoft/maro#install-maro-from-pypi)). Start by creating a configuration file (.yml) that follows the template ``maro/maro/rl/workflows/config/template.yml`` to specify the scenario-independent settings. Then use the command ``maro local run [-c] path/to/your/config`` to run in containerized (with ``-c``) or non-containerized (without ``-c``) environments.

## Create Your Own Scenarios

The workflow scripts make it easy to create your own scenarios by only supplying the necessary ingredients without worrying about putting them together. It is necessary to create an ``__init__.py`` under your scenario folder (so that it can be treated as a package) and expose all ingredients in it. The ingredients include:
You can create your own scenarios by supplying the necessary ingredients without worrying about putting them together in a workflow. It is necessary to create an ``__init__.py`` under your scenario folder (so that it can be treated as a package) and expose all ingredients in it. The ingredients include:
* Definitions of policies and agent-to-policy mappings. These definitions should be provided as a dictionary named ``policy_creator`` that maps a name to a function that takes the name and returns a policy instance with that name. The agent-to-policy mapping should be provided as a dictionary named ``agent2policy``.
* Definitions of training algorithms. These definitions should be provided as a dictionary named ``trainer_creator`` that maps a name to a function that takes the name and returns a trainer instance with that name.
* Definitions of state, action and reward shaping logic pertinent to your simulator and policies.
These definitions should be encapsulated in ``get_env_sampler``, which is a function that takes no parameters and returns an environment sampler;
* Definitions of policies and agent-to-policy mappings. These definitions should be provided as a dictionary named ``policy_func_index`` that maps the name of each policy to a function that creates a policy instance with that name (the policy name should be the function's only parameter). The agent-to-policy mapping should be provided as a dictionary named ``agent2policy``.

It is possible to have customized routines invoked at the end of a roll-out episode or episode segment. These routines usually involve processing or rendering information collected during roll-out. To do this, first implement the ``post_step`` method in your environment sampler class and populate the ``tracker`` member with whatever information you wish to track during roll-out. Then create two functions, ``post_collect`` and ``post_evaluate``, to process the information contained in each ``tracker`` and expose them in the scenario folder's ``__init__.py``. These functions are used as callbacks in the main learning loop and executed at the end of each training or evaluation episode. See ``cim/callbacks.py`` for a simple example of how to create these functions.
These definitions should be encapsulated in ``env_sampler_creator``, which is a function that takes ``policy_creator`` and returns an environment sampler;
It is possible to have customized routines invoked at the end of a roll-out episode or episode segment. These routines usually involve processing and / or rendering information collected during roll-out. To do this, first implement the ``post_step`` method in your environment sampler class to record whatever information you wish to keep track of during roll-out. Then create functions named ``post_collect`` and ``post_evaluate`` to process the information and expose them in the scenario folder's ``__init__.py``. These functions are used as callbacks in the main learning loop and executed at the end of each training or evaluation episode. See ``cim/callbacks.py`` for a simple example of how to create these functions.
7 changes: 4 additions & 3 deletions examples/rl/cim/README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Container Inventory Management

This example demonstrates the use of MARO's RL toolkit to optimize container inventory management. The scenario consists of a set of ports, each acting as a learning agent, and vessels that transfer empty containers among them. Each port must decide 1) whether to load or discharge containers when a vessel arrives and 2) how many containers to be loaded or discharged. The objective is to minimize the overall container shortage over a certain period of time. In this folder you can find:
* ``config.py``, which contains environment and policy configurations for the scenario;
* ``config.py``, which contains general configurations for the scenario;
* ``algorithms``, which contains configurations for the Actor-Critic, DQN and discrete-MADDPG algorithms, including network configurations;
* ``env_sampler.py``, which defines state, action and reward shaping in the ``CIMEnvSampler`` class;
* ``policies.py``, which defines the Q-net for DQN and the network components for Actor-Critic;
* ``policy_trainer.py``, which contains a registry for the policies and algorithms defined in ``algorithms``;
* ``callbacks.py``, which defines routines to be invoked at the end of training or evaluation episodes.

The scripts for running the learning workflows can be found under ``examples/rl/workflows``. See ``README`` under ``examples/rl`` for details about the general applicability of these scripts. We recommend that you follow this example to write your own scenarios.
See ``README.md`` under ``examples/rl`` for details about running the single-threaded learning workflow. We recommend that you follow this example to write your own scenarios.
4 changes: 2 additions & 2 deletions examples/rl/cim/algorithms/ac.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def get_gradients(self, loss: torch.Tensor) -> Dict[str, torch.Tensor]:
loss.backward()
return {name: param.grad for name, param in self.named_parameters()}

def apply_gradients(self, grad: dict) -> None:
def apply_gradients(self, grad: Dict[str, torch.Tensor]) -> None:
for name, param in self.named_parameters():
param.grad = grad[name]
self._optim.step()
Expand Down Expand Up @@ -90,7 +90,7 @@ def get_gradients(self, loss: torch.Tensor) -> Dict[str, torch.Tensor]:
loss.backward()
return {name: param.grad for name, param in self.named_parameters()}

def apply_gradients(self, grad: dict) -> None:
def apply_gradients(self, grad: Dict[str, torch.Tensor]) -> None:
for name, param in self.named_parameters():
param.grad = grad[name]
self._optim.step()
Expand Down
2 changes: 1 addition & 1 deletion examples/rl/cim/algorithms/dqn.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def get_gradients(self, loss: torch.Tensor) -> Dict[str, torch.Tensor]:
loss.backward()
return {name: param.grad for name, param in self.named_parameters()}

def apply_gradients(self, grad: dict) -> None:
def apply_gradients(self, grad: Dict[str, torch.Tensor]) -> None:
for name, param in self.named_parameters():
param.grad = grad[name]
self._optim.step()
Expand Down
4 changes: 2 additions & 2 deletions examples/rl/cim/algorithms/maddpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def get_gradients(self, loss: torch.Tensor) -> Dict[str, torch.Tensor]:
loss.backward()
return {name: param.grad for name, param in self.named_parameters()}

def apply_gradients(self, grad: dict) -> None:
def apply_gradients(self, grad: Dict[str, torch.Tensor]) -> None:
for name, param in self.named_parameters():
param.grad = grad[name]
self._optim.step()
Expand Down Expand Up @@ -92,7 +92,7 @@ def get_gradients(self, loss: torch.Tensor) -> Dict[str, torch.Tensor]:
loss.backward()
return {name: param.grad for name, param in self.named_parameters()}

def apply_gradients(self, grad: dict) -> None:
def apply_gradients(self, grad: Dict[str, torch.Tensor]) -> None:
for name, param in self.named_parameters():
param.grad = grad[name]
self._optim.step()
Expand Down
24 changes: 12 additions & 12 deletions examples/rl/cim/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,25 @@
# Licensed under the MIT license.


def post_collect(trackers: list, ep: int, segment: int) -> None:
def post_collect(info_list: list, ep: int, segment: int) -> None:
# print the env metric from each rollout worker
for tracker in trackers:
print(f"env summary (episode {ep}, segment {segment}): {tracker['env_metric']}")
for info in info_list:
print(f"env summary (episode {ep}, segment {segment}): {info['env_metric']}")

# print the average env metric
if len(trackers) > 1:
metric_keys, num_trackers = trackers[0]["env_metric"].keys(), len(trackers)
avg_metric = {key: sum(tr["env_metric"][key] for tr in trackers) / num_trackers for key in metric_keys}
if len(info_list) > 1:
metric_keys, num_envs = info_list[0]["env_metric"].keys(), len(info_list)
avg_metric = {key: sum(info["env_metric"][key] for info in info_list) / num_envs for key in metric_keys}
print(f"average env summary (episode {ep}, segment {segment}): {avg_metric}")


def post_evaluate(trackers: list, ep: int) -> None:
def post_evaluate(info_list: list, ep: int) -> None:
# print the env metric from each rollout worker
for tracker in trackers:
print(f"env summary (episode {ep}): {tracker['env_metric']}")
for info in info_list:
print(f"env summary (episode {ep}): {info['env_metric']}")

# print the average env metric
if len(trackers) > 1:
metric_keys, num_trackers = trackers[0]["env_metric"].keys(), len(trackers)
avg_metric = {key: sum(tr["env_metric"][key] for tr in trackers) / num_trackers for key in metric_keys}
if len(info_list) > 1:
metric_keys, num_envs = info_list[0]["env_metric"].keys(), len(info_list)
avg_metric = {key: sum(info["env_metric"][key] for info in info_list) / num_envs for key in metric_keys}
print(f"average env summary (episode {ep}): {avg_metric}")
5 changes: 3 additions & 2 deletions examples/rl/cim/env_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _get_reward(self, env_action_dict: Dict[Any, object], event: DecisionEvent,
return {agent_id: reward for agent_id, reward in zip(ports, rewards)}

def _post_step(self, cache_element: CacheElement, reward: Dict[Any, float]) -> None:
self._tracker["env_metric"] = self._env.metrics
self._info["env_metric"] = self._env.metrics


agent2policy = {agent: f"{algorithm}_{agent}.policy" for agent in Env(**env_conf).agent_idx_list}
Expand All @@ -85,5 +85,6 @@ def env_sampler_creator(policy_creator: Dict[str, Callable[[str], RLPolicy]]) ->
return CIMEnvSampler(
get_env=lambda: Env(**env_conf),
policy_creator=policy_creator,
agent2policy=agent2policy
agent2policy=agent2policy,
device="cpu",
)
4 changes: 2 additions & 2 deletions examples/rl/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
from maro.rl.workflows.scenario import Scenario
from maro.utils import Logger


# config variables
SCENARIO_PATH = "cim"
NUM_EPISODES = 50
NUM_STEPS = -1
NUM_STEPS = None
CHECKPOINT_PATH = os.path.join(os.getcwd(), "checkpoints")
CHECKPOINT_INTERVAL = 5
EVAL_SCHEDULE = [10, 20, 30, 40, 50]
Expand Down
9 changes: 5 additions & 4 deletions examples/rl/vm_scheduling/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

A virtual machine (VM) scheduler is a cloud computing service component responsible for providing compute resources to satisfy user demands. A good resource allocation policy should aim to optimize several metrics at the same time, such as user wait time, profit, energy consumption and physical machine (PM) overload. Many commercial cloud providers use rule-based policies. Alternatively, the policy can also be optimized using reinforcement learning (RL) techniques, which involves simulating with historical data. This example demonstrates how DQN and Actor-Critic algorithms can be applied to this scenario. In this folder, you can find:

* ``config.py``, which contains environment and policy configurations.
* ``config.py``, which contains general configurations for the scenario;
* ``algorithms``, which contains configurations for the Actor-Critic, DQN algorithms, including network configurations;
* ``env_sampler.py``, which defines state, action and reward shaping in the ``VMEnvSampler`` class;
* ``policies.py``, which defines the Q-net for DQN and the network components for Actor-Critic.
* ``callbacks.py``, which contains routines to be invoked at the end of a training or evaluation episode.
* ``policy_trainer.py``, which contains a registry for the policies and algorithms defined in ``algorithms``;
* ``callbacks.py``, which defines routines to be invoked at the end of training or evaluation episodes.

The scripts to run the learning workflows can be found under ``examples/rl/workflows``. See ``README`` under ``examples/rl`` for details about the general applicability of these scripts. We recommend that you follow this example to write your own scenarios.
See ``README.md`` under ``examples/rl`` for details about running the single-threaded learning workflow. We recommend that you follow this example to write your own scenarios.


# Some Comments About the Results
Expand Down
4 changes: 2 additions & 2 deletions examples/rl/vm_scheduling/algorithms/ac.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def get_gradients(self, loss: torch.Tensor) -> Dict[str, torch.Tensor]:
loss.backward()
return {name: param.grad for name, param in self.named_parameters()}

def apply_gradients(self, grad: dict) -> None:
def apply_gradients(self, grad: Dict[str, torch.Tensor]) -> None:
for name, param in self.named_parameters():
param.grad = grad[name]
self._optim.step()
Expand Down Expand Up @@ -97,7 +97,7 @@ def get_gradients(self, loss: torch.Tensor) -> Dict[str, torch.Tensor]:
loss.backward()
return {name: param.grad for name, param in self.named_parameters()}

def apply_gradients(self, grad: dict) -> None:
def apply_gradients(self, grad: Dict[str, torch.Tensor]) -> None:
for name, param in self.named_parameters():
param.grad = grad[name]
self._optim.step()
Expand Down
2 changes: 1 addition & 1 deletion examples/rl/vm_scheduling/algorithms/dqn.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def get_gradients(self, loss: torch.Tensor) -> Dict[str, torch.Tensor]:
loss.backward()
return {name: param.grad for name, param in self.named_parameters()}

def apply_gradients(self, grad: dict) -> None:
def apply_gradients(self, grad: Dict[str, torch.Tensor]) -> None:
for name, param in self.named_parameters():
param.grad = grad[name]
self._optim.step()
Expand Down
30 changes: 15 additions & 15 deletions examples/rl/vm_scheduling/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,32 @@
makedirs(plt_path, exist_ok=True)


def post_collect(trackers, ep, segment):
def post_collect(info_list, ep, segment):
# print the env metric from each rollout worker
for tracker in trackers:
print(f"env summary (episode {ep}, segment {segment}): {tracker['env_metric']}")
for info in info_list:
print(f"env summary (episode {ep}, segment {segment}): {info['env_metric']}")

# print the average env metric
if len(trackers) > 1:
metric_keys, num_trackers = trackers[0]["env_metric"].keys(), len(trackers)
avg_metric = {key: sum(tr["env_metric"][key] for tr in trackers) / num_trackers for key in metric_keys}
if len(info_list) > 1:
metric_keys, num_envs = info_list[0]["env_metric"].keys(), len(info_list)
avg_metric = {key: sum(tr["env_metric"][key] for tr in info_list) / num_envs for key in metric_keys}
print(f"average env metric (episode {ep}, segment {segment}): {avg_metric}")


def post_evaluate(trackers, ep):
def post_evaluate(info_list, ep):
# print the env metric from each rollout worker
for tracker in trackers:
print(f"env summary (evaluation episode {ep}): {tracker['env_metric']}")
for info in info_list:
print(f"env summary (evaluation episode {ep}): {info['env_metric']}")

# print the average env metric
if len(trackers) > 1:
metric_keys, num_trackers = trackers[0]["env_metric"].keys(), len(trackers)
avg_metric = {key: sum(tr["env_metric"][key] for tr in trackers) / num_trackers for key in metric_keys}
if len(info_list) > 1:
metric_keys, num_envs = info_list[0]["env_metric"].keys(), len(info_list)
avg_metric = {key: sum(tr["env_metric"][key] for tr in info_list) / num_envs for key in metric_keys}
print(f"average env metric (evaluation episode {ep}): {avg_metric}")

for tracker in trackers:
core_requirement = tracker["actions_by_core_requirement"]
action_sequence = tracker["action_sequence"]
for info in info_list:
core_requirement = info["actions_by_core_requirement"]
action_sequence = info["action_sequence"]
# plot action sequence
fig = plt.figure(figsize=(40, 32))
ax = fig.add_subplot(1, 1, 1)
Expand Down
Loading

0 comments on commit 101baeb

Please sign in to comment.