目录
DDPG是最简单、最经典的Actor_Critic算法之一。它解决了DQN无法处理的连续动作空间上的强化学习问题。
禁止转载,侵权必究!Update 2020.11.27
前言
Actor-Critic网络是一类神经网络算法的统称,比如著名的AlphaGo就属于这一类。DPG、DDPG网络都是这类网络。Actor神经网络负责输出动作,Critic神经网络负责给此动作评分,最后通过Reward,Critic评分会越来越准,反过来影响Actor网络输出更好的动作。
DPG(确定性行为策略)网络是D.Silver等科学家在2014年提出的,在DPG网络之前业界普遍认为环境模型无关的确定性策略是不存在的,而D.Silver等通过严密的数学推导证明了确定性策略是存在的。
DDPG可以认为是DQN和DPG的组合算法。它和DQN不同之处在于,它把神经网络分成了Actor-Critic两个神经网络,又借鉴了DPG网络的策略梯度优化思想,Actor网络不在像DQN的Q网络那样输出动作概率值,而是输出确定的动作。而Critic网络负责输出策略。
教学环境
gym仿真器。
DDPG算法实现
PARL库给了这些经典算法很好的封装,因此,我们不用太关心Agent和Algorithm的实现。我们可以把重点放在建模上了,如下:
1.Model层
实现两个神经网络Actor-Critic。
导入依赖:
import paddle.fluid as fluid
import parl
from parl import layers
Model主类:
class Model(parl.Model):
def __init__(self, act_dim):
self.actor_model = ActorModel(act_dim)
self.critic_model = CriticModel()
def policy(self, obs):
return self.actor_model.policy(obs)
def value(self, obs, act):
return self.critic_model.value(obs, act)
def get_actor_params(self):
return self.actor_model.parameters()
ActorModel类:
class ActorModel(parl.Model):
def __init__(self, act_dim):
hid_size = 100
self.fc1 = layers.fc(size=hid_size, act='relu')
self.fc2 = layers.fc(size=act_dim, act='tanh')
# 它实现了父类的policy方法,代表它是算法中的策略网络
def policy(self, obs):
hid = self.fc1(obs)
means = self.fc2(hid)
return means
CriticModel类:
class CriticModel(parl.Model):
def __init__(self):
hid_size = 100
self.fc1 = layers.fc(size=hid_size, act='relu')
self.fc2 = layers.fc(size=1, act=None)
# 显然它是Q神经网络
def value(self, obs, act):
concat = layers.concat([obs, act], axis=1)
hid = self.fc1(concat)
Q = self.fc2(hid)
Q = layers.squeeze(Q, axes=[1])
return Q
2. main函数
def main():
# 模拟器环境初始化
env = ContinuousCartPoleEnv()
obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.shape[0]
# 引入模型
model = Model(act_dim)
# 引入DDPG算法,这里PARL库已经封装好了
algorithm = parl.algorithms.DDPG(
model, gamma=GAMMA, tau=TAU, actor_lr=ACTOR_LR, critic_lr=CRITIC_LR)
# 引入Agent,跟之前的Agent类似,没有变化
agent = Agent(algorithm, obs_dim, act_dim)
# 引入PARL库封装好的ReplayMemory
rpm = ReplayMemory(MEMORY_SIZE, obs_dim, act_dim)
# 初始化Q内存
while rpm.size() < MEMORY_WARMUP_SIZE:
run_train_episode(env, agent, rpm)
# 开始训练
episode = 0
while episode < 30000:
for i in range(50):
train_reward = run_train_episode(env, agent, rpm)
episode += 1
# logger.info('Episode: {} Reward: {}'.format(episode, train_reward))
evaluate_reward = run_evaluate_episode(env, agent, False)
logger.info('Episode {}, Evaluate reward: {}'.format(
episode, evaluate_reward))
if( evaluate_reward == 200 ):
break
agent.save('./model_dir')