-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy path01_frozenlake_q_learning.py
executable file
·76 lines (65 loc) · 2.31 KB
/
01_frozenlake_q_learning.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#!/usr/bin/env python3
import gym
import collections
from tensorboardX import SummaryWriter
ENV_NAME = "FrozenLake-v0"
GAMMA = 0.9
ALPHA = 0.2
TEST_EPISODES = 20
class Agent:
def __init__(self):
self.env = gym.make(ENV_NAME)
self.state = self.env.reset()
self.values = collections.defaultdict(float)
def sample_env(self):
action = self.env.action_space.sample()
old_state = self.state
new_state, reward, is_done, _ = self.env.step(action)
self.state = self.env.reset() if is_done else new_state
return (old_state, action, reward, new_state)
def best_value_and_action(self, state):
best_value, best_action = None, None
for action in range(self.env.action_space.n):
action_value = self.values[(state, action)]
if best_value is None or best_value < action_value:
best_value = action_value
best_action = action
return best_value, best_action
def value_update(self, s, a, r, next_s):
best_v, _ = self.best_value_and_action(next_s)
new_val = r + GAMMA * best_v
old_val = self.values[(s, a)]
self.values[(s, a)] = old_val * (1-ALPHA) + new_val * ALPHA
def play_episode(self, env):
total_reward = 0.0
state = env.reset()
while True:
_, action = self.best_value_and_action(state)
new_state, reward, is_done, _ = env.step(action)
total_reward += reward
if is_done:
break
state = new_state
return total_reward
if __name__ == "__main__":
test_env = gym.make(ENV_NAME)
agent = Agent()
writer = SummaryWriter(comment="-q-learning")
iter_no = 0
best_reward = 0.0
while True:
iter_no += 1
s, a, r, next_s = agent.sample_env()
agent.value_update(s, a, r, next_s)
reward = 0.0
for _ in range(TEST_EPISODES):
reward += agent.play_episode(test_env)
reward /= TEST_EPISODES
writer.add_scalar("reward", reward, iter_no)
if reward > best_reward:
print("Best reward updated %.3f -> %.3f" % (best_reward, reward))
best_reward = reward
if reward > 0.80:
print("Solved in %d iterations!" % iter_no)
break
writer.close()