-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPredictOri.py
455 lines (423 loc) · 26 KB
/
PredictOri.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
__author__ = "Joseph ARIAS, Arthur BORGES"
__copyright__ = "Copyright 2024, PredictOri"
__license__ = "GPL"
__maintainer__ = "Joseph ARIAS"
__email__ = "joseph.arias@ens.uvsq.fr"
__status__ = "Released"
import flet as ft
import matplotlib.lines
import matplotlib.pyplot as plt
from flet.matplotlib_chart import MatplotlibChart
import regex as re
import threading
# PARTIE INTERFACE GRAPHIQUE
class Interface:
"""Classe de l'interface."""
def __init__(self, page):
self.file_picker = ft.Ref[ft.FilePicker]() # Sélecteur de fichier
self.progress_ring = ft.Ref[ft.ProgressRing]() # Anneau de progression pour visualiser le temps d'attente
# restant
self.page = page # Page de l'interface qui contiendra les différentes fenêtres (views)
self.page.title = "PredictOri" # Titre de la page
self.page.window_maximized = True # Maximise la fenêtre
self.window_ori = [] # Fenêtre contenant le point d'inversion
self.ori_start = [] # Nucléotide du début de la fenêtre contenant l'origine de réplication
self.ori_end = [] # Nucléotide de la fin de la fenêtre contenant de l'origine de réplication
# Ajout du sélecteur de fichier
self.page.overlay.append(
ft.FilePicker(on_result=lambda e: self.file_selected(e, self.file_picker), ref=self.file_picker))
self.chart1 = MatplotlibChart() # Graphique pour afficher le ratio (G-C) / (G+C) en fonction de la fenêtre
self.chart2 = MatplotlibChart() # Graphique pour afficher le chemin de la séquence d'ADN
self.wait_graph = threading.Condition() # Condition pour attendre la fin de la création des graphiques
# Création de la vue d'accueil
self.change_view("accueil")
def change_view(self, view: str, err_mess: str = None):
"""Fonction qui change la vue de l'interface."""
file_picker = self.file_picker
self.page.views.clear() # On supprime les vues précédentes
appbar = None # Barre d'application qui apparait en haut de la page
scroll = None # Mode de défilement
controls = [] # Contrôles de la page
on_scroll_interval = None # Intervalle de défilement
vertical_alignment = ft.MainAxisAlignment.CENTER # Alignement vertical
horizontal_alignment = ft.CrossAxisAlignment.CENTER # Alignement horizontal
# Vérification de la vue à afficher
if view == "analyse": # Vue d'analyse
if len(self.window_ori) == 0:
text = "Aucun point d'inversion trouvé."
elif len(self.window_ori) == 1:
text = (f"L'origine de réplication se trouve entre les nucléotides {format(self.ori_start[0], ',')} et"
f" {format(self.ori_end[0], ',')} dans la fenêtre {self.window_ori[0]}.")
else:
text = "Plusieurs points d'inversion ont été trouvés :"
for i in range(len(self.window_ori)):
text += (f"\n- Entre les nucléotides {format(self.ori_start[i], ',')} "
f"et {format(self.ori_end[i], ',')} dans la fenêtre "
f"{self.window_ori[i]}{';' if i < len(self.window_ori) - 1 else '.'}")
scroll = ft.ScrollMode.HIDDEN # On cache la barre de défilement
on_scroll_interval = 0
controls = [
ft.Column(
controls=[
# On affiche le graphique du ratio (G-C) / (G+C) en fonction de la fenêtre
self.chart1,
# On affiche où se trouve l'origine de réplication
ft.Text(text,
size=20),
# On affiche le graphique du chemin de la séquence d'ADN
self.chart2,
],
horizontal_alignment=ft.CrossAxisAlignment.CENTER,
spacing=0.01 * self.page.width), ]
vertical_alignment = ft.MainAxisAlignment.CENTER
appbar = ft.AppBar(leading=ft.IconButton(icon=ft.icons.ARROW_CIRCLE_LEFT,
on_click=lambda _: self.change_view("accueil"),
tooltip="Retour à l'accueil"))
elif view == "explication": # Vue d'explication
controls = [ft.Row(
controls=[
ft.Text(
value="Qu'est-ce qu'une origine de réplication ?",
size=30,
weight=ft.FontWeight.BOLD)],
vertical_alignment=ft.CrossAxisAlignment.CENTER,
),
# On affiche le texte d'explication qui est contenu dans le fichier explications_ori.txt
ft.Column(
scroll=ft.ScrollMode.AUTO,
controls=[
ft.Text(
value=open("./assets/text/explications_ori.txt", "r", encoding="UTF-8").read(),
size=20, ), ],
horizontal_alignment=ft.CrossAxisAlignment.CENTER,
alignment=ft.MainAxisAlignment.CENTER,
)]
appbar = ft.AppBar(leading=ft.IconButton(icon=ft.icons.ARROW_CIRCLE_LEFT,
on_click=lambda _: self.change_view("accueil"),
tooltip="Retour à l'accueil"),
title=ft.Text("Qu'est ce que l'origine de réplication ?", size=20),
title_spacing=0.0)
vertical_alignment = ft.MainAxisAlignment.CENTER
scroll = ft.ScrollMode.AUTO
on_scroll_interval = 0
elif view == "aide":
controls = [
ft.Column(
controls=[
ft.Text(
value="Comment utiliser PredictOri ?",
size=30,
weight=ft.FontWeight.BOLD),
ft.Text(
value="Pour utiliser PredictOri, il vous suffit de sélectionner le fichier FASTA "
"contenant le génome de la bactérie dont vous souhaitez prédire l'origine "
"de réplication.",
size=20),
],
horizontal_alignment=ft.CrossAxisAlignment.CENTER,
spacing=0.1 * self.page.width), ]
vertical_alignment = ft.MainAxisAlignment.CENTER
horizontal_alignment = ft.CrossAxisAlignment.CENTER
appbar = ft.AppBar(leading=ft.IconButton(icon=ft.icons.ARROW_CIRCLE_LEFT,
on_click=lambda _: self.change_view("accueil"),
tooltip="Retour à l'accueil"))
elif view == "accueil":
# On réinitialise les listes
self.window_ori = []
self.ori_start = []
self.ori_end = []
controls = [ft.Column(
controls=[
ft.Text(
value="Bienvenue sur PredictOri !",
size=30,
weight=ft.FontWeight.BOLD),
ft.Text(
value="Application pour prédire l'origine de réplication d'une bactérie à partir de son"
" génome.",
size=20),
# Bouton pour commencer l'analyse et qui permet la sélection du fichier FASTA
ft.FilledButton(
text="Commencer",
on_click=lambda _: file_picker.current.pick_files(
dialog_title="Sélectionnez le fichier FASTA contenant le génome de la bactérie",
file_type=ft.FilePickerFileType.CUSTOM,
allowed_extensions=["fasta"],
allow_multiple=False), ),
ft.FilledButton(
text="Qu'est-ce qu'une origine de réplication ?",
on_click=lambda _: self.change_view("explication"), ),
ft.FilledButton(
text="Comment utiliser PredictOri ?",
on_click=lambda _: self.change_view("aide"), ),
],
horizontal_alignment=ft.CrossAxisAlignment.CENTER,
spacing=0.1 * self.page.width), ]
elif view == "attente": # Vue d'attente
controls = [ft.Column(
controls=[
ft.Text(
value="Analyse en cours...",
size=30,
weight=ft.FontWeight.BOLD),
ft.ProgressRing(ref=self.progress_ring),
],
horizontal_alignment=ft.CrossAxisAlignment.CENTER,
spacing=0.1 * self.page.width), ]
vertical_alignment = ft.MainAxisAlignment.CENTER
horizontal_alignment = ft.CrossAxisAlignment.CENTER
elif view == "erreur": # Vue d'erreur
button = ft.FilledButton(
text="Voir les résultats malgré tout",
on_click=lambda _: self.change_view("analyse"), )
controls = [ft.Column(
controls=[
ft.Text(
value="Erreur",
size=30,
weight=ft.FontWeight.BOLD,
color=ft.colors.RED),
ft.Text(
value=err_mess,
size=20),
ft.FilledButton(
text="Revenir à l'accueil",
on_click=lambda _: self.change_view("accueil"), ),
button,
],
horizontal_alignment=ft.CrossAxisAlignment.CENTER,
spacing=0.1 * self.page.width), ]
vertical_alignment = ft.MainAxisAlignment.CENTER
horizontal_alignment = ft.CrossAxisAlignment.CENTER
# Ajout de la vue à la page principale
self.page.views.append(ft.View(
route=view,
appbar=appbar,
controls=controls,
horizontal_alignment=horizontal_alignment,
vertical_alignment=vertical_alignment,
scroll=scroll,
on_scroll_interval=on_scroll_interval,
))
self.page.go(view)
def file_selected(self, e, file_picker: ft.Ref[ft.FilePicker]):
"""Fonction appelée lorsqu'un fichier est sélectionné."""
if e.files is not None:
file = file_picker.current.result.files[0].path # On récupère le chemin du fichier
self.change_view("attente") # On affiche la vue d'attente
thread = threading.Thread(target=self.analyze_genome, args=(e, file)) # On crée un thread pour l'analyse
thread.start() # On démarre le thread
with self.wait_graph:
self.wait_graph.wait() # On attend la fin de la création des graphiques
thread.join()
if self.page.views[-1].route != "erreur": # Si aucune erreur n'est survenue
self.change_view("analyse") # On affiche la vue d'analyse
# PARTIE BIOLOGIE
def analyze_genome(self, e, file):
"""Fonction qui analyse le génome pour déterminer l'origine de réplication."""
if e.files is not None:
with open(file, 'r') as f:
lines = f.read().strip() # On lit le fichier FASTA en enlevant les espaces et les retours à la ligne
genome = "".join(lines.splitlines()[1:]).upper() # On récupère la séquence d'ADN à partir de la 2ème ligne
len_window = len(genome) // 175 # on crée 175 fenêtres
overlap = (len_window // 10) * 9 # on chevauche les fenêtres de 90%
non_dna = re.compile(r'[^ATCGU]') # on crée une regex pour trouver les caractères non nucléotidiques
search_non_nucl = non_dna.search(genome) # on cherche les caractères non nucléotidiques
if search_non_nucl: # si on trouve des caractères non nucléotidiques
# on affiche un message d'erreur
self.change_view("erreur", "Le fichier FASTA contient des caractères non nucléotidiques"
f" (le premier est {genome[search_non_nucl.span()[0]]} au nucléotide "
f"{search_non_nucl.span()[0]}).")
with self.wait_graph:
self.wait_graph.notify() # on notifie la fin de la création des graphiques
return # on arrête la fonction
elif "U" in genome: # si on trouve des uraciles dans la séquence
self.change_view("erreur", "Le fichier FASTA est une séquence d'ARN. Une séquence d'ADN est requise.")
with self.wait_graph:
self.wait_graph.notify() # on notifie la fin de la création des graphiques
return # on arrête la fonction
i = 0 # on initialise l'indice de la fenêtre
ratio = [] # on crée une liste pour stocker les ratios (G-C) / (G+C)
self.progress_ring.current.value = 0
self.page.update()
while True:
end_window = min(i + len_window, len(genome)) # on détermine la fin de la fenêtre
window = genome[i:end_window] # on récupère la fenêtre
nb_g = window.count('G') # on compte le nombre de G
nb_c = window.count('C') # on compte le nombre de C
try:
ratio.append((nb_g - nb_c) / (nb_g + nb_c)) # on ajoute le ratio (G-C)/(G+C) à la liste
except ZeroDivisionError: # s'il y a une division par zéro
self.change_view("erreur", "Division par zéro. Vérifiez que le fichier FASTA contient des G et C. "
"Si c'est le cas, veuillez modifier la taille de la fenêtre ou du "
"chevauchement.")
with self.wait_graph:
self.wait_graph.notify() # on notifie la fin de la création des graphiques
return # on arrête la fonction
if min(i + len_window, len(genome)) == len(genome): # si on est à la fin de la séquence
break # on arrête la boucle
i += overlap # on avance la fenêtre
self.progress_ring.current.value = (i / len(genome)) / 2 # on met à jour la barre de progression
self.page.update() # on met à jour la page
self.progress_ring.current.value = None
self.page.update()
fig1, ax1 = plt.subplots(figsize=(15, 6)) # on crée une figure pour le graphique affichant les ratios en
# fonction de la fenêtre
ax1.plot(ratio, linewidth=0.5) # on affiche les ratios
list_invert_point = self.find_inversion_point(ratio) # on cherche le point d'inversion
for invert_point in self.find_inversion_point(ratio): # pout chaque point d'inversion trouvé
self.window_ori.append(invert_point) # on ajoute la fenêtre contenant le point d'inversion
self.ori_start.append(overlap * invert_point) # on ajoute le nucléotide du début de la fenêtre
self.ori_end.append(overlap * invert_point + len_window) # on ajoute le nucléotide de la fin de la
# fenêtre
ax1.axhline(y=0, color='r') # on affiche une ligne rouge pour mieux visualiser le point d'inversion
ax1.scatter(invert_point, 0, color='red', zorder=5) # on affiche le point d'inversion
ax1.axvline(invert_point, linestyle='--', color='r', ymax=0.5) # on affiche une ligne allant de
# l'axe x au point d'inversion
# on ajoute une annotation pour indiquer le point d'inversion
ax1.annotate(f'Fenêtre du point d\'inversion: {invert_point}', xy=(invert_point, 0),
xytext=(invert_point, -0.25),
color='red', ha='center', va='top')
val_max_y = max(abs(min(ratio)), abs(max(ratio))) # on détermine la valeur absolue maximale de y
ax1.set_ylim(-val_max_y, val_max_y) # on définit les limites de y
ax1.set_title('Ratio (G-C) / (G+C) en fonction de la fenêtre') # on ajoute un titre
ax1.set_xlabel('Window') # on ajoute un titre à l'axe x
ax1.set_ylabel('Ratio (G-C) / (G+C)') # on ajoute un titre à l'axe y
self.chart1.figure = fig1
seq_length = len(genome) # on récupère la longueur de la séquence
window = 0 # on initialise la fenêtre
x_values = [0] # La position initiale sur l'axe x
y_values = [0] # La position initiale sur l'axe y
self.progress_ring.current.value = 0.5
self.page.update()
while window <= seq_length: # tant que la fenêtre est inférieure à la longueur de la séquence
nba = nbc = nbg = nbt = 0 # on initialise les compteurs pour les nucléotides
nb = 0 # on initialise le compteur pour les nucléotides
for i in range(window, min(window + len_window, seq_length)): # on parcourt la fenêtre
nb += 1 # on incrémente le compteur de nucléotides
base = genome[i]
if base == 'A': # si le nucléotide est A
nba += 1
elif base == 'C': # si le nucléotide est C
nbc += 1
elif base == 'G': # si le nucléotide est G
nbg += 1
elif base == 'T': # si le nucléotide est T
nbt += 1
nb_steps_x = nbc - nbg # on détermine le nombre de pas sur l'axe x
nb_steps_y = nba - nbt # on détermine le nombre de pas sur l'axe y
x_end_segment = nb_steps_x * nb # on détermine la longueur du segment sur l'axe x
y_end_segment = nb_steps_y * nb # on détermine la longueur du segment sur l'axe y
x_values.append(x_values[-1] + x_end_segment) # on ajoute la nouvelle coordonnée x
y_values.append(y_values[-1] + y_end_segment) # on ajoute la nouvelle coordonnée y
window += len_window # on avance la fenêtre
self.progress_ring.current.value = 0.5 + (window / seq_length) / 2
self.page.update()
self.progress_ring.current.value = None
self.page.update()
fig2, ax2 = plt.subplots(figsize=(15, 6)) # on crée une figure pour le graphique du chemin de la séquence
cusps = self.find_cusp(x_values, y_values) # on cherche les points de rebroussement
for cusp in cusps: # pour chaque point de rebroussement
ax2.scatter(cusp[0], cusp[1], color='red', zorder=5, label="point de rebroussement") # on affiche le
# point de rebroussement
custom_legend = [
matplotlib.lines.Line2D([0], [0], marker='o', color='w', markerfacecolor='red', markersize=10,
label='Point de rebroussement')]
# Ajouter la légende personnalisée
plt.legend(handles=custom_legend)
ax2.plot(x_values, y_values, linestyle='-') # on affiche le chemin de la séquence
ax2.set_xlabel('Horizontal Direction') # on ajoute un titre à l'axe x
ax2.set_ylabel('Vertical Direction') # on ajoute un titre à l'axe y
ax2.set_title('DNA Sequence Graph') # on ajoute un titre
ax2.grid() # on affiche la grille
self.chart2.figure = fig2
if len(list_invert_point) > 1: # si on trouve plusieurs points d'inversion
# on affiche un message d'erreur
self.change_view("erreur", "Plusieurs points d'inversion trouvés. Veuillez modifier la taille "
"de la fenêtre ou du chevauchement. Si le problème persiste, "
"le point d'inversion ne peut être trouvé par l'algorithme utilisé.")
elif len(list_invert_point) == 0: # si on ne trouve pas de point d'inversion
# on affiche un message d'erreur
self.change_view("erreur", "Aucun point d'inversion trouvé. Veuillez modifier la taille de la fenêtre "
"ou du chevauchement. Si le problème persiste, le point d'inversion ne peut "
"être trouvé par l'algorithme utilisé.", )
elif len(cusps) == 0: # si on ne trouve pas de point de rebroussement
self.change_view("erreur", "Aucun point d'inversion trouvé. Veuillez modifier la taille "
"de la fenêtre ou du chevauchement. Si le problème persiste, "
"le point d'inversion ne peut être trouvé par l'algorithme utilisé.")
elif len(cusps) > 1: # si on trouve plusieurs points de rebroussement
self.change_view("erreur", "Plusieurs points d'inversions trouvés. Veuillez modifier la taille "
"de la fenêtre ou du chevauchement. Si le problème persiste, "
"le point d'inversion ne peut être trouvé par l'algorithme utilisé.")
with self.wait_graph:
self.wait_graph.notify() # on notifie la fin de la création des graphiques
@staticmethod
def find_inversion_point(ratios):
"""Fonction qui trouve le point d'inversion dans une liste de ratios (G-C) / (G+C)."""
invert_point = []
if ratios[0] < 0: # si le premier ratio est négatif
sens = 1 # on veut que le ratio soit positif
else: # sinon
sens = -1 # on veut que le ratio soit négatif
for i in range(1, len(ratios)):
nb_ratio_inversions = 0 # nombre de ratios qui respectent la condition d'être de signe opposé au ratio
# initial
if sens == 1 and ratios[i] > 0: # si on veut que le ratio soit positif et que le ratio est positif
for j in range(i + 1, min(i + 11, len(ratios))): # on regarde les 10 ratios suivants
if ratios[j] > 0: # si le ratio est positif
nb_ratio_inversions += 1 # on incrémente le nombre de ratios positifs
if nb_ratio_inversions >= 8: # si on a trouvé au moins 8 ratios positifs
invert_point.append(i) # on a trouvé le point d'inversion
sens = -1 # on veut que le ratio soit négatif
elif sens == -1 and ratios[i] < 0: # si on veut que le ratio soit négatif et que le ratio est négatif
for j in range(i + 1, min(i + 11, len(ratios))): # on regarde les 10 ratios suivants
if ratios[j] < 0: # si le ratio est négatif
nb_ratio_inversions += 1 # on incrémente le nombre de ratios négatifs
if nb_ratio_inversions >= 8: # si on a trouvé au moins 8 ratios négatifs
invert_point.append(i) # on a trouvé le point d'inversion
sens = 1 # on veut que le ratio soit positif
return invert_point
@staticmethod
def find_cusp(x_values, y_values):
"""Fonction qui trouve le point de rebroussement dans une liste de coordonnées."""
cusp = []
# on se base sur les 100 premiers nucléotides pour déterminer la direction initiale
if x_values[100] < 0 and y_values[100] < 0:
direction = "sud-ouest"
elif x_values[100] < 0 < y_values[100]:
direction = "nord-ouest"
elif x_values[100] > 0 > y_values[100]:
direction = "sud-est"
else:
direction = "nord-est"
for i in range(1, len(x_values)): # on parcourt les coordonnées
count = 0 # compteur pour les 10 prochaines coordonnées
for j in range(i + 1, min(i + 11, len(x_values))): # on regarde les 10 coordonnées suivantes
if direction == "sud-ouest": # si la direction est sud-ouest
if x_values[j] > x_values[i] and y_values[j] > y_values[i]: # si les coordonnées suivantes sont
# dans le sens opposé à la direction
count += 1 # on incrémente le compteur
elif direction == "nord-ouest":
if x_values[j] > x_values[i] and y_values[j] < y_values[i]:
count += 1
elif direction == "sud-est":
if x_values[j] < x_values[i] and y_values[j] > y_values[i]:
count += 1
else:
if x_values[j] < x_values[i] and y_values[j] < y_values[i]:
count += 1
if count == 10: # si les 10 coordonnées suivantes sont dans le sens opposé à la direction
cusp.append((x_values[i], y_values[i])) # on a trouvé un point de rebroussement
try: # on détermine la nouvelle direction à partir des 10 prochaines coordonnées
if x_values[i + 10] < x_values[i] and y_values[i + 10] < y_values[i]:
direction = "sud-ouest"
elif x_values[i + 10] < x_values[i] and y_values[i + 10] > y_values[i]:
direction = "nord-ouest"
elif x_values[i + 10] > x_values[i] and y_values[i + 10] < y_values[i]:
direction = "sud-est"
else:
direction = "nord-est"
except IndexError:
break
return cusp # on retourne la liste des points de rebroussement
ft.app(Interface)