分享

DRL:强化学习理论基础与实例

 15所 2020-10-16
  • 强化学习和监督学习、无监督学习的区别
  • RL解决什么问题
  • RL如何解决问题
  • 实例视频
  • openAI and DeepMind
  • 马尔科夫
  • 时序差分(Temporal-Difference)
  • 分类Model-freeModel-base基于概率基于价值回合更新单步更新在线学习 on-Policy离线学习 off-Policy
  • Q-learning
  • Q-learning 实例
  • Q-learning 算法更新
  • Q-learning 思维决策
  • sarsa 思维决策
  • DQN 算法更新
  • DQN 神经网络 (Tensorflow)

learning route

  • AI = DL(Deep Learning) + RL(Reinforcement Learning) == DRL(Deep Reinforcement Learning)
  • 什么是强化学习? 强化学习是学习该如何去做(learning what to do)即学习如何从一个状态映射到某一个行为,来最大化某一个数值的奖励信号。强化学习算法主体是智能体,环境是老师,老师仅仅是评分,不告诉智能体为什么错,为什么对?智能体不断的试错,不断的尝试,累计经验,学习经验。记住高分的行为,避免低分的行为。强化学习作为一门灵感来源于心理学中的行为主义理论的学科,其内容涉及概率论、统计学、逼近论、凸分析、计算复杂性理论、运筹学等多学科知识,难度之大,门槛之高,导致其发展速度特别缓慢。
  • 强化学习特征: 试错:agent需要不断的尝试,通过reward的反馈学习策略。延迟奖励:某一时刻action可能会对后面时刻reward有影响。
  • 强化学习局限性 强化学习非常依赖状态state的概念,state既是策略函数和价值函数的输入,又是环境模型model的输入和输出。
  • 强化学习适合解决什么问题 强化学习适合于解决模型未知,且当前决策会影响环境状态的(序列)决策问题。

强化学习和监督学习、无监督学习的区别

  • 监督学习一般有标签信息,而且是单步决策问题,比如分类问题。监督学习的样本一般是独立 同分布的。
  • 无监督学习没有任何标签信息,一般对应的是聚类问题。
  • 强化学习介于监督和无监督学习之间,每一步决策之后会有一个标量的反馈信号,即回报。通过最大化回报以获得一个最优策略。因此强化学习一般是多步决策,并且样本之间有强的相关性。
  • 强化学习的损失函数 累计回报:依赖于不同的问题设定,累积回报具有不同的形式。比如对于有限长度的MDP问题直接用回报和作为优化目标。对于无限长的问题,为了保证求和是有意义的,需要使用折扣累积回报或者平均回报。
  • 深度学习算是函数: 多个独立同分布样本预测值和标签值的误差,需要最小化。强化学习 的损失函数是轨迹上的累积和,需要最大化。

强化学习

RL解决什么问题

  • 就是需要连续不断地做出决策,才能实现最终目标的问题。

RL如何解决问题

  • 强化学习算法是根据与环境的交互产生的反馈,从而达到最终目标

实例

实例视频

  • 1.机器人炒股
  • 模拟人工障碍
  • 学习抓取东西
  • 学会抓取
  • 学穿衣服

openAI and DeepMind

  • https:///
  • openai:开源库 gym

基础

  • 基础概念: state、 action、 reward function value function 、policy、modle of the environment
  • 在炒股机器人中: 动作(action)是买还是卖,或者更具体的,买多少,卖多少。状态(state)是机器人对环境的观察,这就非常多样,比如说某支股票的涨跌现状、各种技术面的特征、基本面的特征等等。奖励(reward function)是机器人每做一次买卖决策之后,带来的实际收益;回报(return):一般用表示,形式化定义: ,回报的定义很有趣,即某一系列决策后,长期的奖励累加,我们给后面的奖励一个折扣系数,即对未来可能赚的钱,我们的兴趣相对少一些;价值函数(value function)是我们对回报的期望,用来衡量处于某个状态有多好;环境模型(model)用来模拟环境的行为,给定一个状态和动作,模型可以预测下一个状态和奖励。也就是说,如果我们有一个环境模型,那么每次机器人做了一次交易之后,我们就可以预测出它的下一个状态是什么,收到的奖励有多少(这个观点在炒股票这个例子中听起来不太可行)。policy:状态到行为的映射,定义agent在某一个状态下应该如何采取行为,state-> action
  • 构建智能体
import randomclass Environment:    def __init__(self):        self.steps_left = 100    def get_observation(self):        return [0.00.00.0]    def get_actions(self):        return [01]    def is_done(self):        return self.steps_left == 0    def action(self, action):        if self.is_done():            raise Exception('Game is over')        self.steps_left -= 1        return random.random()class Agent:    def __init__(self):        self.total_reward = 0.0    def step(self, env):        current_obs = env.get_observation()        actions = env.get_actions()        reward = env.action(random.choice(actions))        self.total_reward += rewardif __name__ == '__main__':    env = Environment()    agent = Agent()    while not env.is_done():        agent.step(env)    print('Total reward got: %.4f' % agent.total_reward)
  • 智能体验示
import gymif __name__ == '__main__':    env = gym.make('CartPole-v0')    env = gym.wrappers.Monitor(env, 'recording')    total_reward = 0.0    total_steps = 0    obs = env.reset()    while True:        action = env.action_space.sample()        obs, reward, done, _ = env.step(action)        total_reward += reward        total_steps += 1        if done:            break    print('Episode done in %d steps, total reward %.2f' % (total_steps, total_reward))    env.close()    env.env.close()
  • 模型
import torchimport torch.nn as nnclass OurModule(nn.Module):    def __init__(self, num_inputs, num_classes, dropout_prob=0.3):        super(OurModule, self).__init__()        self.pipe = nn.Sequential(            nn.Linear(num_inputs, 5),            nn.ReLU(),            nn.Linear(520),            nn.ReLU(),            nn.Linear(20, num_classes),            nn.Dropout(p=dropout_prob),            nn.Softmax(dim=1)        )    def forward(self, x):        return self.pipe(x)if __name__ == '__main__':    net = OurModule(num_inputs=2, num_classes=3)    print(net)    v = torch.FloatTensor([[23]])    out = net(v)    print(out)    print('Cuda's availability is %s' % torch.cuda.is_available())    if torch.cuda.is_available():        print('Data from cuda: %s' % out.to('cuda'))

马尔科夫

  • Markov Reward Processes(MRPs)
  • Markov Decision Processes (MDPs) 解决办法 动态规划(Dynamic Programming):理论上完美,但需要很强的算力和准确的环境model。蒙特卡洛(Monte Carlo Methods):不需要模型,可操作性强,但不太适合一步一步的增量计算。差分学习(Temporal-Difference Learning):不需要模型,也是增量式的,但分析起来很复杂。
  • POMDP:部分可观测马尔科夫决策问题。
  • 马尔科夫过程表示一个状态序列,每一个状态是一个随机变量,变量之间满足马尔科夫性,表示为 一个元组<S, P>,S是状态,P表示转移概率。
  • MDP表示为一个五元组<S, A, P, R, >,S是状态集合,A是动作集合,P表示转移概率,即模型, R是回报函数,表示折扣因子。

时序差分(Temporal-Difference)

  • 时序差分是强化学习的核心观点。
  • 时序差分是DP和MC方法的结合。
  • TD往往比MC高效;TD和MC都使用经验(experience)来解决预测问题。

强化学习方法

分类

Model-free

  • 不理解环境
  • Q learning, Sarsa, Policy Gradients

Model-base

  • 理解环境

基于概率

  • 基于概率是强化学习中最直接的一种, 他能通过感官分析所处的环境, 直接输出下一步要采取的各种动作的概率, 然后根据概率采取行动, 所以每种动作都有可能被选中, 只是可能性不同.
  • 连续的动作基于概率的方法是有效的
  • Policy Gradients,

基于价值

  • 基于价值的方法输出则是所有动作的价值, 我们会根据最高价值来选择动作, 相比基于概率的方法, 基于价值的决策部分更为铁定, 毫不留情, 就选价值最高的。
  • 连续的动作基于价值的方法是无能为力的
  • Q learning, Sarsa
  • Actor-Critic, actor 会基于概率做出动作, 而 critic 会对做出的动作给出动作的价值, 这样就在原有的 policy gradients 上加速了学习过程.

回合更新

  • 学习效率低
  • Monte-carlo learning 和基础版的 policy gradients

单步更新

  • 学习效率高
  • Qlearning, Sarsa, 升级版的 policy gradients 等

在线学习 on-Policy

  • 智能体:边玩边总结学习
  • 最典型的在线学习就是 Sarsa

离线学习 off-Policy

  • 智能体:从历史的记录中学习,看别人的棋谱
  • 最典型的离线学习就是 Q learning, 后来人也根据离线学习的属性, 开发了更强大的算法, 比如让计算机学会玩电动的 Deep-Q-Network.

Q-learning

Q-learning

  • 2da3885da644cb681fcc4adb2be47ac7.png Q learning 的迷人之处就是 在 Q(s1, a2) 现实 中, 也包含了一个 Q(s2) 的最大估计值, 将对下一步的衰减的最大估计和当前所得到的奖励当成这一步的现实, 很奇妙吧. 最后我们来说说这套算法中一些参数的意义. Epsilon greedy 是用在决策上的一种策略, 比如 epsilon = 0.9 时, 就说明有90% 的情况我会按照 Q 表的最优值选择行为, 10% 的时间使用随机选行为. alpha是学习率, 来决定这次的误差有多少是要被学习的, alpha是一个小于1 的数. gamma 是对未来 reward 的衰减值.
  • 572458142e9790b40543a859135490c5.png 重写一下 Q(s1) 的公式, 将 Q(s2) 拆开, 因为Q(s2)可以像 Q(s1)一样,是关于Q(s3) 的, 所以可以写成这样, 然后以此类推, 不停地这样写下去, 最后就能写成这样, 可以看出Q(s1) 是有关于之后所有的奖励, 但这些奖励正在衰减, 离 s1 越远的状态衰减越严重. 不好理解? 行, 我们想象 Qlearning 的机器人天生近视眼, gamma = 1 时, 机器人有了一副合适的眼镜, 在 s1 看到的 Q 是未来没有任何衰变的奖励, 也就是机器人能清清楚楚地看到之后所有步的全部价值, 但是当 gamma =0, 近视机器人没了眼镜, 只能摸到眼前的 reward, 同样也就只在乎最近的大奖励, 如果 gamma 从 0 变道 1, 眼镜的度数由浅变深, 对远处的价值看得越清楚, 所以机器人渐渐变得有远见, 不仅仅只看眼前的利益, 也为自己的未来着想.

Q-learning 实例

# -*- coding:utf-8 -*-# /usr/bin/python'''-------------------------------------------------   File Name   :  Q_learning_class   Description :  AIM:                   Functions: 1.                              2.    Envs        :  python ==                   pip install numpy pandas  -i https://pypi.douban.com/simple    Author      :  errol   Date        :  2020/5/18  14:35   CodeStyle   :  规范,简洁,易懂,可阅读,可维护,可移植!-------------------------------------------------   Change Activity:          2020/5/18 : 新建-------------------------------------------------'''import timeimport numpy as npimport pandas as pdnp.random.seed(2) # 随机因子class QL(object):    def __init__(self,):        '''        定义变量        '''        self.N_STATES = 6 # 探索世界的宽度        self.ACTIONS = ['left''right']  # 智能体的行为        self.EPSILON = 0.9 # 贪婪度        self.LR = 0.2 # 学习率        self.GAMMA = 0.9 # 奖励递减值        self.MAX_EPISODES = 13 # 最大回合数        self.FRESH_TIME = 0.3 # 移动每步的时间    def build_q_label(self,n_states,actions):        '''         Q values (行为值) 放在 q_table        :param n_states: 状态        :param actions: 行为        :return: table        '''        table = pd.DataFrame(np.zeros((n_states, len(actions))),  # q_table 全 0 初始            columns=actions,)  # columns 对应的是行为名称)        # print('table\n'table)        return table    # 某个state地点,选择行为    def choose_action(self,state, q_table):        '''        智能体选择动作:严格意义上讲,智能体在前期探索阶段需要把贪婪值设低些,随着时间的增长贪婪值增加。        本例子:固定成 EPSILON = 0.9, 90% 的时间是选择最优策略, 10% 的时间来探索.        :param state: 状态        :param q_table: 行为表        :return: action_name 行为名字        '''        state_actions = q_table.iloc[state, :]  # 选出这个 state 的所有 action 值        if (np.random.uniform() > self.EPSILON) or (state_actions.all() == 0):  # 非贪婪 or 或者这个 state 还没有探索过            action_name = np.random.choice(self.ACTIONS)        else:            action_name = state_actions.idxmax()    # 贪婪模式        return action_name    def get_env_feedback(self,S,A):        '''        环境也要给我们的行为一个反馈, 反馈出下个 state (S_) 和 在上个 state (S) 做出 action (A) 所得到的 reward (R)        本例子:O一定到T得到奖励R = 1,其他都是R=0.01        :param S:state        :param A: action        :return: R : 奖励        '''        # This is how agent will interact with the environment        if A == 'right':  # move right            if S == self.N_STATES - 2:  # terminate                S_ = 'terminal'                R = 1            else:                S_ = S + 1                R = 0.01   # 越向右奖励就获得        else:  # move left            R = 0            if S == 0:                S_ = S  # reach the wall                R  = 0 # 撞墙奖励设置为0            else:                S_ = S - 1                R  = -0.01 # 向左 奖励倒扣        return S_, R    def update_env(self,S, episode, step_counter):        '''        环境更新        :param S: state        :param episode:  回合数        :param step_counter: 步骤数        :return:        '''        # This is how environment be updated        env_list = ['-']*(self.N_STATES-1) + ['T']   # '---------T' our environment        if S == 'terminal':            interaction = 'Episode %s: total_steps = %s' % (episode+1, step_counter)            print('\r{}'.format(interaction), end='')            time.sleep(2)            print('\r                                'end='')        else:            env_list[S] = 'o'            interaction = ''.join(env_list)            print('\r{}'.format(interaction), end='')            time.sleep(self.FRESH_TIME)    def qlearing(self):        '''        主函数:循环        :return:        '''        q_table = self.build_q_label(self.N_STATES,self.ACTIONS) # 初始化q_table        print('q_table:\n',q_table)        for episode in range(self.MAX_EPISODES): # 回合            step_counter = 0            S = 0  # 回合初始位置            is_terminated = False  # 是否回合结束            self.update_env(S, episode, step_counter)  # 环境更新            while not is_terminated:                A = self.choose_action(S, q_table)  # 选行为                S_, R = self.get_env_feedback(S, A)  # 实施行为并得到环境的反馈                print('\n S',S,'\n A',A)                q_predict = q_table.loc[S, A]  # 估算的(状态-行为)值                if S_ != 'terminal':                    q_target = R + self.GAMMA * q_table.iloc[S_, :].max()  # 实际的(状态-行为)值 (回合没结束)                else:                    q_target = R  # 实际的(状态-行为)值 (回合结束)                    is_terminated = True  # terminate this episode                q_table.loc[S, A] += self.LR * (q_target - q_predict)  # q_table 更新                S = S_  # 探索者移动到下一个 state                self.update_env(S, episode, step_counter + 1)  # 环境更新                step_counter += 1        return q_tableif __name__ == '__main__':    new = QL()    q_table = new.qlearing()    print('\r\nQ-table:\n')    print(q_table)

Q-learning 算法更新

# -*- coding:utf-8 -*-# /usr/bin/python'''-------------------------------------------------   File Name   :  main   Description :  AIM:                   Functions: 1.                              2.    Envs        :  python ==                   pip install  -i https://pypi.douban.com/simple    Author      :  errol   Date        :  2020/5/17  22:06   CodeStyle   :  规范,简洁,易懂,可阅读,可维护,可移植!-------------------------------------------------   Change Activity:          2020/5/17 : 新建-------------------------------------------------'''from maze_env import Mazefrom RL_brain import QLearningTabledef update():    for episode in range(100):        # initial observation        observation = env.reset()        while True:            # fresh env            env.render()            # RL choose action based on observation            action = RL.choose_action(str(observation))            # RL take action and get next observation and reward            observation_, reward, done = env.step(action)            # RL learn from this transition            RL.learn(str(observation), action, reward, str(observation_))            # swap observation            observation = observation_            # break while loop when end of this episode            if done:                break    # end of game    print('game over')    env.destroy()if __name__ == '__main__':    env = Maze()    RL = QLearningTable(actions=list(range(env.n_actions)))    env.after(100, update)    env.mainloop()

Q-learning 思维决策

# -*- coding:utf-8 -*-# /usr/bin/python'''-------------------------------------------------   File Name   :  RL_brain   Description :  AIM:                   Functions: 1. 大脑更新                             2.    Envs        :  python ==                   pip install  -i https://pypi.douban.com/simple    Author      :  errol   Date        :  2020/5/17  22:03   CodeStyle   :  规范,简洁,易懂,可阅读,可维护,可移植!-------------------------------------------------   Change Activity:          2020/5/17 : 新建-------------------------------------------------'''import numpy as npimport pandas as pdclass QLearningTable:    def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9):        self.actions = actions  # a list        self.lr = learning_rate        self.gamma = reward_decay        self.epsilon = e_greedy        self.q_table = pd.DataFrame(columns=self.actions, dtype=np.float64)    def choose_action(self, observation):        self.check_state_exist(observation)        # action selection        if np.random.uniform() < self.epsilon:            # choose best action            state_action = self.q_table.loc[observation, :]            # some actions may have the same value, randomly choose on in these actions            action = np.random.choice(state_action[state_action == np.max(state_action)].index)        else:            # choose random action            action = np.random.choice(self.actions)        return action    def learn(self, s, a, r, s_):        self.check_state_exist(s_)        q_predict = self.q_table.loc[s, a]        if s_ != 'terminal':            q_target = r + self.gamma * self.q_table.loc[s_, :].max()  # next state is not terminal        else:            q_target = r  # next state is terminal        self.q_table.loc[s, a] += self.lr * (q_target - q_predict)  # update    def check_state_exist(self, state):        if state not in self.q_table.index:            # append new state to q table            self.q_table = self.q_table.append(                pd.Series(                    [0]*len(self.actions),                    index=self.q_table.columns,                    name=state,                )            )

Sarsa

  • 与Q-Learning的不同:离线学习,
  • Sarsa:是实施每个估计,在线学习,’保命为主‘

sarsa 思维决策

Sarsa(lambda)

  • 回合更新
  • lambda 是脚步衰减值, 都是一个在 0 和 1 之间的数,认为距离奖励越远的步骤越不重要。
  • 如果 lambda = 0, Sarsa-lambda 就是 Sarsa, 只更新获取到 reward 前经历的最后一步.
  • 如果 lambda = 1, Sarsa-lambda 更新的是 获取到 reward 前所有经历的步.

DQN

  • Deep Q Network 简称为 DQN. Google Deep mind 团队就是靠着这 DQN 使计算机玩电动玩得比我们还厉害
  • Q learning 是一种 off-policy 离线学习法, 它能学习当前经历着的, 也能学习过去经历过的, 甚至是学习别人的经历. 所以每次 DQN 更新的时候, 我们都可以随机抽取一些之前的经历进行学习.
  • Fixed Q-targets 也是一种打乱相关性的机理。

DQN 算法更新

  • Q learning 主框架上加了些装饰.这些装饰包括: 记忆库 (用于重复学习)神经网络计算 Q 值暂时冻结 q_target 参数 (切断相关性)

DQN 神经网络 (Tensorflow)

OpenAI gym 环境库

  • python --version python=2.7 $ pip install gympython=3.5 $ pip install gym
  • os
# MacOS:$ brew install cmake boost boost-python sdl2 swig wget# Ubuntu 14.04:$ apt-get install -y python-numpy python-dev cmake zlib1g-dev libjpeg-dev xvfb libav-tools xorg-dev python-opengl libboost-all-dev libsdl2-dev swig

Prioritized Experience Replay (DQN)

  • 使用 Prioritized replay, 就会重视这种力量的, 但值得学习的样本.

Dueling DQN

  • 它将每个动作的 Q 拆分成了 state 的 Value 加上 每个动作的 Advantage.

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多