-
Notifications
You must be signed in to change notification settings - Fork 0
/
PandiSim.py
65 lines (51 loc) · 2.51 KB
/
PandiSim.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from pyspark import StorageLevel
import sys
import os
sys.path.insert(1, './utils')
import PandiSimConfigInjection as config
import SparkDependencyInjection as sdi
class PandiSim(sdi.SparkDependencyInjection, config.PandiSimConfigInjection):
def __init__(self, network, epi_model, scoring_model, edge_model, params = {'take_screenshots':False, 'destroy':False}):
self.network = network
self.epi_model = epi_model
self.scoring_model = scoring_model
self.edge_model = edge_model
self.params = params
# self.params['t_end'] = epi_model.params['t_end']
def move(self):
sotw = self.epi_model.next_sotw()[1]
self.scoring_model.run()
self.scoring_model.annotate(sotw)
self.edge_model.run()
def _perc_to_steps(self, perc):
return int(perc * self.params['t_end'])
def run(self, perc = 0.1):
stopAt = self._perc_to_steps(perc)
for _ in range(stopAt):
if self.epi_model.step >= 2 and self.params['destroy']:
self.read_state()
self.move()
self.take_screenshot()
def take_screenshot(self):
hdfs = "hdfs://namenode:9000/"
edges_fil = os.path.join(hdfs, self.write_to, f"step_{self.epi_model.step}", "edges.csv")
vertices_fil = os.path.join(hdfs, self.write_to, f"step_{self.epi_model.step}", "vertices.csv")
print(edges_fil)
print(vertices_fil)
if self.params['take_screenshots']:
self.network.vertices\
.write.format("csv").option("delimiter", ',')\
.option('header', False).mode('overwrite').save(vertices_fil)
self.network.edges\
.write.format("csv").option("delimiter", ',')\
.option('header', False).mode('overwrite').save(edges_fil)
def read_state(self):
hdfs = "hdfs://namenode:9000/"
edges_fil = os.path.join(hdfs, self.read_from, f"step_{self.epi_model.step}", "edges.csv")
vertices_fil = os.path.join(hdfs, self.read_from, f"step_{self.epi_model.step}", "vertices.csv")
self.network.vertices = self.spark.read.format("csv").option("delimiter", ',')\
.option('header', False).option('inferSchema', True).load(vertices_fil).toDF('id', 'score', 'health_status')\
.sort('id').cache()
self.network.edges = self.spark.read.format("csv").option("delimiter", ',')\
.option('header', False).option('inferSchema', True).load(edges_fil).toDF('src', 'dst')\
.cache()