Shortcuts

强化学习 (DQN) 教程

Created On: Mar 24, 2017 | Last Updated: Jun 18, 2024 | Last Verified: Nov 05, 2024

作者: Adam Paszke

Mark Towers

本教程展示了如何使用 PyTorch 在 Gymnasium 的 CartPole-v1 任务上训练深度Q学习(DQN)智能体。

您可能会觉得阅读原始的 深度Q学习 (DQN) 论文很有帮助。

任务

智能体需要在两个动作之间进行选择——使推车向左或向右移动——使与之连接的杆子保持直立。您可以在 Gymnasium 的网站 上找到有关此环境以及其他更具挑战性的环境的更多信息。

CartPole

CartPole

当智能体观察环境的当前状态并选择一个动作时,环境会 过渡 到一个新的状态,同时返回一个指示动作后果的奖励。在任务中,每个增加的时间步都会获得 +1 的奖励,如果杆子倾倒过多或推车超过中心 2.4 单位的距离,环境将结束。表现更好的场景会运行更长的时间,从而累积更大的回报。

CartPole 任务的设计使得智能体的输入是 4 个表示环境状态(位置、速度等)的实数值。我们直接使用这 4 个输入,将它们通过一个具有 2 个输出的简单全连接网络,每个输出对应一个动作。该网络被训练以预测每个动作的期望值,给定输入状态,选择期望值最高的动作。

首先,让我们导入需要的包。首先我们需要使用 pip 安装 gymnasium 来构建环境。这是原始 OpenAI Gym 项目的一个分支,由其自 Gym v0.19 版本起的团队维护。如果您在 Google Colab 上运行,请执行:

%%bash
pip3 install gymnasium[classic_control]

我们还将使用 PyTorch 的以下组件:

  • 神经网络 (torch.nn

  • 优化工具 (torch.optim

  • 自动微分 (torch.autograd

import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

env = gym.make("CartPole-v1")

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

# if GPU is to be used
device = torch.device(
    "cuda" if torch.cuda.is_available() else
    "mps" if torch.backends.mps.is_available() else
    "cpu"
)

重放记忆

我们将使用经验回放记忆来训练我们的 DQN。它存储了智能体观察到的过渡,使我们可以稍后重新利用这些数据。通过随机采样,这些构建批次的过渡行为可以去相关化。这被证明能够显著稳定和改善 DQN 的培训过程。

为此,我们需要两个类:

  • Transition - 一个命名元组,表示我们环境中的单次过渡。它本质上将 (状态,动作) 映射到 (下一个状态,奖励) 的结果,状态描述为后面提到的屏幕差异图像。

  • ReplayMemory - 一个有界大小的循环缓存,用于存储最近观察到的过渡。它还实现了 .sample() 方法,用于随机选择批量的过渡进行训练。

Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

现在,让我们定义我们的模型。但首先,让我们快速回顾一下什么是 DQN。

DQN 算法

我们的环境是确定性的,因此这里呈现的所有公式也都被简化为确定性形式。在强化学习文献中,它们还会包含对环境中随机过渡的期望值。

我们的目标是训练一个策略,该策略试图最大化折扣后的累积奖励:\(R_{t_0} = \sum_{t=t_0}^{\infty} \gamma^{t - t_0} r_t\),其中 \(R_{t_0}\) 也被称为 回报。折扣因子 \(\gamma\) 应在 \(0\)\(1\) 之间的常量,以确保总和收敛。较低的 \(\gamma\) 值使得不确定的远期未来奖励对智能体的价值低于近未来可靠的奖励。它还鼓励智能体在时间上更靠近的奖励比远未来奖励等效的时间点更重要。

Q学习的核心理念是,如果我们拥有一个函数 \(Q^*: State \times Action \rightarrow \mathbb{R}\),它可以告诉我们在给定状态下采取某个动作后我们将获得的回报,那么我们可以轻松构建一个最大化奖励的策略:

\[\pi^*(s) = \arg\!\max_a \ Q^*(s, a) \]

然而,我们并不了解世界上的所有信息,因此无法访问 \(Q^*\)。但由于神经网络是通用函数逼近器,我们可以简单地创建一个网络,并训练它以类似于 \(Q^*\)

对于我们的训练更新规则,我们将使用一个事实:每个策略的 \(Q\) 函数都遵循贝尔曼方程:

\[Q^{\pi}(s, a) = r + \gamma Q^{\pi}(s', \pi(s')) \]

方程两边之间的差异称为时间差分误差 \(\delta\)

\[\delta = Q(s, a) - (r + \gamma \max_a' Q(s', a)) \]

为了最小化此误差,我们将使用 Huber 损失。Huber 损失在误差较小时表现得像均方误差,而在误差较大时表现得像平均绝对误差——这使它在 \(Q\) 的估计非常嘈杂时更能对抗异常值的影响。我们在一个从回放记忆中随机选择的过渡批次 \(B\) 上计算此误差:

\[\mathcal{L} = \frac{1}{|B|}\sum_{(s, a, s', r) \ \in \ B} \mathcal{L}(\delta)\]
\[\text{其中} \quad \mathcal{L}(\delta) = \begin{cases} \frac{1}{2}{\delta^2} & \text{对于 } |\delta| \le 1, \\ |\delta| - \frac{1}{2} & \text{否则.} \end{cases}\]

Q网络

我们的模型将是一个前馈神经网络,它接收当前和之前屏幕块的差异作为输入。它有两个输出,分别表示 \(Q(s, \mathrm{left})\)\(Q(s, \mathrm{right})\) 是网络的输入)。实际上,网络试图预测每个动作给定当前输入的 期望回报

class DQN(nn.Module):

    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, n_actions)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)

训练网络

超参数和工具

此单元实例化了我们的模型及其优化器,并定义了一些工具:

  • select_action - 将根据 epsilon 贪婪策略选择一个动作。简单来说,我们有时会使用模型来选择动作,有时我们会随机选择一个动作。随机选择动作的概率将从 EPS_START 开始指数递减至 EPS_ENDEPS_DECAY 控制递减的速率。

  • plot_durations - 一个帮助工具,用于绘制每集的持续时间,以及过去 100 集的平均值(正式评估中使用的衡量标准)。图表将位于包含主要训练循环的单元下方,并在每一集之后更新。

# BATCH_SIZE is the number of transitions sampled from the replay buffer
# GAMMA is the discount factor as mentioned in the previous section
# EPS_START is the starting value of epsilon
# EPS_END is the final value of epsilon
# EPS_DECAY controls the rate of exponential decay of epsilon, higher means a slower decay
# TAU is the update rate of the target network
# LR is the learning rate of the ``AdamW`` optimizer
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4

# Get number of actions from gym action space
n_actions = env.action_space.n
# Get the number of state observations
state, info = env.reset()
n_observations = len(state)

policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)


steps_done = 0


def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max(1) will return the largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            return policy_net(state).max(1).indices.view(1, 1)
    else:
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)


episode_durations = []


def plot_durations(show_result=False):
    plt.figure(1)
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())

训练循环

最后是训练模型的代码。

此处您可以找到一个 optimize_model 函数,它执行优化的单步操作。它首先对一个批次进行采样,将所有张量拼接成一个单一的张量,计算 \(Q(s_t, a_t)\)\(V(s_{t+1}) = \max_a Q(s_{t+1}, a)\),并将它们组合到我们的损失中。根据定义,如果 \(s\) 是终止状态,我们将设置 \(V(s) = 0\)。我们还使用一个目标网络来计算 \(V(s_{t+1})\) 以增强稳定性。目标网络在每一步通过一个 软更新 控制的超参数 TAU 进行更新,该参数之前定义过。

def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). This converts batch-array of Transitions
    # to Transition of batch-arrays.
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1).values
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    # In-place gradient clipping
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()

以下是主要的训练循环。在开始时我们重置环境并获取初始 state 张量。然后我们采样一个动作,执行它,观察下一个状态和奖励(总为 1),并优化我们的模型一次。当一集结束(我们的模型失败)时,我们重新开始循环。

以下将 num_episodes 设置为 600,如果有 GPU 可用;否则,将安排 50 集以使训练时间不会太长。然而,50 集不足以观察到在 CartPole 上的良好表现。您应该会看到模型在 600 集训练后能够持续达到 500 步。训练 RL 智能体可能是一个噪声较大的过程,因此如果没有观察到收敛,可以重新启动训练以获得更好的结果。

if torch.cuda.is_available() or torch.backends.mps.is_available():
    num_episodes = 600
else:
    num_episodes = 50

for i_episode in range(num_episodes):
    # Initialize the environment and get its state
    state, info = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    for t in count():
        action = select_action(state)
        observation, reward, terminated, truncated, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        done = terminated or truncated

        if terminated:
            next_state = None
        else:
            next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        # Store the transition in memory
        memory.push(state, action, next_state, reward)

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the policy network)
        optimize_model()

        # Soft update of the target network's weights
        # θ′ ← τ θ + (1 −τ )θ′
        target_net_state_dict = target_net.state_dict()
        policy_net_state_dict = policy_net.state_dict()
        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
        target_net.load_state_dict(target_net_state_dict)

        if done:
            episode_durations.append(t + 1)
            plot_durations()
            break

print('Complete')
plot_durations(show_result=True)
plt.ioff()
plt.show()

这里是一个图示,展示了整体结果的数据流。

../_images/reinforcement_learning_diagram.jpg

动作的选择可以随机生成,也可以基于策略,在 gym 环境中获得下一步的样本。我们将在回放记忆中记录结果,并在每次迭代中运行优化步骤。优化会从回放记忆中选取随机批次进行新策略的训练。“较旧”的 target_net 也会在优化中用于计算预期 Q 值。在每一步对其权重进行软更新。

**脚本的总运行时间:**(0分钟0.000秒)

通过Sphinx-Gallery生成的图集

文档

访问 PyTorch 的详细开发者文档

查看文档

教程

获取针对初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源