pythonメニュー

tensorflow 強化学習 検討中

マルコフ決定過程(MDP:Markov decision process)とは
今の状態s(t)と行動a(t)で、次の状態s(t+1)が決まるというルールに従った状態遷移のこと。
この遷移を確率で考えた場合は、
  「現状態s(t)と行動a(t)だけで次の状態s(t+1)への遷移確率が決まる」
      というルールの状態遷移です。
# coding: utf-8       dqn_agent.py
from collections import deque
import os

import numpy as np
import tensorflow as tf

class DQNAgent:
    """
    Multi Layer Perceptron with Experience Replay
    """

    def __init__(self, enable_actions, environment_name):
        # parameters
        self.name = os.path.splitext(os.path.basename(__file__))[0]
        self.environment_name = environment_name
        self.enable_actions = enable_actions
        self.n_actions = len(self.enable_actions)
        self.minibatch_size = 32
        self.replay_memory_size = 1000
        self.learning_rate = 0.001
        self.discount_factor = 0.9
        self.exploration = 0.1 # 診査パラメタ
        self.model_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "models")
        self.model_name = "{}.ckpt".format(self.environment_name)

        # replay memory
        self.D = deque(maxlen=self.replay_memory_size)

        # model
        self.init_model()

        # variables
        self.current_loss = 0.0

    def init_model(self):
        # input layer (8 x 8)
        self.x = tf.placeholder(tf.float32, [None, 8, 8])

        # flatten (64)
        x_flat = tf.reshape(self.x, [-1, 64])

        # fully connected layer (32)
        W_fc1 = tf.Variable(tf.truncated_normal([64, 64], stddev=0.01))
        b_fc1 = tf.Variable(tf.zeros([64]))
        h_fc1 = tf.nn.relu(tf.matmul(x_flat, W_fc1) + b_fc1)

        # output layer (n_actions)
        W_out = tf.Variable(tf.truncated_normal([64, self.n_actions], stddev=0.01))
        b_out = tf.Variable(tf.zeros([self.n_actions]))
        self.y = tf.matmul(h_fc1, W_out) + b_out

        # loss function
        self.y_ = tf.placeholder(tf.float32, [None, self.n_actions])
        self.loss = tf.reduce_mean(tf.square(self.y_ - self.y))
。
        # train operation
        optimizer = tf.train.RMSPropOptimizer(self.learning_rate)
        self.training = optimizer.minimize(self.loss)

        # saver
        self.saver = tf.train.Saver() # saveメソッドやrestoreメソッド用のオブジェクトを生成
        
        # TensorFlow で学習したモデルのグラフを 表示する
        graph=tf.get_default_graph()
        print(graph.as_graph_def())                

        # session
        self.sess = tf.Session()
        self.sess.run(tf.global_variables_initializer())

    def Q_values(self, state):
        # Q(state, action) of all actions
        return self.sess.run(self.y, feed_dict={self.x: [state]})[0]

    def select_action(self, state, epsilon):
        if np.random.rand() <= epsilon:
            # random
            return np.random.choice(self.enable_actions)
        else:
            # max_action Q(state, action)
            return self.enable_actions[np.argmax(self.Q_values(state))]

    def store_experience(self, state, action, reward, state_1, terminal):
        self.D.append((state, action, reward, state_1, terminal))

    def experience_replay(self):# 経験で再生
        state_minibatch = []
        y_minibatch = []

        # sample random minibatch
        minibatch_size = min(len(self.D), self.minibatch_size)
        minibatch_indexes = np.random.randint(0, len(self.D), minibatch_size)

        for j in minibatch_indexes:
            state_j, action_j, reward_j, state_j_1, terminal = self.D[j]
            action_j_index = self.enable_actions.index(action_j)

            y_j = self.Q_values(state_j)

            if terminal:
                y_j[action_j_index] = reward_j
            else:
                # reward_j + gamma * max_action' Q(state', action')
                y_j[action_j_index] = reward_j + self.discount_factor * np.max(self.Q_values(state_j_1))  # NOQA

            state_minibatch.append(state_j)
            y_minibatch.append(y_j)

        # training
        self.sess.run(self.training, feed_dict={self.x: state_minibatch, self.y_: y_minibatch})

        # for log
        self.current_loss = self.sess.run(self.loss, feed_dict={self.x: state_minibatch, self.y_: y_minibatch})

    def load_model(self, model_path=None):
        if model_path:
            print( "load from model_path:" + model_path)
            self.saver.restore(self.sess, model_path)
        else:
            print( "load from checkpoint:" + self.model_dir )
            checkpoint = tf.train.get_checkpoint_state(self.model_dir)
            if checkpoint and checkpoint.model_checkpoint_path:
                self.saver.restore(self.sess, checkpoint.model_checkpoint_path)

    def save_model(self):
        path = os.path.join(self.model_dir, self.model_name)
        print( "sessを", path, "の名前で保存")
        self.saver.save(self.sess, path)


# observe   environment.py
import os
import numpy as np

class CatchBall:
    '''
    シーンを「screen」の2次配列で、playerやballを1の要素で管理する(他の要素はゼロ)
    
    '''
    def __init__(self):
        # parameters
        self.name = os.path.splitext(os.path.basename(__file__))[0]
        self.screen_n_rows = 8
        self.screen_n_cols = 8
        self.player_length = 3
        self.enable_actions = (0, 1, 2)
        self.frame_rate = 5

        # variables
        self.reset()

    def __update(self, action):
        """
        action:
            0: do nothing
            1: move left
            2: move right
        """
        # update player position
        if action == self.enable_actions[1]:
            # move left
            self.player_col = max(0, self.player_col - 1)
        elif action == self.enable_actions[2]:
            # move right
            self.player_col = min(self.player_col + 1, self.screen_n_cols - self.player_length)
        else:
            # do nothing
            pass

        # update ball position
        self.ball_row += 1

        # collision detection
        self.reward = 0
        self.terminal = False
        if self.ball_row == self.screen_n_rows - 1:
            self.terminal = True
            if self.player_col <= self.ball_col < self.player_col + self.player_length:
                # catch
                self.reward = 1
            else:
                # drop
                self.reward = -1

    def draw(self): # 実際に描画する訳ではないが、playとbollのインスタンス変数の座標で、screenを更新する。
        # reset screen
        self.screen = np.zeros((self.screen_n_rows, self.screen_n_cols))

        # draw player
        self.screen[self.player_row, self.player_col:self.player_col + self.player_length] = 1

        # draw ball
        self.screen[self.ball_row, self.ball_col] = 1

    def observe(self):
        self.draw()
        return self.screen, self.reward, self.terminal

    def execute_action(self, action):
        self.__update(action)

    def reset(self):
        # reset player position
        self.player_row = self.screen_n_rows - 1
        self.player_col = np.random.randint(self.screen_n_cols - self.player_length)

        # reset ball position
        self.ball_row = 0
        self.ball_col = np.random.randint(self.screen_n_cols)

        # reset other variables
        self.reward = 0
        self.terminal = False



# coding: utf-8     train.py

import numpy as np

from catch_ball import CatchBall
from dqn_agent import DQNAgent


if __name__ == "__main__":
    # parameters
    n_epochs = 1000

    # environment, agent
    env = CatchBall()
    agent = DQNAgent(env.enable_actions, env.name)


    # variables
    win = 0

    for e in range(n_epochs):
        # reset
        frame = 0
        loss = 0.0
        Q_max = 0.0
        env.reset()
        state_t_1, reward_t, terminal = env.observe()

        while not terminal:
            state_t = state_t_1

            # execute action in environment
            action_t = agent.select_action(state_t, agent.exploration)
            env.execute_action(action_t)

            # observe environment
            state_t_1, reward_t, terminal = env.observe()

            # store experience
            agent.store_experience(state_t, action_t, reward_t, state_t_1, terminal)

            # experience replay
            agent.experience_replay()

            # for log
            frame += 1
            loss += agent.current_loss
            Q_max += np.max(agent.Q_values(state_t))
            if reward_t == 1:
                win += 1

        print("EPOCH: {:03d}/{:03d} | WIN: {:03d} | LOSS: {:.4f} | Q_MAX: {:.4f}".format(
            e, n_epochs - 1, win, loss / frame, Q_max / frame))

    # save model
    agent.save_model()



検討用 train.py

import numpy as np

from catch_ball import CatchBall
from dqn_agent import DQNAgent


n_epochs = 1000

# environment, agent
env = CatchBall()
agent = DQNAgent(env.enable_actions, env.name)

# variables
win = 0


e_n=0
#------- for e_n in range(n_epochs): エピックの学習の繰り返し

# reset
frame = 0
loss = 0.0
Q_max = 0.0
env.reset()

state_t_1, reward_t, terminal = env.observe()
print( "新しい学習対象ゲーム情報を提示",state_t_1, reward_t, terminal)

#--------------------------------- while not terminal: 一つの報酬(+/-)★★
state_t = state_t_1

# action_t <== 行動の決定
# execute action in environment
# action_t = agent.select_action(state_t, agent.exploration)
#===========
if np.random.rand() <= agent.exploration:
    action_t =  np.random.choice(agent.enable_actions)
    print( "ランダムの行動決定:",action_t)
else:
    yQ=agent.sess.run(agent.y, feed_dict={agent.x: [state_t]})[0]
    action_t = agent.enable_actions[np.argmax(yQ)]
    print("AIに[state_t]の状態でアクションを選ばせる:",yQ, "行動の決定 最大:", action_t )

#===========

env.execute_action(action_t)
print("player_col:",env.player_col,"、ball_row:",env.ball_row,
 "、terminal", env.terminal, "の変数が実行で更新")


# observe environment 上記変数を使って、env.screenを更新して、返す。
state_t_1, reward_t, terminal = env.observe()
print("上記変数で更新されたスクリーン:", state_t_1,
"ここでterminal変数を得る terminal:",  terminal )



# store experience 経験を記憶 「以下ををキューに記憶」
#(シーン状態、左の対応アクション、左に対する報酬、これで変化するシーン、判定有無)
agent.store_experience(state_t, action_t, reward_t, state_t_1, terminal)
print("経験を記憶! その結果の agent.D の最後要素と、記憶数:",agent.D[-1],len(agent.D) ) 


#agent.experience_replay() # (経験を学習)agent.Dを使った学習と言える。

================================agent.experience_replay 内容
state_minibatch = []
y_minibatch = []

# sample random minibatch
minibatch_size = min(len(agent.D), agent.minibatch_size)
minibatch_indexes = np.random.randint(0, len(agent.D), minibatch_size)

print("次のfor を、", minibatch_indexes , "で反復する")
it=iter(minibatch_indexes)

#for-------------------------------------◆◆
j=next(it)

state_j, action_j, reward_j, state_j_1, terminal = agent.D[j]
print( "(シーン状態、左の対応アクション、左に対する報酬、これで変化するシーン、判定有無)")
print( state_j , "この状態で", action_j, "の行動時の報酬:", reward_j)
action_j_index = agent.enable_actions.index(action_j)  

y_j=agent.sess.run(agent.y, feed_dict={agent.x: [state_j]})[0]
print( "上記学習データagent.D[{}]のAIが選択した比率:{}".format(j, y_j) )
if terminal:
   y_j[action_j_index] = reward_j # terminalがFalseなので、判定の影響が大!
   print("experience_replay terminal がTrue で", y_j , "へ変更")
else:
   yQ=agent.sess.run(agent.y, feed_dict={agent.x: [state_j_1]})[0]
   y_j[action_j_index] = reward_j + agent.discount_factor * np.max(yQ)  # NOQA
   print("reward_j + max_action Q(state_j_1) terminal がFalse で", y_j , "へ変更 reward_j:",reward_j)


state_minibatch.append(state_j) # 学習に使うバッチデータを構築
y_minibatch.append(y_j) # 
print("minibatch size:", len(y_minibatch) )
#---------------------------------for end -◆◆


# training 上記 forで作ったミニバッチでデータで学習する
agent.sess.run(agent.training, feed_dict={agent.x: state_minibatch, agent.y_: y_minibatch})


# for log
agent.current_loss = agent.sess.run(agent.loss, feed_dict={agent.x: state_minibatch, agent.y_: y_minibatch})
print( "agent.current_loss :", agent.current_loss )
#===========================agent.experience_replay 内容の終わり(経験で学習)の終わり


# for log
frame += 1
loss += agent.current_loss
Q_max += np.max(agent.Q_values(state_t))
if reward_t == 1:
    win += 1

print("terminal :", terminal )

#--------------------------------- while 最後 ★★
print("EPOCH: {:03d}/{:03d} | WIN: {:03d} | LOSS: {:.4f} | Q_MAX: {:.4f}".format(
            e_n, n_epochs - 1, win, loss / frame, Q_max / frame))
e_n+=1











test.py
# coding: utf-8  environment

from __future__ import division

import argparse
import os
import sys

import matplotlib.animation as animation
import matplotlib.pyplot as plt

from catch_ball import CatchBall
from dqn_agent import DQNAgent

def init():
    img.set_array(state_t_1)
    plt.axis("off")
    return img,


def animate(step):
    global win, lose
    global state_t_1, reward_t, terminal

    if terminal:
        env.reset()

        # for log
        if reward_t == 1:
            win += 1
        elif reward_t == -1:
            lose += 1

        print("WIN: {:03d}/{:03d} ({:.1f}%)".format(win, win + lose, 100 * win / (win + lose)))

    else:
        state_t = state_t_1

        # execute action in environment
        action_t = agent.select_action(state_t, 0.0)
        env.execute_action(action_t)


    state_t_1, reward_t, terminal = env.observe()

    # animate
    img.set_array(state_t_1)
    plt.axis("off")
    return img,

if __name__ == "__main__":
    # args
    parser = argparse.ArgumentParser()
    parser.add_argument("-m", "--model_path")
    parser.add_argument("-s", "--save", dest="save", action="store_true")
    parser.set_defaults(save=False)
    args = parser.parse_args()
    print( args )
    #sys.exit()

    # environmet, agent
    env = CatchBall()
    agent = DQNAgent(env.enable_actions, env.name)
    agent.load_model(args.model_path)
    #sys.exit()

    # variables
    win, lose = 0, 0
    state_t_1, reward_t, terminal = env.observe()

    # animate
    fig = plt.figure(figsize=(env.screen_n_rows / 2, env.screen_n_cols / 2))
    fig.canvas.set_window_title("{}-{}".format(env.name, agent.name))
    img = plt.imshow(state_t_1, interpolation="none", cmap="gray")
    ani = animation.FuncAnimation(fig, animate, init_func=init, interval=(1000 / env.frame_rate), blit=True)

    if args.save:
        # save animation (requires ImageMagick)
        ani_path = os.path.join(
            os.path.dirname(os.path.abspath(__file__)), "tmp", "demo-{}.gif".format(env.name))
        ani.save(ani_path, writer="imagemagick", fps=env.frame_rate)
    else:
        # show animation
        plt.show()