使用 Vector Env 和 Domain Randomization 训练 A2C#

注意事项

如果您遇到类似于以下注释中在 multiprocessing/spawn.py 上引发的 RuntimeError,请将代码从 gym.vector.make=gym.vector.AsyncVectorEnv 包装到代码结尾的 if __name__ == '__main__': 中。

“试图在当前进程完成其引导阶段之前启动一个新进程。”


引言#

在本教程中,您将学习如何使用向量化环境来训练优势策略-评论家(Advantage Actor-Critic)智能体。 我们将使用A2C,它是A3C算法的同步版本[1]。

向量化环境[3]可以通过允许在多个CPU上并行运行相同环境的多个实例来帮助实现更快、更鲁棒的训练。这可以显著减少方差,从而加速训练。

我们将从头开始实现一个优势策略-评论家,以了解如何将批量状态输入到网络中,以获得动作向量(每个环境一个动作),并计算演员和评论家的过渡小批量损失。 每个小批量包含一个采样阶段的过渡:在n_envs个环境中并行执行n_steps_per_update步(将两者相乘得到一个小批量中的过渡次数)。每个采样阶段之后,计算损失并执行一次梯度步骤。 为了计算优势,我们将使用广义优势估计(GAE)方法[2],它在优势估计的方差和偏差之间平衡了权衡。

A2C智能体类用输入状态的特征数量、智能体可以采取的动作数量、学习率以及并行收集经验的环境和初始化。定义了演员和评论家网络及其各自的优化器。网络的前向传递接收一批状态向量,并返回状态值张量和动作logits张量。select_action方法返回所选动作的元组、这些动作的对数概率以及每个动作的状态值。 此外,它还返回策略分布的熵,稍后从损失中减去(带有权重因子ent_coef),以鼓励探索。

get_losses函数计算演员和评论家网络的损失(使用GAE),然后使用update_parameters函数进行更新。


# Author: Till Zemann
# License: MIT License

from __future__ import annotations

import os

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
from torch import optim
from tqdm import tqdm

import gymnasium as gym

Advantage Actor-Critic (A2C)#

The Actor-Critic combines elements of value-based and policy-based methods. In A2C, the agent has two separate neural networks: a critic network that estimates the state-value function, and an actor network that outputs logits for a categorical probability distribution over all actions. The critic network is trained to minimize the mean squared error between the predicted state values and the actual returns received by the agent (this is equivalent to minimizing the squared advantages, because the advantage of an action is as the difference between the return and the state-value: A(s,a) = Q(s,a) - V(s). The actor network is trained to maximize the expected return by selecting actions that have high expected values according to the critic network.

The focus of this tutorial will not be on the details of A2C itself. Instead, the tutorial will focus on how to use vectorized environments and domain randomization to accelerate the training process for A2C (and other reinforcement learning algorithms).


class A2C(nn.Module):
    """
    (Synchronous) Advantage Actor-Critic agent class

    Args:
        n_features: The number of features of the input state.
        n_actions: The number of actions the agent can take.
        device: The device to run the computations on (running on a GPU might be quicker for larger Neural Nets,
                for this code CPU is totally fine).
        critic_lr: The learning rate for the critic network (should usually be larger than the actor_lr).
        actor_lr: The learning rate for the actor network.
        n_envs: The number of environments that run in parallel (on multiple CPUs) to collect experiences.
    """

    def __init__(
        self,
        n_features: int,
        n_actions: int,
        device: torch.device,
        critic_lr: float,
        actor_lr: float,
        n_envs: int,
    ) -> None:
        """Initializes the actor and critic networks and their respective optimizers."""
        super().__init__()
        self.device = device
        self.n_envs = n_envs

        critic_layers = [
            nn.Linear(n_features, 32),
            nn.ReLU(),
            nn.Linear(32, 32),
            nn.ReLU(),
            nn.Linear(32, 1),  # estimate V(s)
        ]

        actor_layers = [
            nn.Linear(n_features, 32),
            nn.ReLU(),
            nn.Linear(32, 32),
            nn.ReLU(),
            nn.Linear(
                32, n_actions
            ),  # estimate action logits (will be fed into a softmax later)
        ]

        # define actor and critic networks
        self.critic = nn.Sequential(*critic_layers).to(self.device)
        self.actor = nn.Sequential(*actor_layers).to(self.device)

        # define optimizers for actor and critic
        self.critic_optim = optim.RMSprop(self.critic.parameters(), lr=critic_lr)
        self.actor_optim = optim.RMSprop(self.actor.parameters(), lr=actor_lr)

    def forward(self, x: np.ndarray) -> tuple[torch.Tensor, torch.Tensor]:
        """
        Forward pass of the networks.

        Args:
            x: A batched vector of states.

        Returns:
            state_values: A tensor with the state values, with shape [n_envs,].
            action_logits_vec: A tensor with the action logits, with shape [n_envs, n_actions].
        """
        x = torch.Tensor(x).to(self.device)
        state_values = self.critic(x)  # shape: [n_envs,]
        action_logits_vec = self.actor(x)  # shape: [n_envs, n_actions]
        return (state_values, action_logits_vec)

    def select_action(
        self, x: np.ndarray
    ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
        """
        Returns a tuple of the chosen actions and the log-probs of those actions.

        Args:
            x: A batched vector of states.

        Returns:
            actions: A tensor with the actions, with shape [n_steps_per_update, n_envs].
            action_log_probs: A tensor with the log-probs of the actions, with shape [n_steps_per_update, n_envs].
            state_values: A tensor with the state values, with shape [n_steps_per_update, n_envs].
        """
        state_values, action_logits = self.forward(x)
        action_pd = torch.distributions.Categorical(
            logits=action_logits
        )  # implicitly uses softmax
        actions = action_pd.sample()
        action_log_probs = action_pd.log_prob(actions)
        entropy = action_pd.entropy()
        return (actions, action_log_probs, state_values, entropy)

    def get_losses(
        self,
        rewards: torch.Tensor,
        action_log_probs: torch.Tensor,
        value_preds: torch.Tensor,
        entropy: torch.Tensor,
        masks: torch.Tensor,
        gamma: float,
        lam: float,
        ent_coef: float,
        device: torch.device,
    ) -> tuple[torch.Tensor, torch.Tensor]:
        """
        Computes the loss of a minibatch (transitions collected in one sampling phase) for actor and critic
        using Generalized Advantage Estimation (GAE) to compute the advantages (https://arxiv.org/abs/1506.02438).

        Args:
            rewards: A tensor with the rewards for each time step in the episode, with shape [n_steps_per_update, n_envs].
            action_log_probs: A tensor with the log-probs of the actions taken at each time step in the episode, with shape [n_steps_per_update, n_envs].
            value_preds: A tensor with the state value predictions for each time step in the episode, with shape [n_steps_per_update, n_envs].
            masks: A tensor with the masks for each time step in the episode, with shape [n_steps_per_update, n_envs].
            gamma: The discount factor.
            lam: The GAE hyperparameter. (lam=1 corresponds to Monte-Carlo sampling with high variance and no bias,
                                          and lam=0 corresponds to normal TD-Learning that has a low variance but is biased
                                          because the estimates are generated by a Neural Net).
            device: The device to run the computations on (e.g. CPU or GPU).

        Returns:
            critic_loss: The critic loss for the minibatch.
            actor_loss: The actor loss for the minibatch.
        """
        T = len(rewards)
        advantages = torch.zeros(T, self.n_envs, device=device)

        # compute the advantages using GAE
        gae = 0.0
        for t in reversed(range(T - 1)):
            td_error = (
                rewards[t] + gamma * masks[t] * value_preds[t + 1] - value_preds[t]
            )
            gae = td_error + gamma * lam * masks[t] * gae
            advantages[t] = gae

        # calculate the loss of the minibatch for actor and critic
        critic_loss = advantages.pow(2).mean()

        # give a bonus for higher entropy to encourage exploration
        actor_loss = (
            -(advantages.detach() * action_log_probs).mean() - ent_coef * entropy.mean()
        )
        return (critic_loss, actor_loss)

    def update_parameters(
        self, critic_loss: torch.Tensor, actor_loss: torch.Tensor
    ) -> None:
        """
        Updates the parameters of the actor and critic networks.

        Args:
            critic_loss: The critic loss.
            actor_loss: The actor loss.
        """
        self.critic_optim.zero_grad()
        critic_loss.backward()
        self.critic_optim.step()

        self.actor_optim.zero_grad()
        actor_loss.backward()
        self.actor_optim.step()

Using Vectorized Environments#

When you calculate the losses for the two Neural Networks over only one epoch, it might have a high variance. With vectorized environments, we can play with n_envs in parallel and thus get up to a linear speedup (meaning that in theory, we collect samples n_envs times quicker) that we can use to calculate the loss for the current policy and critic network. When we are using more samples to calculate the loss, it will have a lower variance and theirfore leads to quicker learning.

A2C is a synchronous method, meaning that the parameter updates to Networks take place deterministically (after each sampling phase), but we can still make use of asynchronous vector envs to spawn multiple processes for parallel environment execution.

The simplest way to create vector environments is by calling gym.vector.make, which creates multiple instances of the same environment:

envs = gym.vector.make("LunarLander-v3", num_envs=3, max_episode_steps=600)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[3], line 1
----> 1 envs = gym.vector.make("LunarLander-v3", num_envs=3, max_episode_steps=600)

AttributeError: module 'gymnasium.vector' has no attribute 'make'

Domain Randomization#

If we want to randomize the environment for training to get more robust agents (that can deal with different parameterizations of an environment and theirfore might have a higher degree of generalization), we can set the desired parameters manually or use a pseudo-random number generator to generate them.

Manually setting up 3 parallel ‘LunarLander-v3’ envs with different parameters:

envs = gym.vector.AsyncVectorEnv(
    [
        lambda: gym.make(
            "LunarLander-v3",
            gravity=-10.0,
            enable_wind=True,
            wind_power=15.0,
            turbulence_power=1.5,
            max_episode_steps=600,
        ),
        lambda: gym.make(
            "LunarLander-v3",
            gravity=-9.8,
            enable_wind=True,
            wind_power=10.0,
            turbulence_power=1.3,
            max_episode_steps=600,
        ),
        lambda: gym.make(
            "LunarLander-v3", gravity=-7.0, enable_wind=False, max_episode_steps=600
        ),
    ]
)

Randomly generating the parameters for 3 parallel ‘LunarLander-v3’ envs, using np.clip to stay in the recommended parameter space:

envs = gym.vector.AsyncVectorEnv(
    [
        lambda: gym.make(
            "LunarLander-v3",
            gravity=np.clip(
                np.random.normal(loc=-10.0, scale=1.0), a_min=-11.99, a_max=-0.01
            ),
            enable_wind=np.random.choice([True, False]),
            wind_power=np.clip(
                np.random.normal(loc=15.0, scale=1.0), a_min=0.01, a_max=19.99
            ),
            turbulence_power=np.clip(
                np.random.normal(loc=1.5, scale=0.5), a_min=0.01, a_max=1.99
            ),
            max_episode_steps=600,
        )
        for i in range(3)
    ]
)

Here we are using normal distributions with the standard parameterization of the environment as the mean and an arbitrary standard deviation (scale). Depending on the problem, you can experiment with higher variance and use different distributions as well.

If you are training on the same n_envs environments for the entire training time, and n_envs is a relatively low number (in proportion to how complex the environment is), you might still get some overfitting to the specific parameterizations that you picked. To mitigate this, you can either pick a high number of randomly parameterized environments or remake your environments every couple of sampling phases to generate a new set of pseudo-random parameters.

Setup#

# environment hyperparams
n_envs = 10
n_updates = 1000
n_steps_per_update = 128
randomize_domain = False

# agent hyperparams
gamma = 0.999
lam = 0.95  # hyperparameter for GAE
ent_coef = 0.01  # coefficient for the entropy bonus (to encourage exploration)
actor_lr = 0.001
critic_lr = 0.005

# Note: the actor has a slower learning rate so that the value targets become
# more stationary and are theirfore easier to estimate for the critic

# environment setup
if randomize_domain:
    envs = gym.vector.AsyncVectorEnv(
        [
            lambda: gym.make(
                "LunarLander-v3",
                gravity=np.clip(
                    np.random.normal(loc=-10.0, scale=1.0), a_min=-11.99, a_max=-0.01
                ),
                enable_wind=np.random.choice([True, False]),
                wind_power=np.clip(
                    np.random.normal(loc=15.0, scale=1.0), a_min=0.01, a_max=19.99
                ),
                turbulence_power=np.clip(
                    np.random.normal(loc=1.5, scale=0.5), a_min=0.01, a_max=1.99
                ),
                max_episode_steps=600,
            )
            for i in range(n_envs)
        ]
    )

else:
    envs = gym.vector.make("LunarLander-v3", num_envs=n_envs, max_episode_steps=600)


obs_shape = envs.single_observation_space.shape[0]
action_shape = envs.single_action_space.n

# set the device
use_cuda = False
if use_cuda:
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
else:
    device = torch.device("cpu")

# init the agent
agent = A2C(obs_shape, action_shape, device, critic_lr, actor_lr, n_envs)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[6], line 40
     19     envs = gym.vector.AsyncVectorEnv(
     20         [
     21             lambda: gym.make(
   (...)
     36         ]
     37     )
     39 else:
---> 40     envs = gym.vector.make("LunarLander-v3", num_envs=n_envs, max_episode_steps=600)
     43 obs_shape = envs.single_observation_space.shape[0]
     44 action_shape = envs.single_action_space.n

AttributeError: module 'gymnasium.vector' has no attribute 'make'

Training the A2C Agent#

For our training loop, we are using the RecordEpisodeStatistics wrapper to record the episode lengths and returns and we are also saving the losses and entropies to plot them after the agent finished training.

You may notice that we don’t reset the vectorized envs at the start of each episode like we would usually do. This is because each environment resets automatically once the episode finishes (each environment takes a different number of timesteps to finish an episode because of the random seeds). As a result, we are also not collecting data in episodes, but rather just play a certain number of steps (n_steps_per_update) in each environment (as an example, this could mean that we play 20 timesteps to finish an episode and then use the rest of the timesteps to begin a new one).

# create a wrapper environment to save episode returns and episode lengths
envs_wrapper = gym.wrappers.RecordEpisodeStatistics(envs, deque_size=n_envs * n_updates)

critic_losses = []
actor_losses = []
entropies = []

# use tqdm to get a progress bar for training
for sample_phase in tqdm(range(n_updates)):
    # we don't have to reset the envs, they just continue playing
    # until the episode is over and then reset automatically

    # reset lists that collect experiences of an episode (sample phase)
    ep_value_preds = torch.zeros(n_steps_per_update, n_envs, device=device)
    ep_rewards = torch.zeros(n_steps_per_update, n_envs, device=device)
    ep_action_log_probs = torch.zeros(n_steps_per_update, n_envs, device=device)
    masks = torch.zeros(n_steps_per_update, n_envs, device=device)

    # at the start of training reset all envs to get an initial state
    if sample_phase == 0:
        states, info = envs_wrapper.reset(seed=42)

    # play n steps in our parallel environments to collect data
    for step in range(n_steps_per_update):
        # select an action A_{t} using S_{t} as input for the agent
        actions, action_log_probs, state_value_preds, entropy = agent.select_action(
            states
        )

        # perform the action A_{t} in the environment to get S_{t+1} and R_{t+1}
        states, rewards, terminated, truncated, infos = envs_wrapper.step(
            actions.cpu().numpy()
        )

        ep_value_preds[step] = torch.squeeze(state_value_preds)
        ep_rewards[step] = torch.tensor(rewards, device=device)
        ep_action_log_probs[step] = action_log_probs

        # add a mask (for the return calculation later);
        # for each env the mask is 1 if the episode is ongoing and 0 if it is terminated (not by truncation!)
        masks[step] = torch.tensor([not term for term in terminated])

    # calculate the losses for actor and critic
    critic_loss, actor_loss = agent.get_losses(
        ep_rewards,
        ep_action_log_probs,
        ep_value_preds,
        entropy,
        masks,
        gamma,
        lam,
        ent_coef,
        device,
    )

    # update the actor and critic networks
    agent.update_parameters(critic_loss, actor_loss)

    # log the losses and entropy
    critic_losses.append(critic_loss.detach().cpu().numpy())
    actor_losses.append(actor_loss.detach().cpu().numpy())
    entropies.append(entropy.detach().mean().cpu().numpy())
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[7], line 2
      1 # create a wrapper environment to save episode returns and episode lengths
----> 2 envs_wrapper = gym.wrappers.RecordEpisodeStatistics(envs, deque_size=n_envs * n_updates)
      4 critic_losses = []
      5 actor_losses = []

TypeError: RecordEpisodeStatistics.__init__() got an unexpected keyword argument 'deque_size'

Plotting#

""" plot the results """

# %matplotlib inline

rolling_length = 20
fig, axs = plt.subplots(nrows=2, ncols=2, figsize=(12, 5))
fig.suptitle(
    f"Training plots for {agent.__class__.__name__} in the LunarLander-v3 environment \n \
             (n_envs={n_envs}, n_steps_per_update={n_steps_per_update}, randomize_domain={randomize_domain})"
)

# episode return
axs[0][0].set_title("Episode Returns")
episode_returns_moving_average = (
    np.convolve(
        np.array(envs_wrapper.return_queue).flatten(),
        np.ones(rolling_length),
        mode="valid",
    )
    / rolling_length
)
axs[0][0].plot(
    np.arange(len(episode_returns_moving_average)) / n_envs,
    episode_returns_moving_average,
)
axs[0][0].set_xlabel("Number of episodes")

# entropy
axs[1][0].set_title("Entropy")
entropy_moving_average = (
    np.convolve(np.array(entropies), np.ones(rolling_length), mode="valid")
    / rolling_length
)
axs[1][0].plot(entropy_moving_average)
axs[1][0].set_xlabel("Number of updates")


# critic loss
axs[0][1].set_title("Critic Loss")
critic_losses_moving_average = (
    np.convolve(
        np.array(critic_losses).flatten(), np.ones(rolling_length), mode="valid"
    )
    / rolling_length
)
axs[0][1].plot(critic_losses_moving_average)
axs[0][1].set_xlabel("Number of updates")


# actor loss
axs[1][1].set_title("Actor Loss")
actor_losses_moving_average = (
    np.convolve(np.array(actor_losses).flatten(), np.ones(rolling_length), mode="valid")
    / rolling_length
)
axs[1][1].plot(actor_losses_moving_average)
axs[1][1].set_xlabel("Number of updates")

plt.tight_layout()
plt.show()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[8], line 8
      5 rolling_length = 20
      6 fig, axs = plt.subplots(nrows=2, ncols=2, figsize=(12, 5))
      7 fig.suptitle(
----> 8     f"Training plots for {agent.__class__.__name__} in the LunarLander-v3 environment \n \
      9              (n_envs={n_envs}, n_steps_per_update={n_steps_per_update}, randomize_domain={randomize_domain})"
     10 )
     12 # episode return
     13 axs[0][0].set_title("Episode Returns")

NameError: name 'agent' is not defined
../../../_images/06f88440b77dcba543f8177832ba9238f395af2abcb7ab4f8291f0ae53a77fdf.png
training_plots

Performance Analysis of Synchronous and Asynchronous Vectorized Environments#


Asynchronous environments can lead to quicker training times and a higher speedup for data collection compared to synchronous environments. This is because asynchronous environments allow multiple agents to interact with their environments in parallel, while synchronous environments run multiple environments serially. This results in better efficiency and faster training times for asynchronous environments.

performance_plots

According to the Karp-Flatt metric (a metric used in parallel computing to estimate the limit for the speedup when scaling up the number of parallel processes, here the number of environments), the estimated max. speedup for asynchronous environments is 57, while the estimated maximum speedup for synchronous environments is 21. This suggests that asynchronous environments have significantly faster training times compared to synchronous environments (see graphs).

karp_flatt_metric

However, it is important to note that increasing the number of parallel vector environments can lead to slower training times after a certain number of environments (see plot below, where the agent was trained until the mean training returns were above -120). The slower training times might occur because the gradients of the environments are good enough after a relatively low number of environments (especially if the environment is not very complex). In this case, increasing the number of environments does not increase the learning speed, and actually increases the runtime, possibly due to the additional time needed to calculate the gradients. For LunarLander-v3, the best performing configuration used a AsyncVectorEnv with 10 parallel environments, but environments with a higher complexity may require more parallel environments to achieve optimal performance.

runtime_until_threshold_plot

Saving/ Loading Weights#

save_weights = False
load_weights = False

actor_weights_path = "weights/actor_weights.h5"
critic_weights_path = "weights/critic_weights.h5"

if not os.path.exists("weights"):
    os.mkdir("weights")

""" save network weights """
if save_weights:
    torch.save(agent.actor.state_dict(), actor_weights_path)
    torch.save(agent.critic.state_dict(), critic_weights_path)


""" load network weights """
if load_weights:
    agent = A2C(obs_shape, action_shape, device, critic_lr, actor_lr)

    agent.actor.load_state_dict(torch.load(actor_weights_path))
    agent.critic.load_state_dict(torch.load(critic_weights_path))
    agent.actor.eval()
    agent.critic.eval()

Showcase the Agent#

""" play a couple of showcase episodes """

n_showcase_episodes = 3

for episode in range(n_showcase_episodes):
    print(f"starting episode {episode}...")

    # create a new sample environment to get new random parameters
    if randomize_domain:
        env = gym.make(
            "LunarLander-v3",
            render_mode="human",
            gravity=np.clip(
                np.random.normal(loc=-10.0, scale=2.0), a_min=-11.99, a_max=-0.01
            ),
            enable_wind=np.random.choice([True, False]),
            wind_power=np.clip(
                np.random.normal(loc=15.0, scale=2.0), a_min=0.01, a_max=19.99
            ),
            turbulence_power=np.clip(
                np.random.normal(loc=1.5, scale=1.0), a_min=0.01, a_max=1.99
            ),
            max_episode_steps=500,
        )
    else:
        env = gym.make("LunarLander-v3", render_mode="human", max_episode_steps=500)

    # get an initial state
    state, info = env.reset()

    # play one episode
    done = False
    while not done:
        # select an action A_{t} using S_{t} as input for the agent
        with torch.no_grad():
            action, _, _, _ = agent.select_action(state[None, :])

        # perform the action A_{t} in the environment to get S_{t+1} and R_{t+1}
        state, reward, terminated, truncated, info = env.step(action.item())

        # update if the environment is done
        done = terminated or truncated

env.close()
starting episode 0...
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[10], line 36
     33 while not done:
     34     # select an action A_{t} using S_{t} as input for the agent
     35     with torch.no_grad():
---> 36         action, _, _, _ = agent.select_action(state[None, :])
     38     # perform the action A_{t} in the environment to get S_{t+1} and R_{t+1}
     39     state, reward, terminated, truncated, info = env.step(action.item())

NameError: name 'agent' is not defined

Try playing the environment yourself#

# from gymnasium.utils.play import play
#
# play(gym.make('LunarLander-v3', render_mode='rgb_array'),
#     keys_to_action={'w': 2, 'a': 1, 'd': 3}, noop=0)

References#

[1] V. Mnih, A. P. Badia, M. Mirza, A. Graves, T. P. Lillicrap, T. Harley, D. Silver, K. Kavukcuoglu. “Asynchronous Methods for Deep Reinforcement Learning” ICML (2016).

[2] J. Schulman, P. Moritz, S. Levine, M. Jordan and P. Abbeel. “High-dimensional continuous control using generalized advantage estimation.” ICLR (2016).

[3] Gymnasium Documentation: Vector environments. (URL: https://gymnasium.farama.org/api/vector/)