-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathReplayBuffers.py
164 lines (131 loc) · 6.86 KB
/
ReplayBuffers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import numpy as np
from collections import deque, namedtuple
import random
import torch
class ReplayBuffer:
"""Fixed-size buffer to store experience tuples."""
def __init__(self, buffer_size, batch_size, device, seed, gamma, n_step=1, parallel_env=4):
"""Initialize a ReplayBuffer object.
Params
======
buffer_size (int): maximum size of buffer
batch_size (int): size of each training batch
seed (int): random seed
"""
self.device = device
self.memory = deque(maxlen=buffer_size)
self.batch_size = batch_size
self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])
self.seed = random.seed(seed)
self.gamma = gamma
self.n_step = n_step
self.parallel_env = parallel_env
self.n_step_buffer = [deque(maxlen=self.n_step) for i in range(parallel_env)]
self.iter_ = 0
def add(self, state, action, reward, next_state, done):
"""Add a new experience to memory."""
if self.iter_ == self.parallel_env:
self.iter_ = 0
self.n_step_buffer[self.iter_].append((state, action, reward, next_state, done))
if len(self.n_step_buffer[self.iter_]) == self.n_step:
state, action, reward, next_state, done = self.calc_multistep_return(self.n_step_buffer[self.iter_])
e = self.experience(state, action, reward, next_state, done)
self.memory.append(e)
self.iter_ += 1
def calc_multistep_return(self, n_step_buffer):
Return = 0
for idx in range(self.n_step):
Return += self.gamma**idx * n_step_buffer[idx][2]
return n_step_buffer[0][0], n_step_buffer[0][1], Return, n_step_buffer[-1][3], n_step_buffer[-1][4]
def sample(self):
"""Randomly sample a batch of experiences from memory."""
experiences = random.sample(self.memory, k=self.batch_size)
states = torch.from_numpy(np.stack([e.state for e in experiences if e is not None])).float().to(self.device)
actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).long().to(self.device)
rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(self.device)
next_states = torch.from_numpy(np.stack([e.next_state for e in experiences if e is not None])).float().to(self.device)
dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to(self.device)
return (states, actions, rewards, next_states, dones)
def __len__(self):
"""Return the current size of internal memory."""
return len(self.memory)
class PrioritizedReplay(object):
"""
Proportional Prioritization
"""
def __init__(self, capacity, batch_size, seed, gamma=0.99, n_step=1, alpha=0.6, beta_start = 0.4, beta_frames=100000, parallel_env=4):
self.alpha = alpha
self.beta_start = beta_start
self.beta_frames = beta_frames
self.frame = 1 #for beta calculation
self.batch_size = batch_size
self.capacity = capacity
self.buffer = []
self.pos = 0
self.priorities = np.zeros((capacity,), dtype=np.float32)
self.seed = np.random.seed(seed)
self.n_step = n_step
self.parallel_env = parallel_env
self.n_step_buffer = [deque(maxlen=self.n_step) for i in range(parallel_env)]
self.iter_ = 0
self.gamma = gamma
def calc_multistep_return(self,n_step_buffer):
Return = 0
for idx in range(self.n_step):
Return += self.gamma**idx * n_step_buffer[idx][2]
return n_step_buffer[0][0], n_step_buffer[0][1], Return, n_step_buffer[-1][3], n_step_buffer[-1][4]
def beta_by_frame(self, frame_idx):
"""
Linearly increases beta from beta_start to 1 over time from 1 to beta_frames.
3.4 ANNEALING THE BIAS (Paper: PER)
We therefore exploit the flexibility of annealing the amount of importance-sampling
correction over time, by defining a schedule on the exponent
that reaches 1 only at the end of learning. In practice, we linearly anneal from its initial value 0 to 1
"""
return min(1.0, self.beta_start + frame_idx * (1.0 - self.beta_start) / self.beta_frames)
def add(self, state, action, reward, next_state, done):
if self.iter_ == self.parallel_env:
self.iter_ = 0
assert state.ndim == next_state.ndim
state = np.expand_dims(state, 0)
next_state = np.expand_dims(next_state, 0)
# n_step calc
self.n_step_buffer[self.iter_].append((state, action, reward, next_state, done))
if len(self.n_step_buffer[self.iter_]) == self.n_step:
state, action, reward, next_state, done = self.calc_multistep_return(self.n_step_buffer[self.iter_])
max_prio = self.priorities.max() if self.buffer else 1.0 # gives max priority if buffer is not empty else 1
if len(self.buffer) < self.capacity:
self.buffer.append((state, action, reward, next_state, done))
else:
# puts the new data on the position of the oldes since it circles via pos variable
# since if len(buffer) == capacity -> pos == 0 -> oldest memory (at least for the first round?)
self.buffer[self.pos] = (state, action, reward, next_state, done)
self.priorities[self.pos] = max_prio
self.pos = (self.pos + 1) % self.capacity # lets the pos circle in the ranges of capacity if pos+1 > cap --> new posi = 0
self.iter_ += 1
def sample(self):
N = len(self.buffer)
if N == self.capacity:
prios = self.priorities
else:
prios = self.priorities[:self.pos]
# calc P = p^a/sum(p^a)
probs = prios ** self.alpha
P = probs/probs.sum()
#gets the indices depending on the probability p
indices = np.random.choice(N, self.batch_size, p=P)
samples = [self.buffer[idx] for idx in indices]
beta = self.beta_by_frame(self.frame)
self.frame+=1
#Compute importance-sampling weight
weights = (N * P[indices]) ** (-beta)
# normalize weights
weights /= weights.max()
weights = np.array(weights, dtype=np.float32)
states, actions, rewards, next_states, dones = zip(*samples)
return np.concatenate(states), actions, rewards, np.concatenate(next_states), dones, indices, weights
def update_priorities(self, batch_indices, batch_priorities):
for idx, prio in zip(batch_indices, batch_priorities):
self.priorities[idx] = prio
def __len__(self):
return len(self.buffer)