initial commit

This commit is contained in:
Dominik Brunmeir 2022-07-22 13:44:29 +02:00
commit 077c635934
6 changed files with 310 additions and 0 deletions

25
.gitignore vendored Normal file
View File

@ -0,0 +1,25 @@
# ---> macOS
.DS_Store
.AppleDouble
.LSOverride
# Icon must end with two \r
Icon
# Thumbnails
._*
__pycache__
# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
run_old.py
.idea
venv

133
DynaQ.py Normal file
View File

@ -0,0 +1,133 @@
import numpy as np
from Environment import Env
np.random.seed(1)
class DynaQ:
def __init__(self, env: Env, episodes: int, epsilon: float, alpha: float, gamma: float, n_steps: int):
# Initialize parameter
self.env = env
self.alpha = alpha
self.gamma = gamma
self.epsilon = epsilon
self.episodes = episodes
self.n_steps = n_steps
self.time_step = 0
self.state = self.env.start
self.steps_per_episode = []
self.state_actions = []
self.step_in_episode = 0
# Initialize Q-matrix and model
self.Q = {}
self.model = {}
for state in list(self.env.G):
self.Q[state] = {}
self.model[state] = {}
for action in list(self.env.G.neighbors(state)) + [state]:
self.Q[state][action] = 0
self.model[state][action] = (-1, action, 0)
'''
Resets the model
'''
def reset(self) -> None:
self.state = self.env.start
self.state_actions = []
self.step_in_episode = 0
self.env.reset()
'''
Learning method for agent
Basically DynaQ algorithm adapted for graphs
'''
def learn(self, epsilon_decay: float, epsilon_min: float, run: int) -> None:
self.steps_per_episode = []
eps = self.epsilon
for episode in range(self.episodes):
done = False
self.reset()
if episode == 70:
self.env.block_node(1)
# Episodes last until the goal is reached
while not done:
print("Run: " + str(run), "n_steps: " + str(self.n_steps), "Episode: " + str(episode),
"State: " + str(self.state))
# Get action, reward and next state
action = self.get_action(eps)
self.state_actions.append((self.state, action))
(done, reward, next_state) = self.env.get_state_reward(self.state, action)
# Bellmann equation
q_current = self.Q[self.state][action]
q_max = np.max(list(self.Q[next_state].values()))
self.Q[self.state][action] = q_current + self.alpha * (reward + self.gamma * q_max) - q_current
# Update model
self.time_step += 1
self.step_in_episode += 1
self.update_model(self.state, action, reward, next_state)
# Planning phase
self.planning()
self.state = next_state
self.steps_per_episode.append(len(self.state_actions))
self.reset()
print("Goal")
eps = max(epsilon_min, self.epsilon * np.exp(-epsilon_decay * episode))
'''
Returns epsilon-greedy action
'''
def get_action(self, eps: float) -> int:
random = np.random.uniform(0, 1)
q = float('-inf')
action_list = list(self.env.G.neighbors(self.state)) + [self.state]
# greedy or not
if random < eps:
action = np.random.choice(action_list)
else:
# if all q-values have the same values
if len(set(self.Q[self.state].values())) == 1:
action = np.random.choice(action_list)
else:
# get action with highest q-value
for a in action_list:
tmp_q = self.Q[self.state][a]
if tmp_q >= q:
q = tmp_q
action = a
return action
'''
Add Reward, next state and current time step to state-action pair in model
'''
def update_model(self, state: int, action: int, reward: float, next_state) -> None:
self.model[state][action] = (reward, next_state, self.time_step)
'''
Planning phase, basically Bellmann equation with already taken state-action pairs
'''
def planning(self) -> None:
for step in range(self.n_steps):
state_rnd = np.random.choice(list(self.model.keys()))
action_rnd = np.random.choice(list(self.env.G.neighbors(state_rnd)) + [state_rnd])
(reward_rnd, next_state_rnd, time_step_rnd) = self.model[state_rnd][action_rnd]
q_rnd = self.Q[state_rnd][action_rnd]
q_max = np.max(list(self.Q[next_state_rnd].values()))
self.Q[state_rnd][action_rnd] = q_rnd + self.alpha * (reward_rnd + self.gamma * q_max) - q_rnd

81
Environment.py Normal file
View File

@ -0,0 +1,81 @@
import typing
import networkx as nx
import matplotlib.pyplot as plt
'''
Build network with networkx library
'''
def build_network():
G = nx.path_graph(11)
G.remove_edges_from(G.edges())
G.add_edges_from([(0, 3), (0, 4),
(1, 2), (1, 4), (1, 5), (1, 8), (1, 9),
(2, 3), (2, 5), (2, 6),
(5, 6), (5, 7),
(7, 8),
(8, 9), (8, 10),
(9, 10)])
nx.draw(G, with_labels=True)
plt.show()
return G
'''
Define Environment class
- G: networkx graph
- start: start-element for agent
- goal: goal-element for agent
- blocked: list of blocked nodes
'''
class Env:
def __init__(self, start: int, goal: int):
self.G = build_network()
self.start = start
self.goal = goal
self.blocked = []
'''
Reset the environment --> no blocked nodes
'''
def reset(self) -> None:
self.blocked = []
'''
Returns the next state and the reward based on state-action pait
'''
def get_state_reward(self, state: int, action: int) -> typing.Tuple[bool, int, int]:
if action == self.goal:
return True, 30, action
elif self.is_node_blocked(action):
return False, -5, state
else:
return False, -1, action
'''
Blocks node
'''
def block_node(self, node: int) -> None:
self.blocked.append(node)
'''
Unblocks node
'''
def release_node(self, node: int) -> None:
self.blocked.remove(node)
'''
Return True if node is blocked
'''
def is_node_blocked(self, node: int) -> int:
return node in self.blocked

56
main.py Normal file
View File

@ -0,0 +1,56 @@
import numpy as np
from Environment import Env
from DynaQ import DynaQ
import matplotlib.pyplot as plt
def main():
# Parameters
alpha = 0.1
gamma = 0.9
epsilon = 1
epsilon_decay = 0.05
epsilon_min = 0.01
episodes = 100
start = 0
goal = 10
runs = 10
# Result arrays
results0 = np.zeros(episodes)
results10 = np.zeros(episodes)
results50 = np.zeros(episodes)
# Build environment
env = Env(start=start, goal=goal)
# Learn for agents with planning steps 0, 10, 50
# Do 10 runs for each agent and take the average of the results
for run in range(runs):
agent0 = DynaQ(env=env, alpha=alpha, gamma=gamma, epsilon=epsilon, n_steps=0, episodes=episodes)
agent0.learn(epsilon_min=epsilon_min, epsilon_decay=epsilon_decay, run=run)
results0 += np.array(agent0.steps_per_episode)
agent10 = DynaQ(env=env, alpha=alpha, gamma=gamma, epsilon=epsilon, n_steps=10, episodes=episodes)
agent10.learn(epsilon_min=epsilon_min, epsilon_decay=epsilon_decay, run=run)
results10 += np.array(agent10.steps_per_episode)
agent50 = DynaQ(env=env, alpha=alpha, gamma=gamma, epsilon=epsilon, n_steps=50, episodes=episodes)
agent50.learn(epsilon_min=epsilon_min, epsilon_decay=epsilon_decay, run=run)
results50 += np.array(agent50.steps_per_episode)
results0 = results0 / runs
results10 = results10 / runs
results50 = results50 / runs
# Plot the results
plt.figure()
plt.plot(range(episodes), results0.tolist(), label='0 planning steps')
plt.plot(range(episodes), results10.tolist(), label='10 planning steps')
plt.plot(range(episodes), results50.tolist(), label='50 planning steps')
plt.legend()
plt.show()
if __name__ == '__main__':
main()

4
readme.md Normal file
View File

@ -0,0 +1,4 @@
# Pathfinding
DynaQ based pathfinding in a network

11
requirements.txt Normal file
View File

@ -0,0 +1,11 @@
cycler==0.11.0
fonttools==4.34.4
kiwisolver==1.4.4
matplotlib==3.5.2
networkx==2.8.5
numpy==1.23.1
packaging==21.3
Pillow==9.2.0
pyparsing==3.0.9
python-dateutil==2.8.2
six==1.16.0