Skip to content

Commit

Permalink
Refactored a bit the Daijin code, ignoring the launchers for the asse…
Browse files Browse the repository at this point in the history
…mble/mikado pipeline in coverage analysis. Added tests for the 'is_intersecting' and 'define_graph' sections of superlocus.
  • Loading branch information
lucventurini committed Mar 12, 2021
1 parent 3385b7a commit 8724ee6
Show file tree
Hide file tree
Showing 10 changed files with 515 additions and 362 deletions.
2 changes: 2 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ omit =
Mikado/subprograms/util/awk_gtf.py
Mikado/subprograms/util/class_codes.py
Mikado/subprograms/util/metrics.py
Mikado/daijin/mikado.py
Mikado/daijin/assemble.py
6 changes: 5 additions & 1 deletion Mikado/configuration/picking_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,11 @@ def __call__(self, value):
class ClusteringConfiguration:
cds_only: bool = field(default=False, metadata={
"metadata": {
"description": "Boolean, it specifies whether to cluster transcripts only according to their CDS (if present)."},
"description": "Boolean, it specifies whether to cluster transcripts only according to their CDS (if "
"present). Please note that this applies *only* when comparing pairs of coding "
"transcripts. If *either* transcript under consideration is non-coding, Mikado will "
"consider both coding and non-coding parts of the transcript. for assessing the "
"clustering."},
})
min_cds_overlap: float = field(default=0.2, metadata={
"metadata": {
Expand Down
344 changes: 0 additions & 344 deletions Mikado/daijin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,9 @@
import argparse
import datetime
import time
import json
from dataclasses import asdict
import yaml
import snakemake
from snakemake.utils import min_version
from ..configuration import DaijinConfiguration
from ..configuration.daijin_configurator import create_daijin_config
import shutil
import pkg_resources
import functools
import inspect
import toml
try:
from yaml import CSafeLoader as yLoader
except ImportError:
Expand Down Expand Up @@ -238,338 +229,3 @@ def create_config_parser():
help="Flag. If set, remove all file fields.")
parser.set_defaults(func=create_daijin_config)
return parser


# pylint: disable=too-many-locals
def assemble_transcripts_pipeline(args):

"""
This section of Daijin is focused on creating the necessary configuration for
driving the pipeline.
:param args: CLI arguments from argparse
:return:
"""
if args.config.endswith("json"):
loader = json.load
elif args.config.endswith("yaml"):
loader = functools.partial(yaml.load, Loader=yLoader)
else:
loader = functools.partial(toml.load)

with open(args.config, 'r') as _:
doc = loader(_)

if args.exe and os.path.exists(args.exe):
if args.exe.endswith("json"):
loader = json.load
else:
loader = functools.partial(yaml.load, Loader=yLoader)
with open(args.exe) as _:
doc["load"] = loader(_)

# print(doc["load"])
# Check the configuration
_ = DaijinConfiguration.Schema().load(doc)

# pylint: disable=invalid-name

if not "short_reads" in doc and not "long_reads" in doc:
print("No short reads section or long reads sections was present in the configuration. Please include your samples and try again")
exit(1)

LABELS = []
R1 = []
R2 = []
LR_LABELS = []
LR_FILES = []

if "short_reads" in doc:
LABELS = doc["short_reads"]["samples"]
R1 = doc["short_reads"]["r1"]
R2 = doc["short_reads"]["r2"]

if "long_reads" in doc:
LR_LABELS = doc["long_reads"]["samples"]
LR_FILES = doc["long_reads"]["files"]
READS_DIR = doc["out_dir"] + "/1-reads"
SCHEDULER = doc["scheduler"] if doc["scheduler"] else ""
CWD = os.path.abspath(".")
# pylint: enable=invalid-name

res_cmd, sub_cmd = get_sub_commands(SCHEDULER, args.prefix, args.additional_drmaa)

# Create log folder
if not os.path.exists("daijin_logs"):
os.makedirs("daijin_logs")
elif not os.path.isdir("daijin_logs"):
raise OSError("{} is not a directory!".format("daijin_logs"))

if (len(R1) != len(R2)) and (len(R1) != len(LABELS)):
print("R1, R2 and LABELS lists are not the same length. Please check and try again")
exit(1)

if len(LR_LABELS) != len(LR_FILES):
print("long read samples and file arrays in the configuration file are not the same length. Please check and try again")
exit(1)

if not os.path.exists(READS_DIR):
os.makedirs(READS_DIR)

for read1, read2, label in zip(R1, R2, LABELS):
suffix = read1.split(".")[-1]
if suffix not in ("gz", "bz2"):
suffix = ""
else:
suffix = ".{}".format(suffix)

r1out = READS_DIR + "/" + label + ".R1.fq{}".format(suffix)
r2out = READS_DIR + "/" + label + ".R2.fq{}".format(suffix)
if not os.path.islink(r1out):
os.symlink(os.path.abspath(read1), r1out)

if not os.path.islink(r2out):
os.symlink(os.path.abspath(read2), r2out)

for lr_file, label in zip(LR_FILES, LR_LABELS):
suffix = lr_file.split(".")[-1]
compress = ""
if suffix in ("gz", "bz2"):
compress = "." + suffix[:]
suffix = lr_file.split(".")[-2]

if suffix in ("fa", "fna", "fasta"):
suffix = ".fa" + compress
elif suffix in ("fq", "fastq"):
suffix = ".fq" + compress
else:
suffix = ".{}".format(suffix)

out = READS_DIR + "/" + label + ".long{}".format(suffix)
if not os.path.islink(out):
os.symlink(os.path.abspath(lr_file), out)


# Launch using SnakeMake
assert pkg_resources.resource_exists("Mikado",
os.path.join("daijin", "assemble.smk"))

additional_config = {}
if args.threads is not None:
additional_config["threads"] = args.threads

cluster_var = None
if args.no_drmaa is True and sub_cmd:
cluster_var = sub_cmd + res_cmd

drmaa_var = None
if args.no_drmaa is False and res_cmd:
try:
import drmaa
_ = drmaa.Session()
except (RuntimeError,ImportError,AttributeError):
print("WARNING: DRMAA not installed or not configured properly. Switching to local/cluster mode. Please use the \"-nd\" flag to run Daijin if you do not plan to use DRMAA.", file=sys.stderr)
drmaa_var = None
args.no_drmaa = True
else:
drmaa_var = res_cmd

if SCHEDULER == "local":
hpc_conf = None
drmaa_var = None
cluster_var = None
elif drmaa_var or cluster_var:
if os.path.exists(args.hpc_conf):
hpc_conf = args.hpc_conf
else:
hpc_conf = system_hpc_yaml
else:
hpc_conf = None

yaml_file = open(os.path.join(doc["out_dir"], "daijin.{}.yaml".format(NOW)), "wt")
yaml.dump(doc, yaml_file)
yaml_file.flush()
shutil.copystat(args.config, yaml_file.name)

if args.latency_wait is not None:
latency = abs(args.latency_wait)
elif SCHEDULER not in ('', "local"):
latency = 60
else:
latency = 1

kwds = {
"dryrun": args.dryrun,
"cores": args.cores,
"nodes": args.jobs,
"config": additional_config,
"workdir": CWD,
"cluster_config": hpc_conf,
"cluster": cluster_var,
"drmaa": drmaa_var,
"printshellcmds": True,
"snakemakepath": shutil.which("snakemake"),
"use_conda": args.use_conda,
"stats": "daijin_tr_" + NOW + ".stats",
"force_incomplete": args.rerun_incomplete,
"detailed_summary": args.detailed_summary,
"list_resources": args.list,
"latency_wait": latency,
"printdag": args.dag,
"forceall": args.dag,
"forcerun": args.forcerun,
"lock": (not args.nolock),
"printreason": True
}

if "configfile" in inspect.getfullargspec(snakemake.snakemake).args:
kwds["configfile"] = yaml_file.name
elif "configfiles" in inspect.getfullargspec(snakemake.snakemake).args:
kwds["configfiles"] = [yaml_file.name]
else:
raise KeyError("No configfile key found")

snakemake.snakemake(
pkg_resources.resource_filename("Mikado",
os.path.join("daijin", "assemble.smk")),
**kwds
)
# pylint: enable=too-many-locals


def mikado_pipeline(args):

"""
This function launches the sub-section dedicated to the Mikado pipeline.
:param args: argparse Namespace
:return:
"""

if args.config.endswith("json"):
loader = json.load
elif args.config.endswith("yaml"):
loader = functools.partial(yaml.load, Loader=yLoader)
else:
loader = functools.partial(toml.load)
with open(args.config, 'r') as _:
daijin_config = loader(_)

additional_config = {}
if args.threads is not None:
additional_config["threads"] = args.threads

if args.exe and os.path.exists(args.exe):
if args.exe.endswith("json"):
loader = json.load
else:
loader = functools.partial(yaml.load, Loader=yLoader)
with open(args.exe) as _:
daijin_config["load"] = loader(_)

daijin_config = DaijinConfiguration.Schema().load(daijin_config)

# pylint: disable=invalid-name
SCHEDULER = daijin_config.scheduler if daijin_config.scheduler else ""
CWD = os.path.abspath(".")
# pylint: enable=invalid-name

res_cmd, sub_cmd = get_sub_commands(SCHEDULER, args.prefix, args.additional_drmaa)

if not os.path.exists("daijin_logs"):
os.makedirs("daijin_logs")
elif not os.path.isdir("daijin_logs"):
raise OSError("{} is not a directory!".format("daijin_logs"))

# Launch using SnakeMake
assert pkg_resources.resource_exists("Mikado",
os.path.join("daijin", "mikado.smk"))

cluster_var = None
if args.no_drmaa is True and sub_cmd:
cluster_var = sub_cmd + res_cmd

drmaa_var = None
if args.no_drmaa is False and res_cmd:
try:
import drmaa
_ = drmaa.Session()
except (RuntimeError,ImportError,AttributeError):
print("WARNING: DRMAA not installed or not configured properly. Switching to local/cluster mode. Please use the \"-nd\" flag to run Daijin if you do not plan to use DRMAA.", file=sys.stderr)
drmaa_var = None
args.no_drmaa = True
else:
drmaa_var = res_cmd

if drmaa_var or cluster_var:
if os.path.exists(args.hpc_conf):
hpc_conf = args.hpc_conf
else:
hpc_conf = system_hpc_yaml
else:
hpc_conf = None

yaml_file = open(os.path.join(daijin_config.out_dir, "daijin.{}.yaml".format(NOW)), "wt")
yaml.dump(asdict(daijin_config), yaml_file)
yaml_file.flush()
shutil.copystat(args.config, yaml_file.name)

if SCHEDULER == "local":
hpc_conf = None
drmaa_var = None
cluster_var = None
elif drmaa_var or cluster_var:
if os.path.exists(args.hpc_conf):
hpc_conf = args.hpc_conf
else:
hpc_conf = system_hpc_yaml
else:
hpc_conf = None

if args.latency_wait:
latency = abs(args.latency_wait)
elif SCHEDULER not in ('', "local"):
latency = 60
else:
latency = 1

BLASTX_CHUNKS = max(int(daijin_config.blastx.chunks), daijin_config.threads)
if BLASTX_CHUNKS > daijin_config.blastx.chunks:
print("INFO: Increasing the number of chunks for DIAMOND/BLASTX to match the requested threads, \
as Mikado serialise relies on having a number of chunks equal or greater than the number of requested threads.")

kwds = {
"ignore_ambiguity": False,
"cores": args.cores,
"dryrun": args.dryrun,
"nodes": args.jobs,
"config": additional_config,
"workdir": CWD,
"cluster_config": hpc_conf,
"cluster": cluster_var,
"drmaa": drmaa_var,
"latency_wait": latency,
"printshellcmds": True,
"use_conda": args.use_conda,
"snakemakepath": shutil.which("snakemake"),
"stats": "daijin_tr_" + NOW + ".stats",
"force_incomplete": args.rerun_incomplete,
"detailed_summary": args.detailed_summary,
"list_resources": args.list,
"printdag": args.dag,
"forceall": args.dag,
"forcerun": args.forcerun,
"lock": (not args.nolock),
"printreason": True
}

if "configfile" in inspect.getfullargspec(snakemake.snakemake).args:
kwds["configfile"] = yaml_file.name
elif "configfiles" in inspect.getfullargspec(snakemake.snakemake).args:
kwds["configfiles"] = [yaml_file.name]
else:
raise KeyError("No configfile key found")

snakemake.snakemake(
pkg_resources.resource_filename("Mikado",
os.path.join("daijin", "mikado.smk")),
**kwds
)
7 changes: 4 additions & 3 deletions Mikado/daijin/__main__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import argparse
import sys

from ..import create_default_logger
from ..utilities.log_utils import create_default_logger
from ..configuration.daijin_configurator import create_daijin_config
from ..daijin import mikado_pipeline, create_parser, assemble_transcripts_pipeline, create_config_parser
from .mikado import mikado_pipeline
from .assemble import assemble_transcripts_pipeline
from . import create_parser, create_config_parser


def main(call_args=None):
Expand Down
Loading

0 comments on commit 8724ee6

Please sign in to comment.