-
Notifications
You must be signed in to change notification settings - Fork 0
/
cav_environment.py
343 lines (275 loc) · 14.2 KB
/
cav_environment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
import math
import akro
import numpy as np
import scipy.io as sio
from garage import Environment, EnvSpec, EnvStep, StepType
from scipy.stats import truncnorm
def rand_trans_matrix(trans_matrix=np.array([[0.35, 0.30, 0.20, 0.10, 0.05],
[0.25, 0.30, 0.25, 0.15, 0.05],
[0.10, 0.25, 0.30, 0.25, 0.10],
[0.05, 0.15, 0.25, 0.30, 0.25],
[0.05, 0.10, 0.20, 0.30, 0.35]], dtype=np.float32),
mean=0,
sd=1,
binom_range=0.2
):
def get_truncated_normal(mean, sd, low, upp):
return truncnorm(
(low - mean) / sd, (upp - mean) / sd, loc=mean, scale=sd)
unnormalized = [[get_truncated_normal(mean=mean, sd=sd, low=num-(
num*binom_range), upp=num+(num*binom_range)).rvs() for num in row] for row in trans_matrix]
normalized = np.array([[item/sum(row) for item in row]
for row in unnormalized], dtype=np.float32)
return normalized
class CAVVelEnv(Environment):
"""A simple CAV point environment.
Args:
goal (np.ndarray): A 2D array representing the input transition matrix
never_done (bool): Never send a `done` signal, even if the
agent achieves the goal
max_episode_length (int): The maximum steps allowed for an episode.
"""
def __init__(self,
goal=np.array([[0.35, 0.30, 0.20, 0.10, 0.05],
[0.25, 0.30, 0.25, 0.15, 0.05],
[0.10, 0.25, 0.30, 0.25, 0.10],
[0.05, 0.15, 0.25, 0.30, 0.25],
[0.05, 0.10, 0.20, 0.30, 0.35]], dtype=np.float32),
never_done=False,
max_episode_length=math.inf,
CAV_pairs=2
):
self._goal = goal
self._never_done = never_done
self._step_cnt = None
self._max_episode_length = max_episode_length
self._visualize = False
self._task = {'goal': self._goal}
self._observation_space = akro.Box(low=-np.inf,
high=np.inf,
shape=(6, ),
dtype=np.float32)
self._action_space = akro.Box(low=-1,
high=1,
shape=(2, ),
dtype=np.float32)
self._spec = EnvSpec(action_space=self.action_space,
observation_space=self.observation_space,
max_episode_length=max_episode_length)
#########################################################################################################################
self._infeasible_penalty = -10
############################################### Communication parameters #################################################
self._bandwidth_basis = np.array([7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,
5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,
7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7], dtype=np.float32) # *1000000
self._switch_coeff = 0.4
############################################### optimal resource allocation ###################################
self._obj_opt_2CAV = np.squeeze(sio.loadmat(
'KKT_opt/KKT_2CAV_opt_data.mat').get('KKT_obj_2CAV'))
self._obj_opt_1CAV = np.squeeze(sio.loadmat(
'KKT_opt/KKT_1CAV_opt_data.mat').get('KKT_obj_1CAV'))
############################################### CAV environment ###############################################
self._CAV_pairs = CAV_pairs
# All candidate states for transmitter-receiver distances
self._distances = np.arange(6, 36, 6)
# state transition prob. matrix for distance states
self.trans_matrix = goal
self.workloads = np.array([4, 5, 6, 7, 8])
self.load_trans = np.array([[0.35, 0.30, 0.20, 0.10, 0.05],
[0.25, 0.30, 0.25, 0.15, 0.05],
[0.10, 0.25, 0.30, 0.25, 0.10],
[0.05, 0.15, 0.25, 0.30, 0.25],
[0.05, 0.10, 0.20, 0.30, 0.35]])
self.O_init = 8
def reset(self):
"""Reset the environment.
Returns:
numpy.ndarray: The first observation conforming to
`observation_space`.
dict: The episode-level information.
Note that this is not part of `env_info` provided in `step()`.
It contains information of the entire episode, which could be
needed to determine the first action (e.g. in the case of
goal-conditioned or MTRL.)
"""
self._step_cnt = 0
####################### Available bandwidth (depending on HDV status) #####################################################
band_basis = self._bandwidth_basis[self._step_cnt]
if band_basis == 7:
self.Bandwidth_available = np.random.default_rng().choice([
6, 7], 1, [0.5, 0.5])
elif band_basis == 2:
self.Bandwidth_available = np.random.default_rng().choice([
2, 3], 1, [0.5, 0.5])
else:
self.Bandwidth_available = np.random.default_rng().choice(
[band_basis-1, band_basis, band_basis+1], 1, [0.25, 0.5, 0.25])
Bandwidth_norm = (self.Bandwidth_available - 2)/5
########################### CAV Workload (number of objects for detection) ##################################################################
# Dynmaic workload: number of objects for detection by the DNN model
self.O_vehs_all = self.O_init * np.ones(self._CAV_pairs)
O_norm = (self.O_vehs_all - np.array([4]*self._CAV_pairs))/4
############################# Distance and channel gain ###############################################
self.Distance_all = 6*np.ones(self._CAV_pairs)
Distance_norm = (self.Distance_all - np.array([6]*self._CAV_pairs))/24
########################################## State ######################################################
self.curr_state = [0]*self._CAV_pairs
self.pre_action = np.array([1]*self._CAV_pairs)
for i_agent in range(self._CAV_pairs):
self.curr_state[i_agent] = np.r_[Bandwidth_norm,
O_norm[i_agent], Distance_norm[i_agent]]
first_obs = np.array(self.curr_state).flatten()
return first_obs, dict(goal=self._goal)
def step(self, action):
########################################################################### MADDPG ######################################################################################################
if self._step_cnt is None:
raise RuntimeError('reset() must be called before step()!')
actions_array = np.array(action)
actions = []
for a in actions_array:
if (a < 0):
actions.append(0)
else:
actions.append(1)
actions = np.array(actions)
################################### Action processing #################################################
# Number of CAV pairs in cooperation mode
Activated_CAV_pair_num = sum(actions)
Activated_index = np.where(actions == 1)
################################### Given state and action ############################################
obj_opt = 0
if Activated_CAV_pair_num > 0:
O_vehs = self.O_vehs_all[Activated_index]
Distance = self.Distance_all[Activated_index]
if Activated_CAV_pair_num > 1:
sort_index = np.array(Distance).argsort()
O_vehs_ordered = O_vehs[sort_index]
Distance_ordered = Distance[sort_index]
if Activated_CAV_pair_num == 2:
obj_opt = self._obj_opt_2CAV[int(self.Bandwidth_available-2), int(O_vehs_ordered[0]-4), int(
O_vehs_ordered[1]-4), int(Distance_ordered[0]/3-1), int(Distance_ordered[1]/3-1)]
elif Activated_CAV_pair_num == 1:
obj_opt = self._obj_opt_1CAV[int(
self.Bandwidth_available-2), int(O_vehs[0]-4), int(Distance[0]/3-1)]
####################################### Calculate the reward ##########################################
reward = 0
reward_modified = 0
if obj_opt == -1:
obj_modified = 0
actions_modified = np.array([0]*self._CAV_pairs)
switch_cost = self._CAV_pairs - \
np.count_nonzero(actions_modified == self.pre_action)
reward_modified = obj_modified - switch_cost * self._switch_coeff
reward = reward_modified + self._infeasible_penalty
else:
obj_modified = obj_opt
actions_modified = actions
switch_cost = self._CAV_pairs - \
np.count_nonzero(actions_modified == self.pre_action)
reward = obj_opt - switch_cost * self._switch_coeff
reward_modified = reward
self.pre_action = actions_modified
####################################### Get the next state ##########################################
####################### Available bandwidth (depending on HDV status) #####################################################
band_basis = self._bandwidth_basis[self._step_cnt]
if band_basis == 7:
self.Bandwidth_available = np.random.default_rng().choice([
6, 7], 1, [0.5, 0.5])
elif band_basis == 2:
self.Bandwidth_available = np.random.default_rng().choice([
2, 3], 1, [0.5, 0.5])
else:
self.Bandwidth_available = np.random.default_rng().choice(
[band_basis-1, band_basis, band_basis+1], 1, [0.25, 0.5, 0.25])
Bandwidth_norm = (self.Bandwidth_available - 2)/5
########################### CAV Workload (number of objects for detection) ##################################################################
O_next = np.zeros([self._CAV_pairs])
for i_agent in range(self._CAV_pairs):
p_O = self.load_trans[int(self.O_vehs_all[i_agent]-4), :]
O_next[i_agent] = np.random.default_rng().choice(
self.workloads, 1, p=p_O)
self.O_vehs_all = O_next
O_norm = (self.O_vehs_all - np.array([4]*self._CAV_pairs))/4
############################# Distance and channel gain ###############################################
distance_next = np.zeros([self._CAV_pairs])
for i_agent in range(self._CAV_pairs):
p_D = self.trans_matrix[int(self.Distance_all[i_agent]/6-1), :]
distance_next[i_agent] = np.random.default_rng().choice(
self._distances, 1, p=p_D)
self.Distance_all = distance_next
Distance_norm = (self.Distance_all - np.array([6]*self._CAV_pairs))/24
##################################### State ###########################################################
state_next = [0]*self._CAV_pairs # This is a list
self._step_cnt += 1
succ = False
if self._step_cnt == self._max_episode_length:
succ = True
for i_agent in range(self._CAV_pairs):
state_next[i_agent] = np.r_[Bandwidth_norm,
O_norm[i_agent], Distance_norm[i_agent]]
self.curr_state = state_next
done = succ and not self._never_done
step_type = StepType.get_step_type(
step_cnt=self._step_cnt,
max_episode_length=self._max_episode_length,
done=done)
if step_type in (StepType.TERMINAL, StepType.TIMEOUT):
self._step_cnt = None
return EnvStep(env_spec=self.spec,
action=action,
reward=reward,
observation=np.array(state_next).flatten(),
env_info={
'task': self._task,
'success': succ
},
step_type=step_type)
# pylint: disable=no-self-use
def sample_tasks(self, num_tasks):
"""Sample a list of `num_tasks` tasks.
Args:
num_tasks (int): Number of tasks to sample.
Returns:
list[dict[str, np.ndarray]]: A list of "tasks", where each task is
a dictionary containing a single key, "goal", mapping to a
random transition matrix.
"""
tasks = [{"goal": rand_trans_matrix()} for _ in range(num_tasks)]
return tasks
def set_task(self, task):
"""Reset with a task.
Args:
task (dict[str, np.ndarray]): A task (a dictionary containing a
single key, "goal", which should be a random transition matrix).
"""
self._task = task
self._goal = task['goal']
def render(self, mode):
"""Renders the environment."""
return f'Goal: {self._goal}'
def visualize(self):
"""Creates a visualization of the environment."""
self._visualize = True
print(self.render('ascii'))
def close(self):
"""Close the env."""
@property
def action_space(self):
"""akro.Space: The action space specification."""
return self._action_space
@property
def observation_space(self):
"""akro.Space: The observation space specification."""
return self._observation_space
@property
def spec(self):
"""EnvSpec: The environment specification."""
return self._spec
@property
def render_modes(self):
"""list: A list of string representing the supported render modes."""
return [
'ascii',
]