目录
手机游戏AI实战项目中,Agent速度是非常慢的,因此并行能力十分重要,本文介绍并行AI算法A3C/A2C。
禁止转载,侵权必究!
前言
DQN,DDPG算法都使用了ReplyMemory来存储和回放经验。这是off-policy类型算法的常用技巧,本章将介绍另外一种思路,A3C算法设计了多个异步多线程的Actor-Critic,每个agent在自己的线程中运行。然后再全局共享学习到的网络参数。这样每时每刻都有大量的on-policy数据,并且参数没有相互关联(参考DDQN算法对DQN算法优化原理)。因此A3C算法通过多线程异步+共享全局参数达到了类似ReplyMemory类似效果,既有大量的数据解决训练过程不稳定问题,又解决了参数关联性的问题。
A3C作者声称:用多CPU达到甚至超过使用GPU训练的效率和效果。
注:NVIDIA公司的算法专家表示不服,提出了GA3C算法,证明把Agent的深度神经网络计算部分转移到GPU实现算法更快!有兴趣请参考相关论文。
A2C是A3C的简化升级版本。
One-Step VS N-Step问题
如我们训练的Flask_DQN游戏智能体,当我们从state1转移到state2时,它有一个过场动画,被定义为状态other。我们在工程实践中不可能预估过场动画的时长,那么从state1转移到state2过程中,我们只能执行N个动作,也就是N个step。那么这种情况下我们要如何设计reward呢?我们在Flask_DQN游戏智能体开发中我们设计了从state1转移到other就拿到全部的reward,而从other转移到任意其他state都不能获得reward。这是一种软件工程的trick,不优雅!!!在A3C算法内部已经用了N-Step的R传播的方法解决了这个问题。
知识预备
在PG算法中,我们的Agent又被称为Actor,Actor对于一个特定的任务,都有自己的一个策略π,策略π通常用一个神经网络表示,其参数为θ。从一个特定的状态state出发,一直到任务的结束,被称为一个完整的eposide,在每一步,我们都能获得一个奖励r,一个完整的任务所获得的最终奖励被称为R。
如果我们用Q函数来代替R,同时我们创建一个Critic网络来计算Q函数值,那么我们就得到了Actor-Critic方法。
Q函数增加一个基线,使得反馈有正有负,这里的基线通常用状态价值函数V来表示。但是,这样的话我们需要同时计算Q函数和V函数,不好算。Q函数可以用Step t+1的V函数加上从Step t到Step t+1的r来代替。这样就可以得到不含Q函数的表达式,这个表达式我们叫Advantage (优势函数),此时的Critic网络变为计算优势函数A的网络。
A3C全称为异步优势动作评价算法(Asynchronous advantage actor-critic)。其中Advantage就是指优势函数A。因此我们很容可以推导A3C实质就是求解πθ网络和Aπ(s, a)网络。
需要重点澄清的是: A2C算法是A3C算法的改进版本。可以更高效利用单机的多CPU。
当A3C算法在论文中被提出来的时候,对比了四种算法:一种是异步Sarsa、一种是异步Q-learning、一种是普通DQN、一种是A3C。论文作者认为“异步更新”是算法优秀的重要原因。论文发表后,各路大神挑战”异步更新“是A3C模型表现更好的关键因素的说法?有趣的是,多线程是算法快的原因,但是”异步更新“反而是算法的缺点。所以科学家提出新算法A2C算法(Advantage actor-critic),可以更有效利用CPU资源。打脸,啪啪啪!
A2C 也会构建多个进程,包括多个并行的 worker,与独立的环境进行交互,收集独立的经验。但是A2C每个episode会同步等待所有Actor回传参数。A3C 中各个 agent 都是异步独立更新,每个 agent 使用不同的策略,实验证明这种异步更新并不是最优解(有科学家撰文阐述过其中原因,请自行百度)。而A2C采用同步更新的策略,就可以解决参数更新一致性问题,起到了优化的效果。
补充:DQN算法的问题:
第一,训练效率低,单agent在手机网络游戏AI领域受制于手机游戏主流形式获取reward很慢(需要玩家操作非常快的游戏很少)。反之,PC单机游戏《星际争霸》获取reward是非常快的(DeepMind)。
第二,DQN算法不稳定性,很多相关论文都论述过。从训练过程直观来看,前一个episode明明拿到了reward,下个episode还是没学会。根本原因是:ReplyMemory需要做batch采样,传递reward效率太低,尤其是在N-step问题存在的场景。
A2C算法详解
Main函数
# 实例化Learner
learner = Learner(config)
while not learner.should_stop():
start = time.time()
while time.time() - start < config['log_metrics_interval_s']:
learner.step()
learner.log_metrics()
1.Learner类定义
def __init__(self, config):
#=========== 创建Agent ==========
...
#========== 创建工作线程中 Actor ===========
...
self.create_actors()
创建Agent
可以看到PARL框架在A2C算法实现里面用的是A3C算法,因为它们是同源的算法,只有点不同(后面step函数中会讲到)
env = gym.make(config['env_name'])
env = wrap_deepmind(env, dim=config['env_dim'], obs_format='NCHW')
obs_shape = env.observation_space.shape
act_dim = env.action_space.n
self.config['obs_shape'] = obs_shape
self.config['act_dim'] = act_dim
model = AtariModel(act_dim)
algorithm = parl.algorithms.A3C(
model, vf_loss_coeff=config['vf_loss_coeff'])
self.agent = AtariAgent(algorithm, config)
创建工作线程中的Actor
连接到集群去执行run_remote_sample函数
def create_actors(self):
# 先把自己连接到XPARL集群上去
parl.connect(self.config['master_address'])
for i in six.moves.range(self.config['actor_num']):
...
remote_thread = threading.Thread(
# 在工作线程中运行run_remote_sample函数
# 通过params_queue传递模型的参数
target=self.run_remote_sample, args=(params_queue, ))
remote_thread.setDaemon(True)
remote_thread.start()
...
run_remote_sample函数
各个Actor真正执行的函数:
def run_remote_sample(self, params_queue):
...
while True:
...
batch = remote_actor.sample()
# 通过sample_data_queue传递sample函数返回的数据
self.sample_data_queue.put(batch)
step函数
因为A2C算法会同步等待所有Agent(Actor)完成一轮训练后把π网络的参数θ同步上来更新全局的π网络参数。就像楼梯台阶一样,因此这个重要的函数叫step。
def step(self):
# 同步模型参数
latest_params = self.agent.get_weights()
for params_queue in self.params_queues:
params_queue.put(latest_params)
train_batch = defaultdict(list)
# 汇总Actor sample回来的数据
for i in range(self.config['actor_num']):
sample_data = self.sample_data_queue.get()
for key, value in sample_data.items():
train_batch[key].append(value)
self.sample_total_steps += sample_data['obs'].shape[0]
for key, value in train_batch.items():
train_batch[key] = np.concatenate(value)
with self.learn_time_stat:
# 执行A3C算法的learn方法
total_loss, pi_loss, vf_loss, entropy, lr, entropy_coeff = self.agent.learn(
obs_np=train_batch['obs'],
actions_np=train_batch['actions'],
advantages_np=train_batch['advantages'],
target_values_np=train_batch['target_values'])
self.total_loss_stat.add(total_loss)
self.pi_loss_stat.add(pi_loss)
self.vf_loss_stat.add(vf_loss)
self.entropy_stat.add(entropy)
self.lr = lr
self.entropy_coeff = entropy_coeff
2.Actor&Agent类
注解@parl.remote_class表明Actor类是在独立的本机进程中执行(因为A2C是利用本机多CPU)。如果购买或者部署了PARL分布式集群,那么Actor实际是在远程server中运行了。
注意,Actor的init方法中保存了env数组,用同样的参数实例化了模型、用同样的模型实例化了算法并作为参数传入到了agent中。
@parl.remote_class
class Actor(object):
def __init__(self, config):
...
# Actor保存了env数组
self.envs = []
for _ in range(config['env_num']):
env = gym.make(config['env_name'])
env = wrap_deepmind(env, dim=config['env_dim'], obs_format='NCHW')
self.envs.append(env)
...
model = AtariModel(act_dim)
algorithm = parl.algorithms.A3C(
model, vf_loss_coeff=config['vf_loss_coeff'])
self.agent = AtariAgent(algorithm, config)
class AtariAgent(parl.Agent):
def __init__(self, algorithm, config):
...
def build_program(self):
self.sample_program = fluid.Program()
self.predict_program = fluid.Program()
self.value_program = fluid.Program()
self.learn_program = fluid.Program()
...
def sample(self, obs_np):
...
def predict(self, obs_np):
...
def value(self, obs_np):
...
def learn(self, obs_np, actions_np, advantages_np, target_values_np):
...
每个Actor对应一个Agent
sample函数
Actor中的sample函数会调用agent的sample函数和agent的value函数来分别更新本地的π网络和v网络,最终返回sample_data给中心节点。
...
actions_batch, values_batch =
self.agent.sample(np.stack(self.obs_batch))
...
next_value = self.agent.value(next_obs)
...
sample_data的数据结构:
sample_data['obs'].extend(env_sample_data[env_id]['obs'])
sample_data['actions'].extend(env_sample_data[env_id]['actions'])
sample_data['advantages'].extend(advantages)
sample_data['target_values'].extend(target_values)
其中优势函数的的计算如下:
# gae:generalized advantage estimator
advantages = calc_gae(rewards, values, next_value,
self.config['gamma'],
self.config['lambda'])
target_values = advantages + values
3.Model类
def __init__(self, act_dim):
self.conv1 = layers.conv2d(
num_filters=32, filter_size=8, stride=4, padding=1, act='relu')
self.conv2 = layers.conv2d(
num_filters=64, filter_size=4, stride=2, padding=2, act='relu')
self.conv3 = layers.conv2d(
num_filters=64, filter_size=3, stride=1, padding=0, act='relu')
self.fc = layers.fc(size=512, act='relu')
self.policy_fc = layers.fc(size=act_dim)
self.value_fc = layers.fc(size=1)
显然,A2C的动作策略网络π和评价值网络v使用的相同的卷积神经网络,只是在输出的时候根据输出的要求,做了不同的处理。
4.VectorEnv类
这个类是PARL对env环境的封装。我们的实际应用环境必须实现此类定义的两个方法,如下所示:
class VectorEnv(object):
def __init__(self, envs):
def reset(self):
...
def step(self, actions):
# env需要实现step方法
obs, reward, done, info = self.envs[env_id].step(actions[env_id])
...
if done:
# env需要实现reset方法
obs = self.envs[env_id].reset()
...
return obs_batch, reward_batch, done_batch, info_batch
模拟器的源数据是由此类中的step方法批量返回。
编写游戏模拟器
ArKnight_A2C_Simulator
修改Learner的初始化方法
#=========== Create Agent ==========
game = ArKnights()
env = PMGE(game)
obs_shape = (3, 108, 192)
act_dim = 650
定义新的env.py
class PMGE(object):
def __init__(self, game):
self.game = game
def step(self, action):
# 模拟器简化了状态判断
# 实际项目应该实时生成:当前屏幕--> stateCode 的关系
s1 = [ self.game.stateCode ]
# 产生状态变化
self.game.act(action, s1)
reward = self.game.getScore(s1)
isOver = self.game.gameOver()
next_obs = self.game.render()
# 为了匹配标准的API
return next_obs, reward, isOver, 0
def reset(self):
return self.game.reset()
修改Actor
class Actor(object):
def __init__(self, config):
self.config = config
self.envs = []
for _ in range(config['env_num']):
game = ArKnights()
env = PMGE(game)
self.envs.append(env)
self.vector_env = VectorEnv(self.envs)
self.obs_batch = self.vector_env.reset()
model = Model(config['act_dim'])
algorithm = parl.algorithms.A3C(
model, vf_loss_coeff=config['vf_loss_coeff'])
self.agent = Agent(algorithm, config)
定义模拟环境
class ArKnights(object):
def __init__(self):
"""
游戏《明日方舟》智能体定义
"""
self.stateCode = 990
# 1920x1080 ----- 1920/80 x 1080/40 = 24x27
self.tap_dim = 24*27
self.swipe_dim = 4 # 上下左右
def render(self):
imgDir = IMAGE_DIR + str(self.stateCode) + '/'
filenames = os.listdir(imgDir)
# 在stateCode目录下随机取一张图片
filename = random.choice(filenames)
return self.transform_img(imgDir + filename)
def act(self, action, stateCode):
if stateCode[0] == 990:
if action in [442,443,444,445,466,467,468,469]:
self.stateCode = 970
if stateCode[0] == 970:
if action in [111,112,113,114,115,
135,136,137,138,139,
159,160,161,162,163,
183,184,185,186,187,
207,208,209,210,211]:
self.stateCode = 965
def getScore(self, s1):
# 状态没变扣一分
if s1[0] == self.stateCode:
return -1
return 1
def gameOver(self):
code = self.stateCode
# if (code == 910 or code == 1010):
# for debug 让算法快速收敛
if (code == 965):
return True
return False
def reset(self):
self.stateCode = 990
imgDir = IMAGE_DIR + str(self.stateCode) + '/'
filenames = os.listdir(imgDir)
# 在990目录下随机取一张图片
filename = random.choice(filenames)
return self.transform_img(imgDir + filename)
def transform_img(self, filepath):
# 直接读取 (h,w)
img = cv2.imread(filepath, cv2.IMREAD_COLOR)
# 将图片尺寸缩放道 (image, (w,h)) 192x108
img = cv2.resize(img, (192, 108))
# 因为cv2的数组长宽是反的,所以用numpy转置一下 (C,H,W)
img = np.transpose(img, (2, 0, 1))
obs = img.astype('float32')
return obs
然后经过大约10万个steps就可以训练出模型predict_program。
定义真机环境
import time
import cv2
from PIL import Image
import numpy as np
from adbutil import AdbUtil
from resnet import ResNet
import paddle
import paddle.fluid as fluid
class ArKnights(object):
def __init__(self):
self.adbutil = AdbUtil()
# 加载推理模型
with fluid.dygraph.guard():
# 加载状态推断引擎
self.model = ResNet('resnet', 50)
#加载模型参数
model_state_dict, _ = fluid.load_dygraph("arknights")
self.model.load_dict(model_state_dict)
self.model.eval()
def _restart(self):
"""
打开游戏进程
如果已经打开,先关闭再重新打开
"""
self.adbutil.stopArKnights()
self.adbutil.startArKnights()
# 每隔1秒在屏幕中心点击1下,持续20秒
self.adbutil.taptap(960,540,20,1)
def _stop(self):
"""
关闭游戏进程
"""
self.adbutil.stopArKnights()
def act(self, action):
# 点击动作code映射成动作
if action < 648:
x = (action % 24) * 80 + 40 # 取余
y = (action // 24) * 40 + 20 # 取商
self.adbutil.taptap(x,y,1,0.01) # x,y,count,frequency
elif action == 648:
self.adbutil.rightswipeswipe(2,0.5)
elif action == 649:
self.adbutil.leftswipeswipe(2,0.5)
else:
raise("No such action error!" + str(action))
time.sleep(2) # 等动作执行完
def render(self):
# TODO check shape
img = self.adbutil.screencap()
img = img.resize((192, 108), Image.ANTIALIAS)
# 因为图片的数组长宽是反的,所以用numpy转置一下 (C,H,W)
img = np.transpose(img, (2, 0, 1))
obs = img.astype('float32')
return obs
def reset(self):
self._restart()
return self.render()
def gameOver(self):
state = self.inferState()
print("state"+str(state))
if state[0] == 965:
return True
else:
return False
def inferState(self):
"""
图片推断
"""
...
这里的游戏状态推断引擎,就是ARKNIGHT_CLASSIFY项目输出的推理模型。
评估和部署
def test():
game = ArKnights()
env = PMGE(game)
obs_shape = (3, 108, 192)
act_dim = 650
config['obs_shape'] = obs_shape
config['act_dim'] = act_dim
model = Model(act_dim)
algorithm = parl.algorithms.A3C(model, vf_loss_coeff=config['vf_loss_coeff'])
agent = Agent(algorithm, config)
agent.restore("./model_dir")
# 初始状态
obs = env.reset()
MAX_STEP = 20
step = 0
while True:
state_code = env.game.stateCode
action = agent.predict(obs)
obs, reward, isOver, _ = env.step(action)
next_state_code = env.game.stateCode
step += 1
logger.info("evaluate state_code:{}, action:{} next_state_code:{}, reward:{}, isOver:{}".format(state_code, action, next_state_code, reward, isOver))
if isOver or step >MAX_STEP:
logger.info("GameOver, state:{}".format(next_state_code))
break;
可以看到只用了2步,算法就成功达到了设定的终止状态[965]。
新建部署项目ArKnight_A2C,把模型导入,效果如下。
实战效果演示
A2C玩《明日方舟》: 提取码: rzyk
参考资料
A2C算法的数学推导视频,讲得非常清晰,务必观看。
原版A3C论文: 提取码: zcp7