import numpy as np from Environment import Env np.random.seed(1) class DynaQ: def __init__(self, env: Env, episodes: int, epsilon: float, alpha: float, gamma: float, n_steps: int): # Initialize parameter self.env = env self.alpha = alpha self.gamma = gamma self.epsilon = epsilon self.episodes = episodes self.n_steps = n_steps self.time_step = 0 self.state = self.env.start self.steps_per_episode = [] self.state_actions = [] self.step_in_episode = 0 # Initialize Q-matrix and model self.Q = {} self.model = {} for state in list(self.env.G): self.Q[state] = {} self.model[state] = {} for action in list(self.env.G.neighbors(state)) + [state]: self.Q[state][action] = 0 self.model[state][action] = (-1, action, 0) ''' Resets the model ''' def reset(self) -> None: self.state = self.env.start self.state_actions = [] self.step_in_episode = 0 ''' Learning method for agent Basically DynaQ algorithm adapted for graphs ''' def learn(self, epsilon_decay: float, epsilon_min: float, run: int) -> None: self.steps_per_episode = [] eps = self.epsilon self.env.reset() for episode in range(self.episodes): done = False self.reset() if episode == 70: self.env.block_node(1) # Episodes last until the goal is reached while not done: print("Run: " + str(run), "n_steps: " + str(self.n_steps), "Episode: " + str(episode), "State: " + str(self.state)) # Get action, reward and next state action = self.get_action(eps) self.state_actions.append((self.state, action)) (done, reward, next_state) = self.env.get_state_reward(self.state, action) # Bellmann equation q_current = self.Q[self.state][action] q_max = np.max(list(self.Q[next_state].values())) self.Q[self.state][action] = q_current + self.alpha * (reward + self.gamma * q_max) - q_current # Update model self.time_step += 1 self.step_in_episode += 1 self.update_model(self.state, action, reward, next_state) # Planning phase self.planning() self.state = next_state self.steps_per_episode.append(len(self.state_actions)) self.reset() print("Goal") eps = max(epsilon_min, self.epsilon * np.exp(-epsilon_decay * episode)) if run == 9 and self.n_steps == 50 and episode==69: self.env.print_shortest_path(self.Q) if run==9 and self.n_steps==50: self.env.print_shortest_path(self.Q) ''' Returns epsilon-greedy action ''' def get_action(self, eps: float) -> int: random = np.random.uniform(0, 1) q = float('-inf') action_list = list(self.env.G.neighbors(self.state)) + [self.state] # greedy or not if random < eps: action = np.random.choice(action_list) else: # if all q-values have the same values if len(set(self.Q[self.state].values())) == 1: action = np.random.choice(action_list) else: # get action with highest q-value for a in action_list: tmp_q = self.Q[self.state][a] if tmp_q >= q: q = tmp_q action = a return action ''' Add Reward, next state and current time step to state-action pair in model ''' def update_model(self, state: int, action: int, reward: float, next_state) -> None: self.model[state][action] = (reward, next_state, self.time_step) ''' Planning phase, basically Bellmann equation with already taken state-action pairs ''' def planning(self) -> None: for step in range(self.n_steps): state_rnd = np.random.choice(list(self.model.keys())) action_rnd = np.random.choice(list(self.env.G.neighbors(state_rnd)) + [state_rnd]) (reward_rnd, next_state_rnd, time_step_rnd) = self.model[state_rnd][action_rnd] q_rnd = self.Q[state_rnd][action_rnd] q_max = np.max(list(self.Q[next_state_rnd].values())) self.Q[state_rnd][action_rnd] = q_rnd + self.alpha * (reward_rnd + self.gamma * q_max) - q_rnd