From e36e4320a70c4c2aa7506d0a2db3255f668c9ef0 Mon Sep 17 00:00:00 2001 From: Luca Venturini Date: Fri, 19 Oct 2018 14:41:55 +0100 Subject: [PATCH] Working on #137 --- Mikado/preparation/checking.py | 9 ++++- Mikado/preparation/prepare.py | 4 ++ Mikado/subprograms/prepare.py | 47 +++++++++++++++-------- Mikado/tests/__init__.py | 2 +- Mikado/tests/prepare_misc_test.py | 51 +++++++++---------------- Mikado/tests/test_system_calls.py | 62 ++++++++++++++++++++++++------- Mikado/tests/test_utils.py | 42 +++++++++++++++++++++ 7 files changed, 151 insertions(+), 66 deletions(-) create mode 100644 Mikado/tests/test_utils.py diff --git a/Mikado/preparation/checking.py b/Mikado/preparation/checking.py index 71a4ca37e..860da4fea 100644 --- a/Mikado/preparation/checking.py +++ b/Mikado/preparation/checking.py @@ -264,7 +264,10 @@ def submission_queue(self): return self.__submission_queue def __set_submission_queue(self, submission): - if not isinstance(submission, (multiprocessing.queues.Queue, queue.Queue)): + if isinstance(submission, multiprocessing.queues.SimpleQueue): + submission.put_nowait = submission.put + elif not isinstance(submission, (multiprocessing.queues.Queue, + queue.Queue)): raise ValueError("Invalid queue object: {}".format(type(submission))) self.__submission_queue = submission @@ -273,7 +276,9 @@ def logging_queue(self): return self.__logging_queue def __set_logging_queue(self, logging_queue): - if not isinstance(logging_queue, (multiprocessing.queues.Queue, queue.Queue)): + if isinstance(logging_queue, multiprocessing.queues.SimpleQueue): + logging_queue.put_nowait = logging_queue.put + elif not isinstance(logging_queue, (multiprocessing.queues.Queue, queue.Queue)): raise ValueError("Invalid queue object: {}".format(type(logging_queue))) self.__logging_queue = logging_queue diff --git a/Mikado/preparation/prepare.py b/Mikado/preparation/prepare.py index ca74716f2..a44def47d 100644 --- a/Mikado/preparation/prepare.py +++ b/Mikado/preparation/prepare.py @@ -419,6 +419,10 @@ def prepare(args, logger): args.json_conf["prepare"]["files"]["output_dir"], args.json_conf["prepare"]["files"]["out"]), 'w') + logger.info("Output dir: %s. Output GTF: %s. Output Fasta: %s", + args.json_conf["prepare"]["files"]["output_dir"], + args.json_conf["prepare"]["files"]["out"].name, + args.json_conf["prepare"]["files"]["out_fasta"].name) logger.info("Loading reference file") args.json_conf["reference"]["genome"] = pyfaidx.Fasta(args.json_conf["reference"]["genome"]) diff --git a/Mikado/subprograms/prepare.py b/Mikado/subprograms/prepare.py index 229dcf49d..76f9090d7 100644 --- a/Mikado/subprograms/prepare.py +++ b/Mikado/subprograms/prepare.py @@ -30,11 +30,12 @@ def setup(args): logger = logging.getLogger("prepare") logger.setLevel(logging.INFO) - if args.start_method is not None: + if args.start_method: args.json_conf["multiprocessing_method"] = args.start_method if args.output_dir is not None: args.json_conf["prepare"]["files"]["output_dir"] = getattr(args, "output_dir") + if not os.path.exists(args.json_conf["prepare"]["files"]["output_dir"]): try: os.makedirs(args.json_conf["prepare"]["files"]["output_dir"]) @@ -44,9 +45,9 @@ def setup(args): raise elif not os.path.isdir(args.json_conf["prepare"]["files"]["output_dir"]): logger.error( - "The specified output directory %s exists and is not a file; aborting", + "The specified output directory %s exists and is not a folder; aborting", args.json_conf["prepare"]["output_dir"]) - raise OSError("The specified output directory %s exists and is not a file; aborting" % + raise OSError("The specified output directory %s exists and is not a folder; aborting" % args.json_conf["prepare"]["output_dir"]) if args.log is not None: @@ -54,18 +55,30 @@ def setup(args): args.json_conf["prepare"]["files"]["log"] = args.log.name if args.json_conf["prepare"]["files"]["log"]: + try: + _ = open(path_join( + args.json_conf["prepare"]["files"]["output_dir"], + os.path.basename(args.json_conf["prepare"]["files"]["log"])), + "wt") + except TypeError: + raise TypeError((args.json_conf["prepare"]["files"]["output_dir"], + args.json_conf["prepare"]["files"]["log"])) + handler = logging.FileHandler( path_join( args.json_conf["prepare"]["files"]["output_dir"], - args.json_conf["prepare"]["files"]["log"]), - "w") + os.path.basename(args.json_conf["prepare"]["files"]["log"])), + mode="wt") else: handler = logging.StreamHandler() handler.setFormatter(formatter) + while logger.handlers: + logger.removeHandler(logger.handlers.pop()) logger.addHandler(handler) - logger.info("Command line: %s", " ".join(sys.argv)) + assert logger.handlers == [handler] logger.propagate = False + logger.info("Command line: %s", " ".join(sys.argv)) if args.verbose is True: args.json_conf["log_settings"]["log_level"] = "DEBUG" @@ -114,7 +127,7 @@ def setup(args): if member not in args.json_conf["prepare"]["files"]["gff"]: raise ValueError("Incorrect assembly file specified as strand-specific") args.json_conf["prepare"]["strand_specific_assemblies"] = args.strand_specific_assemblies - if args.labels != '': + if args.labels: args.labels = args.labels.split(",") # Checks labels are unique assert len(set(args.labels)) == len(args.labels) @@ -127,13 +140,20 @@ def setup(args): args.labels = [""] * len(args.json_conf["prepare"]["files"]["gff"]) args.json_conf["prepare"]["files"]["labels"] = args.labels - for option in ["out", "out_fasta", - "minimum_length", "procs", "single"]: - if getattr(args, option) is None or getattr(args, option) is False: + for option in ["minimum_length", "procs", "single"]: + if getattr(args, option) in (None, False): continue else: args.json_conf["prepare"][option] = getattr(args, option) + for option in ["out", "out_fasta"]: + if getattr(args, option) in (None, False): + args.json_conf["prepare"]["files"][option] = os.path.basename( + args.json_conf["prepare"]["files"][option] + ) + else: + args.json_conf["prepare"]["files"][option] = os.path.basename(getattr(args, option)) + if getattr(args, "fasta"): args.fasta.close() args.json_conf["reference"]["genome"] = args.fasta.name @@ -147,11 +167,6 @@ def setup(args): if args.strip_cds is True: args.json_conf["prepare"]["strip_cds"] = True - if args.out is not None: - args.json_conf["prepare"]["files"]["out"] = args.out - if args.out_fasta is not None: - args.json_conf["prepare"]["files"]["out_fasta"] = args.out_fasta - try: args.json_conf = check_json(args.json_conf) except InvalidJson as exc: @@ -253,7 +268,7 @@ def positive(string): parser.add_argument("--json-conf", dest="json_conf", type=to_json, default="", help="Configuration file.") - parser.add_argument("-k", "--keep-redundant", default=None, type=bool, + parser.add_argument("-k", "--keep-redundant", default=None, dest="keep_redundant", action="store_true", help="Boolean flag. If invoked, Mikado prepare will retain redundant models.") parser.add_argument("gff", help="Input GFF/GTF file(s).", nargs="*") diff --git a/Mikado/tests/__init__.py b/Mikado/tests/__init__.py index b95f4432b..9f865c740 100644 --- a/Mikado/tests/__init__.py +++ b/Mikado/tests/__init__.py @@ -23,4 +23,4 @@ from . import transcript_tester_negative from . import transcript_tester_positive from . import transcript_tester_single -from . import utilities_tester \ No newline at end of file +from . import utilities_tester diff --git a/Mikado/tests/prepare_misc_test.py b/Mikado/tests/prepare_misc_test.py index b953d23fc..608af1de7 100644 --- a/Mikado/tests/prepare_misc_test.py +++ b/Mikado/tests/prepare_misc_test.py @@ -10,33 +10,10 @@ import pickle import os import time -import threading import pyfaidx import re - - -class ProcRunner(threading.Thread): - - def __init__(self, function: [mp.Process], *args, **kwargs): - - self.__function = function - self.args = args - self.kwargs = kwargs - self._func = self.__function(*self.args, **self.kwargs) - super().__init__() - - def run(self): - self._func.run() - - @property - def func(self): - return self._func - - def join(self, *args, **kwargs): - if self.func._popen is not None: - self.func.join() - self.func.terminate() - super().join(timeout=0.1) +from Mikado.tests.test_utils import ProcRunner +from queue import Queue class MiscTest(unittest.TestCase): @@ -51,7 +28,8 @@ def setUpClass(cls): @staticmethod def create_logger(name): - logging_queue = mp.JoinableQueue(-1) + logging_queue = Queue() + logging_queue.put_nowait = logging_queue.put log_queue_handler = logging.handlers.QueueHandler(logging_queue) log_queue_handler.setLevel(logging.DEBUG) @@ -64,11 +42,10 @@ def create_logger(name): def setUp(self): # Create the queues for logging and submission - self.submission_queue = mp.JoinableQueue() + self.submission_queue = mp.SimpleQueue() self.fasta_out = "temporary.fasta" self.gtf_out = "temporary.gtf" - @unittest.skip def test_normal(self): # TODO: this test is creating problems due to threading errors. logger, listener, logging_queue = self.create_logger("test_normal") @@ -94,10 +71,10 @@ def test_normal(self): self.assertTrue(os.path.exists(proc.func.gtf_out), proc.func.gtf_out) self.submission_queue.put(("EXIT", None, None, None)) time.sleep(0.1) - proc.join() - + proc.stop() os.remove(proc.func.fasta_out) os.remove(proc.func.gtf_out) + assert not proc.is_alive() self.maxDiff = 10000 self.assertEqual(cmo.output, [ @@ -197,7 +174,6 @@ def test_example_model(self): self.assertIn("WARNING:null:Transcript AT5G01530.0 has been assigned to the wrong strand, reversing it.", cm.output) - @unittest.skip def test_example_model_through_process(self): logger, listener, logging_queue = self.create_logger("test_example_model_through_process") @@ -238,16 +214,23 @@ def test_example_model_through_process(self): line = re.sub("0/", "", line) fasta_lines.append(line) + self.assertGreater(len(fasta_lines), 1) fasta = pyfaidx.Fasta(self.fasta_temp.name) seq = str(fasta[lines["chrom"]][lines["start"] - 1:lines["end"]]) res = Mikado.preparation.checking.create_transcript(lines, seq, lines["start"], lines["end"]) self.assertTrue(len(res.cdna), (209593 - 208937 + 1) + (210445 - 209881 + 1)) with tempfile.NamedTemporaryFile(suffix="fa", delete=True, mode="wt") as faix: - assert len(fasta_lines) > 0 - print(*fasta_lines, file=faix, end="") + + assert len(fasta_lines) > 1 + assert fasta_lines[0][0] == ">" + for line in fasta_lines: + print(line, file=faix) faix.flush() - fa = pyfaidx.Fasta(faix.name) + try: + fa = pyfaidx.Fasta(faix.name) + except TypeError: + raise TypeError([_ for _ in open(faix.name)]) self.assertEqual(list(fa.keys()), ["AT5G01530.0"]) self.assertEqual(str(fa["AT5G01530.0"]), str(res.cdna)) self.assertEqual(len(str(fa["AT5G01530.0"])), res.cdna_length) diff --git a/Mikado/tests/test_system_calls.py b/Mikado/tests/test_system_calls.py index 0a85d348c..5a9178ddd 100644 --- a/Mikado/tests/test_system_calls.py +++ b/Mikado/tests/test_system_calls.py @@ -11,14 +11,16 @@ import pkg_resources import pyfaidx import yaml +import shutil import Mikado.daijin import Mikado.subprograms.configure from Mikado.configuration import configurator, daijin_configurator -from Mikado.parsers import to_gff from Mikado.picking import picker from Mikado.preparation import prepare from Mikado.scales.compare import compare, load_index from Mikado.subprograms.util.stats import Calculator +from Mikado.subprograms.prepare import prepare_launcher +from Mikado.subprograms.prepare import setup as prepare_setup from Mikado.transcripts.transcript import Namespace from Mikado.utilities.log_utils import create_null_logger, create_default_logger from Mikado.parsers.GFF import GffLine @@ -253,24 +255,49 @@ def test_cdna_redundant_cds_not(self): gtf = pkg_resources.resource_filename("Mikado.tests", "cds_test_1.gtf") self.conf["prepare"]["files"]["gff"] = [gtf] self.conf["prepare"]["files"]["labels"] = [""] - self.conf["prepare"]["files"]["output_dir"] = tempfile.gettempdir() self.conf["prepare"]["files"]["out_fasta"] = "mikado_prepared.fasta" self.conf["prepare"]["files"]["out"] = "mikado_prepared.gtf" + self.conf["prepare"]["files"]["log"] = "prepare.log" self.conf["prepare"]["strip_cds"] = False - args = Namespace() + args = Namespace(default=None) args.strip_cds = False - args.json_conf = self.conf - for b in (False, ): + args.json_conf = self.conf.copy() + del args.json_conf["prepare"]["files"]["output_dir"] + args.log = None + for b in (False, True): with self.subTest(b=b): + folder = tempfile.mktemp() + os.makedirs(folder) + # _ = open(os.path.join(folder, args.json_conf["prepare"]["files"]["log"]), "wt") + args.output_dir = folder + args.list = None + args.gffs = None + args.strand_specific_assemblies = None + args.labels = None args.json_conf = self.conf args.keep_redundant = b - # args.json_conf["prepare"]["keep_redundant"] = b + args.out, args.out_fasta = None, None + args.json_conf["prepare"]["files"]["log"] = "prepare.log" + args.log = None self.logger.setLevel("DEBUG") - prepare.prepare(args, self.logger) - self.assertTrue(os.path.exists(os.path.join(self.conf["prepare"]["files"]["output_dir"], - "mikado_prepared.fasta"))) - fa = pyfaidx.Fasta(os.path.join(self.conf["prepare"]["files"]["output_dir"], + args, _ = prepare_setup(args) + self.assertEqual(args.output_dir, folder) + self.assertEqual(args.json_conf["prepare"]["files"]["output_dir"], folder) + self.assertIn(os.path.dirname(args.json_conf["prepare"]["files"]["out_fasta"]), + (folder, ""), args.json_conf) + self.assertIn(os.path.dirname(args.json_conf["prepare"]["files"]["out"]), + (folder, ""), args.json_conf) + + with self.assertRaises(SystemExit) as exi: + prepare_launcher(args) + self.assertTrue(os.path.exists(folder)) + self.assertTrue(os.path.isdir(folder)) + self.assertEqual(exi.exception.code, 0) + self.assertTrue(os.path.exists(os.path.join(folder, + "mikado_prepared.fasta")), + open(os.path.join(folder, "prepare.log")).read()) + fa = pyfaidx.Fasta(os.path.join(folder, "mikado_prepared.fasta")) if b is True: @@ -282,7 +309,7 @@ def test_cdna_redundant_cds_not(self): self.assertTrue("AT5G01530.1" in fa.keys() or "AT5G01530.2" in fa.keys()) self.assertIn("AT5G01530.3", fa.keys()) self.assertIn("AT5G01530.4", fa.keys()) - gtf_file = os.path.join(self.conf["prepare"]["files"]["output_dir"], "mikado_prepared.gtf") + gtf_file = os.path.join(folder, "mikado_prepared.gtf") fa.close() coding_count = 0 with to_gff(gtf_file) as gtf: @@ -321,6 +348,7 @@ def test_cdna_redundant_cds_not(self): self.assertTrue(a5.is_complete) self.assertGreater(coding_count, 0) + shutil.rmtree(folder) def test_negative_cdna_redundant_cds_not(self): """This test will verify whether the new behaviour of not considering redundant two models with same @@ -329,9 +357,9 @@ def test_negative_cdna_redundant_cds_not(self): gtf = pkg_resources.resource_filename("Mikado.tests", "cds_test_2.gtf") self.conf["prepare"]["files"]["gff"] = [gtf] self.conf["prepare"]["files"]["labels"] = [""] - self.conf["prepare"]["files"]["output_dir"] = tempfile.gettempdir() self.conf["prepare"]["files"]["out_fasta"] = "mikado_prepared.fasta" self.conf["prepare"]["files"]["out"] = "mikado_prepared.gtf" + self.conf["prepare"]["files"]["log"] = "prepare.log" self.conf["prepare"]["strip_cds"] = False self.conf["prepare"]["minimum_length"] = 150 # Necessary for testing A5 @@ -340,9 +368,16 @@ def test_negative_cdna_redundant_cds_not(self): args.json_conf = self.conf for b in (False, ): with self.subTest(b=b): + folder = tempfile.mktemp() + os.makedirs(folder) args.json_conf = self.conf args.keep_redundant = b - # args.json_conf["prepare"]["keep_redundant"] = b + args.output_dir = folder + args.log = None + args.gff = None + args.list = None + args.strand_specific_assemblies = None + args, _ = prepare_setup(args) prepare.prepare(args, self.logger) self.assertTrue(os.path.exists(os.path.join(self.conf["prepare"]["files"]["output_dir"], "mikado_prepared.fasta"))) @@ -422,6 +457,7 @@ def test_negative_cdna_redundant_cds_not(self): self.assertGreater(coding_count, 0) fa.close() + shutil.rmtree(folder) def test_truncated_cds(self): files = ["test_truncated_cds.gff3"] diff --git a/Mikado/tests/test_utils.py b/Mikado/tests/test_utils.py new file mode 100644 index 000000000..942016528 --- /dev/null +++ b/Mikado/tests/test_utils.py @@ -0,0 +1,42 @@ +import threading +import multiprocessing as mp +import multiprocessing.queues as mpqueues + + +class ProcRunner(threading.Thread): + + def __init__(self, function: [mp.Process], *args, **kwargs): + + self.__function = function + self.args = args + self.kwargs = kwargs + self._func = self.__function(*self.args, **self.kwargs) + super().__init__() + + def run(self): + self._func.run() + + def __enter__(self): + return self + + def __exit__(self): + self.stop() + + @property + def func(self): + return self._func + + def join(self, *args, **kwargs): + if self.func._popen is not None: + self.func.join() + self.func.terminate() + super().join(timeout=0.1) + if self._tstate_lock is not None: + assert hasattr(self._tstate_lock, "release") + if self._tstate_lock.locked(): + self._tstate_lock.release() + self._stop() + + def stop(self): + self.join() + self._stop() \ No newline at end of file