200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > 强化学习经典算法笔记(十二):近端策略优化算法(PPO)实现 基于A2C(下)

强化学习经典算法笔记(十二):近端策略优化算法(PPO)实现 基于A2C(下)

时间:2022-01-23 00:28:12

相关推荐

强化学习经典算法笔记(十二):近端策略优化算法(PPO)实现 基于A2C(下)

强化学习经典算法笔记(十二):近端策略优化算法(PPO)实现,基于A2C

本篇实现一个基于A2C框架的PPO算法,应用于连续动作空间任务。

import torchimport torch.nn as nnfrom torch.distributions import MultivariateNormalimport gymimport numpy as npdevice = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

class Memory:def __init__(self):self.actions = []self.states = []self.logprobs = []self.rewards = []self.is_terminals = []def clear_memory(self):del self.actions[:]del self.states[:]del self.logprobs[:]del self.rewards[:]del self.is_terminals[:]

A2C的实现和上篇的区别在于动作的选择。Actor输出多变量高斯分布的均值向量,人为给定一个协方差矩阵,当然Var也可以学习出来。

class ActorCritic(nn.Module):def __init__(self, state_dim, action_dim, action_std):super(ActorCritic, self).__init__()# action mean range -1 to 1self.actor = nn.Sequential(nn.Linear(state_dim, 64),nn.Tanh(),nn.Linear(64, 32),nn.Tanh(),nn.Linear(32, action_dim),nn.Tanh())# criticself.critic = nn.Sequential(nn.Linear(state_dim, 64),nn.Tanh(),nn.Linear(64, 32),nn.Tanh(),nn.Linear(32, 1))self.action_var = torch.full((action_dim,), action_std*action_std).to(device)def forward(self):raise NotImplementedErrordef act(self, state, memory):action_mean = self.actor(state)cov_mat = torch.diag(self.action_var).to(device)dist = MultivariateNormal(action_mean, cov_mat)action = dist.sample()action_logprob = dist.log_prob(action)memory.states.append(state)memory.actions.append(action)memory.logprobs.append(action_logprob)return action.detach()def evaluate(self, state, action): action_mean = torch.squeeze(self.actor(state))action_var = self.action_var.expand_as(action_mean)cov_mat = torch.diag_embed(action_var).to(device)dist = MultivariateNormal(action_mean, cov_mat)action_logprobs = dist.log_prob(torch.squeeze(action))dist_entropy = dist.entropy()state_value = self.critic(state)return action_logprobs, torch.squeeze(state_value), dist_entropy

class PPO:def __init__(self, state_dim, action_dim, action_std, lr, betas, gamma, K_epochs, eps_clip):self.lr = lrself.betas = betasself.gamma = gammaself.eps_clip = eps_clipself.K_epochs = K_epochsself.policy = ActorCritic(state_dim, action_dim, action_std).to(device)self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr, betas=betas)self.policy_old = ActorCritic(state_dim, action_dim, action_std).to(device)self.policy_old.load_state_dict(self.policy.state_dict())self.MseLoss = nn.MSELoss()def select_action(self, state, memory):state = torch.FloatTensor(state.reshape(1, -1)).to(device)return self.policy_old.act(state, memory).cpu().data.numpy().flatten()def update(self, memory):# Monte Carlo estimate of rewards:rewards = []discounted_reward = 0for reward, is_terminal in zip(reversed(memory.rewards), reversed(memory.is_terminals)):if is_terminal:discounted_reward = 0discounted_reward = reward + (self.gamma * discounted_reward)rewards.insert(0, discounted_reward)# Normalizing the rewards:rewards = torch.tensor(rewards).to(device)rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-5)# convert list to tensorold_states = torch.squeeze(torch.stack(memory.states).to(device)).detach()old_actions = torch.squeeze(torch.stack(memory.actions).to(device)).detach()old_logprobs = torch.squeeze(torch.stack(memory.logprobs)).to(device).detach()# Optimize policy for K epochs:for _ in range(self.K_epochs):# Evaluating old actions and values :logprobs, state_values, dist_entropy = self.policy.evaluate(old_states, old_actions)# Finding the ratio (pi_theta / pi_theta__old):ratios = torch.exp(logprobs - old_logprobs.detach())# Finding Surrogate Loss:advantages = rewards - state_values.detach() surr1 = ratios * advantagessurr2 = torch.clamp(ratios, 1-self.eps_clip, 1+self.eps_clip) * advantagesloss = -torch.min(surr1, surr2) + 0.5*self.MseLoss(state_values, rewards) - 0.01*dist_entropy# take gradient stepself.optimizer.zero_grad()loss.mean().backward()self.optimizer.step()# Copy new weights into old policy:self.policy_old.load_state_dict(self.policy.state_dict())

def main():############## Hyperparameters ##############env_name = "BipedalWalker-v2"render = Falsesolved_reward = 300 # stop training if avg_reward > solved_rewardlog_interval = 20 # print avg reward in the intervalmax_episodes = 10000 # max training episodesmax_timesteps = 1500 # max timesteps in one episodeupdate_timestep = 4000# update policy every n timestepsaction_std = 0.5 # constant std for action distribution (Multivariate Normal)K_epochs = 80# update policy for K epochseps_clip = 0.2 # clip parameter for PPOgamma = 0.99# discount factorlr = 0.0003 # parameters for Adam optimizerbetas = (0.9, 0.999)random_seed = None############################################## creating environmentenv = gym.make(env_name)state_dim = env.observation_space.shape[0]action_dim = env.action_space.shape[0]if random_seed:print("Random Seed: {}".format(random_seed))torch.manual_seed(random_seed)env.seed(random_seed)np.random.seed(random_seed)memory = Memory()ppo = PPO(state_dim, action_dim, action_std, lr, betas, gamma, K_epochs, eps_clip)print(lr,betas)# logging variablesrunning_reward = 0avg_length = 0time_step = 0# training loopfor i_episode in range(1, max_episodes+1):state = env.reset()for t in range(max_timesteps):time_step +=1# Running policy_old:action = ppo.select_action(state, memory)state, reward, done, _ = env.step(action)# Saving reward and is_terminals:memory.rewards.append(reward)memory.is_terminals.append(done)# update if its timeif time_step % update_timestep == 0:ppo.update(memory)memory.clear_memory()time_step = 0running_reward += rewardif render:env.render()if done:breakavg_length += t# stop training if avg_reward > solved_rewardif running_reward > (log_interval*solved_reward):print("########## Solved! ##########")torch.save(ppo.policy.state_dict(), './PPO_continuous_solved_{}.pth'.format(env_name))break# save every 500 episodesif i_episode % 500 == 0:torch.save(ppo.policy.state_dict(), './PPO_continuous_{}.pth'.format(env_name))# loggingif i_episode % log_interval == 0:avg_length = int(avg_length/log_interval)running_reward = int((running_reward/log_interval))print('Episode {} \t Avg length: {} \t Avg reward: {}'.format(i_episode, avg_length, running_reward))running_reward = 0avg_length = 0if __name__ == '__main__':main()

测试代码

import gymfrom PPO_continuous import PPO, Memoryfrom PIL import Imageimport torchdevice = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")def test():############## Hyperparameters ##############env_name = "BipedalWalker-v2"env = gym.make(env_name)state_dim = env.observation_space.shape[0]action_dim = env.action_space.shape[0]n_episodes = 3# num of episodes to runmax_timesteps = 1500 # max timesteps in one episoderender = True # render the environmentsave_gif = False # png images are saved in gif folder# filename and directory to load model fromfilename = "PPO_continuous_" +env_name+ ".pth"directory = "./preTrained/"action_std = 0.5 # constant std for action distribution (Multivariate Normal)K_epochs = 80 # update policy for K epochseps_clip = 0.2# clip parameter for PPOgamma = 0.99 # discount factorlr = 0.0003 # parameters for Adam optimizerbetas = (0.9, 0.999)#############################################memory = Memory()ppo = PPO(state_dim, action_dim, action_std, lr, betas, gamma, K_epochs, eps_clip)ppo.policy_old.load_state_dict(torch.load(directory+filename))for ep in range(1, n_episodes+1):ep_reward = 0state = env.reset()for t in range(max_timesteps):action = ppo.select_action(state, memory)state, reward, done, _ = env.step(action)ep_reward += rewardif render:env.render()if save_gif:img = env.render(mode = 'rgb_array')img = Image.fromarray(img)img.save('./gif/{}.jpg'.format(t)) if done:breakprint('Episode: {}\tReward: {}'.format(ep, int(ep_reward)))ep_reward = 0env.close()if __name__ == '__main__':test()

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。