200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > 深度强化学习算法(朴素DQN DDQN PPO A3C等)比较与实现

深度强化学习算法(朴素DQN DDQN PPO A3C等)比较与实现

时间:2021-01-12 07:39:17

相关推荐

深度强化学习算法(朴素DQN DDQN PPO A3C等)比较与实现

不同算法的理论比较部分参考CSDN博客 - 专业IT技术发表平台,代码实现在python完成。用的算例是OpenAI官网gym提供的算例环境"CartPole-v1"游戏,代码实现部分在谷歌全家桶(Colab,tensorflow2,wandb)中完成

1.朴素DQN

原论文:

[DQN] Playing Atari with Deep Reinforcement Learning [1]

Off-policy,Discrete action space,model free,

算法:

python实现:

import wandbimport tensorflow as tffrom tensorflow.keras.layers import Input, Densefrom tensorflow.keras.optimizers import Adamimport gymimport argparseimport numpy as npfrom collections import dequeimport randomtf.keras.backend.set_floatx('float64')wandb.init(name='DQN', project="deep-rl-tf2")parser = argparse.ArgumentParser()parser.add_argument('--gamma', type=float, default=0.95)parser.add_argument('--lr', type=float, default=0.005)parser.add_argument('--batch_size', type=int, default=32)parser.add_argument('--eps', type=float, default=1.0)parser.add_argument('--eps_decay', type=float, default=0.995)parser.add_argument('--eps_min', type=float, default=0.01)args = parser.parse_args()# In[]class ReplayBuffer:def __init__(self, capacity=10000):self.buffer = deque(maxlen=capacity)def put(self, state, action, reward, next_state, done):self.buffer.append([state, action, reward, next_state, done])def sample(self):sample = random.sample(self.buffer, args.batch_size)states, actions, rewards, next_states, done = map(np.asarray, zip(*sample))states = np.array(states).reshape(args.batch_size, -1)next_states = np.array(next_states).reshape(args.batch_size, -1)return states, actions, rewards, next_states, donedef size(self):return len(self.buffer)class ActionStateModel:def __init__(self, state_dim, aciton_dim):self.state_dim = state_dimself.action_dim = aciton_dimself.epsilon = args.epsself.model = self.create_model()def create_model(self):model = tf.keras.Sequential([Input((self.state_dim,)),Dense(32, activation='relu'),Dense(16, activation='relu'),Dense(self.action_dim)])pile(loss='mse', optimizer=Adam(args.lr))return modeldef predict(self, state):return self.model.predict(state)def get_action(self, state):state = np.reshape(state, [1, self.state_dim])self.epsilon *= args.eps_decayself.epsilon = max(self.epsilon, args.eps_min)q_value = self.predict(state)[0]if np.random.random() < self.epsilon:return random.randint(0, self.action_dim-1)return np.argmax(q_value)def train(self, states, targets):self.model.fit(states, targets, epochs=1, verbose=0)class Agent:def __init__(self, env):self.env = envself.state_dim = self.env.observation_space.shape[0]self.action_dim = self.env.action_space.nself.model = ActionStateModel(self.state_dim, self.action_dim)self.target_model = ActionStateModel(self.state_dim, self.action_dim)self.target_update()self.buffer = ReplayBuffer()def target_update(self):weights = self.model.model.get_weights()self.target_model.model.set_weights(weights)def replay(self):for _ in range(10):states, actions, rewards, next_states, done = self.buffer.sample()targets = self.target_model.predict(states)next_q_values = self.target_model.predict(next_states).max(axis=1)targets[range(args.batch_size), actions] = rewards + (1-done) * next_q_values * args.gammaself.model.train(states, targets)def train(self, max_episodes=10):for ep in range(max_episodes):done, total_reward = False, 0state = self.env.reset()while not done:action = self.model.get_action(state)next_state, reward, done, _ = self.env.step(action)self.buffer.put(state, action, reward*0.01, next_state, done)total_reward += rewardstate = next_stateif self.buffer.size() >= args.batch_size:self.replay()self.target_update()print('EP{} EpisodeReward={}'.format(ep, total_reward))wandb.log({'Reward': total_reward})# In[]env = gym.make('CartPole-v1')agent = Agent(env)agent.train(max_episodes=100)

2.DDPG

原论文:

Continuous Control with Deep Reinforcement Learning[2]

Model free, off policy, continuous action,

算法:

python实现:

import wandbimport tensorflow as tffrom tensorflow.keras.layers import Input, Dense, Flatten, Lambdafrom tensorflow.keras.optimizers import Adamimport gymimport argparseimport numpy as npfrom collections import dequeimport randomtf.keras.backend.set_floatx('float64')wandb.init(name='DoubleDQN', project="deep-rl-tf2")parser = argparse.ArgumentParser()parser.add_argument('--gamma', type=float, default=0.95)parser.add_argument('--lr', type=float, default=0.005)parser.add_argument('--batch_size', type=int, default=32)parser.add_argument('--eps', type=float, default=1.0)parser.add_argument('--eps_decay', type=float, default=0.995)parser.add_argument('--eps_min', type=float, default=0.01)args = parser.parse_args()class ReplayBuffer:def __init__(self, capacity=10000):self.buffer = deque(maxlen=capacity)def put(self, state, action, reward, next_state, done):self.buffer.append([state, action, reward, next_state, done])def sample(self):sample = random.sample(self.buffer, args.batch_size)states, actions, rewards, next_states, done = map(np.asarray, zip(*sample))states = np.array(states).reshape(args.batch_size, -1)next_states = np.array(next_states).reshape(args.batch_size, -1)return states, actions, rewards, next_states, donedef size(self):return len(self.buffer)class ActionStateModel:def __init__(self, state_dim, aciton_dim):self.state_dim = state_dimself.action_dim = aciton_dimself.epsilon = args.epsself.model = self.create_model()def create_model(self):model = tf.keras.Sequential([Input((self.state_dim,)),Dense(32, activation='relu'),Dense(16, activation='relu'),Dense(self.action_dim)])pile(loss='mse', optimizer=Adam(args.lr))return modeldef predict(self, state):return self.model.predict(state)def get_action(self, state):state = np.reshape(state, [1, self.state_dim])self.epsilon *= args.eps_decayself.epsilon = max(self.epsilon, args.eps_min)q_value = self.predict(state)[0]if np.random.random() < self.epsilon:return random.randint(0, self.action_dim-1)return np.argmax(q_value)def train(self, states, targets):self.model.fit(states, targets, epochs=1, verbose=0)class Agent:def __init__(self, env):self.env = envself.state_dim = self.env.observation_space.shape[0]self.action_dim = self.env.action_space.nself.model = ActionStateModel(self.state_dim, self.action_dim)self.target_model = ActionStateModel(self.state_dim, self.action_dim)self.target_update()self.buffer = ReplayBuffer()def target_update(self):weights = self.model.model.get_weights()self.target_model.model.set_weights(weights)def replay(self):for _ in range(10):states, actions, rewards, next_states, done = self.buffer.sample()targets = self.target_model.predict(states)next_q_values = self.target_model.predict(next_states)[range(args.batch_size),np.argmax(self.model.predict(next_states), axis=1)]targets[range(args.batch_size), actions] = rewards + (1-done) * next_q_values * args.gammaself.model.train(states, targets)def train(self, max_episodes=1000):for ep in range(max_episodes):done, total_reward = False, 0state = self.env.reset()while not done:action = self.model.get_action(state)next_state, reward, done, _ = self.env.step(action)self.buffer.put(state, action, reward*0.01, next_state, done)total_reward += rewardstate = next_stateif self.buffer.size() >= args.batch_size:self.replay()self.target_update()print('EP{} EpisodeReward={}'.format(ep, total_reward))wandb.log({'Reward': total_reward})def main():env = gym.make('CartPole-v1')agent = Agent(env)agent.train(max_episodes=1000)if __name__ == "__main__":main()

3.A3C

原论文:

Asynchronous Methods for Deep Reinforcement Learning[4]

算法:

python实现:

import wandbimport tensorflow as tffrom tensorflow.keras.layers import Input, Denseimport gymimport argparseimport numpy as npfrom threading import Thread, Lockfrom multiprocessing import cpu_counttf.keras.backend.set_floatx('float64')wandb.init(name='A3C', project="deep-rl-tf2")parser = argparse.ArgumentParser()parser.add_argument('--gamma', type=float, default=0.99)parser.add_argument('--update_interval', type=int, default=5)parser.add_argument('--actor_lr', type=float, default=0.0005)parser.add_argument('--critic_lr', type=float, default=0.001)args = parser.parse_args()CUR_EPISODE = 0class Actor:def __init__(self, state_dim, action_dim):self.state_dim = state_dimself.action_dim = action_dimself.model = self.create_model()self.opt = tf.keras.optimizers.Adam(args.actor_lr)self.entropy_beta = 0.01def create_model(self):return tf.keras.Sequential([Input((self.state_dim,)),Dense(32, activation='relu'),Dense(16, activation='relu'),Dense(self.action_dim, activation='softmax')])def compute_loss(self, actions, logits, advantages):ce_loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)entropy_loss = tf.keras.losses.CategoricalCrossentropy(from_logits=True)actions = tf.cast(actions, tf.int32)policy_loss = ce_loss(actions, logits, sample_weight=tf.stop_gradient(advantages))entropy = entropy_loss(logits, logits)return policy_loss - self.entropy_beta * entropydef train(self, states, actions, advantages):with tf.GradientTape() as tape:logits = self.model(states, training=True)loss = pute_loss(actions, logits, advantages)grads = tape.gradient(loss, self.model.trainable_variables)self.opt.apply_gradients(zip(grads, self.model.trainable_variables))return lossclass Critic:def __init__(self, state_dim):self.state_dim = state_dimself.model = self.create_model()self.opt = tf.keras.optimizers.Adam(args.critic_lr)def create_model(self):return tf.keras.Sequential([Input((self.state_dim,)),Dense(32, activation='relu'),Dense(16, activation='relu'),Dense(16, activation='relu'),Dense(1, activation='linear')])def compute_loss(self, v_pred, td_targets):mse = tf.keras.losses.MeanSquaredError()return mse(td_targets, v_pred)def train(self, states, td_targets):with tf.GradientTape() as tape:v_pred = self.model(states, training=True)assert v_pred.shape == td_targets.shapeloss = pute_loss(v_pred, tf.stop_gradient(td_targets))grads = tape.gradient(loss, self.model.trainable_variables)self.opt.apply_gradients(zip(grads, self.model.trainable_variables))return lossclass Agent:def __init__(self, env_name):env = gym.make(env_name)self.env_name = env_nameself.state_dim = env.observation_space.shape[0]self.action_dim = env.action_space.nself.global_actor = Actor(self.state_dim, self.action_dim)self.global_critic = Critic(self.state_dim)self.num_workers = cpu_count()def train(self, max_episodes=1000):workers = []for i in range(self.num_workers):env = gym.make(self.env_name)workers.append(WorkerAgent(env, self.global_actor, self.global_critic, max_episodes))for worker in workers:worker.start()for worker in workers:worker.join()class WorkerAgent(Thread):def __init__(self, env, global_actor, global_critic, max_episodes):Thread.__init__(self)self.lock = Lock()self.env = envself.state_dim = self.env.observation_space.shape[0]self.action_dim = self.env.action_space.nself.max_episodes = max_episodesself.global_actor = global_actorself.global_critic = global_criticself.actor = Actor(self.state_dim, self.action_dim)self.critic = Critic(self.state_dim)self.actor.model.set_weights(self.global_actor.model.get_weights())self.critic.model.set_weights(self.global_critic.model.get_weights())def n_step_td_target(self, rewards, next_v_value, done):td_targets = np.zeros_like(rewards)cumulative = 0if not done:cumulative = next_v_valuefor k in reversed(range(0, len(rewards))):cumulative = args.gamma * cumulative + rewards[k]td_targets[k] = cumulativereturn td_targetsdef advatnage(self, td_targets, baselines):return td_targets - baselinesdef list_to_batch(self, list):batch = list[0]for elem in list[1:]:batch = np.append(batch, elem, axis=0)return batchdef train(self):global CUR_EPISODEwhile self.max_episodes >= CUR_EPISODE:state_batch = []action_batch = []reward_batch = []episode_reward, done = 0, Falsestate = self.env.reset()while not done:# self.env.render()probs = self.actor.model.predict(np.reshape(state, [1, self.state_dim]))action = np.random.choice(self.action_dim, p=probs[0])next_state, reward, done, _ = self.env.step(action)state = np.reshape(state, [1, self.state_dim])action = np.reshape(action, [1, 1])next_state = np.reshape(next_state, [1, self.state_dim])reward = np.reshape(reward, [1, 1])state_batch.append(state)action_batch.append(action)reward_batch.append(reward)if len(state_batch) >= args.update_interval or done:states = self.list_to_batch(state_batch)actions = self.list_to_batch(action_batch)rewards = self.list_to_batch(reward_batch)next_v_value = self.critic.model.predict(next_state)td_targets = self.n_step_td_target(rewards, next_v_value, done)advantages = td_targets - self.critic.model.predict(states)with self.lock:actor_loss = self.global_actor.train(states, actions, advantages)critic_loss = self.global_critic.train(states, td_targets)self.actor.model.set_weights(self.global_actor.model.get_weights())self.critic.model.set_weights(self.global_critic.model.get_weights())state_batch = []action_batch = []reward_batch = []td_target_batch = []advatnage_batch = []episode_reward += reward[0][0]state = next_state[0]print('EP{} EpisodeReward={}'.format(CUR_EPISODE, episode_reward))wandb.log({'Reward': episode_reward})CUR_EPISODE += 1def run(self):self.train()def main():env_name = 'CartPole-v1'agent = Agent(env_name)agent.train()if __name__ == "__main__":main()

4.PPO

Proximal Policy Optimization Algorithms

PPO, on policy, actor critic, Both discrete continuous action space,

算法:

python实现:

import wandbimport tensorflow as tffrom tensorflow.keras.layers import Input, Denseimport gymimport argparseimport numpy as nptf.keras.backend.set_floatx('float64')wandb.init(name='PPO', project="deep-rl-tf2")parser = argparse.ArgumentParser()parser.add_argument('--gamma', type=float, default=0.99)parser.add_argument('--update_interval', type=int, default=5)parser.add_argument('--actor_lr', type=float, default=0.0005)parser.add_argument('--critic_lr', type=float, default=0.001)parser.add_argument('--clip_ratio', type=float, default=0.1)parser.add_argument('--lmbda', type=float, default=0.95)parser.add_argument('--epochs', type=int, default=3)args = parser.parse_args()class Actor:def __init__(self, state_dim, action_dim):self.state_dim = state_dimself.action_dim = action_dimself.model = self.create_model()self.opt = tf.keras.optimizers.Adam(args.actor_lr)def create_model(self):return tf.keras.Sequential([Input((self.state_dim,)),Dense(32, activation='relu'),Dense(16, activation='relu'),Dense(self.action_dim, activation='softmax')])def compute_loss(self, old_policy, new_policy, actions, gaes):gaes = tf.stop_gradient(gaes)old_log_p = tf.math.log(tf.reduce_sum(old_policy * actions))old_log_p = tf.stop_gradient(old_log_p)log_p = tf.math.log(tf.reduce_sum(new_policy * actions))ratio = tf.math.exp(log_p - old_log_p)clipped_ratio = tf.clip_by_value(ratio, 1 - args.clip_ratio, 1 + args.clip_ratio)surrogate = -tf.minimum(ratio * gaes, clipped_ratio * gaes)return tf.reduce_mean(surrogate)def train(self, old_policy, states, actions, gaes):actions = tf.one_hot(actions, self.action_dim)actions = tf.reshape(actions, [-1, self.action_dim])actions = tf.cast(actions, tf.float64)with tf.GradientTape() as tape:logits = self.model(states, training=True)loss = pute_loss(old_policy, logits, actions, gaes)grads = tape.gradient(loss, self.model.trainable_variables)self.opt.apply_gradients(zip(grads, self.model.trainable_variables))return lossclass Critic:def __init__(self, state_dim):self.state_dim = state_dimself.model = self.create_model()self.opt = tf.keras.optimizers.Adam(args.critic_lr)def create_model(self):return tf.keras.Sequential([Input((self.state_dim,)),Dense(32, activation='relu'),Dense(16, activation='relu'),Dense(16, activation='relu'),Dense(1, activation='linear')])def compute_loss(self, v_pred, td_targets):mse = tf.keras.losses.MeanSquaredError()return mse(td_targets, v_pred)def train(self, states, td_targets):with tf.GradientTape() as tape:v_pred = self.model(states, training=True)assert v_pred.shape == td_targets.shapeloss = pute_loss(v_pred, tf.stop_gradient(td_targets))grads = tape.gradient(loss, self.model.trainable_variables)self.opt.apply_gradients(zip(grads, self.model.trainable_variables))return lossclass Agent:def __init__(self, env):self.env = envself.state_dim = self.env.observation_space.shape[0]self.action_dim = self.env.action_space.nself.actor = Actor(self.state_dim, self.action_dim)self.critic = Critic(self.state_dim)def gae_target(self, rewards, v_values, next_v_value, done):n_step_targets = np.zeros_like(rewards)gae = np.zeros_like(rewards)gae_cumulative = 0forward_val = 0if not done:forward_val = next_v_valuefor k in reversed(range(0, len(rewards))):delta = rewards[k] + args.gamma * forward_val - v_values[k]gae_cumulative = args.gamma * args.lmbda * gae_cumulative + deltagae[k] = gae_cumulativeforward_val = v_values[k]n_step_targets[k] = gae[k] + v_values[k]return gae, n_step_targetsdef list_to_batch(self, list):batch = list[0]for elem in list[1:]:batch = np.append(batch, elem, axis=0)return batchdef train(self, max_episodes=1000):for ep in range(max_episodes):state_batch = []action_batch = []reward_batch = []old_policy_batch = []episode_reward, done = 0, Falsestate = self.env.reset()while not done:# self.env.render()probs = self.actor.model.predict(np.reshape(state, [1, self.state_dim]))action = np.random.choice(self.action_dim, p=probs[0])next_state, reward, done, _ = self.env.step(action)state = np.reshape(state, [1, self.state_dim])action = np.reshape(action, [1, 1])next_state = np.reshape(next_state, [1, self.state_dim])reward = np.reshape(reward, [1, 1])state_batch.append(state)action_batch.append(action)reward_batch.append(reward * 0.01)old_policy_batch.append(probs)if len(state_batch) >= args.update_interval or done:states = self.list_to_batch(state_batch)actions = self.list_to_batch(action_batch)rewards = self.list_to_batch(reward_batch)old_policys = self.list_to_batch(old_policy_batch)v_values = self.critic.model.predict(states)next_v_value = self.critic.model.predict(next_state)gaes, td_targets = self.gae_target(rewards, v_values, next_v_value, done)for epoch in range(args.epochs):actor_loss = self.actor.train(old_policys, states, actions, gaes)critic_loss = self.critic.train(states, td_targets)state_batch = []action_batch = []reward_batch = []old_policy_batch = []episode_reward += reward[0][0]state = next_state[0]print('EP{} EpisodeReward={}'.format(ep, episode_reward))wandb.log({'Reward': episode_reward})def main():env_name = 'CartPole-v1'env = gym.make(env_name)agent = Agent(env)agent.train()if __name__ == "__main__":main()

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。