-
Notifications
You must be signed in to change notification settings - Fork 0
/
extract_gene_sequence.py
172 lines (154 loc) · 7.33 KB
/
extract_gene_sequence.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#!/usr/bin/env python3
import argparse
import logging
import os
import sys
from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
# ------------------------------------------------------------------------------------
# Notes
# ------------------------------------------------------------------------------------
# This script is used to extract the DNA sequence of an annotated gene
# Testing notes:
# Script only tested with GFF3 files generated by Bakta v1.5.1
# ------------------------------------------------------------------------------------
# Functions
# ------------------------------------------------------------------------------------
def parse_arguments():
description = "This script is used to extract the DNA and protein sequence of a GFF annotated gene/\n" \
"The script will output: gene information, gene DNA and protein sequences."
parser = argparse.ArgumentParser(description=description)
group = parser.add_argument_group('required arguments')
group.add_argument(
"-f", "--input_fasta", action="store", dest="input_fasta",
help="input genome in FASTA format (assembly or complete genome)",
required=True, metavar="GENOME"
)
group.add_argument(
"-g", "--input_gff", action="store", dest="input_gff",
help="input genome in GFF3 format (Bakta GFF3 output)",
required=True, metavar="GFF")
group.add_argument(
"-x", "--gff_field", action="store", dest="gff_field",
help="GFF annotation field (e.g. locus_tag,product,Name)",
required=True, metavar="FIELD")
group.add_argument(
"-y", "--gff_value", action="store", dest="gff_value",
help="GFF annotation field value (e.g. name or product of gene to be extracted)",
required=True, metavar="VALUE")
group.add_argument(
"-o", "--output_prefix", action="store", dest="output_prefix",
help="File name prefix to name output files: gene information, gene DNA and protein sequences.",
required=True, metavar="OUT")
group.add_argument(
"-i", "--fasta_id", action="store", dest="fasta_id",
help="Id to be used in output FASTA files. Recommended to use sample/isolate id for MSA.",
required=True, metavar="ID")
group.add_argument(
"-t", "--type_sequence", action="store", dest="type_sequence",
help="Type of sequence: \"assembly\" or \"complete\"",
required=True, metavar="TYPE")
return parser.parse_args()
# ------------------------------------------------------------------------------------
# Main program
# ------------------------------------------------------------------------------------
def _main():
# Configure logging
logging.basicConfig(
format='%(asctime)s %(levelname)s: %(message)s',
level=logging.INFO
)
# Get arguments
args = parse_arguments()
gff_field = args.gff_field
gff_value = args.gff_value
type_seq = args.type_sequence
# Making sure args.type_sequence has expected value: "assembly" or "complete"
type_seq_exp = ["assembly", "complete"]
if type_seq not in type_seq_exp:
logging.error(f'--type_sequence {type_seq} must be either \"assembly\" or \"complete\"!')
sys.exit(-1)
# Making sure input files exist
input_files = [args.input_fasta, args.input_gff]
for input_file in input_files:
if not os.path.isfile(input_file):
logging.error(f'Input file {input_file} not found!')
sys.exit(-1)
# Variables to save
gene_contig = "not_found"
gene_from = "not_found"
gene_to = "not_found"
gene_strand = "not_found"
gene_length = "not_found"
occurrences = 0
# Parsing GFF3 file to find and extract gene coordinates
logging.info(f"Parsing GFF3 file {args.input_gff} to find and extract gene coordinates")
for line in open(args.input_gff, 'r'):
# skipping GFF3 header lines
if not line.startswith('#'):
items = line.strip().split('\t')
# keeping GFF3 annotation lines, expected to contain 9 fields
if len(items) == 9:
ann_items = items[8].strip().split(';')
# for each GFF3 annotation item, extract value of selected annotation field
for ann_item in ann_items:
regexp = gff_field + '='
if ann_item.startswith(regexp):
gff_value_l = ann_item.replace(regexp, "")
if gff_value_l == gff_value:
gene_contig = items[0]
gene_from = items[3]
gene_to = items[4]
gene_strand = items[6]
occurrences += 1
# Saving extracted gene information
if gene_from != "not_found":
gene_length = int(gene_to) - int(gene_from) + 1
out_cols = ["gff_field", "gff_value", "contig", "from", "to", "strand", "gene_length", "occurrences"]
out_items = [gff_field, gff_value, gene_contig, gene_from, gene_to, gene_strand, str(gene_length), str(occurrences)]
output_info = args.output_prefix + ".gene_info.txt"
logging.info(f"Saving extracted gene into {output_info}")
output = open(output_info, 'w')
output.write('\t'.join(out_cols) + '\n')
output.write('\t'.join(out_items) + '\n')
print('\t'.join(out_items))
output.close()
# Extracting and saving gene DNA sequence
if occurrences != 0:
fasta_id = args.fasta_id
output_seq1 = args.output_prefix + ".dna.fa"
output_seq2 = args.output_prefix + ".protein.fa"
logging.info(f"Opening input genome {args.input_fasta}")
input_records = SeqIO.parse(args.input_fasta, "fasta")
record_num = 0
for record in input_records:
record_num += 1
record_id = record.id
# For assemblies annotated with Bakta: change contig id: e.g. contig1 to contig_1
gene_contig2 = gene_contig.replace("_", "")
# variable used to know which contig/FASTA record to extract gene from
to_extract = False
# for complete genomes, extract sequence from first contig: assumed to be chromosome sequence
if type_seq == "complete" and record_num == 1:
to_extract = True
# keeping contig where selected gene is present
if type_seq == "assembly":
if record_id == gene_contig or record_id == gene_contig2:
to_extract = True
# extracting gene sequence
if to_extract:
record_seq = record.seq
gene_seq = record_seq[int(gene_from)-1:int(gene_to)-1]
if gene_strand == "-":
gene_seq = record_seq[int(gene_from):int(gene_to)]
gene_seq = gene_seq.reverse_complement()
print(gene_seq)
logging.info(f"Saving extracted gene sequence into {output_seq1}")
gene_seq_record = SeqRecord(gene_seq, id=fasta_id)
SeqIO.write(gene_seq_record, output_seq1, "fasta")
logging.info(f"Saving extracted protein sequence into {output_seq2}")
protein_seq = gene_seq.translate()
protein_seq_record = SeqRecord(protein_seq, id=fasta_id)
SeqIO.write(protein_seq_record, output_seq2, "fasta")
if __name__ == "__main__":
_main()