-
Notifications
You must be signed in to change notification settings - Fork 10
/
cobra.py
2291 lines (2159 loc) · 87.9 KB
/
cobra.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# Author: LinXing Chen, UC Berkeley
# cobra v1.2.3
# Contig Overlap Based Re-Assembly
# Modification date: Sep 3, 2023
import argparse
import concurrent.futures
import itertools
import os
from collections import defaultdict
from pathlib import Path
import sys
from time import strftime
from typing import Callable, Iterable, Literal, NamedTuple, TextIO, TypeVar
import pandas as pd
import pysam
from Bio import SeqIO
from Bio.Seq import Seq
T = TypeVar("T")
try:
from tqdm import tqdm
except ImportError:
def tqdm(__object: Iterable[T], *nargs, **kwargs) -> Iterable[T]:
return iter(__object)
try:
from Bio.SeqUtils import gc_fraction as GC
except ImportError:
from Bio.SeqUtils import GC # type: ignore
def parse_args(args=None):
parser = argparse.ArgumentParser(
description="This script is used to get higher quality (including circular) virus genomes "
"by joining assembled contigs based on their end overlaps."
)
requiredNamed = parser.add_argument_group("required named arguments")
requiredNamed.add_argument(
"-q",
"--query",
type=str,
help="the query contigs file (fasta format), or the query name "
"list (text file, one column).",
required=True,
)
requiredNamed.add_argument(
"-f",
"--fasta",
type=str,
help="the whole set of assembled contigs (fasta format).",
required=True,
)
requiredNamed.add_argument(
"-a",
"--assembler",
type=str,
choices=["idba", "megahit", "metaspades"],
help="de novo assembler used, COBRA not tested for others.",
required=True,
)
requiredNamed.add_argument(
"-mink",
"--mink",
type=int,
help="the min kmer size used in de novo assembly.",
required=True,
)
requiredNamed.add_argument(
"-maxk",
"--maxk",
type=int,
help="the max kmer size used in de novo assembly.",
required=True,
)
requiredNamed.add_argument(
"-m",
"--mapping",
type=str,
help="the reads mapping file in sam or bam format.",
required=True,
)
requiredNamed.add_argument(
"-c",
"--coverage",
type=str,
help="the contig coverage file (two columns divided by tab).",
required=True,
)
parser.add_argument(
"-lm",
"--linkage_mismatch",
type=int,
default=2,
help="the max read mapping mismatches for "
"determining if two contigs are spanned by "
"paired reads. [2]",
)
parser.add_argument(
"-tr",
"--trim_readno",
type=str,
choices=["no", "trim", "auto"],
default="no",
help='whether trim the suffix `/1` and `/2` that distinguish paired reads or not ["no"]',
)
parser.add_argument(
"--skip_joining",
action="store_true",
help="whether skip joining contigs output to disk",
)
parser.add_argument(
"-o",
"--output",
type=str,
help="the name of output folder (default = '<query>_COBRA').",
)
parser.add_argument(
"-t",
"--threads",
type=int,
default=16,
help="the number of threads for blastn. [16]",
)
parser.add_argument("-v", "--version", action="version", version="cobra v1.2.3")
return parser.parse_args(args)
def get_log_info(steps: int, desc="COBRA Steps", log_file: TextIO | None = None):
def steps_range():
step = [0]
for i in tqdm(range(steps), desc=desc):
if i:
yield step[0]
step[0] += 1
while True:
yield step[0]
step[0] += 1
def _log_info(description: str, end="\n"):
print(
step_format_str.format(next(stepin)),
strftime("[%Y/%m/%d %H:%M:%S]"),
description,
end=end,
file=log_file,
flush=True,
)
step_format_str = "[{0:0>" + f"{len(str(steps))}" + "}/" + f"{steps}]"
stepin = steps_range()
print(desc, file=log_file, flush=True)
return _log_info
# define functions for analyses
def compre_sets(p: int, li: set, lj: set):
if p < 0:
return ""
if p == 0:
return f"{len(li)}"
i, j, ij = len(li), len(lj), len(li & lj)
if ij:
if i == ij:
if j == ij:
return f"=="
return ">"
if j == ij:
return "<"
return f"{ij}"
def end2contig(end: str):
"""
get contig name from end name
"""
base_name, suffix = end.rsplit("_", 1)
return base_name if suffix in ("L", "R", "Lrc", "Rrc") else end
def end2end2(end: str):
"""
to get the target for next run of joining
"""
base_name, suffix = end.rsplit("_", 1)
if suffix == "Lrc":
return base_name + "_Rrc"
elif suffix == "Rrc":
return base_name + "_Lrc"
elif suffix == "R":
return base_name + "_L"
elif suffix == "L":
return base_name + "_R"
raise ValueError(f"Unexpected {suffix=}")
def detect_self_circular(
contig: str,
one_path_end: frozenset[str],
two_paths_end: frozenset[str],
link_pair: dict[str, list[str]],
):
"""
to determine the input sequences if they are self_circular genomes or not
"""
end = contig + "_L"
if end in one_path_end:
if link_pair[end][0] == contig + "_R":
# the other end could be joined with the current working end
# >contig
# |----->...|----->
# |- L ->
# |- R ->
# |----->...|----->
# >contig
# |----->.*.*.*
# >other contigs ends with `|----->` <
#
return "one_path_end"
elif end in two_paths_end:
if contig + "_R" in link_pair[end]:
# the other end could be joined with the current working end
# >contig
# |----->...|----->
# |- L ->
# |- R ->
# |----->...|----->
# >contig
# |----->...|----->
# >the other contig ends with `|----->` <
#
if (
link_pair[end][0].rsplit("_", 1)[0]
== link_pair[end][1].rsplit("_", 1)[0]
):
# >contig
# |----->...|----->
# |- L ->
# |- R ->
# |----->...|----->
# contig< | FIXME: Why this happened?
# |~Rrc~>...|~Lrc~> |
# |~Rrc~>...|~Lrc~> |
print(
"Warning: Unexpected self-multipile circular: ", contig, flush=True
)
print(" : from", end, "to", link_pair[end], flush=True)
return ""
return "two_paths_end"
return ""
def check_self_circular_soft(sequence: Seq, min_over_len: int):
"""check overlap shorter than maxK, while not shorter than mink"""
end_part = sequence[-min_over_len:]
if sequence.count(end_part) > 1:
expected_end = sequence.split(end_part, 1)[0] + end_part
if sequence.endswith(expected_end):
return len(expected_end)
return -1
def get_contig2join(
query_set: frozenset[str],
orphan_query: frozenset[str],
contig_pe_links: frozenset[tuple[str, str]],
contig2cov: dict[str, float],
link_pair: dict[str, list[str]],
one_path_end: frozenset[str],
two_paths_end: frozenset[str],
self_circular: frozenset[str],
):
contig2join: dict[str, list[str]] = {}
contig_checked: dict[str, list[str]] = {}
contig2join_reason: dict[str, dict[str, str]] = {}
for contig in query_set:
contig2join[contig + "_L"] = []
contig2join[contig + "_R"] = []
contig_checked[contig + "_L"] = []
contig_checked[contig + "_R"] = []
contig2join_reason[contig] = {contig: "query"}
path_circular_end: set[str] = set()
def other_end_is_extendable(end: str):
"""True if the other end is extendable"""
return (
end2contig(end) not in self_circular and len(link_pair[end2end2(end)]) > 0
)
def check_2path_to_add(end1: str, end2: str, contig: str):
if link2_1contig(end1, end2):
# >contig2<
# |----->.*. |=====>
# >contig< |=====>.*.*.*
# .*.*.*|-----> >contig4<
# |-end-|
# |-----> *.*|=====>
# >contig3<
# no more contigs starts with `|----->` or ends with `|=====>`
#
checked_reason = "are_equal_paths"
link_pair_do = the_dominant_one(end1, end2)
elif (
contig2cov[end2contig(end1)] + contig2cov[end2contig(end2)]
>= contig2cov[contig] * 0.5
):
# 0.5 is ok, too big will get much fewer "Extended circular" ones.
# choise the one with more similar abundance
checked_reason = "the_better_one"
link_pair_do = the_better_one((end1, end2), contig)
else:
return "", ""
if end2contig(link_pair_do) in self_circular:
return "", ""
return checked_reason, link_pair_do
def check_1path_to_add(end: str, contig: str):
"""
if the other end of a potential joining contig cannot be extended,
add only when it has a very similar coverage to that of the query contig
"""
# 2.1. link_pair1 can be extend in the next step
if other_end_is_extendable(end):
return "other_end_is_extendable"
# however, it CANNOT be extend in the next step
# 1. link_pair1 is not point to a self-circulated contig
if end2contig(end) in self_circular:
return ""
# 2.2. link_pair1 has similar coverage with contig
if (
contig2cov[contig] # don't div zero
and 0.9 <= contig2cov[end2contig(end)] / contig2cov[contig] <= 1.11
):
return "final_similar_cov"
return ""
def link2_1contig(end1: str, end2: str):
"""
True if two paths both links to the only one same contig
.. code-block::
|-end1-| .*.*.*|-----> end1_2pair
|-end2-| *.*.*.|-----> end2_2pair
|----->.*.*.*
* it means only one end matched by both pairs
"""
end1_2pairs = link_pair[end2end2(end1)]
end2_2pairs = link_pair[end2end2(end2)]
return (
len(end1_2pairs) == 1
and len(end2_2pairs) == 1
and end2contig(end1_2pairs[0]) == end2contig(end2_2pairs[0])
)
def the_dominant_one(path1: str, path2: str):
"""the paths with higher abundance"""
return max(path1, path2, key=lambda x: contig2cov[end2contig(x)])
def the_better_one(paths: Iterable[str], contig: str):
"""the path with more similar coverage"""
return min(
paths, key=lambda x: abs(contig2cov[contig] - contig2cov[end2contig(x)])
)
def could_circulate(
end: str,
contig: str,
direction: Literal["L", "R"],
):
"""True if the path is circular with the current contig included"""
contig_pair = ""
other_direction = {"L": "_R", "R": "_L"}[direction]
if end in two_paths_end: # point is the same as "target" in join_walker
link_pair1, link_pair2 = link_pair[end]
if end2contig(link_pair1) == contig:
contig_pair = link_pair1
elif end2contig(link_pair2) == contig:
contig_pair = link_pair2
# 2 times is for repeat, but it is too risky, use 1.5 instead (same below)
if contig_pair and 1.5 * contig2cov[contig] <= contig2cov[end2contig(end)]:
contig_pair = ""
elif end in one_path_end:
(link_pair1,) = link_pair[end]
if end2contig(link_pair1) == contig:
contig_pair = link_pair1
return (
contig_pair
and contig_pair.endswith(other_direction)
and (contig + other_direction, end) in contig_pe_links
)
def not_checked(end_list: Iterable[str], checked: list[str]):
"""True if all contig in end_list has been checked for adding or not"""
return not any(end2contig(end) in checked for end in end_list)
def join_walker(contig: str, direction: Literal["L", "R"]):
"""
get potential joins for a given query
"""
end0 = f"{contig}_{direction}"
len_before_walk = len(contig2join[end0])
if len_before_walk == 0:
contig_checked[end0].append(contig)
if end0 in one_path_end:
end1 = link_pair[end0][0]
if end2contig(end1) != contig:
checked_reason = check_1path_to_add(end1, contig)
if checked_reason and (end0, end1) in contig_pe_links:
# 3. linkage between link_pair1 and end is supported by reads linkage
contig2join[end0].append(end1)
contig_checked[end0].append(end2contig(end1))
contig2join_reason[contig][end2contig(end1)] = checked_reason
elif end0 in two_paths_end:
# >contig<
# |-end-|
# .*.*.*|----->
# |----->.*.*.*
# >contig2<
# |----->.*.*.*
# >contig3<
# no more contigs starts with `|----->`
#
end1, end2 = link_pair[end0]
if end2contig(end1) != end2contig(end2):
checked_reason, end2add = check_2path_to_add(end1, end2, contig)
if checked_reason:
contig2join[end0].append(end2add)
contig_checked[end0].append(end2contig(end1))
contig_checked[end0].append(end2contig(end2))
contig2join_reason[contig][end2contig(end2add)] = checked_reason
# TODO: also record the discarded one
else:
end = end2end2(contig2join[end0][-1])
if end in one_path_end:
end1 = link_pair[end][0]
if checked := not_checked([end1], contig_checked[end0]):
# inherent contig_name(link_pair1) != contig
# the same as ablove
checked_reason = check_1path_to_add(end1, contig)
if checked_reason and (end, end1) in contig_pe_links:
contig2join[end0].append(end1)
contig_checked[end0].append(end2contig(end1))
contig2join_reason[contig][end2contig(end1)] = checked_reason
elif end in two_paths_end:
end1, end2 = link_pair[end]
if checked := end2contig(end1) != end2contig(end2):
# >contig<
# .*.|----->
# >contig of target<
# |----->*.*.*.|----->
# |-----| target
# |----->.*.*.*
# >contig2<
# |----->.*.*.*
# >contig3<
# no more contigs starts with `|----->`
#
if (
not_checked([end1, end2], contig_checked[end0])
and contig2cov[end2contig(end)] < 1.9 * contig2cov[contig]
):
# given a contig
# |-query contig->[repeat region]|-contig2->[repeat region]|-contig3->
# where len(repeat region) > maxk, then the linkage looks like:
# |-query contig->[repea> | 1* cov
# [repeat region] | 2* cov
# |egion]|-contig2->[repea> | 1* cov
# |egion]|-contig3-> | 1* cov
# where we can only confirm:
# |-query contig->[repeat region]
# If we query the [repeat region], then never mind.
#
checked_reason, end2add = check_2path_to_add(end1, end2, contig)
if checked_reason:
contig2join[end0].append(end2add)
contig_checked[end0].append(end2contig(end1))
contig_checked[end0].append(end2contig(end2))
contig2join_reason[contig][
end2contig(end2add)
] = checked_reason
else:
checked = False
if checked and could_circulate(end, contig, direction):
# 1. target is extendable (the only link_pair here)
path_circular_end.add(end0)
contig2join_reason[contig][end2contig(end)] = "could_circulate"
# In the first walk, path will never add it self
# In the following walks, paths will never be duplicated.
return len_before_walk < len(contig2join[end0])
for contig in tqdm(
query_set - (orphan_query | self_circular),
desc="Detecting joins of contigs. ",
):
# extend each contig from both directions
while join_walker(contig, "L"):
pass
while join_walker(contig, "R"):
pass
return contig2join, contig2join_reason, path_circular_end
def get_contig2assembly(contig2join: dict[str, list[str]], path_circular_end: set[str]):
contig2assembly: dict[str, set[str]] = {}
for item in contig2join:
if len(contig2join[item]) == 0:
continue
contig = end2contig(item)
# detect extented query contigs
if contig not in contig2assembly:
contig2assembly[contig] = {contig}
if (
contig + "_L" in path_circular_end
and contig + "_R" not in path_circular_end
):
# here, only one path can be found to be circulated.
contig2assembly[contig].update(
(end2contig(i) for i in contig2join[contig + "_L"])
)
elif (
contig + "_L" not in path_circular_end
and contig + "_R" in path_circular_end
):
contig2assembly[contig].update(
(end2contig(i) for i in contig2join[contig + "_R"])
)
else:
contig2assembly[contig].update((end2contig(i) for i in contig2join[item]))
return contig2assembly
def join_seqs(
contig: str,
contig2join: dict[str, list[str]],
path_circular_end: set[str],
):
"""
get the join order of the sequences in a give path
"""
left, right = contig + "_L", contig + "_R"
order_all_ = [*reversed(contig2join[left]), contig]
if left not in path_circular_end:
# check if path is circular
# then we can add right to it
# TODO+FIXME: Compared with contig2assembly,
# 1. what's the problem if path_circular_end in the middle of some path?
# 2. what's the problem if some contig exist at both wind of a query?
if contig2join[left] and contig2join[right]:
# only when we add those in both contig2join[left] and contig2join[right]
# and drop duplicates
added_to_ = {end2contig(item) for item in order_all_}
order_all_ += [
item for item in contig2join[right] if end2contig(item) not in added_to_
]
else:
# either contig2join[left] or contig2join[right] is blank, just add one side is necessary
order_all_ += contig2join[right]
# elif left in path_circular:
# not extend right to order_all_ and just return
return order_all_
def seqjoin2contig(seqjoin: Iterable[str]):
used_queries = set()
for item in seqjoin:
contig = end2contig(item)
if contig not in used_queries:
yield item
used_queries.add(contig)
def summary_fasta(
fasta_file: str,
length: int,
contig2cov: dict[str, float],
self_circular: frozenset[str],
self_circular_flexible_overlap: dict[str, int],
):
"""
summary basic information of a fasta file
"""
summary_file_headers = ["SeqID", "Length", "Coverage", "GC", "Ns"]
if "self_circular" in fasta_file:
summary_file_headers.append("DTR_length")
with open(f"{fasta_file}.summary.tsv", "w") as so:
print(*summary_file_headers, sep="\t", file=so)
record: SeqIO.SeqRecord
for record in SeqIO.parse(f"{fasta_file}", "fasta"):
header = str(record.id).strip()
seq: Seq = record.seq
sequence_stats = [
header,
str(len(seq)),
"",
str(round(GC(seq), 3)),
str(seq.count("N")),
]
if "self_circular" in fasta_file:
sequence_stats[2] = str(contig2cov[header.split("_self")[0]])
if header.split("_self")[0] in self_circular:
sequence_stats.append(str(length))
else:
# header.split("_self")[0] in self_circular_flexible_overlap:
sequence_stats.append(
str(self_circular_flexible_overlap[header.split("_self")[0]])
)
else:
sequence_stats[2] = str(contig2cov[header.split("_extended")[0]])
print(*sequence_stats, sep="\t", file=so)
def get_direction(item: str):
"""
get the direction in a joining path
"""
return "reverse" if item.endswith("rc") else "forward"
def total_length(contig_list: list[str], contig2len: dict[str, int]):
"""
get the total length of all sequences in a joining path before overlap removing
"""
return sum(contig2len[end2contig(item)] for item in contig_list)
def get_link_pair(contig2seq: dict[str, Seq], maxk: int):
d_L: dict[Seq, set[str]] = defaultdict(set)
d_Lrc: dict[Seq, set[str]] = defaultdict(set)
d_R: dict[Seq, set[str]] = defaultdict(set)
d_Rrc: dict[Seq, set[str]] = defaultdict(set)
#
for header, seq in tqdm(
contig2seq.items(), desc=f"Buiding graph based on {maxk}-mer end"
):
# >contig
# >>>>>>>...>>>>>>>
# |- L -> |- R ->
# <~Lrc~| <~Rrc~|
# <<<<<<<***<<<<<<<
d_L[seq[:maxk]].add(header + "_L")
d_Lrc[seq[:maxk].reverse_complement()].add(header + "_Lrc")
d_R[seq[-maxk:]].add(header + "_R")
d_Rrc[seq[-maxk:].reverse_complement()].add(header + "_Rrc")
# get the shared seqs between direction pairs (L/Lrc, Lrc/L, L/R, R/L, R/Rrc, Rrc/R, Lrc/Rrc, Rrc/Lrc)
link_pair: dict[str, list[str]] = defaultdict(list)
# >contig1
# >>>>>>>...>>>>>>>
# |- L ->
# |~Lrc~>
# >>>>>>>***>>>>>>>
# rc_contig2<
# contig1_L - contig2_Lrc == contig1_Lrc - contig2_L
for end in set(d_L) & set(d_Lrc):
for left in d_L[end]: # left is a seq name
link_pair[left].extend(d_Lrc[end])
for left_rc in d_Lrc[end]:
link_pair[left_rc].extend(d_L[end])
# >contig1
# >>>>>>>...>>>>>>>
# |- L ->
# |- R ->
# >>>>>>>...>>>>>>>
# >contig2
# contig1_L - contig2_R == contig1_Lrc - contig2_Rrc
for end in set(d_L) & set(d_R):
for left in d_L[end]:
link_pair[left].extend(d_R[end])
for right in d_R[end]:
link_pair[right].extend(d_L[end])
# the d_R_d_L_shared will be included below
# >contig1
# >>>>>>>...>>>>>>>
# |- R ->
# |~Rrc~>
# >>>>>>>***>>>>>>>
# rc_contig2<
# contig1_R - contig2_Rrc == contig1_Rrc - contig2_R
for end in set(d_R) & set(d_Rrc):
for right in d_R[end]:
link_pair[right].extend(d_Rrc[end])
for right_rc in d_Rrc[end]:
link_pair[right_rc].extend(d_R[end])
# equals to | >contig1
# rc_contig1< | >>>>>>>...>>>>>>>
# >>>>>>>***>>>>>>> | |- L ->
# |~Lrc~> | |- R ->
# |~Rrc~> | >>>>>>>...>>>>>>>
# >>>>>>>***>>>>>>> | >contig2
# rc_contig2< | the reverse_complement one
for end in set(d_Rrc) & set(d_Lrc):
for right_rc in d_Rrc[end]:
link_pair[right_rc].extend(d_Lrc[end])
for left_rc in d_Lrc[end]:
link_pair[left_rc].extend(d_Rrc[end])
return link_pair
def check_Y_paths(
link_pair: dict[str, list[str]],
logfile: Path | None,
):
one_path_end: set[str] = set() # the end of contigs with one potential join
two_paths_end: set[str] = set() # the end of contigs with two potential joins
if logfile:
p = open(logfile, "w")
for item in tqdm(sorted(link_pair), desc="Collect one_path_end and two_path_end"):
if len(link_pair[item]) == 1: # and len(link_pair[link_pair[item][0]]) > 1:
# | >contig3 or more contigs ends with `|----->` < |
# | .*.*.*|-----> |
# >contig1<
# .*.*.*|----->
# |----->.*.*.*
# >contig2<
# | no more contigs starts with `|----->` < |
#
# add one joining end to a list, its pair may have one or more joins
one_path_end.add(item)
elif (
len(link_pair[item]) == 2
and len(link_pair[link_pair[item][0]]) == 1
and len(link_pair[link_pair[item][1]]) == 1
):
# | no more contigs ends with `|----->` < |
# >contig1<
# .*.*.*|----->
# |----->.*.*.*
# >contig2<
# |----->.*.*.*
# >contig3<
# | no more contigs starts with `|----->` < |
#
# add two joining end to a list, each of its pairs should only have one join
two_paths_end.add(item)
# print link pairs into a file for check if interested
if logfile:
print(item, *sorted(link_pair[item]), sep="\t", file=p)
return frozenset(one_path_end), frozenset(two_paths_end)
def get_cov(coverage_file: Path):
contig2cov: dict[str, float] = {} # the coverage of contigs
with open(coverage_file) as coverage:
# Sometimes will give a file with header, just ignore it once
for line in coverage:
header_cov, cov_value = line.strip().split("\t")[:2]
try:
contig2cov[header_cov] = float(cov_value)
except ValueError:
pass
break
for line in coverage:
header_cov, cov_value = line.strip().split("\t")[:2]
contig2cov[header_cov] = float(cov_value)
return contig2cov
def get_query_set(query_fa: Path, uniset: dict[str, T]):
query_set: set[str] = set()
with open(query_fa) as f:
first_line = next(f)
if first_line.startswith(">"):
# if the query file is in fasta format
query_header = lambda x: (
str(i.id).strip() for i in SeqIO.parse(x, "fasta")
)
else:
# if the query file is in text format
query_header = lambda x: (line.strip().split(" ")[0] for line in x)
with open(query_fa) as query_file:
for header in query_header(query_file):
# some queries may not in the whole assembly, should be rare though.
if header in uniset:
query_set.add(header)
else:
print(
f"Query {header} is not in your whole contig fasta file, please check!",
flush=True,
)
return frozenset(query_set)
def get_pe_links(
mapping_file: Path,
trim_readno: Literal["no", "trim", "auto"],
contig2len: dict[str, int],
orphan_set: frozenset[str],
linkage_mismatch: int = 2,
):
"""
contig_pe_links: paired linkage supported by bam file
orphan_pe_spanned:
contig without kmer with other contigs, and the end may be spanned by paired-end reads
"""
if trim_readno == "auto":
with pysam.AlignmentFile(f"{mapping_file}", "rb") as map_file:
for rmap in map_file:
if rmap.query_name is not None:
if rmap.query_name[-2:] in ("/1", "/2"):
trim_readno = "trim"
else:
trim_readno = "no"
break
parse_readid: Callable[[str], str] = (
(lambda x: x) if trim_readno == "no" else lambda x: x[:-2]
)
linkage: dict[str, set[str]] = defaultdict(set)
orphan2pe_span: dict[str, dict[str, list[int]]] = {
contig: defaultdict(list) for contig in orphan_set
}
with pysam.AlignmentFile(f"{mapping_file}", "rb") as map_file:
for rmap in tqdm(
map_file,
desc="Getting contig linkage based on sam/bam. Be patient, this may take long",
):
if rmap.is_unmapped:
continue
assert rmap.query_name is not None
assert rmap.reference_name is not None
if linkage_mismatch < int(rmap.get_tag("NM")):
continue
if rmap.reference_name != rmap.next_reference_name:
# Check if the read and its mate map to different contigs
# get name just before use it to speed up
rmap_readid = parse_readid(rmap.query_name)
if contig2len[rmap.reference_name] > 1000:
# determine if the read maps to the left or right end
# >contig1
# >>>>>>>>>>>>>>>>>>>>>>>>...>
# | | |
# ^- 0 ^- 500 ^- 1000
# +++++++ ... | . | linkage[read_i].add(contig1_L)
# ... +++++++ . | linkage[read_i].add(contig1_L)
# |+++++++ ... | | linkage[read_i].add(contig1_R)
# ... +++++++ | linkage[read_i].add(contig1_R)
# @read_i
#
linkage[rmap_readid].add(
rmap.reference_name
+ ("_L" if rmap.reference_start <= 500 else "_R")
)
else:
# >contig1
# >>>>>>>>>>>>>>>>>>>>...>
# | | |
# ^- 0 ^- 500 ^- 1000
# +++++++ ... | linkage[read_i].add(contig1_L, contig1_R)
# ... +++++++ | linkage[read_i].add(contig1_L, contig1_R)
# @read_i
#
# add both the left and right ends to the linkage
linkage[rmap_readid].add(rmap.reference_name + "_L")
linkage[rmap_readid].add(rmap.reference_name + "_R")
else:
# If the read and its mate map to the same contig, store the read mapped position (start)
if rmap.reference_name in orphan_set:
# >contig1, normally > 1000 bp
# >>>>>>>>>>>>>>......>>>>>>>>>>>>>
# | | ...... | |
# ^- 0 ^- 500 ^- -500 ^- -0
# +++++++ ... | . | orphan2pe_span[contig1][read_i].append(0)
# ... +++++++ . | orphan2pe_span[contig1][read_i].append(499)
# |+++++++ | |
# +++++++ |
# +++++++ | orphan2pe_span[contig1][read_i].append(-500)
# @read_i/1
# +++++++
# @read_i/2
#
# add both the left and right ends to the linkage
# only care about those in query
if (
rmap.reference_start <= 500
or contig2len[rmap.reference_name] - rmap.reference_start <= 500
):
orphan2pe_span[rmap.reference_name][
parse_readid(rmap.query_name)
].append(rmap.reference_start)
contig_pe_links: set[tuple[str, str]] = set()
for read in tqdm(linkage, desc="Parsing the linkage information"):
# len(linkage[read]) in (1, 2, 3, 4)
if len(linkage[read]) >= 2: # Process only reads linked to at least two contigs
for item, item_1 in itertools.combinations(linkage[read], 2):
# Generate unique pairs of linked contigs for the current read using itertools.combinations
if item.rsplit("_", 1)[1] != item_1.rsplit("_", 1)[1]:
# If the contigs have different ends (_L or _R), add the combinations to the parsed_linkage
contig_pe_links.add((item, item_1))
contig_pe_links.add((item_1, item))
contig_pe_links.add((item + "rc", item_1 + "rc"))
contig_pe_links.add((item_1 + "rc", item + "rc"))
else:
# If the contigs have the same ends, add the combinations with reverse-complement (_rc) to the parsed_linkage
# Warning: contigs <= 1000 bp will be linked to it self: (contig1_L, contig1_Rrc) etc.
contig_pe_links.add((item, item_1 + "rc"))
contig_pe_links.add((item_1 + "rc", item))
contig_pe_links.add((item + "rc", item_1))
contig_pe_links.add((item_1, item + "rc"))
# Initialize a set to store the contig spanned by paired-end reads
orphan_pe_spanned: set[str] = set()
for contig in orphan_set:
# Check if the count is 0 and the contig has exactly two paired-end reads
for PE in orphan2pe_span[contig].values():
if len(PE) == 2 and abs(PE[0] - PE[1]) >= contig2len[contig] - 1000:
# >contig1, normally > 1000 bp
# >>>>>>>>>>>>>>......>>>>>>>>>>>>>
# | | ...... | |
# ^- 0 ^- 500 ^- -500 ^- -0
# +++++++ ... | |
# ... +++++++ |
# @read_i/1 . |
# . +++++++
# . . +++++++
# . . @read_i/2
# |--------| <- contig2len[contig1] - 1000
#
orphan_pe_spanned.add(contig)
break
return frozenset(contig_pe_links), frozenset(orphan_pe_spanned)
class QueryCovVar(NamedTuple):
query_count: int
cov_mean: float
cov_std: float
seq_len: float
cov_adj_sum: float
@classmethod
def query(
cls,
contigs: Iterable[str],
contig2: dict[str, float],
contig2len: dict[str, int],
):
"""
return stat of query coverage
stat of queries:
- query count
- cov mean
- cov std
- len sum
- len*cov sum
"""
cov_lens = pd.Series({i: (contig2[i], contig2len[i]) for i in contigs})
return cls(
len(cov_lens),
cov_lens.apply(lambda x: x[0]).mean(),
cov_lens.apply(lambda x: x[0]).std(),
cov_lens.apply(lambda x: x[1]).sum(),
cov_lens.apply(lambda x: x[0] * x[1]).sum(),
)
def extend_query(contigs: Iterable[str], contig2seq: dict[str, Seq], maxk: int) -> Seq:
extend_seq = Seq("")
last = Seq("")
# print the sequences with their overlap removed
for end in contigs:
contig = end2contig(end)
if end.endswith("rc"):
seq = contig2seq[contig].reverse_complement()
else:
seq = contig2seq[contig]
if last == "" or seq[:maxk] == last:
extend_seq += seq[:-maxk]
last = seq[-maxk:]
return extend_seq + last
def write_and_run_blast(
data_chunk: Iterable[tuple[str, Seq, int]],
working_dir: Path,
prec_identity=70,
retries=3,
):
cobra_seq2lenblast: dict[str, tuple[int, list[list[str]]]] = {}
os.makedirs(working_dir)
outlog = working_dir / "log"
outfile = working_dir / "blastdb_2.vs.1.tsv"
for i in range(1, retries + 1):
try:
with (
open(working_dir / "blastdb_1.fa", "w") as blastdb_1,
open(working_dir / "blastdb_2.fa", "w") as blastdb_2,
):
for header, seq, overlap in data_chunk: