diff --git a/Mikado/parsers/bed12.py b/Mikado/parsers/bed12.py index 9df9074f3..32ab3359a 100644 --- a/Mikado/parsers/bed12.py +++ b/Mikado/parsers/bed12.py @@ -1695,7 +1695,6 @@ def gff_next(self, line, sequence): return bed12 def run(self, *args, **kwargs): - print("Started", self.__identifier) self.handler = logging_handlers.QueueHandler(self.logging_queue) self.logger = logging.getLogger(self.name) self.logger.addHandler(self.handler) diff --git a/Mikado/serializers/blast_serializer/tabular_utils.py b/Mikado/serializers/blast_serializer/tabular_utils.py index 894b13e27..eb927c478 100644 --- a/Mikado/serializers/blast_serializer/tabular_utils.py +++ b/Mikado/serializers/blast_serializer/tabular_utils.py @@ -298,9 +298,6 @@ def run(self): with open(self.params_file, "rb") as pfile: params = msgpack.loads(pfile.read(), raw=False, strict_map_key=False) self.columns = params["columns"] - with open(self.index_file, "rb") as index_handle: - self.indexes = msgpack.loads(index_handle.read(), raw=False, strict_map_key=False) - os.remove(self.index_file) # Clean it up prep_hit = partial(prepare_tab_hit, columns=self.columns, qmult=self.qmult, tmult=self.tmult, matrix_name=self.matrix_name) @@ -316,12 +313,15 @@ def run(self): session = Session(bind=self.engine) self.session = session hits, hsps = [], [] - for key, rows in self.indexes: - curr_hit, curr_hsps = prep_hit(key, rows) - hits.append(curr_hit) - hsps += curr_hsps - hits, hsps = load_into_db(self, hits, hsps, force=False, raw=True) + with open(self.index_file, "rb") as index_handle: + for key, rows in msgpack.Unpacker(index_handle, raw=False, strict_map_key=False): + curr_hit, curr_hsps = prep_hit(key, rows) + hits.append(curr_hit) + hsps += curr_hsps + hits, hsps = load_into_db(self, hits, hsps, force=False, raw=True) _, _ = load_into_db(self, hits, hsps, force=True, raw=True) + self.logger.debug("Finished %s", self.identifier) + os.remove(self.index_file) # Clean it up return @@ -349,7 +349,7 @@ def parse_tab_blast(self, index_files = dict((idx, tempfile.mktemp(suffix=".csv")) for idx in range(procs)) kwargs = {"conf": conf, - "maxobjects": int(self.maxobjects), + "maxobjects": max(int(self.maxobjects / procs), 1), "lock": lock, "matrix_name": matrix_name, "qmult": qmult, @@ -392,8 +392,9 @@ def parse_tab_blast(self, # Split the indices for idx, split in enumerate(np.array_split(np.array(list(groups.items())), procs)): with open(index_files[idx], "wb") as index: - vals = [(tuple(item[0]), values[item[1], :].tolist()) for item in split] - index.write(msgpack.dumps(vals)) + for item in split: + vals = (tuple(item[0]), values[item[1], :].tolist()) + msgpack.dump(vals, index) assert os.path.exists(index_files[idx]) processes[idx].start()