# coding: utf-8 dqn_agent.py
from collections import deque
import os
import numpy as np
import tensorflow as tf
class DQNAgent:
"""
Multi Layer Perceptron with Experience Replay
"""
def __init__(self, enable_actions, environment_name):
# parameters
self.name = os.path.splitext(os.path.basename(__file__))[0]
self.environment_name = environment_name
self.enable_actions = enable_actions
self.n_actions = len(self.enable_actions)
self.minibatch_size = 32
self.replay_memory_size = 1000
self.learning_rate = 0.001
self.discount_factor = 0.9
self.exploration = 0.1 # 診査パラメタ
self.model_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "models")
self.model_name = "{}.ckpt".format(self.environment_name)
# replay memory
self.D = deque(maxlen=self.replay_memory_size)
# model
self.init_model()
# variables
self.current_loss = 0.0
def init_model(self):
# input layer (8 x 8)
self.x = tf.placeholder(tf.float32, [None, 8, 8])
# flatten (64)
x_flat = tf.reshape(self.x, [-1, 64])
# fully connected layer (32)
W_fc1 = tf.Variable(tf.truncated_normal([64, 64], stddev=0.01))
b_fc1 = tf.Variable(tf.zeros([64]))
h_fc1 = tf.nn.relu(tf.matmul(x_flat, W_fc1) + b_fc1)
# output layer (n_actions)
W_out = tf.Variable(tf.truncated_normal([64, self.n_actions], stddev=0.01))
b_out = tf.Variable(tf.zeros([self.n_actions]))
self.y = tf.matmul(h_fc1, W_out) + b_out
# loss function
self.y_ = tf.placeholder(tf.float32, [None, self.n_actions])
self.loss = tf.reduce_mean(tf.square(self.y_ - self.y))
。
# train operation
optimizer = tf.train.RMSPropOptimizer(self.learning_rate)
self.training = optimizer.minimize(self.loss)
# saver
self.saver = tf.train.Saver() # saveメソッドやrestoreメソッド用のオブジェクトを生成
# TensorFlow で学習したモデルのグラフを 表示する
graph=tf.get_default_graph()
print(graph.as_graph_def())
# session
self.sess = tf.Session()
self.sess.run(tf.global_variables_initializer())
def Q_values(self, state):
# Q(state, action) of all actions
return self.sess.run(self.y, feed_dict={self.x: [state]})[0]
def select_action(self, state, epsilon):
if np.random.rand() <= epsilon:
# random
return np.random.choice(self.enable_actions)
else:
# max_action Q(state, action)
return self.enable_actions[np.argmax(self.Q_values(state))]
def store_experience(self, state, action, reward, state_1, terminal):
self.D.append((state, action, reward, state_1, terminal))
def experience_replay(self):# 経験で再生
state_minibatch = []
y_minibatch = []
# sample random minibatch
minibatch_size = min(len(self.D), self.minibatch_size)
minibatch_indexes = np.random.randint(0, len(self.D), minibatch_size)
for j in minibatch_indexes:
state_j, action_j, reward_j, state_j_1, terminal = self.D[j]
action_j_index = self.enable_actions.index(action_j)
y_j = self.Q_values(state_j)
if terminal:
y_j[action_j_index] = reward_j
else:
# reward_j + gamma * max_action' Q(state', action')
y_j[action_j_index] = reward_j + self.discount_factor * np.max(self.Q_values(state_j_1)) # NOQA
state_minibatch.append(state_j)
y_minibatch.append(y_j)
# training
self.sess.run(self.training, feed_dict={self.x: state_minibatch, self.y_: y_minibatch})
# for log
self.current_loss = self.sess.run(self.loss, feed_dict={self.x: state_minibatch, self.y_: y_minibatch})
def load_model(self, model_path=None):
if model_path:
print( "load from model_path:" + model_path)
self.saver.restore(self.sess, model_path)
else:
print( "load from checkpoint:" + self.model_dir )
checkpoint = tf.train.get_checkpoint_state(self.model_dir)
if checkpoint and checkpoint.model_checkpoint_path:
self.saver.restore(self.sess, checkpoint.model_checkpoint_path)
def save_model(self):
path = os.path.join(self.model_dir, self.model_name)
print( "sessを", path, "の名前で保存")
self.saver.save(self.sess, path)
# observe environment.py
import os
import numpy as np
class CatchBall:
'''
シーンを「screen」の2次配列で、playerやballを1の要素で管理する(他の要素はゼロ)
'''
def __init__(self):
# parameters
self.name = os.path.splitext(os.path.basename(__file__))[0]
self.screen_n_rows = 8
self.screen_n_cols = 8
self.player_length = 3
self.enable_actions = (0, 1, 2)
self.frame_rate = 5
# variables
self.reset()
def __update(self, action):
"""
action:
0: do nothing
1: move left
2: move right
"""
# update player position
if action == self.enable_actions[1]:
# move left
self.player_col = max(0, self.player_col - 1)
elif action == self.enable_actions[2]:
# move right
self.player_col = min(self.player_col + 1, self.screen_n_cols - self.player_length)
else:
# do nothing
pass
# update ball position
self.ball_row += 1
# collision detection
self.reward = 0
self.terminal = False
if self.ball_row == self.screen_n_rows - 1:
self.terminal = True
if self.player_col <= self.ball_col < self.player_col + self.player_length:
# catch
self.reward = 1
else:
# drop
self.reward = -1
def draw(self): # 実際に描画する訳ではないが、playとbollのインスタンス変数の座標で、screenを更新する。
# reset screen
self.screen = np.zeros((self.screen_n_rows, self.screen_n_cols))
# draw player
self.screen[self.player_row, self.player_col:self.player_col + self.player_length] = 1
# draw ball
self.screen[self.ball_row, self.ball_col] = 1
def observe(self):
self.draw()
return self.screen, self.reward, self.terminal
def execute_action(self, action):
self.__update(action)
def reset(self):
# reset player position
self.player_row = self.screen_n_rows - 1
self.player_col = np.random.randint(self.screen_n_cols - self.player_length)
# reset ball position
self.ball_row = 0
self.ball_col = np.random.randint(self.screen_n_cols)
# reset other variables
self.reward = 0
self.terminal = False
# coding: utf-8 train.py
import numpy as np
from catch_ball import CatchBall
from dqn_agent import DQNAgent
if __name__ == "__main__":
# parameters
n_epochs = 1000
# environment, agent
env = CatchBall()
agent = DQNAgent(env.enable_actions, env.name)
# variables
win = 0
for e in range(n_epochs):
# reset
frame = 0
loss = 0.0
Q_max = 0.0
env.reset()
state_t_1, reward_t, terminal = env.observe()
while not terminal:
state_t = state_t_1
# execute action in environment
action_t = agent.select_action(state_t, agent.exploration)
env.execute_action(action_t)
# observe environment
state_t_1, reward_t, terminal = env.observe()
# store experience
agent.store_experience(state_t, action_t, reward_t, state_t_1, terminal)
# experience replay
agent.experience_replay()
# for log
frame += 1
loss += agent.current_loss
Q_max += np.max(agent.Q_values(state_t))
if reward_t == 1:
win += 1
print("EPOCH: {:03d}/{:03d} | WIN: {:03d} | LOSS: {:.4f} | Q_MAX: {:.4f}".format(
e, n_epochs - 1, win, loss / frame, Q_max / frame))
# save model
agent.save_model()
検討用 train.py
import numpy as np
from catch_ball import CatchBall
from dqn_agent import DQNAgent
n_epochs = 1000
# environment, agent
env = CatchBall()
agent = DQNAgent(env.enable_actions, env.name)
# variables
win = 0
e_n=0
#------- for e_n in range(n_epochs): エピックの学習の繰り返し
# reset
frame = 0
loss = 0.0
Q_max = 0.0
env.reset()
state_t_1, reward_t, terminal = env.observe()
print( "新しい学習対象ゲーム情報を提示",state_t_1, reward_t, terminal)
#--------------------------------- while not terminal: 一つの報酬(+/-)★★
state_t = state_t_1
# action_t <== 行動の決定
# execute action in environment
# action_t = agent.select_action(state_t, agent.exploration)
#===========
if np.random.rand() <= agent.exploration:
action_t = np.random.choice(agent.enable_actions)
print( "ランダムの行動決定:",action_t)
else:
yQ=agent.sess.run(agent.y, feed_dict={agent.x: [state_t]})[0]
action_t = agent.enable_actions[np.argmax(yQ)]
print("AIに[state_t]の状態でアクションを選ばせる:",yQ, "行動の決定 最大:", action_t )
#===========
env.execute_action(action_t)
print("player_col:",env.player_col,"、ball_row:",env.ball_row,
"、terminal", env.terminal, "の変数が実行で更新")
# observe environment 上記変数を使って、env.screenを更新して、返す。
state_t_1, reward_t, terminal = env.observe()
print("上記変数で更新されたスクリーン:", state_t_1,
"ここでterminal変数を得る terminal:", terminal )
# store experience 経験を記憶 「以下ををキューに記憶」
#(シーン状態、左の対応アクション、左に対する報酬、これで変化するシーン、判定有無)
agent.store_experience(state_t, action_t, reward_t, state_t_1, terminal)
print("経験を記憶! その結果の agent.D の最後要素と、記憶数:",agent.D[-1],len(agent.D) )
#agent.experience_replay() # (経験を学習)agent.Dを使った学習と言える。
================================agent.experience_replay 内容
state_minibatch = []
y_minibatch = []
# sample random minibatch
minibatch_size = min(len(agent.D), agent.minibatch_size)
minibatch_indexes = np.random.randint(0, len(agent.D), minibatch_size)
print("次のfor を、", minibatch_indexes , "で反復する")
it=iter(minibatch_indexes)
#for-------------------------------------◆◆
j=next(it)
state_j, action_j, reward_j, state_j_1, terminal = agent.D[j]
print( "(シーン状態、左の対応アクション、左に対する報酬、これで変化するシーン、判定有無)")
print( state_j , "この状態で", action_j, "の行動時の報酬:", reward_j)
action_j_index = agent.enable_actions.index(action_j)
y_j=agent.sess.run(agent.y, feed_dict={agent.x: [state_j]})[0]
print( "上記学習データagent.D[{}]のAIが選択した比率:{}".format(j, y_j) )
if terminal:
y_j[action_j_index] = reward_j # terminalがFalseなので、判定の影響が大!
print("experience_replay terminal がTrue で", y_j , "へ変更")
else:
yQ=agent.sess.run(agent.y, feed_dict={agent.x: [state_j_1]})[0]
y_j[action_j_index] = reward_j + agent.discount_factor * np.max(yQ) # NOQA
print("reward_j + max_action Q(state_j_1) terminal がFalse で", y_j , "へ変更 reward_j:",reward_j)
state_minibatch.append(state_j) # 学習に使うバッチデータを構築
y_minibatch.append(y_j) #
print("minibatch size:", len(y_minibatch) )
#---------------------------------for end -◆◆
# training 上記 forで作ったミニバッチでデータで学習する
agent.sess.run(agent.training, feed_dict={agent.x: state_minibatch, agent.y_: y_minibatch})
# for log
agent.current_loss = agent.sess.run(agent.loss, feed_dict={agent.x: state_minibatch, agent.y_: y_minibatch})
print( "agent.current_loss :", agent.current_loss )
#===========================agent.experience_replay 内容の終わり(経験で学習)の終わり
# for log
frame += 1
loss += agent.current_loss
Q_max += np.max(agent.Q_values(state_t))
if reward_t == 1:
win += 1
print("terminal :", terminal )
#--------------------------------- while 最後 ★★
print("EPOCH: {:03d}/{:03d} | WIN: {:03d} | LOSS: {:.4f} | Q_MAX: {:.4f}".format(
e_n, n_epochs - 1, win, loss / frame, Q_max / frame))
e_n+=1
test.py
# coding: utf-8 environment
from __future__ import division
import argparse
import os
import sys
import matplotlib.animation as animation
import matplotlib.pyplot as plt
from catch_ball import CatchBall
from dqn_agent import DQNAgent
def init():
img.set_array(state_t_1)
plt.axis("off")
return img,
def animate(step):
global win, lose
global state_t_1, reward_t, terminal
if terminal:
env.reset()
# for log
if reward_t == 1:
win += 1
elif reward_t == -1:
lose += 1
print("WIN: {:03d}/{:03d} ({:.1f}%)".format(win, win + lose, 100 * win / (win + lose)))
else:
state_t = state_t_1
# execute action in environment
action_t = agent.select_action(state_t, 0.0)
env.execute_action(action_t)
state_t_1, reward_t, terminal = env.observe()
# animate
img.set_array(state_t_1)
plt.axis("off")
return img,
if __name__ == "__main__":
# args
parser = argparse.ArgumentParser()
parser.add_argument("-m", "--model_path")
parser.add_argument("-s", "--save", dest="save", action="store_true")
parser.set_defaults(save=False)
args = parser.parse_args()
print( args )
#sys.exit()
# environmet, agent
env = CatchBall()
agent = DQNAgent(env.enable_actions, env.name)
agent.load_model(args.model_path)
#sys.exit()
# variables
win, lose = 0, 0
state_t_1, reward_t, terminal = env.observe()
# animate
fig = plt.figure(figsize=(env.screen_n_rows / 2, env.screen_n_cols / 2))
fig.canvas.set_window_title("{}-{}".format(env.name, agent.name))
img = plt.imshow(state_t_1, interpolation="none", cmap="gray")
ani = animation.FuncAnimation(fig, animate, init_func=init, interval=(1000 / env.frame_rate), blit=True)
if args.save:
# save animation (requires ImageMagick)
ani_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "tmp", "demo-{}.gif".format(env.name))
ani.save(ani_path, writer="imagemagick", fps=env.frame_rate)
else:
# show animation
plt.show()