200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > 倒立摆_DQN算法_边做边学深度强化学习:PyTorch程序设计实践(5)

倒立摆_DQN算法_边做边学深度强化学习:PyTorch程序设计实践(5)

时间:2020-12-31 23:23:48

相关推荐

倒立摆_DQN算法_边做边学深度强化学习:PyTorch程序设计实践(5)

倒立摆_DQN算法_边做边学深度强化学习:PyTorch程序设计实践(5)

0、相关系列文章1、Agent.py2、Brain.py3、Environment.py4、Val.py5、ReplayMemory.py6、main.py7、最终结果8、代码下载9、参考资料

0、相关系列文章

迷宫_随机实验_边做边学深度强化学习:PyTorch程序设计实践(1)

迷宫_Sarsa算法_边做边学深度强化学习:PyTorch程序设计实践(2)

迷宫_Q-Learning算法_边做边学深度强化学习:PyTorch程序设计实践(3)

倒立摆_Q-Learning算法_边做边学深度强化学习:PyTorch程序设计实践(4)

1、Agent.py

from select import selectimport numpy as npimport Brain# 倒立摆小推车对象class Agent:def __init__(self, num_states, num_actions):# 为智能体创建大脑以作出决策self.brain = Brain.Brain(num_states, num_actions)# 更新Q函数def update_Q_function(self):self.brain.replay()# 确定下一个动作def get_action(self, state, episode):action = self.brain.decide_action(state, episode)return action# 将state\action\state_next和reward的内容保存在经验池中def memorize(self, state, action, state_next, reward):self.brain.memory.push(state, action, state_next, reward)

2、Brain.py

from os import renameimport numpy as npfrom ReplayMemory import ReplayMemory, Transitionimport Valimport randomimport torchfrom torch import nn from torch import optimimport torch.nn.functional as FBATCH_SIZE = 32CAPACITY = 10000lr = 0.0001# 使用Q表来实现Q学习class Brain:# 为智能体创建大脑以作出决策def __init__(self, num_states, num_actions):# 获取动作self.num_actions = num_actions# 创建存储经验的对象self.memory = ReplayMemory(CAPACITY)# 构建一个神经网络self.model = nn.Sequential()self.model.add_module('fc1',nn.Linear(num_states,32))self.model.add_module('relu1',nn.ReLU())self.model.add_module('fc2',nn.Linear(32,32))self.model.add_module('relu2',nn.ReLU())self.model.add_module('fc3',nn.Linear(32,num_actions))# 输出网络的形状print(self.model)# 最优化方法的设定self.optimizer = optim.Adam(self.model.parameters(),lr=0.0001)# 通过Experience Replay 学习网络的连接参数def replay(self):# 1.检查经验池大小if len(self.memory) < BATCH_SIZE:return# 2.创建小批量数据transitions = self.memory.sample(BATCH_SIZE)# 2.2 将每个变量转换为与小批量数据对应的形式batch = Transition(*zip(*transitions))# 2.3 将每个变量的元素转换为与小批量数据对应的格式state_batch = torch.cat(batch.state)action_batch = torch.cat(batch.action)reward_batch = torch.cat(batch.reward)non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])# 3.求取Q(s_t,a_t)值作为监督信号# 3.1 将网络切换到推理模式self.model.eval()# 3.2 求取网络输出的Q(s_t,a_t)state_action_values = self.model(state_batch).gather(1, action_batch)# 3.3 求取max{Q(s_t+1,a)}值non_final_mask = torch.ByteTensor(tuple(map(lambda s:s is not None, batch.next_state)))next_state_values = torch.zeros(BATCH_SIZE)next_state_values[non_final_mask] = self.model(non_final_next_states).max(1)[0].detach()# 3.4 从Q公式中求取Q(s_t,a_t)值作为监督信息expected_state_action_values = reward_batch + Val.get_value('GAMMA') * next_state_values# 4. 更新连接参数# 4.1 将网络切换到训练模式self.model.train()# 4.2 计算损失函数loss = F.smooth_l1_loss(state_action_values,expected_state_action_values.unsqueeze(1))# 4.3 更新连接参数self.optimizer.zero_grad()loss.backward()self.optimizer.step()# 确定来自Q表的动作,根据 ε- 贪婪法逐渐采用最优动作def decide_action(self,state,episode):epsilon = 0.5 * (1 / (episode + 1))if epsilon <= np.random.uniform(0, 1):self.model.eval()with torch.no_grad():action = self.model(state).max(1)[1].view(1, 1)else:action = torch.LongTensor([[random.randrange(self.num_actions)]]) # 随机返回0、1动作return action

3、Environment.py

import numpy as npimport matplotlib.pyplot as pltimport torchimport datetimeimport gymimport Agentimport Val# 参考URL /github/patrickmineault/xcorr-notebooks/blob/master/Render%20OpenAI%20gym%20as%20GIF.ipynbfrom JSAnimation.IPython_display import display_animationfrom matplotlib import animationfrom IPython.display import displayfrom IPython.display import HTML# 定义Environment类,如果连续10次站立195步或更多,则说明强化学习成功,然后再运行一次以保持成功后的动画class Environment:def __init__(self):self.env = gym.make(Val.get_value('ENV')) # 设置要执行的任务num_states = self.env.observation_space.shape[0] # 获取任务状态的个数num_actions = self.env.action_space.n # 获取CartPole的动作数为2self.agent = Agent.Agent(num_states,num_actions) # 创建在环境中行动的Agent# 将运行状态保存为动画def display_frames_as_gif(self,frames):"""Displays a list of frames as a gif, with controls"""plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0),dpi=72)patch = plt.imshow(frames[0])plt.axis('off')def animate(i):patch.set_data(frames[i])anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames),interval=50)#anim.save('result/cartpole_QLearning.mp4') # 保存动画anim.save('result/cartpole_QLearning'+ datetime.datetime.now().strftime('-%m-%d-%H-%M-%S') +'.gif',writer='pillow')#display(display_animation(anim, default_mode='loop'))'''observation, reward, done, info = env.step(action)是将游戏推进一步的指令 observation表示小车和杆的状态,包含小车位置、小车速度、杆的角度、杆的角速度 reward,是即时奖励 done, 在结束状态时为True info,包含调试信息 '''def run(self):episode_10_list = np.zeros(10)complete_episodes = 0 # 持续超过195步的实验次数is_episode_final = False # 最终试验的标志frames = [] # 用于存储视频图像的变量for episode in range(Val.get_value('NUM_EPISODES')): # 试验的最大重复次数observation = self.env.reset() # 环境初始化state = observationstate = torch.from_numpy(state).type(torch.FloatTensor)state = torch.unsqueeze(state, 0)for step in range(Val.get_value('MAX_STEPS')): # 每个回合的循环if is_episode_final is True:frames.append(self.env.render(mode='rgb_array'))# 求取动作action = self.agent.get_action(state,episode)# 通过执行动作a_t 找到 s_{t+1},r_{t+1}observation_next,_,done,_ = self.env.step(action.item())# 给予奖励if done:state_next = Noneepisode_10_list = np.hstack((episode_10_list[1:],step+1))# 如果步数超过200,或者如果倾斜超过某个角度,则done为Trueif step < Val.get_value('NUM_KEEP_TIMES'):reward = torch.FloatTensor([-1.0]) # 如果半途摔倒,给予奖励 -1 作为惩罚complete_episodes = 0 # 站立超过195步,重置试验次数else:reward = torch.FloatTensor([1.0]) # 一直站立到结束时给予奖励 1complete_episodes = complete_episodes + 1 # 更新连续记录else:reward = torch.FloatTensor([0.0]) # 途中奖励为 0state_next = observation_nextstate_next = torch.from_numpy(state_next).type(torch.FloatTensor)state_next = torch.unsqueeze(state_next,0)# 向经验池中添加经验self.agent.memorize(state,action,state_next,reward)# 经验回放中更新Q函数self.agent.update_Q_function()# 更新观测值state = state_nextif done:print('%d Episode: Finished after %d steps : 10次试验的平均step数 = %.1f'%(episode,step +1,episode_10_list.mean()))break# 在最后一次试验中保存并绘制动画if is_episode_final is True:self.display_frames_as_gif(frames)breakif complete_episodes >= 10:print('10回合连续成功')is_episode_final = True

4、Val.py

# _*_ coding:utf-8 _*_'''在main中,import Val#使用一下命令初始化Val._init()'''def _init():global _global_dict_global_dict = {}def set_value(key,value):_global_dict[key] = valuedef get_value(key,defValue=None):try:return _global_dict[key]except KeyError:return -1

5、ReplayMemory.py

from collections import namedtupleimport randomTransition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))# 定义用于存储经验的内存类class ReplayMemory:def __init__(self,CAPACITY):self.capacity = CAPACITYself.memory = []self.index = 0def push(self,state,action,state_next,reward):if len(self.memory) < self.capacity:self.memory.append(None)self.memory[self.index] = Transition(state,action,state_next,reward)self.index = (self.index + 1) % self.capacitydef sample(self,batch_size):return random.sample(self.memory,batch_size)def __len__(self):return len(self.memory)

6、main.py

# 导入所使用的包import Environmentimport Valif __name__ == '__main__':Val._init()# 定义常量Val.set_value('ENV','CartPole-v0')# 要使用的任务名称Val.set_value('GAMMA',0.99) # 时间折扣率Val.set_value('NUM_KEEP_TIMES',195)# 站立保持次数,超过即为成功Val.set_value('MAX_STEPS',500)# 一次试验的步数 # 200次时,会出现一边倒的现象Val.set_value('NUM_EPISODES',500)# 最大试验次数cartpole_env = Environment.Environment()cartpole_env.run()

7、最终结果

8、代码下载

跳转到下载地址

9、参考资料

[1]边做边学深度强化学习:PyTorch程序设计实践

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。