训练玩马里奥的 RL 智能体

原文:https://pytorch.org/tutorials/intermediate/mario_rl_tutorial.html

Authors: Yuansong Feng , Suraj Subramanian , Howard Wang , Steven Guo .

本教程将向您介绍深度强化学习的基础知识。 最后,您将实现一个 AI 驱动的马里奥(使用双重深度 Q 网络),它可以自己玩游戏。

尽管本教程不需要任何有关 RL 的先验知识,但是您可以熟悉这些 RL 概念,并将此方便的备忘单作为您的伴侣。完整代码可在此处获得

mario

  1. # !pip install gym-super-mario-bros==7.3.0
  2. import torch
  3. from torch import nn
  4. from torchvision import transforms as T
  5. from PIL import Image
  6. import numpy as np
  7. from pathlib import Path
  8. from collections import deque
  9. import random, datetime, os, copy
  10. # Gym is an OpenAI toolkit for RL
  11. import gym
  12. from gym.spaces import Box
  13. from gym.wrappers import FrameStack
  14. # NES Emulator for OpenAI Gym
  15. from nes_py.wrappers import JoypadSpace
  16. # Super Mario environment for OpenAI Gym
  17. import gym_super_mario_bros

RL 定义

环境:智能体与之交互并学习的世界。

操作a:智能体如何响应环境。 所有可能动作的集合称为动作空间

状态s:环境的当前特征。 环境可以处于的所有可能状态的集合称为状态空间

奖励r:奖励是从环境到智能体的关键反馈。 这是驱动智能体学习并改变其未来行动的动力。 多个时间步长上的奖励汇总称为回报

最佳操作的值函数Q*(s, a):如果您以状态s开始,执行任意操作a并给出期望的回报, 然后针对每个未来时间步长采取使收益最大化的行动。 可以说Q代表状态中动作的“质量”。 我们尝试近似该函数。

环境

初始化环境

在马里奥,环境由试管,蘑菇和其他成分组成。

当马里奥采取行动时,环境会以已更改的(下一个)状态,奖励和其他信息作为响应。

  1. # Initialize Super Mario environment
  2. env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0")
  3. # Limit the action-space to
  4. # 0\. walk right
  5. # 1\. jump right
  6. env = JoypadSpace(env, [["right"], ["right", "A"]])
  7. env.reset()
  8. next_state, reward, done, info = env.step(action=0)
  9. print(f"{next_state.shape},\n {reward},\n {done},\n {info}")

出:

  1. (240, 256, 3),
  2. 0,
  3. False,
  4. {'coins': 0, 'flag_get': False, 'life': 2, 'score': 0, 'stage': 1, 'status': 'small', 'time': 400, 'world': 1, 'x_pos': 40, 'x_pos_screen': 40, 'y_pos': 79}

预处理环境

环境数据在next_state中返回给智能体。 正如您在上面看到的,每个状态都由[3, 240, 256]大小数组表示。 通常,这比我们的智能体需要的信息更多; 例如,马里奥的举动并不取决于管道或天空的颜色!

我们使用包装程序在将环境数据发送到智能体之前对其进行预处理。

GrayScaleObservation是将 RGB 图像转换为灰度的通用包装器; 这样做可以减少状态表示的大小,而不会丢失有用的信息。 现在每个状态的大小:[1, 240, 256]

ResizeObservation将每个观察值下采样为正方形图像。 新尺寸:[1, 84, 84]

SkipFrame是一个自定义包装器,它继承自gym.Wrapper并实现了step()函数。 由于连续的帧变化不大,因此我们可以跳过 n 个中间帧而不会丢失太多信息。 第 n 帧聚集在每个跳过的帧上累积的奖励。

FrameStack是一个包装器,它使我们可以将环境的连续帧压缩到单个观察点中,以提供给我们的学习模型。 这样,我们可以根据前几个帧中马里奥的运动方向来确定马里奥是在降落还是跳跃。

  1. class SkipFrame(gym.Wrapper):
  2. def __init__(self, env, skip):
  3. """Return only every `skip`-th frame"""
  4. super().__init__(env)
  5. self._skip = skip
  6. def step(self, action):
  7. """Repeat action, and sum reward"""
  8. total_reward = 0.0
  9. done = False
  10. for i in range(self._skip):
  11. # Accumulate reward and repeat the same action
  12. obs, reward, done, info = self.env.step(action)
  13. total_reward += reward
  14. if done:
  15. break
  16. return obs, total_reward, done, info
  17. class GrayScaleObservation(gym.ObservationWrapper):
  18. def __init__(self, env):
  19. super().__init__(env)
  20. obs_shape = self.observation_space.shape[:2]
  21. self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)
  22. def permute_orientation(self, observation):
  23. # permute [H, W, C] array to [C, H, W] tensor
  24. observation = np.transpose(observation, (2, 0, 1))
  25. observation = torch.tensor(observation.copy(), dtype=torch.float)
  26. return observation
  27. def observation(self, observation):
  28. observation = self.permute_orientation(observation)
  29. transform = T.Grayscale()
  30. observation = transform(observation)
  31. return observation
  32. class ResizeObservation(gym.ObservationWrapper):
  33. def __init__(self, env, shape):
  34. super().__init__(env)
  35. if isinstance(shape, int):
  36. self.shape = (shape, shape)
  37. else:
  38. self.shape = tuple(shape)
  39. obs_shape = self.shape + self.observation_space.shape[2:]
  40. self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)
  41. def observation(self, observation):
  42. transforms = T.Compose(
  43. [T.Resize(self.shape), T.Normalize(0, 255)]
  44. )
  45. observation = transforms(observation).squeeze(0)
  46. return observation
  47. # Apply Wrappers to environment
  48. env = SkipFrame(env, skip=4)
  49. env = GrayScaleObservation(env)
  50. env = ResizeObservation(env, shape=84)
  51. env = FrameStack(env, num_stack=4)

将上述包装纸应用于环境后,最终的包装状态由 4 个灰度连续的帧堆叠在一起组成,如左图所示。 每次马里奥采取行动时,环境都会以这种结构的状态做出响应。 该结构由大小为[4, 84, 84]的 3D 数组表示。

picture

智能体

我们创建一个类Mario来表示我们的智能体在游戏中。 马里奥应该能够:

  • 根据(环境的)当前状态,执行最佳操作策略
  • 记住经验。 经验为(当前状态,当前动作,奖励,下一个状态)。 马里奥缓存并且后来回忆起他的经验来更新其行动策略。
  • 逐步了解更好的操作策略
  1. class Mario:
  2. def __init__():
  3. pass
  4. def act(self, state):
  5. """Given a state, choose an epsilon-greedy action"""
  6. pass
  7. def cache(self, experience):
  8. """Add the experience to memory"""
  9. pass
  10. def recall(self):
  11. """Sample experiences from memory"""
  12. pass
  13. def learn(self):
  14. """Update online action value (Q) function with a batch of experiences"""
  15. pass

在以下各节中,我们将填充马里奥的参数并定义其函数。

行动

对于任何给定状态,智能体都可以选择执行最佳操作(利用)或执行随机操作(探索)。

马里奥随机发掘并发self.exploration_rate 当他选择利用时,他依靠MarioNet(在Learn部分中实现)提供最佳操作。

  1. class Mario:
  2. def __init__(self, state_dim, action_dim, save_dir):
  3. self.state_dim = state_dim
  4. self.action_dim = action_dim
  5. self.save_dir = save_dir
  6. self.use_cuda = torch.cuda.is_available()
  7. # Mario's DNN to predict the most optimal action - we implement this in the Learn section
  8. self.net = MarioNet(self.state_dim, self.action_dim).float()
  9. if self.use_cuda:
  10. self.net = self.net.to(device="cuda")
  11. self.exploration_rate = 1
  12. self.exploration_rate_decay = 0.99999975
  13. self.exploration_rate_min = 0.1
  14. self.curr_step = 0
  15. self.save_every = 5e5 # no. of experiences between saving Mario Net
  16. def act(self, state):
  17. """
  18. Given a state, choose an epsilon-greedy action and update value of step.
  19. Inputs:
  20. state(LazyFrame): A single observation of the current state, dimension is (state_dim)
  21. Outputs:
  22. action_idx (int): An integer representing which action Mario will perform
  23. """
  24. # EXPLORE
  25. if np.random.rand() < self.exploration_rate:
  26. action_idx = np.random.randint(self.action_dim)
  27. # EXPLOIT
  28. else:
  29. state = state.__array__()
  30. if self.use_cuda:
  31. state = torch.tensor(state).cuda()
  32. else:
  33. state = torch.tensor(state)
  34. state = state.unsqueeze(0)
  35. action_values = self.net(state, model="online")
  36. action_idx = torch.argmax(action_values, axis=1).item()
  37. # decrease exploration_rate
  38. self.exploration_rate *= self.exploration_rate_decay
  39. self.exploration_rate = max(self.exploration_rate_min, self.exploration_rate)
  40. # increment step
  41. self.curr_step += 1
  42. return action_idx

缓存和回忆

这两个函数是马里奥的“记忆”过程。

cache():每次马里奥执行操作时,都会将experience存储到他的内存中。 他的经验包括当前状态动作从动作中获得的奖励下一个状态以及游戏是否为完成

recall():马里奥从他的记忆中随机抽取一批经验,并以此来学习游戏。

  1. class Mario(Mario): # subclassing for continuity
  2. def __init__(self, state_dim, action_dim, save_dir):
  3. super().__init__(state_dim, action_dim, save_dir)
  4. self.memory = deque(maxlen=100000)
  5. self.batch_size = 32
  6. def cache(self, state, next_state, action, reward, done):
  7. """
  8. Store the experience to self.memory (replay buffer)
  9. Inputs:
  10. state (LazyFrame),
  11. next_state (LazyFrame),
  12. action (int),
  13. reward (float),
  14. done(bool))
  15. """
  16. state = state.__array__()
  17. next_state = next_state.__array__()
  18. if self.use_cuda:
  19. state = torch.tensor(state).cuda()
  20. next_state = torch.tensor(next_state).cuda()
  21. action = torch.tensor([action]).cuda()
  22. reward = torch.tensor([reward]).cuda()
  23. done = torch.tensor([done]).cuda()
  24. else:
  25. state = torch.tensor(state)
  26. next_state = torch.tensor(next_state)
  27. action = torch.tensor([action])
  28. reward = torch.tensor([reward])
  29. done = torch.tensor([done])
  30. self.memory.append((state, next_state, action, reward, done,))
  31. def recall(self):
  32. """
  33. Retrieve a batch of experiences from memory
  34. """
  35. batch = random.sample(self.memory, self.batch_size)
  36. state, next_state, action, reward, done = map(torch.stack, zip(*batch))
  37. return state, next_state, action.squeeze(), reward.squeeze(), done.squeeze()

学习

马里奥在后台使用 DDQN 算法。 DDQN 使用两个 ConvNet-Q_onlineQ_target-独立地逼近最佳作用值函数。

在我们的实现中,我们在Q_onlineQ_target之间共享特征生成器features,但是为每个特征维护单独的 FC 分类器。 θ_targetQ_target的参数)被冻结,以防止反向传播进行更新。 而是定期与θ_online同步(稍后会对此进行详细介绍)。

神经网络

  1. class MarioNet(nn.Module):
  2. """mini cnn structure
  3. input -> (conv2d + relu) x 3 -> flatten -> (dense + relu) x 2 -> output
  4. """
  5. def __init__(self, input_dim, output_dim):
  6. super().__init__()
  7. c, h, w = input_dim
  8. if h != 84:
  9. raise ValueError(f"Expecting input height: 84, got: {h}")
  10. if w != 84:
  11. raise ValueError(f"Expecting input width: 84, got: {w}")
  12. self.online = nn.Sequential(
  13. nn.Conv2d(in_channels=c, out_channels=32, kernel_size=8, stride=4),
  14. nn.ReLU(),
  15. nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2),
  16. nn.ReLU(),
  17. nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1),
  18. nn.ReLU(),
  19. nn.Flatten(),
  20. nn.Linear(3136, 512),
  21. nn.ReLU(),
  22. nn.Linear(512, output_dim),
  23. )
  24. self.target = copy.deepcopy(self.online)
  25. # Q_target parameters are frozen.
  26. for p in self.target.parameters():
  27. p.requires_grad = False
  28. def forward(self, input, model):
  29. if model == "online":
  30. return self.online(input)
  31. elif model == "target":
  32. return self.target(input)

TD 估计和 TD 目标

学习涉及两个值:

TD 估计-给定状态s的预测最佳Q*

训练玩马里奥的 RL 智能体 - 图3

TD 目标-当前奖励和下一状态s'中的估计Q*的汇总

训练玩马里奥的 RL 智能体 - 图4

训练玩马里奥的 RL 智能体 - 图5

由于我们不知道下一个动作a'是什么,因此我们在下一个状态s'中使用动作a'最大化Q_online

请注意,我们在td_target()上使用了@torch.no_grad()装饰器来禁用梯度计算(因为我们无需在θ_target上进行反向传播。)

  1. class Mario(Mario):
  2. def __init__(self, state_dim, action_dim, save_dir):
  3. super().__init__(state_dim, action_dim, save_dir)
  4. self.gamma = 0.9
  5. def td_estimate(self, state, action):
  6. current_Q = self.net(state, model="online")[
  7. np.arange(0, self.batch_size), action
  8. ] # Q_online(s,a)
  9. return current_Q
  10. @torch.no_grad()
  11. def td_target(self, reward, next_state, done):
  12. next_state_Q = self.net(next_state, model="online")
  13. best_action = torch.argmax(next_state_Q, axis=1)
  14. next_Q = self.net(next_state, model="target")[
  15. np.arange(0, self.batch_size), best_action
  16. ]
  17. return (reward + (1 - done.float()) * self.gamma * next_Q).float()

更新模型

当马里奥从其重播缓冲区中采样输入时,我们计算TD_tTD_e并反向传播该损失Q_online以更新其参数θ_online(\ (\ alpha \)是传递给optimizer的学习率lr

训练玩马里奥的 RL 智能体 - 图6

θ_target不会通过反向传播进行更新。 相反,我们会定期将θ_online复制到θ_target

训练玩马里奥的 RL 智能体 - 图7

  1. class Mario(Mario):
  2. def __init__(self, state_dim, action_dim, save_dir):
  3. super().__init__(state_dim, action_dim, save_dir)
  4. self.optimizer = torch.optim.Adam(self.net.parameters(), lr=0.00025)
  5. self.loss_fn = torch.nn.SmoothL1Loss()
  6. def update_Q_online(self, td_estimate, td_target):
  7. loss = self.loss_fn(td_estimate, td_target)
  8. self.optimizer.zero_grad()
  9. loss.backward()
  10. self.optimizer.step()
  11. return loss.item()
  12. def sync_Q_target(self):
  13. self.net.target.load_state_dict(self.net.online.state_dict())

保存检查点

  1. class Mario(Mario):
  2. def save(self):
  3. save_path = (
  4. self.save_dir / f"mario_net_{int(self.curr_step // self.save_every)}.chkpt"
  5. )
  6. torch.save(
  7. dict(model=self.net.state_dict(), exploration_rate=self.exploration_rate),
  8. save_path,
  9. )
  10. print(f"MarioNet saved to {save_path} at step {self.curr_step}")

全部放在一起

  1. class Mario(Mario):
  2. def __init__(self, state_dim, action_dim, save_dir):
  3. super().__init__(state_dim, action_dim, save_dir)
  4. self.burnin = 1e4 # min. experiences before training
  5. self.learn_every = 3 # no. of experiences between updates to Q_online
  6. self.sync_every = 1e4 # no. of experiences between Q_target & Q_online sync
  7. def learn(self):
  8. if self.curr_step % self.sync_every == 0:
  9. self.sync_Q_target()
  10. if self.curr_step % self.save_every == 0:
  11. self.save()
  12. if self.curr_step < self.burnin:
  13. return None, None
  14. if self.curr_step % self.learn_every != 0:
  15. return None, None
  16. # Sample from memory
  17. state, next_state, action, reward, done = self.recall()
  18. # Get TD Estimate
  19. td_est = self.td_estimate(state, action)
  20. # Get TD Target
  21. td_tgt = self.td_target(reward, next_state, done)
  22. # Backpropagate loss through Q_online
  23. loss = self.update_Q_online(td_est, td_tgt)
  24. return (td_est.mean().item(), loss)

日志记录

  1. import numpy as np
  2. import time, datetime
  3. import matplotlib.pyplot as plt
  4. class MetricLogger:
  5. def __init__(self, save_dir):
  6. self.save_log = save_dir / "log"
  7. with open(self.save_log, "w") as f:
  8. f.write(
  9. f"{'Episode':>8}{'Step':>8}{'Epsilon':>10}{'MeanReward':>15}"
  10. f"{'MeanLength':>15}{'MeanLoss':>15}{'MeanQValue':>15}"
  11. f"{'TimeDelta':>15}{'Time':>20}\n"
  12. )
  13. self.ep_rewards_plot = save_dir / "reward_plot.jpg"
  14. self.ep_lengths_plot = save_dir / "length_plot.jpg"
  15. self.ep_avg_losses_plot = save_dir / "loss_plot.jpg"
  16. self.ep_avg_qs_plot = save_dir / "q_plot.jpg"
  17. # History metrics
  18. self.ep_rewards = []
  19. self.ep_lengths = []
  20. self.ep_avg_losses = []
  21. self.ep_avg_qs = []
  22. # Moving averages, added for every call to record()
  23. self.moving_avg_ep_rewards = []
  24. self.moving_avg_ep_lengths = []
  25. self.moving_avg_ep_avg_losses = []
  26. self.moving_avg_ep_avg_qs = []
  27. # Current episode metric
  28. self.init_episode()
  29. # Timing
  30. self.record_time = time.time()
  31. def log_step(self, reward, loss, q):
  32. self.curr_ep_reward += reward
  33. self.curr_ep_length += 1
  34. if loss:
  35. self.curr_ep_loss += loss
  36. self.curr_ep_q += q
  37. self.curr_ep_loss_length += 1
  38. def log_episode(self):
  39. "Mark end of episode"
  40. self.ep_rewards.append(self.curr_ep_reward)
  41. self.ep_lengths.append(self.curr_ep_length)
  42. if self.curr_ep_loss_length == 0:
  43. ep_avg_loss = 0
  44. ep_avg_q = 0
  45. else:
  46. ep_avg_loss = np.round(self.curr_ep_loss / self.curr_ep_loss_length, 5)
  47. ep_avg_q = np.round(self.curr_ep_q / self.curr_ep_loss_length, 5)
  48. self.ep_avg_losses.append(ep_avg_loss)
  49. self.ep_avg_qs.append(ep_avg_q)
  50. self.init_episode()
  51. def init_episode(self):
  52. self.curr_ep_reward = 0.0
  53. self.curr_ep_length = 0
  54. self.curr_ep_loss = 0.0
  55. self.curr_ep_q = 0.0
  56. self.curr_ep_loss_length = 0
  57. def record(self, episode, epsilon, step):
  58. mean_ep_reward = np.round(np.mean(self.ep_rewards[-100:]), 3)
  59. mean_ep_length = np.round(np.mean(self.ep_lengths[-100:]), 3)
  60. mean_ep_loss = np.round(np.mean(self.ep_avg_losses[-100:]), 3)
  61. mean_ep_q = np.round(np.mean(self.ep_avg_qs[-100:]), 3)
  62. self.moving_avg_ep_rewards.append(mean_ep_reward)
  63. self.moving_avg_ep_lengths.append(mean_ep_length)
  64. self.moving_avg_ep_avg_losses.append(mean_ep_loss)
  65. self.moving_avg_ep_avg_qs.append(mean_ep_q)
  66. last_record_time = self.record_time
  67. self.record_time = time.time()
  68. time_since_last_record = np.round(self.record_time - last_record_time, 3)
  69. print(
  70. f"Episode {episode} - "
  71. f"Step {step} - "
  72. f"Epsilon {epsilon} - "
  73. f"Mean Reward {mean_ep_reward} - "
  74. f"Mean Length {mean_ep_length} - "
  75. f"Mean Loss {mean_ep_loss} - "
  76. f"Mean Q Value {mean_ep_q} - "
  77. f"Time Delta {time_since_last_record} - "
  78. f"Time {datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}"
  79. )
  80. with open(self.save_log, "a") as f:
  81. f.write(
  82. f"{episode:8d}{step:8d}{epsilon:10.3f}"
  83. f"{mean_ep_reward:15.3f}{mean_ep_length:15.3f}{mean_ep_loss:15.3f}{mean_ep_q:15.3f}"
  84. f"{time_since_last_record:15.3f}"
  85. f"{datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S'):>20}\n"
  86. )
  87. for metric in ["ep_rewards", "ep_lengths", "ep_avg_losses", "ep_avg_qs"]:
  88. plt.plot(getattr(self, f"moving_avg_{metric}"))
  89. plt.savefig(getattr(self, f"{metric}_plot"))
  90. plt.clf()

开始吧!

在此示例中,我们运行了 10 个剧集的训练循环,但是对于马里奥要真正了解他的世界的方式,我们建议运行至少 40,000 个剧集的循环!

  1. use_cuda = torch.cuda.is_available()
  2. print(f"Using CUDA: {use_cuda}")
  3. print()
  4. save_dir = Path("checkpoints") / datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
  5. save_dir.mkdir(parents=True)
  6. mario = Mario(state_dim=(4, 84, 84), action_dim=env.action_space.n, save_dir=save_dir)
  7. logger = MetricLogger(save_dir)
  8. episodes = 10
  9. for e in range(episodes):
  10. state = env.reset()
  11. # Play the game!
  12. while True:
  13. # Run agent on the state
  14. action = mario.act(state)
  15. # Agent performs action
  16. next_state, reward, done, info = env.step(action)
  17. # Remember
  18. mario.cache(state, next_state, action, reward, done)
  19. # Learn
  20. q, loss = mario.learn()
  21. # Logging
  22. logger.log_step(reward, loss, q)
  23. # Update state
  24. state = next_state
  25. # Check if end of game
  26. if done or info["flag_get"]:
  27. break
  28. logger.log_episode()
  29. if e % 20 == 0:
  30. logger.record(episode=e, epsilon=mario.exploration_rate, step=mario.curr_step)

../_img/sphx_glr_mario_rl_tutorial_001.png

出:

  1. Using CUDA: True
  2. Episode 0 - Step 40 - Epsilon 0.9999900000487484 - Mean Reward 231.0 - Mean Length 40.0 - Mean Loss 0.0 - Mean Q Value 0.0 - Time Delta 0.444 - Time 2021-01-05T20:23:08

总结

在本教程中,我们看到了如何使用 PyTorch 来训练玩游戏的 AI。 您可以使用相同的方法训练 AI 在 OpenAI Gym上玩任何游戏。 希望您喜欢本教程,请随时通过我们的 Github 与我们联系!

脚本的总运行时间:(0 分钟 21.485 秒)

下载 Python 源码:mario_rl_tutorial.py

下载 Jupyter 笔记本:mario_rl_tutorial.ipynb

由 Sphinx 画廊生成的画廊