手机游戏AI实战项目中,Agent速度是非常慢的,因此并行能力十分重要,本文介绍并行AI算法A3C/A2C。

禁止转载,侵权必究!

前言

DQN,DDPG算法都使用了ReplyMemory来存储和回放经验。这是off-policy类型算法的常用技巧,本章将介绍另外一种思路,A3C算法设计了多个异步多线程的Actor-Critic,每个agent在自己的线程中运行。然后再全局共享学习到的网络参数。这样每时每刻都有大量的on-policy数据,并且参数没有相互关联(参考DDQN算法对DQN算法优化原理)。因此A3C算法通过多线程异步+共享全局参数达到了类似ReplyMemory类似效果,既有大量的数据解决训练过程不稳定问题,又解决了参数关联性的问题。

A3C作者声称:用多CPU达到甚至超过使用GPU训练的效率和效果。

注:NVIDIA公司的算法专家表示不服,提出了GA3C算法,证明把Agent的深度神经网络计算部分转移到GPU实现算法更快!有兴趣请参考相关论文。

A2C是A3C的简化升级版本。

One-Step VS N-Step问题

如我们训练的Flask_DQN游戏智能体,当我们从state1转移到state2时,它有一个过场动画,被定义为状态other。我们在工程实践中不可能预估过场动画的时长,那么从state1转移到state2过程中,我们只能执行N个动作,也就是N个step。那么这种情况下我们要如何设计reward呢?我们在Flask_DQN游戏智能体开发中我们设计了从state1转移到other就拿到全部的reward,而从other转移到任意其他state都不能获得reward。这是一种软件工程的trick,不优雅!!!在A3C算法内部已经用了N-Step的R传播的方法解决了这个问题。

知识预备

在PG算法中,我们的Agent又被称为Actor,Actor对于一个特定的任务,都有自己的一个策略π,策略π通常用一个神经网络表示,其参数为θ。从一个特定的状态state出发,一直到任务的结束,被称为一个完整的eposide,在每一步,我们都能获得一个奖励r,一个完整的任务所获得的最终奖励被称为R。

如果我们用Q函数来代替R,同时我们创建一个Critic网络来计算Q函数值,那么我们就得到了Actor-Critic方法。

Q函数增加一个基线,使得反馈有正有负,这里的基线通常用状态价值函数V来表示。但是,这样的话我们需要同时计算Q函数和V函数,不好算。Q函数可以用Step t+1的V函数加上从Step t到Step t+1的r来代替。这样就可以得到不含Q函数的表达式,这个表达式我们叫Advantage (优势函数),此时的Critic网络变为计算优势函数A的网络。

A3C全称为异步优势动作评价算法(Asynchronous advantage actor-critic)。其中Advantage就是指优势函数A。因此我们很容可以推导A3C实质就是求解πθ网络和Aπ(s, a)网络。

需要重点澄清的是: A2C算法是A3C算法的改进版本。可以更高效利用单机的多CPU。

当A3C算法在论文中被提出来的时候,对比了四种算法:一种是异步Sarsa、一种是异步Q-learning、一种是普通DQN、一种是A3C。论文作者认为“异步更新”是算法优秀的重要原因。论文发表后,各路大神挑战”异步更新“是A3C模型表现更好的关键因素的说法?有趣的是,多线程是算法快的原因,但是”异步更新“反而是算法的缺点。所以科学家提出新算法A2C算法(Advantage actor-critic),可以更有效利用CPU资源。打脸,啪啪啪!

A2C 也会构建多个进程,包括多个并行的 worker,与独立的环境进行交互,收集独立的经验。但是A2C每个episode会同步等待所有Actor回传参数。A3C 中各个 agent 都是异步独立更新,每个 agent 使用不同的策略,实验证明这种异步更新并不是最优解(有科学家撰文阐述过其中原因,请自行百度)。而A2C采用同步更新的策略,就可以解决参数更新一致性问题,起到了优化的效果。

补充:DQN算法的问题:

第一,训练效率低,单agent在手机网络游戏AI领域受制于手机游戏主流形式获取reward很慢(需要玩家操作非常快的游戏很少)。反之,PC单机游戏《星际争霸》获取reward是非常快的(DeepMind)。

第二,DQN算法不稳定性,很多相关论文都论述过。从训练过程直观来看,前一个episode明明拿到了reward,下个episode还是没学会。根本原因是:ReplyMemory需要做batch采样,传递reward效率太低,尤其是在N-step问题存在的场景。

A2C算法详解

Main函数

    # 实例化Learner
    learner = Learner(config)
    while not learner.should_stop():
        start = time.time()
        while time.time() - start < config['log_metrics_interval_s']:
            learner.step()
        learner.log_metrics()

1.Learner类定义

def __init__(self, config):
   #=========== 创建Agent ==========
   ...
   #========== 创建工作线程中 Actor ===========
   ...
   self.create_actors()

创建Agent

可以看到PARL框架在A2C算法实现里面用的是A3C算法,因为它们是同源的算法,只有点不同(后面step函数中会讲到)

        env = gym.make(config['env_name'])
        env = wrap_deepmind(env, dim=config['env_dim'], obs_format='NCHW')
        obs_shape = env.observation_space.shape
        act_dim = env.action_space.n
        self.config['obs_shape'] = obs_shape
        self.config['act_dim'] = act_dim

        model = AtariModel(act_dim)
        algorithm = parl.algorithms.A3C(
            model, vf_loss_coeff=config['vf_loss_coeff'])
        self.agent = AtariAgent(algorithm, config)

创建工作线程中的Actor

连接到集群去执行run_remote_sample函数

    def create_actors(self):
        # 先把自己连接到XPARL集群上去
        parl.connect(self.config['master_address'])
        for i in six.moves.range(self.config['actor_num']):
            ...
            remote_thread = threading.Thread(
                # 在工作线程中运行run_remote_sample函数
                # 通过params_queue传递模型的参数
                target=self.run_remote_sample, args=(params_queue, ))
            remote_thread.setDaemon(True)
            remote_thread.start()
        ...

run_remote_sample函数

各个Actor真正执行的函数:

def run_remote_sample(self, params_queue):
    ...
    while True:
       ...
       batch = remote_actor.sample()
       # 通过sample_data_queue传递sample函数返回的数据
       self.sample_data_queue.put(batch)

step函数

因为A2C算法会同步等待所有Agent(Actor)完成一轮训练后把π网络的参数θ同步上来更新全局的π网络参数。就像楼梯台阶一样,因此这个重要的函数叫step。

    def step(self):
        # 同步模型参数
        latest_params = self.agent.get_weights()
        for params_queue in self.params_queues:
            params_queue.put(latest_params)

        train_batch = defaultdict(list)
        # 汇总Actor sample回来的数据
        for i in range(self.config['actor_num']):
            sample_data = self.sample_data_queue.get()
            for key, value in sample_data.items():
                train_batch[key].append(value)

            self.sample_total_steps += sample_data['obs'].shape[0]

        for key, value in train_batch.items():
            train_batch[key] = np.concatenate(value)

        with self.learn_time_stat:
            # 执行A3C算法的learn方法
            total_loss, pi_loss, vf_loss, entropy, lr, entropy_coeff = self.agent.learn(
                obs_np=train_batch['obs'],
                actions_np=train_batch['actions'],
                advantages_np=train_batch['advantages'],
                target_values_np=train_batch['target_values'])

        self.total_loss_stat.add(total_loss)
        self.pi_loss_stat.add(pi_loss)
        self.vf_loss_stat.add(vf_loss)
        self.entropy_stat.add(entropy)
        self.lr = lr
        self.entropy_coeff = entropy_coeff

2.Actor&Agent类

注解@parl.remote_class表明Actor类是在独立的本机进程中执行(因为A2C是利用本机多CPU)。如果购买或者部署了PARL分布式集群,那么Actor实际是在远程server中运行了。

注意,Actor的init方法中保存了env数组,用同样的参数实例化了模型、用同样的模型实例化了算法并作为参数传入到了agent中

@parl.remote_class
class Actor(object):
    def __init__(self, config):
        ...
        # Actor保存了env数组
        self.envs = []
        for _ in range(config['env_num']):
            env = gym.make(config['env_name'])
            env = wrap_deepmind(env, dim=config['env_dim'], obs_format='NCHW')
            self.envs.append(env)
        ...
        model = AtariModel(act_dim)
        algorithm = parl.algorithms.A3C(
            model, vf_loss_coeff=config['vf_loss_coeff'])
        self.agent = AtariAgent(algorithm, config)
class AtariAgent(parl.Agent):
    def __init__(self, algorithm, config):
        ...

    def build_program(self):
        self.sample_program = fluid.Program()
        self.predict_program = fluid.Program()
        self.value_program = fluid.Program()
        self.learn_program = fluid.Program()
        ...

    def sample(self, obs_np):
        ...

    def predict(self, obs_np):
        ...

    def value(self, obs_np):
        ...

    def learn(self, obs_np, actions_np, advantages_np, target_values_np):
        ...

每个Actor对应一个Agent

sample函数

Actor中的sample函数会调用agent的sample函数和agent的value函数来分别更新本地的π网络和v网络,最终返回sample_data给中心节点。

...
    actions_batch, values_batch = 
    self.agent.sample(np.stack(self.obs_batch))
...
    next_value = self.agent.value(next_obs)
...

sample_data的数据结构:

sample_data['obs'].extend(env_sample_data[env_id]['obs'])
sample_data['actions'].extend(env_sample_data[env_id]['actions'])
sample_data['advantages'].extend(advantages)
sample_data['target_values'].extend(target_values)

其中优势函数的的计算如下:

# gae:generalized advantage estimator
advantages = calc_gae(rewards, values, next_value,
                      self.config['gamma'],
                      self.config['lambda'])
target_values = advantages + values

3.Model类

    def __init__(self, act_dim):

        self.conv1 = layers.conv2d(
            num_filters=32, filter_size=8, stride=4, padding=1, act='relu')
        self.conv2 = layers.conv2d(
            num_filters=64, filter_size=4, stride=2, padding=2, act='relu')
        self.conv3 = layers.conv2d(
            num_filters=64, filter_size=3, stride=1, padding=0, act='relu')

        self.fc = layers.fc(size=512, act='relu')

        self.policy_fc = layers.fc(size=act_dim)
        self.value_fc = layers.fc(size=1)

显然,A2C的动作策略网络π和评价值网络v使用的相同的卷积神经网络,只是在输出的时候根据输出的要求,做了不同的处理。

4.VectorEnv类

这个类是PARL对env环境的封装。我们的实际应用环境必须实现此类定义的两个方法,如下所示:

class VectorEnv(object):
    def __init__(self, envs):
    def reset(self):
        ...
    def step(self, actions):
            # env需要实现step方法
            obs, reward, done, info = self.envs[env_id].step(actions[env_id])
        ...
            if done:
                # env需要实现reset方法
                obs = self.envs[env_id].reset()
        ...
    return obs_batch, reward_batch, done_batch, info_batch

模拟器的源数据是由此类中的step方法批量返回。

编写游戏模拟器

ArKnight_A2C_Simulator

修改Learner的初始化方法

        #=========== Create Agent ==========
        game = ArKnights()
        env = PMGE(game)
        obs_shape = (3, 108, 192)
        act_dim = 650

定义新的env.py

class PMGE(object):
    def __init__(self, game):
        self.game = game

    def step(self, action):
        # 模拟器简化了状态判断
        # 实际项目应该实时生成:当前屏幕--> stateCode 的关系
        s1 = [ self.game.stateCode ]

        # 产生状态变化
        self.game.act(action, s1)
        reward = self.game.getScore(s1)
        isOver = self.game.gameOver()
        next_obs = self.game.render()
        # 为了匹配标准的API
        return next_obs, reward, isOver, 0

    def reset(self):
        return self.game.reset()

修改Actor

class Actor(object):
    def __init__(self, config):
        self.config = config

        self.envs = []
        for _ in range(config['env_num']):
            game = ArKnights()
            env = PMGE(game)
            self.envs.append(env)
        self.vector_env = VectorEnv(self.envs)

        self.obs_batch = self.vector_env.reset()

        model = Model(config['act_dim'])
        algorithm = parl.algorithms.A3C(
            model, vf_loss_coeff=config['vf_loss_coeff'])
        self.agent = Agent(algorithm, config)

定义模拟环境

class ArKnights(object):
    def __init__(self):
        """
        游戏《明日方舟》智能体定义
        """
        self.stateCode = 990
        # 1920x1080 ----- 1920/80 x 1080/40 = 24x27
        self.tap_dim = 24*27
        self.swipe_dim = 4 # 上下左右

    def render(self):
        imgDir = IMAGE_DIR + str(self.stateCode) + '/'
        filenames = os.listdir(imgDir)
        # 在stateCode目录下随机取一张图片
        filename = random.choice(filenames)
        return self.transform_img(imgDir + filename)

    def act(self, action, stateCode):
        if stateCode[0] == 990:
            if action in [442,443,444,445,466,467,468,469]:
                self.stateCode = 970
                
        if stateCode[0] == 970:
            if action in [111,112,113,114,115,
                          135,136,137,138,139,
                          159,160,161,162,163,
                          183,184,185,186,187,
                          207,208,209,210,211]:
                self.stateCode = 965

    def getScore(self, s1):
        # 状态没变扣一分
        if s1[0] == self.stateCode:
            return -1
        return 1

    def gameOver(self):
        code = self.stateCode
        # if (code == 910 or code == 1010):
        # for debug 让算法快速收敛
        if (code == 965):
            return True
        return False

    def reset(self):
        self.stateCode = 990
        imgDir = IMAGE_DIR + str(self.stateCode) + '/'
        filenames = os.listdir(imgDir)
        # 在990目录下随机取一张图片
        filename = random.choice(filenames)
        return self.transform_img(imgDir + filename)

    def transform_img(self, filepath):
        # 直接读取 (h,w)
        img = cv2.imread(filepath, cv2.IMREAD_COLOR)
        # 将图片尺寸缩放道 (image, (w,h)) 192x108
        img = cv2.resize(img, (192, 108))
        # 因为cv2的数组长宽是反的,所以用numpy转置一下 (C,H,W)
        img = np.transpose(img, (2, 0, 1))
        obs = img.astype('float32')
        return obs     

然后经过大约10万个steps就可以训练出模型predict_program

定义真机环境

import time
import cv2
from PIL import Image
import numpy as np
from adbutil import AdbUtil
from resnet import ResNet
import paddle
import paddle.fluid as fluid

class ArKnights(object):
    def __init__(self):
        self.adbutil = AdbUtil()

        # 加载推理模型
        with fluid.dygraph.guard():
            # 加载状态推断引擎
            self.model = ResNet('resnet', 50)
            #加载模型参数
            model_state_dict, _ = fluid.load_dygraph("arknights")
            self.model.load_dict(model_state_dict)
            self.model.eval()

    def _restart(self):
        """
        打开游戏进程
        如果已经打开,先关闭再重新打开
        """
        self.adbutil.stopArKnights()
        self.adbutil.startArKnights()
        # 每隔1秒在屏幕中心点击1下,持续20秒
        self.adbutil.taptap(960,540,20,1)

    def _stop(self):
        """
        关闭游戏进程
        """
        self.adbutil.stopArKnights()

    def act(self, action):
        # 点击动作code映射成动作
        if action < 648:
            x = (action % 24) * 80 + 40 # 取余
            y = (action // 24) * 40 + 20 # 取商
            self.adbutil.taptap(x,y,1,0.01) # x,y,count,frequency
        elif action == 648:
            self.adbutil.rightswipeswipe(2,0.5)
        elif action == 649:      
            self.adbutil.leftswipeswipe(2,0.5)
        else:
            raise("No such action error!" + str(action))
        time.sleep(2) # 等动作执行完

    def render(self):
        # TODO check shape
        img = self.adbutil.screencap()
        img = img.resize((192, 108), Image.ANTIALIAS)
        # 因为图片的数组长宽是反的,所以用numpy转置一下 (C,H,W)
        img = np.transpose(img, (2, 0, 1))
        obs = img.astype('float32')
        return obs

    def reset(self):
        self._restart()
        return self.render()

    def gameOver(self):
        state = self.inferState()
        print("state"+str(state))
        if state[0] == 965:
            return True
        else:
            return False

    def inferState(self):
        """
        图片推断
        """
        ...

这里的游戏状态推断引擎,就是ARKNIGHT_CLASSIFY项目输出的推理模型。

评估和部署

def test():
    game = ArKnights()
    env = PMGE(game)
    obs_shape = (3, 108, 192)
    act_dim = 650
    config['obs_shape'] = obs_shape
    config['act_dim'] = act_dim

    model = Model(act_dim)
    algorithm = parl.algorithms.A3C(model, vf_loss_coeff=config['vf_loss_coeff'])
    agent = Agent(algorithm, config)
    agent.restore("./model_dir")

    # 初始状态
    obs = env.reset()
    MAX_STEP = 20
    step = 0
    while True:
        state_code = env.game.stateCode
        action = agent.predict(obs)
        obs, reward, isOver, _ = env.step(action)
        next_state_code = env.game.stateCode
        step += 1
        logger.info("evaluate state_code:{}, action:{} next_state_code:{}, reward:{}, isOver:{}".format(state_code, action, next_state_code, reward, isOver))
        if isOver or step >MAX_STEP:
            logger.info("GameOver, state:{}".format(next_state_code))
            break;

可以看到只用了2步,算法就成功达到了设定的终止状态[965]。

新建部署项目ArKnight_A2C,把模型导入,效果如下。

实战效果演示

A2C玩《明日方舟》: 提取码: rzyk

参考资料

A2C算法的数学推导视频,讲得非常清晰,务必观看。

原版A3C论文: 提取码: zcp7