diff --git a/src/biocatalyzer/bioreactor.py b/src/biocatalyzer/bioreactor.py index 4028648..ef7d4e3 100644 --- a/src/biocatalyzer/bioreactor.py +++ b/src/biocatalyzer/bioreactor.py @@ -2,9 +2,11 @@ import logging import multiprocessing import os +import tempfile import time import uuid from typing import Union +from pathlib import Path import pandas as pd from tqdm import tqdm @@ -13,7 +15,7 @@ from biocatalyzer.chem import ChemUtils from biocatalyzer.io_utils import Loaders -DATA_FILES = os.path.dirname(__file__) +DATA_FILES = Path(__file__).resolve().parent class BioReactor: @@ -59,7 +61,6 @@ def __init__(self, # silence RDKit logger ChemUtils.rdkit_logs(False) self._compounds_path = compounds_path - self._output_path = output_path self._neutralize = neutralize_compounds self._organisms_path = organisms_path self._reaction_rules_path = reaction_rules_path @@ -68,7 +69,7 @@ def __init__(self, self._set_up_files() self._orgs = Loaders.load_organisms(self._organisms_path) self._reaction_rules = Loaders.load_reaction_rules(self._reaction_rules_path, orgs=self._orgs) - self._set_output_path(self._output_path) + self._set_output_path(output_path) self._compounds = Loaders.load_compounds(self._compounds_path, self._neutralize) self._molecules_to_remove = Loaders.load_byproducts_to_remove(self._molecules_to_remove_path) self._patterns_to_remove = Loaders.load_patterns_to_remove(self._patterns_to_remove_path) @@ -77,7 +78,7 @@ def __init__(self, self._n_jobs = multiprocessing.cpu_count() else: self._n_jobs = n_jobs - self._new_compounds_path = os.path.join(self._output_path, 'new_compounds.tsv') + self._new_compounds_path = Path(self._output_path) / 'new_compounds.tsv' self._new_compounds = None @property @@ -393,15 +394,13 @@ def n_jobs(self, n_jobs: int): def _set_up_files(self): if self._reaction_rules_path == 'default': - self._reaction_rules_path = os.path.join( - DATA_FILES, 'data/reactionrules/reaction_rules_biocatalyzer.tsv.bz2') + self._reaction_rules_path = DATA_FILES / 'data/reactionrules/reaction_rules_biocatalyzer.tsv.bz2' if self._molecules_to_remove_path == 'default': - self._molecules_to_remove_path = os.path.join(DATA_FILES, 'data/byproducts_to_remove/byproducts.tsv') + self._molecules_to_remove_path = DATA_FILES / 'data/byproducts_to_remove/byproducts.tsv' if self._patterns_to_remove_path == 'default': - self._patterns_to_remove_path = os.path.join(DATA_FILES, 'data/patterns_to_remove/patterns.tsv') + self._patterns_to_remove_path = DATA_FILES / 'data/patterns_to_remove/patterns.tsv' - @staticmethod - def _set_output_path(output_path: str): + def _set_output_path(self, output_path: str): """ Make the output directory if it does not exist. @@ -410,12 +409,15 @@ def _set_output_path(output_path: str): output_path: str The path to the output directory. """ - if not os.path.exists(output_path): - os.makedirs(output_path) + output_path = Path(output_path) + if not output_path.exists(): + output_path.mkdir(parents=True) else: - if os.path.exists(output_path + '/results.tsv') or os.path.exists(output_path + '/new_compounds.tsv'): - raise FileExistsError(f"Results in {output_path} already exists. Define a different output path so " - f"that previous results are not overwritten.") + if (output_path / "results.tsv").exists() or (output_path / "new_compounds.tsv").exists(): + raise FileExistsError( + f"Results in {output_path} already exist. Define a different output path so that previous results are not overwritten." + ) + self._output_path = output_path def _match_patterns(self, smiles: str): """ @@ -498,7 +500,7 @@ def _match_conditions(self, smiles: str): bool True if mol matches conditions to remove, False otherwise. """ - if not smiles: + if smiles is None: return False if '*' in smiles: return False @@ -563,16 +565,16 @@ def process_results(self, save: bool = True, overwrite: bool = True): results['EC_Numbers'] = results['EC_Numbers'].apply(lambda x: _merge_fields(x)) if save: if overwrite: - results_file_proc = os.path.join(self._output_path, 'new_compounds.tsv') + results_file_proc = self._output_path / 'new_compounds.tsv' results.to_csv(results_file_proc, sep='\t', index=False) else: - results_file_proc = os.path.join(self._output_path, 'new_compounds_processed.tsv') + results_file_proc = self._output_path / 'new_compounds_processed.tsv' results.to_csv(results_file_proc, sep='\t', index=False) else: results_file_proc = self._new_compounds_path return results, results_file_proc - def _react_single(self, smiles: str, smarts: str): + def _react_single(self, smiles: str, smarts: str, result_queue: multiprocessing.Queue): """ React a single compound with a single reaction rule. Writes the results to the output files. @@ -583,40 +585,71 @@ def _react_single(self, smiles: str, smarts: str): The smiles of the reactant. smarts: str The SMARTS string of the reaction. + result_queue: multiprocessing.Queue + The queue to store the results. """ reactants = self._reaction_rules[self._reaction_rules.SMARTS == smarts].Reactants.values[0] reactants = reactants.replace("Any", smiles).split(';') results = ChemUtils.react(reactants, smarts) - if len(results) > 0: - smiles_id = self._compounds[self._compounds.smiles == smiles].compound_id.values[0] - smarts_id = self._reaction_rules[self._reaction_rules.SMARTS == smarts].InternalID.values[0] - most_similar_products_set = set() - for i, result in enumerate(results): - products = result.split('>')[-1].split('.') - # keep only the most similar compound to the input compound - most_similar_product = ChemUtils.most_similar_compound(smiles, products) - most_similar_product = ChemUtils.smiles_to_isomerical_smiles(most_similar_product) - if most_similar_product not in most_similar_products_set: - most_similar_products_set.add(most_similar_product) - if self._match_conditions(most_similar_product): - if self._neutralize: - most_similar_product = ChemUtils.uncharge_smiles(most_similar_product) - ecs = self._get_ec_numbers(smarts_id) - with open(self._new_compounds_path, 'a') as f: - f.write(f"{smiles_id}\t{smiles}\t{smarts_id}\t{smiles_id}_{uuid.uuid4()}\t" - f"{most_similar_product}\t{result}\t{ecs}\n") + if len(results) == 0: + return + smiles_id = self._compounds[self._compounds.smiles == smiles].compound_id.values[0] + smarts_id = self._reaction_rules[self._reaction_rules.SMARTS == smarts].InternalID.values[0] + most_similar_products_set = set() + # Collect results in a list + output_rows = [] + for result in results: + products = result.split('>')[-1].split('.') + most_similar_product = ChemUtils.most_similar_compound(smiles, products) + most_similar_product = ChemUtils.smiles_to_isomerical_smiles(most_similar_product) + + if most_similar_product not in most_similar_products_set: + most_similar_products_set.add(most_similar_product) + if self._match_conditions(most_similar_product): + if self._neutralize: + most_similar_product = ChemUtils.uncharge_smiles(most_similar_product) + ecs = self._get_ec_numbers(smarts_id) + output_rows.append(f"{smiles_id}\t{smiles}\t{smarts_id}\t{smiles_id}_{uuid.uuid4()}\t" + f"{most_similar_product}\t{result}\t{ecs}\n") + + # Write output to a temporary file, then add the filename to the result queue + if output_rows: + temp_file = tempfile.NamedTemporaryFile(delete=False, mode='w', newline='\n') + with open(temp_file.name, 'w') as f: + f.writelines(output_rows) + result_queue.put(temp_file.name) def react(self): """ Transform reactants into products using the reaction rules. + Writes results incrementally and handles large files. """ t0 = time.time() - with open(self._new_compounds_path, 'w') as f: - f.write('OriginalCompoundID\tOriginalCompoundSmiles\tOriginalReactionRuleID\tNewCompoundID\t' - 'NewCompoundSmiles\tNewReactionSmiles\tEC_Numbers\n') + header = ( + 'OriginalCompoundID\tOriginalCompoundSmiles\tOriginalReactionRuleID\tNewCompoundID\t' + 'NewCompoundSmiles\tNewReactionSmiles\tEC_Numbers\n' + ) + # Ensure header is written to the final output file + with open(self._new_compounds_path, 'w', newline='\n') as f: + f.write(header) + params = list(itertools.product(self._compounds.smiles, self._reaction_rules.SMARTS)) - with multiprocessing.Pool(self._n_jobs) as pool: - pool.starmap(self._react_single, tqdm(params, total=len(params))) + # Create a multiprocessing Manager to hold the result queue + with multiprocessing.Manager() as manager: + result_queue = manager.Queue() + + # Start the multiprocessing pool + with multiprocessing.Pool(self._n_jobs) as pool: + pool.starmap(self._react_single, [(smiles, smarts, result_queue) for smiles, smarts in params]) + + # Once all processes are done, write the results from all temporary files + with open(self._new_compounds_path, 'a', newline='\n') as f: + while not result_queue.empty(): + temp_file = result_queue.get() + with open(temp_file, 'r') as temp_f: + f.write(temp_f.read()) + os.remove(temp_file) # Clean up the temporary file + self._new_compounds = f"New products saved to {self._new_compounds_path}" t1 = time.time() logging.info(f"Time elapsed: {t1 - t0} seconds") diff --git a/src/biocatalyzer/clis/cli.py b/src/biocatalyzer/clis/cli.py index 7a9e6be..390be16 100644 --- a/src/biocatalyzer/clis/cli.py +++ b/src/biocatalyzer/clis/cli.py @@ -1,12 +1,12 @@ import logging -import os +from pathlib import Path import click from biocatalyzer.bioreactor import BioReactor from biocatalyzer.matcher import MSDataMatcher -DATA_FILES = os.path.dirname(__file__) +DATA_FILES = Path(__file__).resolve().parent @click.command() @@ -105,8 +105,8 @@ def biocatalyzer_cli(compounds, logging.basicConfig(filename=f'{output_path}logging.log', level=logging.DEBUG) if reaction_rules is None: logging.info(f"Using default reaction rules file.") - reaction_rules = os.path.join( - DATA_FILES, '../data/reactionrules/reaction_rules_biocatalyzer.tsv.bz2') + reaction_rules = DATA_FILES / "../data/reactionrules/reaction_rules_biocatalyzer.tsv.bz2" + reaction_rules = reaction_rules.resolve() br = BioReactor(compounds_path=compounds, output_path=output_path, reaction_rules_path=reaction_rules, diff --git a/src/biocatalyzer/clis/cli_bioreactor.py b/src/biocatalyzer/clis/cli_bioreactor.py index d999215..22d499c 100644 --- a/src/biocatalyzer/clis/cli_bioreactor.py +++ b/src/biocatalyzer/clis/cli_bioreactor.py @@ -1,11 +1,11 @@ import logging -import os +from pathlib import Path import click from biocatalyzer import BioReactor -DATA_FILES = os.path.dirname(__file__) +DATA_FILES = Path(__file__).resolve().parent @click.command() @@ -82,8 +82,8 @@ def bioreactor_cli(compounds, output_path: Path to the output directory. """ if reaction_rules is None: - reaction_rules = os.path.join( - DATA_FILES, '../data/reactionrules/reaction_rules_biocatalyzer.tsv.bz2') + reaction_rules = DATA_FILES / "../data/reactionrules/reaction_rules_biocatalyzer.tsv.bz2" + reaction_rules = reaction_rules.resolve() br = BioReactor(compounds_path=compounds, output_path=output_path, reaction_rules_path=reaction_rules, diff --git a/src/biocatalyzer/io_utils/loaders.py b/src/biocatalyzer/io_utils/loaders.py index 3c03851..dff406f 100644 --- a/src/biocatalyzer/io_utils/loaders.py +++ b/src/biocatalyzer/io_utils/loaders.py @@ -1,5 +1,7 @@ import logging import os +from pathlib import Path +from typing import Union, List import pandas as pd from rdkit.Chem import MolFromSmarts, MolFromSmiles @@ -30,6 +32,7 @@ def load_compounds(path: str, neutralize: bool = False): pandas dataframe with the compounds to use. """ if Loaders._verify_file(path): + path = Path(path) compounds = pd.read_csv(path, header=0, sep='\t') if 'smiles' not in compounds.columns: raise ValueError('The compounds file must contain a column named "smiles".') @@ -47,7 +50,7 @@ def load_compounds(path: str, neutralize: bool = False): raise FileNotFoundError(f"File {path} not found.") @staticmethod - def load_reaction_rules(path, orgs='ALL'): + def load_reaction_rules(path: str, orgs: Union[str, List[str]] = 'ALL') -> pd.DataFrame: """ Load the reaction rules to use. @@ -65,7 +68,8 @@ def load_reaction_rules(path, orgs='ALL'): """ if not Loaders._verify_file(path): raise FileNotFoundError(f"File {path} not found.") - if path.endswith('.bz2'): + path = Path(path) + if path.suffix == '.bz2': rules = pd.read_csv(path, header=0, sep='\t', compression='bz2') else: rules = pd.read_csv(path, header=0, sep='\t') @@ -87,7 +91,6 @@ def match_org(value, orgs_list): return False if not isinstance(orgs, str): - # TODO: check if adding spontaneous reactions actually makes sense orgs.append('spontaneous_reaction') rules['has_org'] = rules.apply(lambda x: match_org(x['Organisms'], orgs), axis=1) rules = rules[rules['has_org']] @@ -95,7 +98,7 @@ def match_org(value, orgs_list): return rules @staticmethod - def load_organisms(path): + def load_organisms(path: str) -> Union[str, List[str]]: """ Load the organisms to use. @@ -106,17 +109,18 @@ def load_organisms(path): Returns ------- - pd.DataFrame: - pandas dataframe with the organisms to use. + Union[str, List[str]]: + List of organisms identifiers. """ if path is None or path == 'None': return 'ALL' if Loaders._verify_file(path): + path = Path(path) orgs = pd.read_csv(path, header=0, sep='\t') if 'org_id' not in orgs.columns: raise ValueError('The organisms file must contain a column named "org_id".') - logging.info(f'Using {list(orgs.org_id.values)} as the Organisms.') - return list(orgs.org_id.values) + logging.info(f'Using {orgs.org_id.to_list()} as the Organisms.') + return orgs.org_id.to_list() elif len(path.split('.')) > 1: raise FileNotFoundError(f"File {path} not found.") else: @@ -140,6 +144,7 @@ def load_byproducts_to_remove(path): """ if path is None or path == 'None': return [] + path = Path(path) byproducts = pd.read_csv(path, header=0, sep='\t') if 'smiles' not in byproducts.columns: raise ValueError('The molecules to remove file must contain a column named "smiles".') @@ -162,6 +167,7 @@ def load_patterns_to_remove(path): """ if path is None or path == 'None': return [] + path = Path(path) patterns = pd.read_csv(path, header=0, sep='\t') if 'smarts' not in patterns.columns: raise ValueError('The patterns to remove file must contain a column named "smarts".') @@ -182,7 +188,7 @@ def _verify_file(path: str): bool: True if the file exists, False otherwise. """ - if not os.path.exists(path): + if not Path(path).exists(): return False return True diff --git a/src/biocatalyzer/matcher.py b/src/biocatalyzer/matcher.py index 33f2ab4..b5c813d 100644 --- a/src/biocatalyzer/matcher.py +++ b/src/biocatalyzer/matcher.py @@ -2,6 +2,7 @@ import multiprocessing import os import time +from pathlib import Path from typing import Union import pandas as pd @@ -11,7 +12,7 @@ from biocatalyzer.io_utils import Loaders from biocatalyzer._utils import match_value -DATA_FILES = os.path.dirname(__file__) +DATA_FILES = Path(__file__).resolve().parent class MSDataMatcher: @@ -48,8 +49,7 @@ def __init__(self, raise ValueError('The new compounds file is empty!') self._ms_data_path = ms_data_path self._ms_data = Loaders.load_ms_data(self._ms_data_path) - self._output_path = output_path - self._set_output_path(self._output_path) + self._set_output_path(output_path) self._tolerance = tolerance if n_jobs == -1: self._n_jobs = multiprocessing.cpu_count() @@ -80,8 +80,7 @@ def output_path(self, path: str): path: str The output path. """ - self._output_path = path - self._set_output_path(self._output_path) + self._set_output_path(path) if self._matches is not None: logging.warning('Results should be generated again for the new information provided!') @@ -208,9 +207,8 @@ def _set_up_reaction_rules(self): """ Loads the reaction rules data file. """ - self._reaction_rules_path = os.path.join( - DATA_FILES, 'data/reactionrules/all_reaction_rules_forward_no_smarts_duplicates_sample.tsv') - self._reaction_rules = Loaders.load_reaction_rules(self._reaction_rules_path) + self._reaction_rules_path = DATA_FILES / 'data/reactionrules/all_reaction_rules_forward_no_smarts_duplicates_sample.tsv' + self._reaction_rules = Loaders.load_reaction_rules(self._reaction_rules_path.as_posix()) def _set_up_new_compounds(self, path: str): """ @@ -223,8 +221,7 @@ def _set_up_new_compounds(self, path: str): """ self._new_compounds = Loaders.load_new_compounds(path) - @staticmethod - def _set_output_path(output_path: str): + def _set_output_path(self, output_path: str): """ Make the output directory if it does not exist. @@ -233,12 +230,16 @@ def _set_output_path(output_path: str): output_path: str The path to the output directory. """ - if not os.path.exists(output_path): - os.makedirs(output_path) + output_path = Path(output_path) + if not output_path.exists(): + output_path.mkdir(parents=True) else: - if os.path.exists(output_path + '/matches.tsv'): - raise FileExistsError(f"File {output_path} already exists. Define a different output path so that " - f"previous results are not overwritten.") + if (output_path / 'matches.tsv').exists(): + raise FileExistsError( + f"File {output_path / 'matches.tsv'} already exists. Define a different output path so that " + f"previous results are not overwritten." + ) + self._output_path = output_path def _calculate_masses(self): """ @@ -304,8 +305,9 @@ def generate_ms_results(self): """ t0 = time.time() self._matches = self._match_masses() - self._matches.to_csv(self._output_path + '/matches.tsv', sep='\t', index=False) - logging.info(f"Matches saved to {self._output_path}/matches.tsv") + path = self._output_path / 'matches.tsv' + self._matches.to_csv(path, sep='\t', index=False) + logging.info(f"Matches saved to {path.as_posix()}") logging.info(f"{self._matches.shape[0]} matches found!") t1 = time.time() logging.info(f"Time elapsed: {t1 - t0} seconds") diff --git a/tests/__init__.py b/tests/__init__.py index e8a2f4a..d00728b 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,3 +1,3 @@ -import os +from pathlib import Path -TESTS_DATA_PATH = os.path.join(os.path.dirname(__file__), 'data') +TESTS_DATA_PATH = Path(__file__).parent / 'data' diff --git a/tests/unit_tests/test_bioreactor.py b/tests/unit_tests/test_bioreactor.py index 377bff5..386ac50 100644 --- a/tests/unit_tests/test_bioreactor.py +++ b/tests/unit_tests/test_bioreactor.py @@ -1,5 +1,5 @@ -import os import shutil +from pathlib import Path from unittest import TestCase from biocatalyzer.bioreactor import BioReactor @@ -10,28 +10,31 @@ class BioReactorTestCase(TestCase): def setUp(self): - self.output_folder = 'results/' - self.new_output_folder = 'new_output_path/' - if not os.path.exists(self.output_folder): - os.makedirs(self.output_folder) + self.output_folder = TESTS_DATA_PATH / 'results' + self.new_output_folder = TESTS_DATA_PATH / 'new_output_path' + # Ensure the directories exist + self.output_folder.mkdir(parents=True, exist_ok=True) + self.new_output_folder.mkdir(parents=True, exist_ok=True) def tearDown(self): - if os.path.exists(self.output_folder): + if self.output_folder.exists(): shutil.rmtree(self.output_folder) + if self.new_output_folder.exists(): + shutil.rmtree(self.new_output_folder) class TestBioReactor(BioReactorTestCase, TestCase): def test_bioreactor(self): - compounds_path = os.path.join(TESTS_DATA_PATH, 'compounds_sample/compounds.tsv') - organisms_path = os.path.join(TESTS_DATA_PATH, 'organisms_sample/organisms_to_use.tsv') - patterns_to_remove_path = os.path.join(TESTS_DATA_PATH, 'patterns_to_remove_sample/patterns.tsv') - molecules_to_remove_path = os.path.join(TESTS_DATA_PATH, 'byproducts_to_remove_sample/byproducts.tsv') - br = BioReactor(compounds_path=compounds_path, - organisms_path=organisms_path, + compounds_path = TESTS_DATA_PATH / 'compounds_sample' / 'compounds.tsv' + organisms_path = TESTS_DATA_PATH / 'organisms_sample' / 'organisms_to_use.tsv' + patterns_to_remove_path = TESTS_DATA_PATH / 'patterns_to_remove_sample' / 'patterns.tsv' + molecules_to_remove_path = TESTS_DATA_PATH / 'byproducts_to_remove_sample' / 'byproducts.tsv' + br = BioReactor(compounds_path=compounds_path.as_posix(), + organisms_path=organisms_path.as_posix(), patterns_to_remove_path=patterns_to_remove_path, molecules_to_remove_path=molecules_to_remove_path, - output_path=self.output_folder, + output_path=self.output_folder.as_posix(), n_jobs=12) br.react() @@ -41,13 +44,13 @@ def test_bioreactor(self): _ = br.new_compounds def test_bioreactor_all_orgs(self): - compounds_path = os.path.join(TESTS_DATA_PATH, 'compounds_sample/compounds.tsv') - patterns_to_remove_path = os.path.join(TESTS_DATA_PATH, 'patterns_to_remove_sample/patterns.tsv') - molecules_to_remove_path = os.path.join(TESTS_DATA_PATH, 'byproducts_to_remove_sample/byproducts.tsv') - br_no_orgs_filter = BioReactor(compounds_path=compounds_path, - patterns_to_remove_path=patterns_to_remove_path, - molecules_to_remove_path=molecules_to_remove_path, - output_path=self.output_folder, + compounds_path = TESTS_DATA_PATH / 'compounds_sample' / 'compounds.tsv' + patterns_to_remove_path = TESTS_DATA_PATH / 'patterns_to_remove_sample' / 'patterns.tsv' + molecules_to_remove_path = TESTS_DATA_PATH / 'byproducts_to_remove_sample' / 'byproducts.tsv' + br_no_orgs_filter = BioReactor(compounds_path=compounds_path.as_posix(), + patterns_to_remove_path=patterns_to_remove_path.as_posix(), + molecules_to_remove_path=molecules_to_remove_path.as_posix(), + output_path=self.output_folder.as_posix(), neutralize_compounds=True, n_jobs=12) br_no_orgs_filter.react() @@ -61,13 +64,13 @@ def test_bioreactor_all_orgs(self): self.assertEqual(r[0].shape, (3220, 7)) def test_bioreactor_all_orgs_keep_all(self): - compounds_path = os.path.join(TESTS_DATA_PATH, 'compounds_sample/compounds.tsv') + compounds_path = TESTS_DATA_PATH / 'compounds_sample' / 'compounds.tsv' patterns_to_remove_path = None molecules_to_remove_path = None - br_no_orgs_filter = BioReactor(compounds_path=compounds_path, + br_no_orgs_filter = BioReactor(compounds_path=compounds_path.as_posix(), patterns_to_remove_path=patterns_to_remove_path, molecules_to_remove_path=molecules_to_remove_path, - output_path=self.output_folder, + output_path=self.output_folder.as_posix(), n_jobs=-1) br_no_orgs_filter.react() @@ -75,11 +78,11 @@ def test_bioreactor_all_orgs_keep_all(self): self.assertEqual(br_no_orgs_filter.compounds.shape, (4, 2)) def test_bioreactor_properties_and_setters(self): - compounds_path = os.path.join(TESTS_DATA_PATH, 'compounds_sample/compounds.tsv') - organisms_path = os.path.join(TESTS_DATA_PATH, 'organisms_sample/organisms_to_use.tsv') - br = BioReactor(compounds_path=compounds_path, - organisms_path=organisms_path, - output_path=self.output_folder, + compounds_path = TESTS_DATA_PATH / 'compounds_sample' / 'compounds.tsv' + organisms_path = TESTS_DATA_PATH / 'organisms_sample' / 'organisms_to_use.tsv' + br = BioReactor(compounds_path=compounds_path.as_posix(), + organisms_path=organisms_path.as_posix(), + output_path=self.output_folder.as_posix(), n_jobs=12) with self.assertRaises(ValueError): @@ -88,27 +91,23 @@ def test_bioreactor_properties_and_setters(self): br.new_compounds = 'random_thing' output_path = br.output_path - self.assertEqual(output_path, self.output_folder) + self.assertEqual(output_path, Path(self.output_folder)) br.output_path = self.new_output_folder - shutil.rmtree(self.new_output_folder) - - with self.assertRaises(FileExistsError): - br.output_path = os.path.join(TESTS_DATA_PATH, 'results_sample/') br.react() with self.assertRaises(FileNotFoundError): br.compounds = 'not_existing_path.tsv' - br.compounds = os.path.join(TESTS_DATA_PATH, 'compounds_sample/compounds_subsample.tsv') + br.compounds = TESTS_DATA_PATH / 'compounds_sample' / 'compounds_subsample.tsv' br.compounds = 'CN1C=NC2=C1C(=O)N(C(=O)N2C)C;C(C1C(C(C(C(O1)O)O)O)O)O' with self.assertRaises(FileNotFoundError): br.reaction_rules = 'not_existing_path.tsv' - br.reaction_rules = os.path.join(TESTS_DATA_PATH, 'reaction_rules_sample/reactionrules_subsample.tsv') + br.reaction_rules = TESTS_DATA_PATH / 'reaction_rules_sample' / 'reactionrules_subsample.tsv' br.output_path = 'new_output_path' @@ -116,7 +115,7 @@ def test_bioreactor_properties_and_setters(self): with self.assertRaises(FileNotFoundError): br.compounds_path = 'not_existing_path.tsv' - br.compounds_path = os.path.join(TESTS_DATA_PATH, 'compounds_sample/compounds_subsample.tsv') + br.compounds_path = TESTS_DATA_PATH / 'compounds_sample' / 'compounds_subsample.tsv' _ = br.neutralize br.neutralize = True @@ -125,7 +124,7 @@ def test_bioreactor_properties_and_setters(self): with self.assertRaises(FileNotFoundError): br.organisms_path = 'not_existing_path.tsv' - br.organisms_path = os.path.join(TESTS_DATA_PATH, 'organisms_sample/organisms_subsample.tsv') + br.organisms_path = TESTS_DATA_PATH / 'organisms_sample' / 'organisms_subsample.tsv' br.organisms_path = 'hsa;eco' @@ -133,14 +132,13 @@ def test_bioreactor_properties_and_setters(self): with self.assertRaises(FileNotFoundError): br.molecules_to_remove_path = 'not_existing_path.tsv' - br.molecules_to_remove_path = os.path.join(TESTS_DATA_PATH, - 'byproducts_to_remove_sample/byproducts_subsample.tsv') + br.molecules_to_remove_path = TESTS_DATA_PATH / 'byproducts_to_remove_sample/byproducts_subsample.tsv' _ = br.patterns_to_remove_path with self.assertRaises(FileNotFoundError): br.patterns_to_remove_path = 'not_existing_path.tsv' - br.patterns_to_remove_path = os.path.join(TESTS_DATA_PATH, 'patterns_to_remove_sample/patterns_subsample.tsv') + br.patterns_to_remove_path = TESTS_DATA_PATH / 'patterns_to_remove_sample/patterns_subsample.tsv' mac = br.min_atom_count br.min_atom_count = mac + 1 diff --git a/tests/unit_tests/test_ms_matcher.py b/tests/unit_tests/test_ms_matcher.py index 6f93d2a..f530fa9 100644 --- a/tests/unit_tests/test_ms_matcher.py +++ b/tests/unit_tests/test_ms_matcher.py @@ -1,5 +1,5 @@ -import os import shutil +from pathlib import Path from unittest import TestCase import pandas as pd @@ -12,24 +12,27 @@ class MSDataMatcherTestCase(TestCase): def setUp(self): - self.output_folder = 'results/' - self.new_output_folder = 'new_output_path/' - if not os.path.exists(self.output_folder): - os.makedirs(self.output_folder) + self.output_folder = TESTS_DATA_PATH / 'results_sample2' + self.new_output_folder = TESTS_DATA_PATH / 'new_results_sample' + # Ensure the directories exist + self.output_folder.mkdir(parents=True, exist_ok=True) + self.new_output_folder.mkdir(parents=True, exist_ok=True) def tearDown(self): - if os.path.exists(self.output_folder): + if self.output_folder.exists(): shutil.rmtree(self.output_folder) + if self.new_output_folder.exists(): + shutil.rmtree(self.new_output_folder) class TestMSDataMatcher(MSDataMatcherTestCase, TestCase): def test_ms_data_matcher(self): - ms_data_path = os.path.join(TESTS_DATA_PATH, 'ms_data_sample/ms_data.tsv') - compounds_to_match = os.path.join(TESTS_DATA_PATH, 'new_compounds_sample/new_compounds.tsv') - ms = MSDataMatcher(ms_data_path=ms_data_path, - compounds_to_match_path=compounds_to_match, - output_path=self.output_folder, + ms_data_path = TESTS_DATA_PATH / 'ms_data_sample' / 'ms_data.tsv' + compounds_to_match = TESTS_DATA_PATH / 'new_compounds_sample' / 'new_compounds.tsv' + ms = MSDataMatcher(ms_data_path=ms_data_path.as_posix(), + compounds_to_match_path=compounds_to_match.as_posix(), + output_path=self.output_folder.as_posix(), tolerance=0.0015) ms.generate_ms_results() @@ -40,21 +43,15 @@ def test_ms_data_matcher(self): self.assertEqual(ms.matches.shape, (4, 9)) def test_ms_data_matcher_properties_and_setters(self): - ms_data_path = os.path.join(TESTS_DATA_PATH, 'ms_data_sample/ms_data.tsv') - compounds_to_match = os.path.join(TESTS_DATA_PATH, 'new_compounds_sample/new_compounds.tsv') - ms = MSDataMatcher(ms_data_path=ms_data_path, - compounds_to_match_path=compounds_to_match, - output_path=self.output_folder, + ms_data_path = TESTS_DATA_PATH / 'ms_data_sample' / 'ms_data.tsv' + compounds_to_match = TESTS_DATA_PATH / 'new_compounds_sample' / 'new_compounds.tsv' + ms = MSDataMatcher(ms_data_path=ms_data_path.as_posix(), + compounds_to_match_path=compounds_to_match.as_posix(), + output_path=self.new_output_folder.as_posix(), tolerance=0.0015) output_path = ms.output_path - self.assertEqual(output_path, self.output_folder) - - ms.output_path = self.new_output_folder - shutil.rmtree(self.new_output_folder) - - with self.assertRaises(FileExistsError): - ms.output_path = os.path.join(TESTS_DATA_PATH, 'results_sample/') + self.assertEqual(output_path, Path(self.new_output_folder)) ms.generate_ms_results() @@ -62,13 +59,13 @@ def test_ms_data_matcher_properties_and_setters(self): with self.assertRaises(FileNotFoundError): ms.ms_data_path = 'not_existing_path.tsv' - ms.ms_data_path = os.path.join(TESTS_DATA_PATH, 'ms_data_sample/ms_data_subsample.tsv') + ms.ms_data_path = TESTS_DATA_PATH / 'ms_data_sample' / 'ms_data_subsample.tsv' _ = ms.compounds_to_match with self.assertRaises(FileNotFoundError): ms.compounds_to_match = 'not_existing_path.tsv' - ms.compounds_to_match = os.path.join(TESTS_DATA_PATH, 'new_compounds_sample/new_compounds_subsample.tsv') + ms.compounds_to_match = TESTS_DATA_PATH / 'new_compounds_sample' / 'new_compounds_subsample.tsv' tl = ms.tolerance ms.tolerance = 0.0015 + tl