Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…ioinformatics#305)

* Issue EI-CoreBioinformatics#280: now mikado serialise has been refactored so that:
   * loading of BLAST XMLs should be faster thanks to using Cython on the most time-expensive function
   * mikado now accepts also *tabular* BLAST data (custom format, we need the `ppos` and `btop` extra fields)
   * `daijin` now automatically generates *tabular* rather than XML BLAST results
   * `mikado` will now use `spawn` as the default multiprocessing method. This avoids memory accounting problems in eg. SLURM (sometimes `fork` results in the HPC management system to think that the shared memory is duplicated, massively and falsely inflating the accounting of memory usage).
* Issue EI-CoreBioinformatics#270: now Mikado will remove redundancy based on intron chains
  * For EI-CoreBioinformatics#270: now `mikado prepare` will remove redundant transcripts based on their *intron chains* / *monoexonic span overlap*, rather than start/end. Exact CDS match still applies.
  • Loading branch information
lucventurini authored Apr 7, 2020
1 parent 796c0e3 commit d4dae39
Show file tree
Hide file tree
Showing 42 changed files with 44,062 additions and 709 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ sample_data/1.fasta
sample_data/2.fasta
sample_data/configuration2.yaml
sample_data/reference.gff3.midx
Mikado/*/*/*html
Mikado/*/*html
Mikado/utilities/overlap.cpython-3*.so
Mikado/scales/contrast.c
Mikado/scales/contrast.cpython-3*.so
Expand All @@ -82,6 +84,9 @@ Mikado/tests/sample_data/diamond.xml.gz
Mikado/tests/fusion_test/fusion_test.log
sample_data/diamond.xml.gz
Mikado/*/*.c
Mikado/*/*.so
Mikado/*/*/*.c
Mikado/*/*/*.so
Mikado/utilities/overlap.c
sample_data/Daijin
.eggs/
Expand Down
14 changes: 12 additions & 2 deletions Mikado/configuration/configuration_blueprint.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
},
"multiprocessing_method": {
"type": "string",
"default": "",
"default": "spawn",
"enum": ["fork", "spawn", "forkserver", ""]
},
"log_settings": {
Expand Down Expand Up @@ -196,7 +196,16 @@
"default": "."}
}
},
"max_objects": {"type": "integer", "default": 100000, "minimum": 1},
"substitution_matrix": {
"enum": ["blosum45", "blosum50", "blosum62", null,
"blosum80", "blosum90", "pam250", "pam30", "pam70"],
"default": "blosum62"
},
"blast_flavour": {
"enum": ["blastx", "blastp"],
"default": "blastx"
},
"max_objects": {"type": "integer", "default": 10000000, "minimum": 1},
"max_regression": {"type": "number",
"minimum": 0,
"maximum": 1,
Expand Down Expand Up @@ -281,6 +290,7 @@
"labels": {"type": "array", "default": []},
"strand_specific_assemblies": {"type": "array", "default": []},
"reference": {"type": "array", "default": []},
"keep_redundant": {"type": "array", "default": []},
"source_score":{
"type": "object",
"default": {},
Expand Down
58 changes: 49 additions & 9 deletions Mikado/daijin/mikado.smk
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,36 @@ import os
from shutil import which
import pkg_resources
from Bio.Data import CodonTable
from Mikado.serializers.blast_serializer.tabular_utils import blast_keys
import functools
import subprocess
import re
from fastnumbers import fast_int


diamond_pat = re.compile("^diamond version (\S*)[$|\s]*")

@functools.lru_cache(maxsize=4, typed=True)
def diamond_to_correct(command):
"""This will always return False until https://github.com/bbuchfink/diamond/issues/334 is fixed."""
if workflow.use_conda is True:
return False
cmd = "{} diamond --version && set -u".format(command)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE).stdout.read().decode()
version = None
for line in output.split("\n"):
m = diamond_pat.search(line)
if m:
version = m.groups()[0]
break
if version is None:
return False
else:
major, minor, micro = [fast_int(_) for _ in version.split(".")]
if major > 0 or minor > 9 or micro > 100:
return True
else:
return False


CodonTable.ambiguous_dna_by_id[0] = CodonTable.ambiguous_dna_by_id[1]
Expand Down Expand Up @@ -187,17 +217,21 @@ if config["mikado"]["use_diamond"] is False:
input:
db=rules.make_blast.output,
split=rules.split_fa.output
output: os.path.join(BLAST_DIR, "xmls" , "chunk-{chunk_id}-proteins.xml.gz")
output: os.path.join(BLAST_DIR, "tsv" , "chunk-{chunk_id}-proteins.tsv.gz")
params:
tr=os.path.join(BLAST_DIR, "fastas", "chunk_{chunk_id}.fasta"),
db=os.path.join(BLAST_DIR, "index", "blastdb-proteins"),
load=loadPre(config, "blast"),
uncompressed=os.path.join(BLAST_DIR, "xmls", "chunk-{chunk_id}-proteins.xml"),
uncompressed=os.path.join(BLAST_DIR, "tsv", "chunk-{chunk_id}-proteins.tsv"),
blast_keys=" ".join(blast_keys)
log: os.path.join(BLAST_DIR, "logs", "chunk-{chunk_id}.blastx.log")
threads: THREADS
conda: os.path.join(envdir, "blast.yaml")
message: "Running BLASTX for mikado transcripts against: {params.tr}"
shell: "{params.load} if [ -s {params.tr} ]; then blastx -num_threads {threads} -outfmt 5 -query {params.tr} -db {params.db} -evalue {BLASTX_EVALUE} -max_target_seqs {BLASTX_MAX_TARGET_SEQS} > {params.uncompressed} 2> {log}; else touch {params.uncompressed}; fi && gzip {params.uncompressed}"
shell: "{params.load} if [ -s {params.tr} ]; then blastx -num_threads {threads} "\
"-outfmt \"6 {params.blast_keys}\" "\
" -query {params.tr} -db {params.db} -evalue {BLASTX_EVALUE} -max_target_seqs {BLASTX_MAX_TARGET_SEQS} "\
"> {params.uncompressed} 2> {log}; else touch {params.uncompressed}; fi && gzip {params.uncompressed}"

else:
rule diamond_index:
Expand All @@ -217,17 +251,23 @@ else:
input:
db=rules.diamond_index.output,
split=rules.split_fa.output
output: os.path.join(BLAST_DIR, "xmls", "chunk-{chunk_id}-proteins.xml.gz")
output: os.path.join(BLAST_DIR, "tsv", "chunk-{chunk_id}-proteins.tsv.gz")
params:
load=loadPre(config, "diamond"),
tr=os.path.join(BLAST_DIR, "fastas", "chunk_{chunk_id}.fasta")
tr=os.path.join(BLAST_DIR, "fastas", "chunk_{chunk_id}.fasta"),
blast_keys=" ".join(blast_keys),
matrix=config["serialise"]["substitution_matrix"],
threads: THREADS
log: os.path.join(BLAST_DIR, "logs", "chunk-{chunk_id}.blastx.log")
conda: os.path.join(envdir, "diamond.yaml")
shell: "{params.load} if [ -s {params.tr} ]; then diamond blastx --threads {threads} --outfmt xml --compress 1 --out {output} --max-target-seqs {BLASTX_MAX_TARGET_SEQS} --evalue {BLASTX_EVALUE} --db {input.db} --salltitles --query {params.tr} --sensitive > {log} 2> {log}; else touch {output}; fi"
shell: "{params.load} if [ -s {params.tr} ]; then diamond blastx --threads {threads} "\
"--outfmt 6 {params.blast_keys} "\
"--max-target-seqs {BLASTX_MAX_TARGET_SEQS} --matrix {params.matrix} "\
"--evalue {BLASTX_EVALUE} --db {input.db} --salltitles --query {params.tr} --sensitive "\
" --compress 1 --out {output} 2> {log} > {log}; else touch {output}; fi"

rule blast_all:
input: expand(os.path.join(BLAST_DIR, "xmls", "chunk-{chunk_id}-proteins.xml.gz"), chunk_id=CHUNK_ARRAY)
input: expand(os.path.join(BLAST_DIR, "tsv", "chunk-{chunk_id}-proteins.tsv.gz"), chunk_id=CHUNK_ARRAY)
output: os.path.join(BLAST_DIR, "blastx.all.done")
shell: "touch {output}"

Expand Down Expand Up @@ -336,9 +376,9 @@ rule mikado_serialise:
log: os.path.join(MIKADO_DIR, "mikado_serialise.log")
params:
cfg=CFG,
blast="--xml={}".format(os.path.join(BLAST_DIR, "xmls")) if len(BLASTX_TARGET) > 0 else "",
blast="--tsv={}".format(os.path.join(BLAST_DIR, "tsv")) if len(BLASTX_TARGET) > 0 else "",
load=loadPre(config, "mikado"),
blast_target="--blast_targets={}".format(os.path.join(BLAST_DIR, "index", "blastdb-proteins.fa")) if len(BLASTX_TARGET) > 0 else "",
blast_target="--blast-targets={}".format(os.path.join(BLAST_DIR, "index", "blastdb-proteins.fa")) if len(BLASTX_TARGET) > 0 else "",
orfs=orfs,
no_start_adj="-nsa" if config.get("mikado", dict()).get("use_prodigal", False) is False else ""
threads: THREADS
Expand Down
45 changes: 39 additions & 6 deletions Mikado/parsers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ class Parser(metaclass=abc.ABCMeta):

def __init__(self, handle):
self.__closed = False
self._handle = self.__get_handle(handle)
self.closed = False

def __iter__(self):
return self

def __get_handle(self, handle, position=None):
if not isinstance(handle, io.IOBase):
if handle.endswith(".gz") or filetype(handle) == b"application/gzip":
opener = gzip.open
Expand All @@ -36,12 +43,9 @@ def __init__(self, handle):
handle = opener(handle, "rt")
except FileNotFoundError:
raise FileNotFoundError("File not found: {0}".format(handle))

self._handle = handle
self.closed = False

def __iter__(self):
return self
if position is not None:
handle.seek(position)
return handle

def __next__(self):
line = self._handle.readline()
Expand Down Expand Up @@ -92,6 +96,35 @@ def closed(self, *args):

self.__closed = args[0]

def __getstate__(self):
try:
position = self._handle.tell()
except:
position = None
state = dict()
state.update(self.__dict__)
state["position"] = position
if hasattr(self._handle, "filename"):
_handle = self._handle.filename
if isinstance(_handle, bytes):
_handle = _handle.decode()
state["_handle"] = _handle
elif hasattr(self._handle, "name"):
_handle = self._handle.name
if isinstance(_handle, bytes):
_handle = _handle.decode()
state["_handle"] = _handle
else:
raise TypeError("Unknown handle: {}".format(self._handle))
state.pop("logger", None)
return state

def __setstate__(self, state):
position = state.get("position")
del state["position"]
self.__dict__.update(state)
self._handle = self.__get_handle(state["_handle"], position=position)


# noinspection PyPep8
from . import GFF
Expand Down
51 changes: 37 additions & 14 deletions Mikado/parsers/bed12.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
but at the same time more pythonic.
"""


from time import sleep
import numpy
import os
from fastnumbers import fast_int, fast_float, isint
Expand Down Expand Up @@ -1468,7 +1468,14 @@ def __init__(self, handle,
self.coding = coding
self.start_adjustment = start_adjustment
self.logger = logger
self.fasta_index = self.__set_fasta_index(fasta_index)
self.__closed = False
self.header = False
self.__table = table
self._is_bed12 = (not is_gff)

@staticmethod
def __set_fasta_index(fasta_index):
if isinstance(fasta_index, dict):
# check that this is a bona fide dictionary ...
assert isinstance(
Expand All @@ -1482,12 +1489,7 @@ def __init__(self, handle,
fasta_index = pysam.FastaFile(fasta_index)
else:
assert isinstance(fasta_index, pysam.FastaFile), type(fasta_index)

self.fasta_index = fasta_index
self.__closed = False
self.header = False
self.__table = table
self._is_bed12 = (not is_gff)
return fasta_index

def __iter__(self):
return self
Expand All @@ -1499,6 +1501,20 @@ def __next__(self, seq=None):
else:
return self.gff_next()

def __getstate__(self):
state = super().__getstate__()
# Now let's remove the fasta index
if state["fasta_index"] is not None:
if isinstance(state["fasta_index"], pysam.FastaFile):
state["fasta_index"] = state["fasta_index"].filename
return state

def __setstate__(self, state):
fasta_index = state.pop("fasta_index", None)
super().__setstate__(state)
self.logger = create_null_logger()
self.__set_fasta_index(fasta_index)

def bed_next(self):
"""
Expand Down Expand Up @@ -1614,17 +1630,11 @@ def __init__(self,
self.rec_queue = rec_queue
self.return_queue = return_queue
self.logging_queue = log_queue
self.handler = logging_handlers.QueueHandler(self.logging_queue)
self.logger = logging.getLogger(self.name)
self.logger.addHandler(self.handler)
self.logger.setLevel(level)
self.logger.propagate = False
self.transcriptomic = transcriptomic
self.__max_regression = 0
self._max_regression = max_regression
self.coding = coding
self.start_adjustment = start_adjustment
# self.cache = cache

if isinstance(fasta_index, dict):
# check that this is a bona fide dictionary ...
Expand All @@ -1640,7 +1650,7 @@ def __init__(self,
fasta_index = pyfaidx.Fasta(fasta_index)
else:
assert isinstance(fasta_index, pysam.FastaFile), type(fasta_index)

self._level = level
self.fasta_index = fasta_index
self.__closed = False
self.header = False
Expand Down Expand Up @@ -1685,7 +1695,20 @@ def gff_next(self, line, sequence):
return bed12

def run(self, *args, **kwargs):
self.handler = logging_handlers.QueueHandler(self.logging_queue)
self.logger = logging.getLogger(self.name)
self.logger.addHandler(self.handler)
self.logger.setLevel(self._level)
self.logger.propagate = False

self.logger.info("Started %s", self.__identifier)
if self.rec_queue is None:
self.return_queue.put(b"FINISHED")
raise ValueError
while True:
if self.rec_queue.empty():
sleep(0.1)
continue
line = self.rec_queue.get()
if line in ("EXIT", b"EXIT"):
self.rec_queue.put(b"EXIT")
Expand Down
Loading

0 comments on commit d4dae39

Please sign in to comment.