Python 还能实现哪些 AI 游戏?附上代码一起来一把!

2020 年 6 月 18 日 THU数据派


来源:AI科技大本营

作者:李秋键

责编:Carol

本文约 4562字 ,建议阅读 10分钟
以DQN算法为例介绍如何用Python 实现AI 游戏,赶快来试一下吧。

人工智能作为当前热门在我们生活中得到了广泛应用,尤其是在智能游戏方面,有的已经达到了可以和职业选手匹敌的效果。而DQN算法作为智能游戏的经典选择算法,其主要是通过奖励惩罚机制来迭代模型,来达到更接近于人类学习的效果。
那在强化学习中, 神经网络是如何被训练的呢? 首先, 我们需要 a1, a2 正确的Q值, 这个 Q 值我们就用之前在 Q learning 中的 Q 现实来代替. 同样我们还需要一个Q估计来实现神经网络的更新. 所以神经网络的的参数就是老的NN参数加学习率 alpha乘以Q现实和Q估计的差距。
我们通过 NN 预测出Q(s2, a1) 和 Q(s2,a2) 的值, 这就是 Q 估计. 然后我们选取 Q 估计中最大值的动作来换取环境中的奖励 reward. 而 Q 现实中也包含从神经网络分析出来的两个 Q 估计值, 不过这个 Q 估计是针对于下一步在 s’ 的估计. 最后再通过刚刚所说的算法更新神经网络中的参数. 
DQN是第一个将深度学习模型与强化学习结合在一起从而成功地直接从高维的输入学习控制策略。
  • 创新点:

基于Q-Learning构造Loss Function(不算很新,过往使用线性和非线性函数拟合Q-Table时就是这样做)。
通过experience replay(经验池)解决相关性及非静态分布问题;
使用TargetNet解决稳定性问题。
  • 优点:

算法通用性,可玩不同游戏;
End-to-End 训练方式;
可生产大量样本供监督学习。
  • 缺点:

无法应用于连续动作控制;
只能处理只需短时记忆问题,无法处理需长时记忆问题(后续研究提出了使用LSTM等改进方法);
CNN不一定收敛,需精良调参。
整体的程序效果如下:
  实验前的准备
首先我们使用的python版本是3.6.5所用到的库有cv2库用来图像处理;
Numpy库用来矩阵运算;TensorFlow框架用来训练和加载模型。Collection库用于高性能的数据结构。
程序的搭建

1、游戏结构设定:

我们在DQN训练前需要有自己设定好的程序,即在这里为弹珠游戏。在游戏整体框架搭建完成后,对于计算机的决策方式我们需要给他一个初始化的决策算法为了达到更快的训练效果。
程序结构的部分代码如下:
   
   
     
def __init__(self):
         self.__initGame()
         # 初始化一些变量
         self.loseReward = - 1
         self.winReward =  1
         self.hitReward =  0
         self.paddleSpeed =  15
         self.ballSpeed = ( 77)
         self.paddle_1_score =  0
         self.paddle_2_score =  0
         self.paddle_1_speed =  0.
         self.paddle_2_speed =  0.
         self.__reset()
     '''
    更新一帧
    action: [keep, up, down]
    '''

#     更新ball的位置
         self.ball_pos =  self.ball_pos[ 0] +  self.ballSpeed[ 0],  self.ball_pos[ 1] +  self.ballSpeed[ 1]
         # 获取当前场景(只取左半边)
        image = pygame.surfarray.array3d(pygame.display.get_surface())
         # image = image[321:, :]
        pygame.display.update()
        terminal = False
         if max( self.paddle_1_score,  self.paddle_2_score) >=  20:
             self.paddle_1_score =  0
             self.paddle_2_score =  0
            terminal = True
         return image, reward, terminal
def update_frame(self, action):
        assert len(action) ==  3
        pygame.event.pump()
        reward =  0
         # 绑定一些对象
         self.score1Render =  self.font.render(str( self.paddle_1_score), True, ( 255255255))
         self.score2Render =  self.font.render(str( self.paddle_2_score), True, ( 255255255))
         self.screen.blit( self.background, ( 00))
        pygame.draw.rect( self.screen, ( 255255255), pygame.Rect(( 55), ( 630470)),  2)
        pygame.draw.aaline( self.screen, ( 255255255), ( 3205), ( 320475))
         self.screen.blit( self.paddle_1,  self.paddle_1_pos)
         self.screen.blit( self.paddle_2,  self.paddle_2_pos)
         self.screen.blit( self.ball,  self.ball_pos)
         self.screen.blit( self.score1Render, ( 240210))
         self.screen.blit( self.score2Render, ( 370210))
'''
    游戏初始化
    '''

     def __initGame(self):
        pygame.init()
         self.screen = pygame.display.set_mode(( 640480),  032)
         self.background = pygame.Surface(( 640480)).convert()
         self.background.fill(( 000))
         self.paddle_1 = pygame.Surface(( 1050)).convert()
         self.paddle_1.fill(( 0255255))
         self.paddle_2 = pygame.Surface(( 1050)).convert()
         self.paddle_2.fill(( 2552550))
        ball_surface = pygame.Surface(( 1515))
        pygame.draw.circle(ball_surface, ( 255255255), ( 77), ( 7))
         self.ball = ball_surface.convert()
         self.ball.set_colorkey(( 000))
         self.font = pygame.font.SysFont( "calibri"40)
     '''
    重置球和球拍的位置
    '''

     def __reset(self):
         self.paddle_1_pos = ( 10.,  215.)
         self.paddle_2_pos = ( 620.,  215.)
         self.ball_pos = ( 312.5232.5)

2、行动决策机制:

首先在程序框架中设定不同的行动作为训练对象
   
   
     
# 行动paddle_1(训练对象)
if action[ 0] ==  1:
     self.paddle_1_speed =  0
elif action[ 1] ==  1:
     self.paddle_1_speed = - self.paddleSpeed
elif action[ 2] ==  1:
     self.paddle_1_speed =  self.paddleSpeed
self.paddle_1_pos =  self.paddle_1_pos[ 0], max(min( self.paddle_1_speed +  self.paddle_1_pos[ 1],  420),  10)
接着设置一个简单的初始化决策。根据结果判断奖励和惩罚机制,即球撞到拍上奖励,撞到墙上等等惩罚:
其中代码如下:
   
   
     
# 行动paddle_2(设置一个简单的算法使paddle_2的表现较优, 非训练对象)
         if  self.ball_pos[ 0] >=  305.:
             if not  self.paddle_2_pos[ 1] ==  self.ball_pos[ 1] +  7.5:
                 if  self.paddle_2_pos[ 1] <  self.ball_pos[ 1] +  7.5:
                     self.paddle_2_speed =  self.paddleSpeed
                     self.paddle_2_pos =  self.paddle_2_pos[ 0], max(min( self.paddle_2_pos[ 1] +  self.paddle_2_speed,  420),  10)
                 if  self.paddle_2_pos[ 1] >  self.ball_pos[ 1] -  42.5:
                     self.paddle_2_speed = - self.paddleSpeed
                     self.paddle_2_pos =  self.paddle_2_pos[ 0], max(min( self.paddle_2_pos[ 1] +  self.paddle_2_speed,  420),  10)
             else:
                 self.paddle_2_pos =  self.paddle_2_pos[ 0], max(min( self.paddle_2_pos[ 1] +  7.5420),  10)
         # 行动ball
         #   球撞拍上
         if  self.ball_pos[ 0] <=  self.paddle_1_pos[ 0] +  10.:
             if  self.ball_pos[ 1] +  7.5 >=  self.paddle_1_pos[ 1and  self.ball_pos[ 1] <=  self.paddle_1_pos[ 1] +  42.5:
                 self.ball_pos =  20.self.ball_pos[ 1]
                 self.ballSpeed = - self.ballSpeed[ 0],  self.ballSpeed[ 1]
                reward =  self.hitReward
         if  self.ball_pos[ 0] +  15 >=  self.paddle_2_pos[ 0]:
             if  self.ball_pos[ 1] +  7.5 >=  self.paddle_2_pos[ 1and  self.ball_pos[ 1] <=  self.paddle_2_pos[ 1] +  42.5:
                 self.ball_pos =  605.self.ball_pos[ 1]
                 self.ballSpeed = - self.ballSpeed[ 0],  self.ballSpeed[ 1]
         #   拍未接到球(另外一个拍得分)
         if  self.ball_pos[ 0] <  5.:
             self.paddle_2_score +=  1
            reward =  self.loseReward
             self.__reset()
        elif  self.ball_pos[ 0] >  620.:
             self.paddle_1_score +=  1
            reward =  self.winReward
             self.__reset()
         #   球撞墙上
         if  self.ball_pos[ 1] <=  10.:
             self.ballSpeed =  self.ballSpeed[ 0], - self.ballSpeed[ 1]
             self.ball_pos =  self.ball_pos[ 0],  10
        elif  self.ball_pos[ 1] >=  455:
             self.ballSpeed =  self.ballSpeed[ 0], - self.ballSpeed[ 1]
             self.ball_pos =  self.ball_pos[ 0],  455
3、DQN算法搭建:
为了方便整体算法的调用,我们首先定义神经网络的函数,包括卷积层损失等函数定义具体如下可见:
   
   
     
'''
    获得初始化weight权重
    '''

     def init_weight_variable(self, shape):
         return tf.Variable(tf.truncated_normal(shape, stddev= 0.01))
     '''
    获得初始化bias权重
    '''

     def init_bias_variable(self, shape):
         return tf.Variable(tf.constant( 0.01, shape=shape))
     '''
    卷积层
    '''

     def conv2D(self, x, W, stride):
         return tf.nn.conv2d(x, W, strides=[ 1, stride, stride,  1], padding= "SAME")
     '''
    池化层
    '''

     def maxpool(self, x):
         return tf.nn.max_pool(x, ksize=[ 1221], strides=[ 1221], padding= 'SAME')
     '''
    计算损失
    '''

     def compute_loss(self, q_values, action_now, target_q_values):
        tmp = tf.reduce_sum(tf.multiply(q_values, action_now), reduction_indices= 1)
        loss = tf.reduce_mean(tf.square(target_q_values - tmp))
         return loss
     '''
    下一帧
    '''

     def next_frame(self, action_now, scene_now, gameState):
        x_now, reward, terminal = gameState.update_frame(action_now)
        x_now = cv2.cvtColor(cv2.resize(x_now, ( 8080)), cv2.COLOR_BGR2GRAY)
        _, x_now = cv2.threshold(x_now,  127255, cv2.THRESH_BINARY)
        x_now = np.reshape(x_now, ( 80801))
        scene_next = np.append(x_now, scene_now[:, :,  0: 3], axis= 2)
         return scene_next, reward, terminal
     '''
    计算target_q_values
    '''

     def compute_target_q_values(self, reward_batch, q_values_batch, minibatch):
        target_q_values = []
         for i  in range(len(minibatch)):
             if minibatch[i][ 4]:
                target_q_values.append(reward_batch[i])
             else:
                target_q_values.append(reward_batch[i] + self.gamma * np.max(q_values_batch[i]))
         return target_q_values
然后定义整体的类变量DQN,分别定义初始化和训练函数,其中网络层哪里主要就是神经网络层的调用。然后在训练函数里面记录当前动作和数据加载入优化器中达到模型训练效果。
其中代码如下:
   
   
     
def __init__(self, options):
         self.options = options
         self.num_action = options[ 'num_action']
         self.lr = options[ 'lr']
         self.modelDir = options[ 'modelDir']
         self.init_prob = options[ 'init_prob']
         self.end_prob = options[ 'end_prob']
         self.OBSERVE = options[ 'OBSERVE']
         self.EXPLORE = options[ 'EXPLORE']
         self.action_interval = options[ 'action_interval']
         self.REPLAY_MEMORY = options[ 'REPLAY_MEMORY']
         self.gamma = options[ 'gamma']
         self.batch_size = options[ 'batch_size']
         self.save_interval = options[ 'save_interval']
         self.logfile = options[ 'logfile']
         self.is_train = options[ 'is_train']
     '''
    训练网络
    '''

     def train(self, session):
        x, q_values_ph =  self.create_network()
        action_now_ph = tf.placeholder( 'float', [None,  self.num_action])
        target_q_values_ph = tf.placeholder( 'float', [None])
         # 计算loss
        loss =  self.compute_loss(q_values_ph, action_now_ph, target_q_values_ph)
         # 优化目标
        trainStep = tf.train.AdamOptimizer( self.lr).minimize(loss)
         # 游戏
        gameState = PongGame()
         # 用于记录数据
        dataDeque = deque()
         # 当前的动作
        action_now = np.zeros( self.num_action)
        action_now[ 0] =  1
         # 初始化游戏状态
        x_now, reward, terminal = gameState.update_frame(action_now)
        x_now = cv2.cvtColor(cv2.resize(x_now, ( 8080)), cv2.COLOR_BGR2GRAY)
         _, x_now = cv2.threshold(x_now,  127255, cv2.THRESH_BINARY)
        scene_now = np.stack((x_now, )* 4, axis= 2)
         # 读取和保存checkpoint
        saver = tf.train.Saver()
        session.run(tf.global_variables_initializer())
        checkpoint = tf.train.get_checkpoint_state( self.modelDir)
         if checkpoint  and checkpoint. model_checkpoint_path:
            saver.restore(session, checkpoint.model_checkpoint_path)
            print( '[INFO]: Load %s successfully...' % checkpoint.model_checkpoint_path)
         else:
            print( '[INFO]: No weights found, start to train a new model...')
        prob =  self.init_prob
        num_frame =  0
        logF = open( self.logfile,  'a')
         while  True:
            q_values = q_values_ph.eval(feed_dict={ x: [scene_now]})
            action_idx = get_action_idx(q_values=q_values, 
                                        prob=prob, 
                                        num_frame=num_frame, 
                                        OBSERVE= self.OBSERVE, 
                                        num_action= self.num_action)
            action_now = np.zeros( self.num_action)
            action_now[action_idx] =  1
            prob = down_prob(prob=prob, 
                             num_frame=num_frame, 
                             OBSERVE= self.OBSERVE, 
                             EXPLORE= self.EXPLORE, 
                             init_prob= self.init_prob, 
                             end_prob= self.end_prob)
             for  _  in range( self.action_interval):
                scene_next, reward, terminal =  self.next_frame(action_now=action_now, 
                                                               scene_now=scene_now,                                                            gameState=gameState)
                scene_now = scene_next
                dataDeque.append((scene_now, action_now, reward, scene_next, terminal))
                 if len(dataDeque) >  self. REPLAY_MEMORY:
                    dataDeque.popleft()
            loss_now = None
             if (num_frame >  self.OBSERVE):
                minibatch = random.sample(dataDeque,  self.batch_size)
                scene_now_batch = [mb[ 0for mb  in minibatch]
                action_batch = [mb[ 1for mb  in minibatch]
                reward_batch = [mb[ 2for mb  in minibatch]
                scene_next_batch = [mb[ 3for mb  in minibatch]
                q_values_batch = q_values_ph.eval(feed_dict={ x: scene_next_batch})
                target_q_values =  self.compute_target_q_values(reward_batch, q_values_batch, minibatch)
                trainStep.run(feed_dict={
                                             target_q_values_ph: target_q_values,
                                             action_now_ph: action_batch,
                                             x: scene_now_batch
                                            })
                loss_now = session.run(loss, feed_dict={
                                                         target_q_values_ph: target_q_values,
                                                         action_now_ph: action_batch,
                                                         x: scene_now_batch
                                                        })
            num_frame +=  1
             if num_frame %  self.save_interval ==  0:
                name =  'DQN_Pong'
                saver.save(session, os.path.join( self.modelDir, name), global_step=num_frame)
            log_content =  '<Frame>: %s, <Prob>: %s, <Action>: %s, <Reward>: %s, <Q_max>: %s, <Loss>: %s' % (str(num_frame), str(prob), str(action_idx), str(reward), str(np.max(q_values)), str(loss_now))
            logF.write(log_content +  '\n')
            print(log_content)
        logF.close()
     '''
    创建网络
    '''

     def create_network(self):
         '''
        W_conv1 = self.init_weight_variable([9, 9, 4, 16])
        b_conv1 = self.init_bias_variable([16])
        W_conv2 = self.init_weight_variable([7, 7, 16, 32])
        b_conv2 = self.init_bias_variable([32])
        W_conv3 = self.init_weight_variable([5, 5, 32, 32])
        b_conv3 = self.init_bias_variable([32])
        W_conv4 = self.init_weight_variable([5, 5, 32, 64])
        b_conv4 = self.init_bias_variable([64])
        W_conv5 = self.init_weight_variable([3, 3, 64, 64])
        b_conv5 = self.init_bias_variable([64])
        '''

        W_conv1 =  self.init_weight_variable([ 88432])
        b_conv1 =  self.init_bias_variable([ 32])
        W_conv2 =  self.init_weight_variable([ 443264])
        b_conv2 =  self.init_bias_variable([ 64])
        W_conv3 =  self.init_weight_variable([ 336464])
        b_conv3 =  self.init_bias_variable([ 64])
         # 5 * 5 * 64 = 1600
        W_fc1 =  self.init_weight_variable([ 1600512])
        b_fc1 =  self.init_bias_variable([ 512])
        W_fc2 =  self.init_weight_variable([ 512self.num_action])
        b_fc2 =  self.init_bias_variable([ self.num_action])
         # input placeholder
        x = tf.placeholder( 'float', [None,  80804])
         '''
        conv1 = tf.nn.relu(tf.layers.batch_normalization(self.conv2D(x, W_conv1, 4) + b_conv1, training=self.is_train, momentum=0.9))
        conv2 = tf.nn.relu(tf.layers.batch_normalization(self.conv2D(conv1, W_conv2, 2) + b_conv2, training=self.is_train, momentum=0.9))
        conv3 = tf.nn.relu(tf.layers.batch_normalization(self.conv2D(conv2, W_conv3, 2) + b_conv3, training=self.is_train, momentum=0.9))
        conv4 = tf.nn.relu(tf.layers.batch_normalization(self.conv2D(conv3, W_conv4, 1) + b_conv4, training=self.is_train, momentum=0.9))
        conv5 = tf.nn.relu(tf.layers.batch_normalization(self.conv2D(conv4, W_conv5, 1) + b_conv5, training=self.is_train, momentum=0.9))
        flatten = tf.reshape(conv5, [-1, 1600])
        '''

        conv1 = tf.nn.relu( self.conv2D(x, W_conv1,  4) + b_conv1)
        pool1 =  self.maxpool(conv1)
        conv2 = tf.nn.relu( self.conv2D(pool1, W_conv2,  2) + b_conv2)
        conv3 = tf.nn.relu( self.conv2D(conv2, W_conv3,  1) + b_conv3)
        flatten = tf.reshape(conv3, [- 11600])
        fc1 = tf.nn.relu(tf.layers.batch_normalization(tf.matmul(flatten, W_fc1) + b_fc1, training= self.is_train, momentum= 0. 9))
        fc2 = tf.matmul(fc1, W_fc2) + b_fc2
         return x, fc2
到这里,我们整体的程序就搭建完成,下面为我们程序的运行结果:  
源码地址:
https://pan.baidu.com/s/1ksvjIiQ0BfXOah4PIE1arg
提取码:p74p
作者简介:
李秋键,CSDN博客专家,CSDN达人课作者。硕士在读于中国矿业大学,开发有taptap竞赛获奖等等。


——END——


登录查看更多
0

相关内容

多智能体深度强化学习的若干关键科学问题
专知会员服务
176+阅读 · 2020年5月24日
【圣经书】《强化学习导论(2nd)》电子书与代码,548页pdf
专知会员服务
197+阅读 · 2020年5月22日
Sklearn 与 TensorFlow 机器学习实用指南,385页pdf
专知会员服务
127+阅读 · 2020年3月15日
Transformer文本分类代码
专知会员服务
116+阅读 · 2020年2月3日
【强化学习】深度强化学习初学者指南
专知会员服务
178+阅读 · 2019年12月14日
100行Python代码,轻松搞定神经网络
大数据文摘
4+阅读 · 2019年5月2日
除了DQN/A3C,还有哪些高级强化学习成果
论智
14+阅读 · 2018年10月28日
从零开始深度学习:利用numpy手写一个感知机
数萃大数据
10+阅读 · 2018年6月10日
关于强化学习(附代码,练习和解答)
深度学习
35+阅读 · 2018年1月30日
免费|机器学习算法Python实现
全球人工智能
5+阅读 · 2018年1月2日
循环神经网络的介绍、代码及实现
AI研习社
3+阅读 · 2017年11月21日
【强化学习】强化学习入门以及代码实现
产业智能官
18+阅读 · 2017年9月4日
机器学习(7)之感知机python实现
机器学习算法与Python学习
4+阅读 · 2017年7月23日
Arxiv
3+阅读 · 2018年10月5日
VIP会员
相关VIP内容
多智能体深度强化学习的若干关键科学问题
专知会员服务
176+阅读 · 2020年5月24日
【圣经书】《强化学习导论(2nd)》电子书与代码,548页pdf
专知会员服务
197+阅读 · 2020年5月22日
Sklearn 与 TensorFlow 机器学习实用指南,385页pdf
专知会员服务
127+阅读 · 2020年3月15日
Transformer文本分类代码
专知会员服务
116+阅读 · 2020年2月3日
【强化学习】深度强化学习初学者指南
专知会员服务
178+阅读 · 2019年12月14日
相关资讯
100行Python代码,轻松搞定神经网络
大数据文摘
4+阅读 · 2019年5月2日
除了DQN/A3C,还有哪些高级强化学习成果
论智
14+阅读 · 2018年10月28日
从零开始深度学习:利用numpy手写一个感知机
数萃大数据
10+阅读 · 2018年6月10日
关于强化学习(附代码,练习和解答)
深度学习
35+阅读 · 2018年1月30日
免费|机器学习算法Python实现
全球人工智能
5+阅读 · 2018年1月2日
循环神经网络的介绍、代码及实现
AI研习社
3+阅读 · 2017年11月21日
【强化学习】强化学习入门以及代码实现
产业智能官
18+阅读 · 2017年9月4日
机器学习(7)之感知机python实现
机器学习算法与Python学习
4+阅读 · 2017年7月23日
Top
微信扫码咨询专知VIP会员