2022-07-22 13:44:29 +02:00
|
|
|
import numpy as np
|
|
|
|
|
|
|
|
from Environment import Env
|
|
|
|
|
|
|
|
np.random.seed(1)
|
|
|
|
|
|
|
|
|
|
|
|
class DynaQ:
|
|
|
|
def __init__(self, env: Env, episodes: int, epsilon: float, alpha: float, gamma: float, n_steps: int):
|
|
|
|
# Initialize parameter
|
|
|
|
self.env = env
|
|
|
|
self.alpha = alpha
|
|
|
|
self.gamma = gamma
|
|
|
|
self.epsilon = epsilon
|
|
|
|
self.episodes = episodes
|
|
|
|
self.n_steps = n_steps
|
|
|
|
|
|
|
|
self.time_step = 0
|
|
|
|
self.state = self.env.start
|
|
|
|
self.steps_per_episode = []
|
|
|
|
self.state_actions = []
|
|
|
|
self.step_in_episode = 0
|
|
|
|
|
|
|
|
# Initialize Q-matrix and model
|
|
|
|
self.Q = {}
|
|
|
|
self.model = {}
|
|
|
|
|
|
|
|
for state in list(self.env.G):
|
|
|
|
self.Q[state] = {}
|
|
|
|
self.model[state] = {}
|
|
|
|
for action in list(self.env.G.neighbors(state)) + [state]:
|
|
|
|
self.Q[state][action] = 0
|
|
|
|
self.model[state][action] = (-1, action, 0)
|
|
|
|
|
|
|
|
'''
|
|
|
|
Resets the model
|
|
|
|
'''
|
|
|
|
|
|
|
|
def reset(self) -> None:
|
|
|
|
self.state = self.env.start
|
|
|
|
self.state_actions = []
|
|
|
|
self.step_in_episode = 0
|
|
|
|
|
|
|
|
'''
|
|
|
|
Learning method for agent
|
|
|
|
Basically DynaQ algorithm adapted for graphs
|
|
|
|
'''
|
|
|
|
|
|
|
|
def learn(self, epsilon_decay: float, epsilon_min: float, run: int) -> None:
|
|
|
|
self.steps_per_episode = []
|
|
|
|
eps = self.epsilon
|
2022-07-25 11:16:22 +02:00
|
|
|
self.env.reset()
|
2022-07-22 13:44:29 +02:00
|
|
|
for episode in range(self.episodes):
|
|
|
|
done = False
|
|
|
|
self.reset()
|
|
|
|
if episode == 70:
|
|
|
|
self.env.block_node(1)
|
|
|
|
|
|
|
|
# Episodes last until the goal is reached
|
|
|
|
while not done:
|
|
|
|
print("Run: " + str(run), "n_steps: " + str(self.n_steps), "Episode: " + str(episode),
|
|
|
|
"State: " + str(self.state))
|
|
|
|
|
|
|
|
# Get action, reward and next state
|
|
|
|
action = self.get_action(eps)
|
|
|
|
self.state_actions.append((self.state, action))
|
|
|
|
(done, reward, next_state) = self.env.get_state_reward(self.state, action)
|
|
|
|
|
|
|
|
# Bellmann equation
|
|
|
|
q_current = self.Q[self.state][action]
|
|
|
|
q_max = np.max(list(self.Q[next_state].values()))
|
|
|
|
self.Q[self.state][action] = q_current + self.alpha * (reward + self.gamma * q_max) - q_current
|
|
|
|
|
|
|
|
# Update model
|
|
|
|
self.time_step += 1
|
|
|
|
self.step_in_episode += 1
|
|
|
|
self.update_model(self.state, action, reward, next_state)
|
|
|
|
|
|
|
|
# Planning phase
|
|
|
|
self.planning()
|
|
|
|
self.state = next_state
|
|
|
|
|
|
|
|
self.steps_per_episode.append(len(self.state_actions))
|
|
|
|
self.reset()
|
|
|
|
print("Goal")
|
|
|
|
eps = max(epsilon_min, self.epsilon * np.exp(-epsilon_decay * episode))
|
2022-07-25 11:16:22 +02:00
|
|
|
if run == 9 and self.n_steps == 50 and episode==69:
|
|
|
|
self.env.print_shortest_path(self.Q)
|
|
|
|
if run==9 and self.n_steps==50:
|
|
|
|
self.env.print_shortest_path(self.Q)
|
2022-07-22 13:44:29 +02:00
|
|
|
|
|
|
|
'''
|
|
|
|
Returns epsilon-greedy action
|
|
|
|
'''
|
|
|
|
|
|
|
|
def get_action(self, eps: float) -> int:
|
|
|
|
random = np.random.uniform(0, 1)
|
|
|
|
q = float('-inf')
|
|
|
|
action_list = list(self.env.G.neighbors(self.state)) + [self.state]
|
|
|
|
|
|
|
|
# greedy or not
|
|
|
|
if random < eps:
|
|
|
|
action = np.random.choice(action_list)
|
|
|
|
else:
|
|
|
|
# if all q-values have the same values
|
|
|
|
if len(set(self.Q[self.state].values())) == 1:
|
|
|
|
action = np.random.choice(action_list)
|
|
|
|
else:
|
|
|
|
# get action with highest q-value
|
|
|
|
for a in action_list:
|
|
|
|
tmp_q = self.Q[self.state][a]
|
|
|
|
if tmp_q >= q:
|
|
|
|
q = tmp_q
|
|
|
|
action = a
|
|
|
|
return action
|
|
|
|
|
|
|
|
'''
|
|
|
|
Add Reward, next state and current time step to state-action pair in model
|
|
|
|
'''
|
|
|
|
|
|
|
|
def update_model(self, state: int, action: int, reward: float, next_state) -> None:
|
|
|
|
self.model[state][action] = (reward, next_state, self.time_step)
|
|
|
|
|
|
|
|
'''
|
|
|
|
Planning phase, basically Bellmann equation with already taken state-action pairs
|
|
|
|
'''
|
|
|
|
|
|
|
|
def planning(self) -> None:
|
|
|
|
for step in range(self.n_steps):
|
|
|
|
state_rnd = np.random.choice(list(self.model.keys()))
|
|
|
|
action_rnd = np.random.choice(list(self.env.G.neighbors(state_rnd)) + [state_rnd])
|
|
|
|
(reward_rnd, next_state_rnd, time_step_rnd) = self.model[state_rnd][action_rnd]
|
|
|
|
|
|
|
|
q_rnd = self.Q[state_rnd][action_rnd]
|
|
|
|
q_max = np.max(list(self.Q[next_state_rnd].values()))
|
|
|
|
|
|
|
|
self.Q[state_rnd][action_rnd] = q_rnd + self.alpha * (reward_rnd + self.gamma * q_max) - q_rnd
|