-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmaxpar.py
214 lines (159 loc) · 6.53 KB
/
maxpar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
import timeit
import random
import threading
from graphviz import Digraph
class Task:
name = ""
reads = []
writes = []
run = None
def __init__(self, name, function, reads=[], writes=[]):
self.name = name
self.reads = reads
self.writes = writes
self.run = function
class TaskSystem:
tasks = []
dependencies = dict()
print("test")
def __init__(self, tasks, dependencies):
self.tasks = tasks
self.dependencies = {}
def addTask(self, task):
# Vérification de la validité de la tâche
if task.name == "":
raise ValueError("Le nom de la tâche ne peut pas être vide.")
for t in self.tasks:
if t.name == task.name:
raise ValueError("Le nom de la tâche existe déjà.")
for r in task.reads:
if r == "":
raise ValueError("Le nom d'une lecture ne peut pas être vide.")
if r not in [t.name for t in self.tasks]:
raise ValueError("La tâche de lecture n'existe pas.")
for w in task.writes:
if w == "":
raise ValueError("Le nom d'une écriture ne peut pas être vide.")
if w not in [t.name for t in self.tasks]:
raise ValueError("La tâche d'écriture n'existe pas.")
self.tasks.append(task)
self.dependencies[task.name] = set(task.reads)
def TestBernstein(self,task):
print("Lancement TestBernstein")
# On test toutes les conditions de Bernstein
# s'il y a une interference, on retourne True
for read in task.reads:
if read in task.writes:
return True
for write in task.writes:
if write in task.reads:
return True
for write in task.writes:
if write in task.writes:
return True
# si on retourne False, il n'y a pas d'interférence
return False
def getDependencies(self, nomTache):
# stocke les noms de tâches dépendants
nameTask = []
dependencies = self.dependencies[nomTache]
# parcourt toutes les tâches et vérifie si leur nom correspond à une dépendance donnée
for dep_name in dependencies:
for task in self.tasks:
# si oui, on l'ajoute à la liste des tâches et on sort de la boucle avec break
if task.name == dep_name:
nameTask.append(task.name)
break
return nameTask
# L'exécution séquentielle
def runSeq(self):
print("Lancement runSeq")
"""
Exécute les tâches du système de façon séquentielle en respectant l'ordre imposé
par la relation de précédence.
"""
# On initialise la liste des tâches exécutées
tasksExecuted = []
# Boucle 'while' tant qu'il reste des tâches à exécuter
while len(tasksExecuted) < len(self.tasks):
# Recherche de la première tâche non exécutée qui satisfait les conditions de précédence
for task in self.tasks:
if task.name not in tasksExecuted:
dependencies = self.getDependencies(task.name)
if all(dep in tasksExecuted for dep in dependencies):
task.run()
tasksExecuted.append(task.name)
break
return True
def run(self):
executedTask = []
dependencies = self.getDependencies()
while len(executedTask)< len(self.tasks):
taskInWait = []
for tasks in self.tasks:
if task.name in executedTask:
continue
threads = []
for task in taskInWait:
thread = threading.Thread(target=task.run)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
for task in taskInWait:
executedTask.append(task.name)
taskInWait.clear()
'''
def run(self):
print("lancement séquentiel")
self.runSeq()
print("Fin runSeq")
print("lancement parallèle")
self.run()
print("Fin run")
'''
def detTestRnd(self, num_tests):
"""
Teste la déterminisme du système de tâches en exécutant le système avec des jeux de valeurs
aléatoires pour les variables de façon simple.
"""
for i in range(num_tests):
# Initialiser les valeurs aléatoires pour les tâches
for task in self.tasks:
for var in task.reads + task.writes:
var.value = random.randint(0, 100)
# Exécuter le système en mode séquentiel et en mode parallèle
results_seq = []
results_par = []
for task in self.tasks:
result = task.run()
results_seq.append(result)
for task in self.tasks:
result = task.runPar(self.max_parallelism)
results_par.append(result)
# Comparer les résultats des deux modes d'exécution
if results_seq != results_par:
return False
return True
# on fait le test 5x car les premières exécutions peuvent être plus lentes que les suivantes
def parCost(self, number=5):
print("Lancement parCost")
# On lance l'exécution séquentielle et on calcule le temps de son exécution
runSeq_time = timeit.timeit(lambda: self.runSeq(), number)
# On lance l'exécution parallèle et on calcule le temps de son exécution
run_time = timeit.timeit(lambda: self.run(), number)
# On affiche les temps d'exécution puis on les comparent
print("Temps d'exécution de runSeq moyens :", runSeq_time, "sec")
print("Temps d'exécution de run moyens :", run_time, "sec")
print("Différence de temps d'exécution :",
abs(run_time - runSeq_time), "sec")
# Ne fonctionne pas
def draw(self):
graphe = Digraph()
# La déclaration des noeuds
for task in self.tasks:
graphe.node(task.name)
# La création des arrêtes
for task in self.tasks:
graphe.edge(task.name)
graphe.view()