diff --git a/.github/workflows/linting.yml b/.github/workflows/linting.yml index 6c195a15..575b11ab 100644 --- a/.github/workflows/linting.yml +++ b/.github/workflows/linting.yml @@ -30,6 +30,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip + pip install "numpy<2.0.0" pip install mypy mypy . || exit_code=$? mypy --install-types --non-interactive diff --git a/molpipeline/abstract_pipeline_elements/mol2any/mol2bitvector.py b/molpipeline/abstract_pipeline_elements/mol2any/mol2bitvector.py index 4e4533ab..66317966 100644 --- a/molpipeline/abstract_pipeline_elements/mol2any/mol2bitvector.py +++ b/molpipeline/abstract_pipeline_elements/mol2any/mol2bitvector.py @@ -13,6 +13,7 @@ import numpy as np import numpy.typing as npt +from rdkit.Chem import rdFingerprintGenerator from rdkit.DataStructs import ExplicitBitVect from scipy import sparse @@ -196,7 +197,131 @@ def pretransform_single( """ -class ABCMorganFingerprintPipelineElement(MolToFingerprintPipelineElement, abc.ABC): +class MolToRDKitGenFPElement(MolToFingerprintPipelineElement, abc.ABC): + """Abstract class for PipelineElements using the FingeprintGenerator64.""" + + def __init__( + self, + counted: bool = False, + return_as: OutputDatatype = "sparse", + name: str = "MolToRDKitGenFin", + n_jobs: int = 1, + uuid: Optional[str] = None, + ): + """Initialize abstract class. + + Parameters + ---------- + counted: bool + Whether to count the bits or not. + return_as: Literal["sparse", "dense", "explicit_bit_vect"] + Type of output. When "sparse" the fingerprints will be returned as a scipy.sparse.csr_matrix + name: str + Name of PipelineElement. + n_jobs: + Number of jobs. + uuid: Optional[str] + Unique identifier. + """ + super().__init__( + return_as=return_as, + name=name, + n_jobs=n_jobs, + uuid=uuid, + ) + self.counted = counted + + @abc.abstractmethod + def _get_fp_generator(self) -> rdFingerprintGenerator.FingeprintGenerator64: + """Get fingerprint generator. + + Returns + ------- + rdFingerprintGenerator.FingeprintGenerator64 + Fingerprint generator. + """ + + def pretransform_single( + self, value: RDKitMol + ) -> ExplicitBitVect | npt.NDArray[np.int_] | dict[int, int]: + """Transform a single compound to a dictionary. + + Keys denote the feature position, values the count. Here always 1. + + Parameters + ---------- + value: RDKitMol + Molecule for which the fingerprint is generated. + + Returns + ------- + ExplicitBitVect | npt.NDArray[np.int_] | dict[int, int] + If return_as is "explicit_bit_vect" return ExplicitBitVect. + If return_as is "dense" return numpy array. + If return_as is "sparse" return dictionary with feature-position as key and count as value. + """ + fingerprint_generator = self._get_fp_generator() + if self._return_as == "dense": + if self.counted: + return fingerprint_generator.GetCountFingerprintAsNumPy(value) + return fingerprint_generator.GetFingerprintAsNumPy(value) + + if self.counted: + fingerprint = fingerprint_generator.GetCountFingerprint(value) + else: + fingerprint = fingerprint_generator.GetFingerprint(value) + + if self._return_as == "explicit_bit_vect": + return fingerprint + + if self.counted: + return fingerprint.GetNonzeroElements() + + return {pos: 1 for pos in fingerprint.GetOnBits()} + + def get_params(self, deep: bool = True) -> dict[str, Any]: + """Get object parameters relevant for copying the class. + + Parameters + ---------- + deep: bool + If True get a deep copy of the parameters. + + Returns + ------- + dict[str, Any] + Dictionary of parameter names and values. + """ + parameters = super().get_params(deep) + if deep: + parameters["counted"] = bool(self.counted) + else: + parameters["counted"] = self.counted + + return parameters + + def set_params(self, **parameters: dict[str, Any]) -> Self: + """Set object parameters relevant for copying the class. + + Parameters + ---------- + parameters: dict[str, Any] + Dictionary of parameter names and values. + + Returns + ------- + Self + Copied object with updated parameters. + """ + parameter_dict_copy = dict(parameters) + counted = parameter_dict_copy.pop("counted", None) + if counted is not None: + self.counted = bool(counted) + super().set_params(**parameter_dict_copy) + return self + + +class ABCMorganFingerprintPipelineElement(MolToRDKitGenFPElement, abc.ABC): """Abstract Class for Morgan fingerprints.""" # pylint: disable=R0913 @@ -204,6 +329,7 @@ def __init__( self, radius: int = 2, use_features: bool = False, + counted: bool = False, return_as: Literal["sparse", "dense", "explicit_bit_vect"] = "sparse", name: str = "AbstractMorgan", n_jobs: int = 1, @@ -217,6 +343,8 @@ def __init__( Radius of fingerprint. use_features: bool Whether to represent atoms by element or category (donor, acceptor. etc.) + counted: bool + Whether to count the bits or not. return_as: Literal["sparse", "dense", "explicit_bit_vect"] Type of output. When "sparse" the fingerprints will be returned as a scipy.sparse.csr_matrix holding a sparse representation of the bit vectors. With "dense" a numpy matrix will be returned. @@ -232,6 +360,7 @@ def __init__( # pylint: disable=R0801 super().__init__( return_as=return_as, + counted=counted, name=name, n_jobs=n_jobs, uuid=uuid, diff --git a/molpipeline/mol2any/__init__.py b/molpipeline/mol2any/__init__.py index da7bb760..cf87f151 100644 --- a/molpipeline/mol2any/__init__.py +++ b/molpipeline/mol2any/__init__.py @@ -3,8 +3,10 @@ from molpipeline.mol2any.mol2bin import MolToBinary from molpipeline.mol2any.mol2concatinated_vector import MolToConcatenatedVector from molpipeline.mol2any.mol2inchi import MolToInchi, MolToInchiKey +from molpipeline.mol2any.mol2maccs_key_fingerprint import MolToMACCSFP from molpipeline.mol2any.mol2morgan_fingerprint import MolToMorganFP from molpipeline.mol2any.mol2net_charge import MolToNetCharge +from molpipeline.mol2any.mol2path_fingerprint import Mol2PathFP from molpipeline.mol2any.mol2rdkit_phys_chem import MolToRDKitPhysChem from molpipeline.mol2any.mol2smiles import MolToSmiles @@ -12,8 +14,10 @@ "MolToBinary", "MolToConcatenatedVector", "MolToSmiles", + "MolToMACCSFP", "MolToMorganFP", "MolToNetCharge", + "Mol2PathFP", "MolToInchi", "MolToInchiKey", "MolToRDKitPhysChem", diff --git a/molpipeline/mol2any/mol2maccs_key_fingerprint.py b/molpipeline/mol2any/mol2maccs_key_fingerprint.py new file mode 100644 index 00000000..701b5ef9 --- /dev/null +++ b/molpipeline/mol2any/mol2maccs_key_fingerprint.py @@ -0,0 +1,77 @@ +"""Implementation of MACCS key fingerprint.""" + +from typing import Literal + +import numpy as np +from numpy import typing as npt +from rdkit.Chem import MACCSkeys +from rdkit.DataStructs import ExplicitBitVect + +from molpipeline.abstract_pipeline_elements.mol2any.mol2bitvector import ( + MolToFingerprintPipelineElement, +) +from molpipeline.utils.molpipeline_types import RDKitMol + + +class MolToMACCSFP(MolToFingerprintPipelineElement): + """MACCS key fingerprint. + + The MACCS keys are a set of 166 keys that encode the presence or absence of + particular substructures in a molecule. The MACCS keys are a subset of the + PubChem substructure keys. + + """ + + _n_bits = 167 # MACCS keys have 166 bits + 1 bit for an all-zero vector (bit 0) + + def __init__( + self, + return_as: Literal["sparse", "dense", "explicit_bit_vect"] = "sparse", + name: str = "MolToMACCS", + n_jobs: int = 1, + uuid: str | None = None, + ) -> None: + """Initialize MolToMACCS. + + Parameters + ---------- + return_as: Literal["sparse", "dense", "explicit_bit_vect"], optional (default="sparse") + Type of output. When "sparse" the fingerprints will be returned as a + scipy.sparse.csr_matrix holding a sparse representation of the bit vectors. + With "dense" a numpy matrix will be returned. + With "explicit_bit_vect" the fingerprints will be returned as a list of RDKit's + rdkit.DataStructs.cDataStructs.ExplicitBitVect. + name: str, optional (default="MolToMACCS") + Name of PipelineElement + n_jobs: int, optional (default=1) + Number of cores to use. + uuid: str | None, optional (default=None) + UUID of the PipelineElement. + + """ + super().__init__(return_as=return_as, name=name, n_jobs=n_jobs, uuid=uuid) + + def pretransform_single( + self, value: RDKitMol + ) -> dict[int, int] | npt.NDArray[np.int_] | ExplicitBitVect: + """Transform a single molecule to MACCS key fingerprint. + + Parameters + ---------- + value : RDKitMol + RDKit molecule. + + Returns + ------- + dict[int, int] | npt.NDArray[np.int_] | ExplicitBitVect + MACCS key fingerprint. + + """ + fingerprint = MACCSkeys.GenMACCSKeys(value) + if self._return_as == "explicit_bit_vect": + return fingerprint + if self._return_as == "dense": + return np.array(fingerprint) + if self._return_as == "sparse": + return {idx: 1 for idx in fingerprint.GetOnBits()} + raise ValueError(f"Unknown return_as value: {self._return_as}") diff --git a/molpipeline/mol2any/mol2morgan_fingerprint.py b/molpipeline/mol2any/mol2morgan_fingerprint.py index dd3d20c3..2f079e38 100644 --- a/molpipeline/mol2any/mol2morgan_fingerprint.py +++ b/molpipeline/mol2any/mol2morgan_fingerprint.py @@ -11,10 +11,7 @@ import copy -import numpy as np -import numpy.typing as npt from rdkit.Chem import AllChem, rdFingerprintGenerator -from rdkit.DataStructs import ExplicitBitVect from molpipeline.abstract_pipeline_elements.mol2any.mol2bitvector import ( ABCMorganFingerprintPipelineElement, @@ -35,6 +32,7 @@ def __init__( radius: int = 2, use_features: bool = False, n_bits: int = 2048, + counted: bool = False, return_as: Literal["sparse", "dense", "explicit_bit_vect"] = "sparse", name: str = "MolToMorganFP", n_jobs: int = 1, @@ -50,6 +48,9 @@ def __init__( Instead of atoms, features are encoded in the fingerprint. [2] n_bits: int, optional (default=2048) Size of fingerprint. + counted: bool, optional (default=False) + If True, the fingerprint will be counted. + If False, the fingerprint will be binary. return_as: Literal["sparse", "dense", "explicit_bit_vect"] Type of output. When "sparse" the fingerprints will be returned as a scipy.sparse.csr_matrix holding a sparse representation of the bit vectors. With "dense" a numpy matrix will be returned. @@ -71,17 +72,17 @@ def __init__( super().__init__( radius=radius, use_features=use_features, + counted=counted, return_as=return_as, name=name, n_jobs=n_jobs, uuid=uuid, ) - if isinstance(n_bits, int) and n_bits >= 0: - self._n_bits = n_bits - else: + if not isinstance(n_bits, int) or n_bits < 1: raise ValueError( - f"Number of bits has to be a positive integer! (Received: {n_bits})" + f"Number of bits has to be a positve integer, which is > 0! (Received: {n_bits})" ) + self._n_bits = n_bits def get_params(self, deep: bool = True) -> dict[str, Any]: """Return all parameters defining the object. @@ -124,38 +125,19 @@ def set_params(self, **parameters: dict[str, Any]) -> Self: return self - def pretransform_single( - self, value: RDKitMol - ) -> ExplicitBitVect | npt.NDArray[np.int_] | dict[int, int]: - """Transform a single compound to a dictionary. - - Keys denote the feature position, values the count. Here always 1. - - Parameters - ---------- - value: RDKitMol - Molecule for which the fingerprint is generated. + def _get_fp_generator(self) -> rdFingerprintGenerator.FingerprintGenerator: + """Get the fingerprint generator. Returns ------- - dict[int, int] - Dictionary with feature-position as key and count as value. + rdFingerprintGenerator.FingerprintGenerator + RDKit fingerprint generator. """ - fingerprint_generator = rdFingerprintGenerator.GetMorganGenerator( + return rdFingerprintGenerator.GetMorganGenerator( radius=self.radius, fpSize=self._n_bits, ) - if self._return_as == "explicit_bit_vect": - return fingerprint_generator.GetFingerprint(value) - if self._return_as == "dense": - return fingerprint_generator.GetFingerprintAsNumPy(value) - # sparse return type - return { - bit_idx: 1 - for bit_idx in fingerprint_generator.GetFingerprint(value).GetOnBits() - } - def _explain_rdmol(self, mol_obj: RDKitMol) -> dict[int, list[tuple[int, int]]]: """Get central atom and radius of all features in molecule. diff --git a/molpipeline/mol2any/mol2path_fingerprint.py b/molpipeline/mol2any/mol2path_fingerprint.py new file mode 100644 index 00000000..368e8cb7 --- /dev/null +++ b/molpipeline/mol2any/mol2path_fingerprint.py @@ -0,0 +1,223 @@ +"""Implementations for the RDKit Path Fingerprint.""" + +from __future__ import annotations # for all the python 3.8 users out there. + +from typing import Any, Literal, Optional + +try: + from typing import Self # type: ignore[attr-defined] +except ImportError: + from typing_extensions import Self + +import copy + +from rdkit.Chem import rdFingerprintGenerator + +from molpipeline.abstract_pipeline_elements.mol2any.mol2bitvector import ( + MolToRDKitGenFPElement, +) + + +class Mol2PathFP( + MolToRDKitGenFPElement +): # pylint: disable=too-many-instance-attributes + """Folded Morgan Fingerprint. + + Feature-mapping to vector-positions is arbitrary. + + """ + + # pylint: disable=too-many-arguments,too-many-locals + def __init__( + self, + min_path: int = 1, + max_path: int = 7, + use_hs: bool = True, + branched_paths: bool = True, + use_bond_order: bool = True, + count_simulation: bool = False, + count_bounds: Any = None, + n_bits: int = 2048, + num_bits_per_feature: int = 2, + atom_invariants_generator: Any = None, + counted: bool = False, + return_as: Literal["sparse", "dense", "explicit_bit_vect"] = "sparse", + name: str = "Mol2PathFP", + n_jobs: int = 1, + uuid: Optional[str] = None, + ) -> None: + """Initialize Mol2PathFP. + + Parameters + ---------- + min_path: int, optional (default=1) + Minimum path length. + max_path: int, optional (default=7) + Maximum path length. + use_hs: bool, optional (default=True) + Include hydrogens (If explicit hydrogens are present in the molecule). + branched_paths: bool, optional (default=True) + Include branched paths. + use_bond_order: bool, optional (default=True) + Include bond order in path. + count_simulation: bool, optional (default=False) + Count simulation. + count_bounds: Any, optional (default=None) + Set the bins for the bond count. + n_bits: int, optional (default=2048) + Size of fingerprint. + num_bits_per_feature: int, optional (default=2) + Number of bits per feature. + atom_invariants_generator: Any, optional (default=None) + Atom invariants generator. + counted: bool, optional (default=False) + If True, the fingerprint will be counted. + If False, the fingerprint will be binary. + return_as: Literal["sparse", "dense", "explicit_bit_vect"] + Type of output. When "sparse" the fingerprints will be returned as a scipy.sparse.csr_matrix + holding a sparse representation of the bit vectors. With "dense" a numpy matrix will be returned. + With "explicit_bit_vect" the fingerprints will be returned as a list of RDKit's + rdkit.DataStructs.cDataStructs.ExplicitBitVect. + name: str, optional (default="MolToMorganFP") + Name of PipelineElement + n_jobs: int, optional (default=1) + Number of cores to use. + uuid: str | None, optional (default=None) + UUID of the PipelineElement. + + References + ---------- + [1] https://www.rdkit.org/docs/source/rdkit.Chem.rdFingerprintGenerator.html#rdkit.Chem.rdFingerprintGenerator.GetRDKitFPGenerator + """ + # pylint: disable=R0801 + super().__init__( + counted=counted, + return_as=return_as, + name=name, + n_jobs=n_jobs, + uuid=uuid, + ) + if not isinstance(n_bits, int) or n_bits < 1: + raise ValueError( + f"Number of bits has to be a positve integer, which is > 0! (Received: {n_bits})" + ) + self._n_bits = n_bits + self._min_path = min_path + self._max_path = max_path + self._use_hs = use_hs + self._branched_paths = branched_paths + self._use_bond_order = use_bond_order + self._count_simulation = count_simulation + self._count_bounds = count_bounds + self._num_bits_per_feature = num_bits_per_feature + self._atom_invariants_generator = atom_invariants_generator + + def get_params(self, deep: bool = True) -> dict[str, Any]: + """Return all parameters defining the object. + + Parameters + ---------- + deep: bool + If True get a deep copy of the parameters. + + Returns + ------- + dict[str, Any] + Dictionary of parameters. + """ + parameters = super().get_params(deep) + if deep: + parameters["min_path"] = int(self._min_path) + parameters["max_path"] = int(self._max_path) + parameters["use_hs"] = bool(self._use_hs) + parameters["branched_paths"] = bool(self._branched_paths) + parameters["use_bond_order"] = bool(self._use_bond_order) + parameters["count_simulation"] = bool(self._count_simulation) + parameters["count_bounds"] = copy.copy(self._count_bounds) + parameters["num_bits_per_feature"] = int(self._num_bits_per_feature) + parameters["atom_invariants_generator"] = copy.copy( + self._atom_invariants_generator + ) + parameters["n_bits"] = int(self._n_bits) + else: + parameters["min_path"] = self._min_path + parameters["max_path"] = self._max_path + parameters["use_hs"] = self._use_hs + parameters["branched_paths"] = self._branched_paths + parameters["use_bond_order"] = self._use_bond_order + parameters["count_simulation"] = self._count_simulation + parameters["count_bounds"] = self._count_bounds + parameters["num_bits_per_feature"] = self._num_bits_per_feature + parameters["atom_invariants_generator"] = self._atom_invariants_generator + parameters["n_bits"] = self._n_bits + return parameters + + def set_params(self, **parameters: dict[str, Any]) -> Self: + """Set parameters. + + Parameters + ---------- + parameters: dict[str, Any] + Dictionary of parameter names and values. + + Returns + ------- + Self + MolToMorganFP pipeline element with updated parameters. + """ + parameter_copy = dict(parameters) + min_path = parameter_copy.pop("min_path", None) + if min_path is not None: + self._min_path = min_path # type: ignore + max_path = parameter_copy.pop("max_path", None) + if max_path is not None: + self._max_path = max_path # type: ignore + use_hs = parameter_copy.pop("use_hs", None) + if use_hs is not None: + self._use_hs = use_hs # type: ignore + branched_paths = parameter_copy.pop("branched_paths", None) + if branched_paths is not None: + self._branched_paths = branched_paths # type: ignore + use_bond_order = parameter_copy.pop("use_bond_order", None) + if use_bond_order is not None: + self._use_bond_order = use_bond_order # type: ignore + count_simulation = parameter_copy.pop("count_simulation", None) + if count_simulation is not None: + self._count_simulation = count_simulation # type: ignore + count_bounds = parameter_copy.pop("count_bounds", None) + if count_bounds is not None: + self._count_bounds = count_bounds # type: ignore + num_bits_per_feature = parameter_copy.pop("num_bits_per_feature", None) + if num_bits_per_feature is not None: + self._num_bits_per_feature = num_bits_per_feature # type: ignore + atom_invariants_generator = parameter_copy.pop( + "atom_invariants_generator", None + ) + if atom_invariants_generator is not None: + self._atom_invariants_generator = atom_invariants_generator + n_bits = parameter_copy.pop("n_bits", None) # pylint: disable=duplicate-code + if n_bits is not None: + self._n_bits = n_bits # type: ignore + super().set_params(**parameter_copy) + return self + + def _get_fp_generator(self) -> rdFingerprintGenerator.GetRDKitFPGenerator: + """Get the fingerprint generator for the RDKit path fingerprint. + + Returns + ------- + rdFingerprintGenerator.GetRDKitFPGenerator + RDKit Path fingerprint generator. + """ + return rdFingerprintGenerator.GetRDKitFPGenerator( + minPath=self._min_path, + maxPath=self._max_path, + fpSize=self._n_bits, + useHs=self._use_hs, + branchedPaths=self._branched_paths, + useBondOrder=self._use_bond_order, + countSimulation=self._count_simulation, + countBounds=self._count_bounds, + numBitsPerFeature=self._num_bits_per_feature, + atomInvariantsGenerator=self._atom_invariants_generator, + ) diff --git a/requirements.txt b/requirements.txt index c6fab9f9..9b597696 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ joblib >= 1.3.0 loguru -numpy +numpy < 2.0.0 pandas rdkit >= 2023.9.1 scipy diff --git a/tests/test_elements/test_mol2any/test_mol2maccs_key_fingerprint.py b/tests/test_elements/test_mol2any/test_mol2maccs_key_fingerprint.py new file mode 100644 index 00000000..553c28ec --- /dev/null +++ b/tests/test_elements/test_mol2any/test_mol2maccs_key_fingerprint.py @@ -0,0 +1,104 @@ +"""Tests for the MolToMACCSFP pipeline element.""" + +from __future__ import annotations + +import unittest +from typing import Any + +import numpy as np + +from molpipeline import Pipeline +from molpipeline.any2mol import SmilesToMol +from molpipeline.mol2any import MolToMACCSFP + +# pylint: disable=duplicate-code +# Similar to test_mol2morgan_fingerprint.py and test_mol2path_fingerprint.py + +test_smiles = [ + "c1ccccc1", + "c1ccccc1C", + "NCCOCCCC(=O)O", +] + + +class TestMolToMACCSFP(unittest.TestCase): + """Unittest for MolToMACCSFP, which calculates MACCS Key Fingerprints.""" + + def test_can_be_constructed(self) -> None: + """Test if the MolToMACCSFP pipeline element can be constructed. + + Returns + ------- + None + """ + mol_fp = MolToMACCSFP() + mol_fp_copy = mol_fp.copy() + self.assertTrue(mol_fp_copy is not mol_fp) + for key, value in mol_fp.get_params().items(): + self.assertEqual(value, mol_fp_copy.get_params()[key]) + mol_fp_recreated = MolToMACCSFP(**mol_fp.get_params()) + for key, value in mol_fp.get_params().items(): + self.assertEqual(value, mol_fp_recreated.get_params()[key]) + + def test_output_types(self) -> None: + """Test equality of different output_types.""" + + smi2mol = SmilesToMol() + sparse_maccs = MolToMACCSFP(return_as="sparse") + dense_maccs = MolToMACCSFP(return_as="dense") + explicit_bit_vect_maccs = MolToMACCSFP(return_as="explicit_bit_vect") + sparse_pipeline = Pipeline( + [ + ("smi2mol", smi2mol), + ("sparse_maccs", sparse_maccs), + ], + ) + dense_pipeline = Pipeline( + [ + ("smi2mol", smi2mol), + ("dense_maccs", dense_maccs), + ], + ) + explicit_bit_vect_pipeline = Pipeline( + [ + ("smi2mol", smi2mol), + ("explicit_bit_vect_maccs", explicit_bit_vect_maccs), + ], + ) + + sparse_output = sparse_pipeline.fit_transform(test_smiles) + dense_output = dense_pipeline.fit_transform(test_smiles) + explicit_bit_vect_maccs_output = explicit_bit_vect_pipeline.fit_transform( + test_smiles + ) + + self.assertTrue(np.all(sparse_output.toarray() == dense_output)) + + self.assertTrue( + np.equal( + dense_output, + np.array(explicit_bit_vect_maccs_output), + ).all() + ) + + def test_setter_getter(self) -> None: + """Test if the setters and getters work as expected.""" + mol_fp = MolToMACCSFP() + params: dict[str, Any] = { + "return_as": "dense", + } + mol_fp.set_params(**params) + self.assertEqual(mol_fp.get_params()["return_as"], "dense") + + def test_setter_getter_error_handling(self) -> None: + """Test if the setters and getters work as expected when errors are encountered.""" + + mol_fp = MolToMACCSFP() + params: dict[str, Any] = { + "return_as": "invalid-option", + } + self.assertRaises(ValueError, mol_fp.set_params, **params) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_elements/test_mol2any/test_mol2morgan_fingerprint.py b/tests/test_elements/test_mol2any/test_mol2morgan_fingerprint.py index ce95b6c2..3a5e94a9 100644 --- a/tests/test_elements/test_mol2any/test_mol2morgan_fingerprint.py +++ b/tests/test_elements/test_mol2any/test_mol2morgan_fingerprint.py @@ -37,35 +37,29 @@ def test_can_be_constructed(self) -> None: for key, value in mol_fp.get_params().items(): self.assertEqual(value, mol_fp_recreated.get_params()[key]) - def test_sparse_dense_accordance(self) -> None: - """Test if the calculation of Morgan fingprints in dense and sparse are equal. - - Compared to precalculated values. + def test_counted_bits(self) -> None: + """Test if the option counted bits works as expected. Returns ------- None """ + mol_fp = MolToMorganFP(radius=2, n_bits=1024, return_as="dense") smi2mol = SmilesToMol() - sparse_morgan = MolToMorganFP(radius=2, n_bits=1024, return_as="sparse") - dense_morgan = MolToMorganFP(radius=2, n_bits=1024, return_as="dense") - sparse_pipeline = Pipeline( + pipeline = Pipeline( [ ("smi2mol", smi2mol), - ("sparse_morgan", sparse_morgan), + ("mol_fp", mol_fp), ], ) - dense_pipeline = Pipeline( - [ - ("smi2mol", smi2mol), - ("dense_morgan", dense_morgan), - ], + output_binary = pipeline.fit_transform(test_smiles) + pipeline.set_params(mol_fp__counted=True) + output_counted = pipeline.fit_transform(test_smiles) + self.assertTrue( + np.all(np.flatnonzero(output_counted) == np.flatnonzero(output_binary)) ) - - sparse_output = sparse_pipeline.fit_transform(test_smiles) - dense_output = dense_pipeline.fit_transform(test_smiles) - - self.assertTrue(np.all(sparse_output.toarray() == dense_output)) + self.assertTrue(np.all(output_counted >= output_binary)) + self.assertTrue(np.any(output_counted > output_binary)) def test_output_types(self) -> None: """Test equality of different output_types.""" diff --git a/tests/test_elements/test_mol2any/test_mol2path_fingerprint.py b/tests/test_elements/test_mol2any/test_mol2path_fingerprint.py new file mode 100644 index 00000000..691abfb9 --- /dev/null +++ b/tests/test_elements/test_mol2any/test_mol2path_fingerprint.py @@ -0,0 +1,147 @@ +"""Tests for the MolToPathFingerprint pipeline element.""" + +from __future__ import annotations + +import unittest +from typing import Any + +import numpy as np + +from molpipeline import Pipeline +from molpipeline.any2mol import SmilesToMol +from molpipeline.mol2any import Mol2PathFP + +# pylint: disable=duplicate-code + +test_smiles = [ + "c1ccccc1", + "c1ccccc1C", + "NCCOCCCC(=O)O", +] + + +class TestMol2PathFingerprint(unittest.TestCase): + """Unittest for Mol2PathFP, which calculates the RDKit Path Fingerprint.""" + + def test_can_be_constructed(self) -> None: + """Test if the Mol2PathFP pipeline element can be constructed. + + Returns + ------- + None + """ + mol_fp = Mol2PathFP() + mol_fp_copy = mol_fp.copy() + self.assertTrue(mol_fp_copy is not mol_fp) + for key, value in mol_fp.get_params().items(): + self.assertEqual(value, mol_fp_copy.get_params()[key]) + mol_fp_recreated = Mol2PathFP(**mol_fp.get_params()) + for key, value in mol_fp.get_params().items(): + self.assertEqual(value, mol_fp_recreated.get_params()[key]) + + def test_output_types(self) -> None: + """Test equality of different output_types.""" + + smi2mol = SmilesToMol() + sparse_path_fp = Mol2PathFP(n_bits=1024, return_as="sparse") + dense_path_fp = Mol2PathFP(n_bits=1024, return_as="dense") + explicit_bit_vect_path_fp = Mol2PathFP( + n_bits=1024, return_as="explicit_bit_vect" + ) + sparse_pipeline = Pipeline( + [ + ("smi2mol", smi2mol), + ("sparse_path_fp", sparse_path_fp), + ], + ) + dense_pipeline = Pipeline( + [ + ("smi2mol", smi2mol), + ("dense_path_fp", dense_path_fp), + ], + ) + explicit_bit_vect_pipeline = Pipeline( + [ + ("smi2mol", smi2mol), + ("explicit_bit_vect_path_fp", explicit_bit_vect_path_fp), + ], + ) + + sparse_output = sparse_pipeline.fit_transform(test_smiles) + dense_output = dense_pipeline.fit_transform(test_smiles) + explicit_bit_vect_path_fp_output = explicit_bit_vect_pipeline.fit_transform( + test_smiles + ) + + self.assertTrue(np.all(sparse_output.toarray() == dense_output)) + + self.assertTrue( + np.equal( + dense_output, + np.array(explicit_bit_vect_path_fp_output), + ).all() + ) + + def test_counted_bits(self) -> None: + """Test if the option counted bits works as expected. + + Returns + ------- + None + """ + mol_fp = Mol2PathFP(n_bits=1024, return_as="dense") + smi2mol = SmilesToMol() + pipeline = Pipeline( + [ + ("smi2mol", smi2mol), + ("mol_fp", mol_fp), + ], + ) + output_binary = pipeline.fit_transform(test_smiles) + pipeline.set_params(mol_fp__counted=True) + output_counted = pipeline.fit_transform(test_smiles) + self.assertTrue( + np.all(np.flatnonzero(output_counted) == np.flatnonzero(output_binary)) + ) + self.assertTrue(np.all(output_counted >= output_binary)) + self.assertTrue(np.any(output_counted > output_binary)) + + def test_setter_getter(self) -> None: + """Test if the setters and getters work as expected.""" + mol_fp = Mol2PathFP() + params: dict[str, Any] = { + "min_path": 10, + "max_path": 12, + "use_hs": False, + "branched_paths": False, + "use_bond_order": False, + "count_simulation": True, + "num_bits_per_feature": 4, + "counted": True, + "n_bits": 1024, + } + mol_fp.set_params(**params) + self.assertEqual(mol_fp.get_params()["min_path"], 10) + self.assertEqual(mol_fp.get_params()["max_path"], 12) + self.assertEqual(mol_fp.get_params()["use_hs"], False) + self.assertEqual(mol_fp.get_params()["branched_paths"], False) + self.assertEqual(mol_fp.get_params()["use_bond_order"], False) + self.assertEqual(mol_fp.get_params()["count_simulation"], True) + self.assertEqual(mol_fp.get_params()["num_bits_per_feature"], 4) + self.assertEqual(mol_fp.get_params()["counted"], True) + self.assertEqual(mol_fp.get_params()["n_bits"], 1024) + + def test_setter_getter_error_handling(self) -> None: + """Test if the setters and getters work as expected when errors are encountered.""" + + mol_fp = Mol2PathFP() + params: dict[str, Any] = { + "min_path": 2, + "n_bits": 1024, + "return_as": "invalid-option", + } + self.assertRaises(ValueError, mol_fp.set_params, **params) + + +if __name__ == "__main__": + unittest.main()