diff --git a/conda_lock/conda_lock.py b/conda_lock/conda_lock.py index 66ef8d34a..4390ef092 100644 --- a/conda_lock/conda_lock.py +++ b/conda_lock/conda_lock.py @@ -62,22 +62,22 @@ PIP_SUPPORT = True except ImportError: PIP_SUPPORT = False -from conda_lock.lookup import set_lookup_location -from conda_lock.src_parser import ( +from conda_lock.lockfile import ( Dependency, GitMeta, InputMeta, LockedDependency, Lockfile, LockMeta, - LockSpecification, MetadataOption, TimeMeta, UpdateSpecification, - aggregate_lock_specs, + parse_conda_lock_file, + write_conda_lock_file, ) +from conda_lock.lookup import set_lookup_location +from conda_lock.src_parser import LockSpecification, aggregate_lock_specs from conda_lock.src_parser.environment_yaml import parse_environment_file -from conda_lock.src_parser.lockfile import parse_conda_lock_file, write_conda_lock_file from conda_lock.src_parser.meta_yaml import parse_meta_yaml_file from conda_lock.src_parser.pyproject_toml import parse_pyproject_toml from conda_lock.virtual_package import ( diff --git a/conda_lock/conda_solver.py b/conda_lock/conda_solver.py index c8903ef99..adfe12426 100644 --- a/conda_lock/conda_solver.py +++ b/conda_lock/conda_solver.py @@ -31,14 +31,9 @@ conda_pkgs_dir, is_micromamba, ) +from conda_lock.lockfile import HashModel, LockedDependency, _apply_categories from conda_lock.models.channel import Channel -from conda_lock.src_parser import ( - Dependency, - HashModel, - LockedDependency, - VersionedDependency, - _apply_categories, -) +from conda_lock.src_parser import Dependency, VersionedDependency logger = logging.getLogger(__name__) diff --git a/conda_lock/src_parser/lockfile.py b/conda_lock/lockfile/__init__.py similarity index 56% rename from conda_lock/src_parser/lockfile.py rename to conda_lock/lockfile/__init__.py index aa37be00e..e725ab56b 100644 --- a/conda_lock/src_parser/lockfile.py +++ b/conda_lock/lockfile/__init__.py @@ -1,14 +1,87 @@ import json import pathlib +from collections import defaultdict from textwrap import dedent -from typing import Collection, Optional +from typing import Collection, Dict, List, Optional, Sequence, Set import yaml -from conda_lock.src_parser import MetadataOption +from conda_lock.src_parser import Dependency -from . import Lockfile +from .models import DependencySource as DependencySource +from .models import GitMeta as GitMeta +from .models import HashModel as HashModel +from .models import InputMeta as InputMeta +from .models import LockedDependency, Lockfile +from .models import LockKey as LockKey +from .models import LockMeta as LockMeta +from .models import MetadataOption +from .models import TimeMeta as TimeMeta +from .models import UpdateSpecification as UpdateSpecification + + +def _apply_categories( + requested: Dict[str, Dependency], + planned: Dict[str, LockedDependency], + categories: Sequence[str] = ("main", "dev"), +) -> None: + """map each package onto the root request the with the highest-priority category""" + # walk dependency tree to assemble all transitive dependencies by request + dependents: Dict[str, Set[str]] = {} + by_category = defaultdict(list) + + def seperator_munge_get( + d: Dict[str, LockedDependency], key: str + ) -> LockedDependency: + # since separators are not consistent across managers (or even within) we need to do some double attempts here + try: + return d[key] + except KeyError: + try: + return d[key.replace("-", "_")] + except KeyError: + return d[key.replace("_", "-")] + + for name, request in requested.items(): + todo: List[str] = list() + deps: Set[str] = set() + item = name + while True: + todo.extend( + dep + for dep in seperator_munge_get(planned, item).dependencies + # exclude virtual packages + if not (dep in deps or dep.startswith("__")) + ) + if todo: + item = todo.pop(0) + deps.add(item) + else: + break + + dependents[name] = deps + + by_category[request.category].append(request.name) + + # now, map each package to its root request + categories = [*categories, *(k for k in by_category if k not in categories)] + root_requests = {} + for category in categories: + for root in by_category.get(category, []): + for transitive_dep in dependents[root]: + if transitive_dep not in root_requests: + root_requests[transitive_dep] = root + # include root requests themselves + for name in requested: + root_requests[name] = name + + for dep, root in root_requests.items(): + source = requested[root] + # try a conda target first + target = seperator_munge_get(planned, dep) + target.category = source.category + target.optional = source.optional def parse_conda_lock_file( diff --git a/conda_lock/lockfile/models.py b/conda_lock/lockfile/models.py new file mode 100644 index 000000000..4a2de963e --- /dev/null +++ b/conda_lock/lockfile/models.py @@ -0,0 +1,373 @@ +import datetime +import enum +import hashlib +import logging +import pathlib +import typing + +from collections import defaultdict, namedtuple +from typing import TYPE_CHECKING, AbstractSet, ClassVar, Dict, List, Optional, Union + + +if TYPE_CHECKING: + from hashlib import _Hash + +from pydantic import Field, validator +from typing_extensions import Literal + +from conda_lock.common import ordered_union, relative_path +from conda_lock.models import StrictModel +from conda_lock.models.channel import Channel + + +logger = logging.getLogger(__name__) + + +class DependencySource(StrictModel): + type: Literal["url"] + url: str + + +LockKey = namedtuple("LockKey", ["manager", "name", "platform"]) + + +class HashModel(StrictModel): + md5: Optional[str] = None + sha256: Optional[str] = None + + +class LockedDependency(StrictModel): + name: str + version: str + manager: Literal["conda", "pip"] + platform: str + dependencies: Dict[str, str] = {} + url: str + hash: HashModel + optional: bool = False + category: str = "main" + source: Optional[DependencySource] = None + build: Optional[str] = None + + def key(self) -> LockKey: + return LockKey(self.manager, self.name, self.platform) + + @validator("hash") + def validate_hash(cls, v: HashModel, values: Dict[str, typing.Any]) -> HashModel: + if (values["manager"] == "conda") and (v.md5 is None): + raise ValueError("conda package hashes must use MD5") + return v + + +class MetadataOption(enum.Enum): + TimeStamp = "timestamp" + GitSha = "git_sha" + GitUserName = "git_user_name" + GitUserEmail = "git_user_email" + InputMd5 = "input_md5" + InputSha = "input_sha" + + +class TimeMeta(StrictModel): + """Stores information about when the lockfile was generated.""" + + created_at: str = Field(..., description="Time stamp of lock-file creation time") + + @classmethod + def create(cls) -> "TimeMeta": + return cls( + created_at=datetime.datetime.utcnow().isoformat(timespec="seconds") + "Z" + ) + + +class GitMeta(StrictModel): + """ + Stores information about the git repo the lockfile is being generated in (if applicable) and + the git user generating the file. + """ + + git_user_name: Optional[str] = Field( + default=None, description="Git user.name field of global config" + ) + git_user_email: Optional[str] = Field( + default=None, description="Git user.email field of global config" + ) + git_sha: Optional[str] = Field( + default=None, + description=( + "sha256 hash of the most recent git commit that modified one of the input files for " + + "this lockfile" + ), + ) + + @classmethod + def create( + cls, + metadata_choices: AbstractSet[MetadataOption], + src_files: List[pathlib.Path], + ) -> "GitMeta | None": + try: + import git + except ImportError: + return None + + git_sha: "str | None" = None + git_user_name: "str | None" = None + git_user_email: "str | None" = None + + try: + repo = git.Repo(search_parent_directories=True) # type: ignore + if MetadataOption.GitSha in metadata_choices: + most_recent_datetime: Optional[datetime.datetime] = None + for src_file in src_files: + relative_src_file_path = relative_path( + pathlib.Path(repo.working_tree_dir), src_file # type: ignore + ) + commit = list( + repo.iter_commits(paths=relative_src_file_path, max_count=1) + )[0] + if repo.is_dirty(path=relative_src_file_path): + logger.warning( + "One of the inputs to conda-lock is dirty, using commit hash of head +" + ' "dirty"' + ) + git_sha = f"{repo.head.object.hexsha}-dirty" + break + else: + if ( + most_recent_datetime is None + or most_recent_datetime < commit.committed_datetime + ): + most_recent_datetime = commit.committed_datetime + git_sha = commit.hexsha + if MetadataOption.GitUserName in metadata_choices: + git_user_name = repo.config_reader().get_value("user", "name", None) # type: ignore + if MetadataOption.GitUserEmail in metadata_choices: + git_user_email = repo.config_reader().get_value("user", "email", None) # type: ignore + except git.exc.InvalidGitRepositoryError: # type: ignore + pass + + if any([git_sha, git_user_name, git_user_email]): + return cls( + git_sha=git_sha, + git_user_name=git_user_name, + git_user_email=git_user_email, + ) + else: + return None + + +class InputMeta(StrictModel): + """Stores information about an input provided to generate the lockfile.""" + + md5: Optional[str] = Field(..., description="md5 checksum for an input file") + sha256: Optional[str] = Field(..., description="md5 checksum for an input file") + + @classmethod + def create( + cls, metadata_choices: AbstractSet[MetadataOption], src_file: pathlib.Path + ) -> "InputMeta": + if MetadataOption.InputSha in metadata_choices: + sha256 = cls.get_input_sha256(src_file=src_file) + else: + sha256 = None + if MetadataOption.InputMd5 in metadata_choices: + md5 = cls.get_input_md5(src_file=src_file) + else: + md5 = None + return cls( + md5=md5, + sha256=sha256, + ) + + @classmethod + def get_input_md5(cls, src_file: pathlib.Path) -> str: + hasher = hashlib.md5() + return cls.hash_file(src_file=src_file, hasher=hasher) + + @classmethod + def get_input_sha256(cls, src_file: pathlib.Path) -> str: + hasher = hashlib.sha256() + return cls.hash_file(src_file=src_file, hasher=hasher) + + @staticmethod + def hash_file(src_file: pathlib.Path, hasher: "_Hash") -> str: + with src_file.open("r") as infile: + hasher.update(infile.read().encode("utf-8")) + return hasher.hexdigest() + + +class LockMeta(StrictModel): + content_hash: Dict[str, str] = Field( + ..., description="Hash of dependencies for each target platform" + ) + channels: List[Channel] = Field( + ..., description="Channels used to resolve dependencies" + ) + platforms: List[str] = Field(..., description="Target platforms") + sources: List[str] = Field( + ..., + description="paths to source files, relative to the parent directory of the lockfile", + ) + time_metadata: Optional[TimeMeta] = Field( + default=None, description="Metadata dealing with the time lockfile was created" + ) + git_metadata: Optional[GitMeta] = Field( + default=None, + description=( + "Metadata dealing with the git repo the lockfile was created in and the user that created it" + ), + ) + inputs_metadata: Optional[Dict[str, InputMeta]] = Field( + default=None, + description="Metadata dealing with the input files used to create the lockfile", + ) + custom_metadata: Optional[Dict[str, str]] = Field( + default=None, + description="Custom metadata provided by the user to be added to the lockfile", + ) + + def __or__(self, other: "LockMeta") -> "LockMeta": + """merge other into self""" + if other is None: + return self + elif not isinstance(other, LockMeta): + raise TypeError + + if self.inputs_metadata is None: + new_inputs_metadata = other.inputs_metadata + elif other.inputs_metadata is None: + new_inputs_metadata = self.inputs_metadata + else: + new_inputs_metadata = self.inputs_metadata + new_inputs_metadata.update(other.inputs_metadata) + + if self.custom_metadata is None: + new_custom_metadata = other.custom_metadata + elif other.custom_metadata is None: + new_custom_metadata = self.custom_metadata + else: + new_custom_metadata = self.custom_metadata + for key in other.custom_metadata: + if key in new_custom_metadata: + logger.warning( + f"Custom metadata key {key} provided twice, overwriting original value" + + f"({new_custom_metadata[key]}) with new value " + + f"({other.custom_metadata[key]})" + ) + new_custom_metadata.update(other.custom_metadata) + return LockMeta( + content_hash={**self.content_hash, **other.content_hash}, + channels=self.channels, + platforms=sorted(set(self.platforms).union(other.platforms)), + sources=ordered_union([self.sources, other.sources]), + time_metadata=other.time_metadata, + git_metadata=other.git_metadata, + inputs_metadata=new_inputs_metadata, + custom_metadata=new_custom_metadata, + ) + + @validator("channels", pre=True, always=True) + def ensure_channels(cls, v: List[Union[str, Channel]]) -> List[Channel]: + res = [] + for e in v: + if isinstance(e, str): + res.append(Channel.from_string(e)) + else: + res.append(e) + return typing.cast(List[Channel], res) + + +class Lockfile(StrictModel): + + version: ClassVar[int] = 1 + + package: List[LockedDependency] + metadata: LockMeta + + def __or__(self, other: "Lockfile") -> "Lockfile": + return other.__ror__(self) + + def __ror__(self, other: "Optional[Lockfile]") -> "Lockfile": + """ + merge self into other + """ + if other is None: + return self + elif not isinstance(other, Lockfile): + raise TypeError + + assert self.metadata.channels == other.metadata.channels + + ours = {d.key(): d for d in self.package} + theirs = {d.key(): d for d in other.package} + + # Pick ours preferentially + package: List[LockedDependency] = [] + for key in sorted(set(ours.keys()).union(theirs.keys())): + if key not in ours or key[-1] not in self.metadata.platforms: + package.append(theirs[key]) + else: + package.append(ours[key]) + + # Resort the conda packages topologically + final_package = self._toposort(package) + return Lockfile(package=final_package, metadata=other.metadata | self.metadata) + + def toposort_inplace(self) -> None: + self.package = self._toposort(self.package) + + @staticmethod + def _toposort( + package: List[LockedDependency], update: bool = False + ) -> List[LockedDependency]: + platforms = {d.platform for d in package} + + # Resort the conda packages topologically + final_package: List[LockedDependency] = [] + for platform in sorted(platforms): + from .._vendor.conda.common.toposort import toposort + + # Add the remaining non-conda packages in the order in which they appeared. + # Order the pip packages topologically ordered (might be not 100% perfect if they depend on + # other conda packages, but good enough + for manager in ["conda", "pip"]: + lookup = defaultdict(set) + packages: Dict[str, LockedDependency] = {} + + for d in package: + if d.platform != platform: + continue + + if d.manager != manager: + continue + + lookup[d.name] = set(d.dependencies) + packages[d.name] = d + + ordered = toposort(lookup) + for package_name in ordered: + # since we could have a pure dep in here, that does not have a package + # eg a pip package that depends on a conda package (the conda package will not be in this list) + dep = packages.get(package_name) + if dep is None: + continue + if dep.manager != manager: + continue + # skip virtual packages + if dep.manager == "conda" and dep.name.startswith("__"): + continue + + final_package.append(dep) + + return final_package + + +class UpdateSpecification: + def __init__( + self, + locked: Optional[List[LockedDependency]] = None, + update: Optional[List[str]] = None, + ): + self.locked = locked or [] + self.update = update or [] diff --git a/conda_lock/models/__init__.py b/conda_lock/models/__init__.py index e69de29bb..20251e462 100644 --- a/conda_lock/models/__init__.py +++ b/conda_lock/models/__init__.py @@ -0,0 +1,9 @@ +from pydantic import BaseModel + + +class StrictModel(BaseModel): + class Config: + extra = "forbid" + json_encoders = { + frozenset: list, + } diff --git a/conda_lock/pypi_solver.py b/conda_lock/pypi_solver.py index ca0cd27a3..78c405f00 100644 --- a/conda_lock/pypi_solver.py +++ b/conda_lock/pypi_solver.py @@ -1,16 +1,15 @@ import re import sys -import typing from pathlib import Path -from typing import Dict, List, Optional +from typing import TYPE_CHECKING, Dict, List, Optional from urllib.parse import urldefrag from clikit.api.io.flags import VERY_VERBOSE from clikit.io import ConsoleIO, NullIO from packaging.tags import compatible_tags, cpython_tags -from conda_lock import src_parser +from conda_lock import lockfile, src_parser from conda_lock._vendor.poetry.core.packages import Dependency as PoetryDependency from conda_lock._vendor.poetry.core.packages import Package as PoetryPackage from conda_lock._vendor.poetry.core.packages import ( @@ -28,9 +27,10 @@ from conda_lock.lookup import conda_name_to_pypi_name -if typing.TYPE_CHECKING: +if TYPE_CHECKING: from packaging.tags import Tag + # NB: in principle these depend on the glibc in the conda env MANYLINUX_TAGS = ["1", "2010", "2014", "_2_17"] @@ -159,7 +159,7 @@ def get_dependency(dep: src_parser.Dependency) -> PoetryDependency: raise ValueError(f"Unknown requirement {dep}") -def get_package(locked: src_parser.LockedDependency) -> PoetryPackage: +def get_package(locked: lockfile.LockedDependency) -> PoetryPackage: if locked.source is not None: return PoetryPackage( locked.name, @@ -174,12 +174,12 @@ def get_package(locked: src_parser.LockedDependency) -> PoetryPackage: def solve_pypi( pip_specs: Dict[str, src_parser.Dependency], use_latest: List[str], - pip_locked: Dict[str, src_parser.LockedDependency], - conda_locked: Dict[str, src_parser.LockedDependency], + pip_locked: Dict[str, lockfile.LockedDependency], + conda_locked: Dict[str, lockfile.LockedDependency], python_version: str, platform: str, verbose: bool = False, -) -> Dict[str, src_parser.LockedDependency]: +) -> Dict[str, lockfile.LockedDependency]: """ Solve pip dependencies for the given platform @@ -226,7 +226,7 @@ def solve_pypi( locked = Repository() python_packages = dict() - locked_dep: src_parser.LockedDependency + locked_dep: lockfile.LockedDependency for locked_dep in conda_locked.values(): if locked_dep.name.startswith("__"): continue @@ -273,16 +273,16 @@ def solve_pypi( # Extract distributions from Poetry package plan, ignoring uninstalls # (usually: conda package with no pypi equivalent) and skipped ops # (already installed) - requirements: List[src_parser.LockedDependency] = [] + requirements: List[lockfile.LockedDependency] = [] for op in result: if not isinstance(op, Uninstall) and not op.skipped: # Take direct references verbatim - source: Optional[src_parser.DependencySource] = None + source: Optional[lockfile.DependencySource] = None if op.package.source_type == "url": url, fragment = urldefrag(op.package.source_url) hash_type, hash = fragment.split("=") - hash = src_parser.HashModel(**{hash_type: hash}) - source = src_parser.DependencySource( + hash = lockfile.HashModel(**{hash_type: hash}) + source = lockfile.DependencySource( type="url", url=op.package.source_url ) # Choose the most specific distribution for the target @@ -292,10 +292,10 @@ def solve_pypi( hashes: Dict[str, str] = {} if link.hash_name is not None and link.hash is not None: hashes[link.hash_name] = link.hash - hash = src_parser.HashModel.parse_obj(hashes) + hash = lockfile.HashModel.parse_obj(hashes) requirements.append( - src_parser.LockedDependency( + lockfile.LockedDependency( name=op.package.name, version=str(op.package.version), manager="pip", @@ -324,6 +324,6 @@ def solve_pypi( continue planned[pypi_name] = locked_dep - src_parser._apply_categories(requested=pip_specs, planned=planned) + lockfile._apply_categories(requested=pip_specs, planned=planned) return {dep.name: dep for dep in requirements} diff --git a/conda_lock/src_parser/__init__.py b/conda_lock/src_parser/__init__.py index 8c9965e65..102262f76 100644 --- a/conda_lock/src_parser/__init__.py +++ b/conda_lock/src_parser/__init__.py @@ -1,35 +1,18 @@ -import datetime -import enum import hashlib import json import logging import pathlib import typing -from collections import defaultdict, namedtuple from itertools import chain -from typing import ( - TYPE_CHECKING, - AbstractSet, - ClassVar, - Dict, - List, - Optional, - Sequence, - Set, - Tuple, - Union, -) +from typing import Dict, List, Optional, Tuple, Union - -if TYPE_CHECKING: - from hashlib import _Hash - -from pydantic import BaseModel, Field, validator +from pydantic import BaseModel, validator from typing_extensions import Literal -from conda_lock.common import ordered_union, relative_path, suffix_union +from conda_lock.common import ordered_union, suffix_union from conda_lock.errors import ChannelAggregationError +from conda_lock.models import StrictModel from conda_lock.models.channel import Channel from conda_lock.virtual_package import FakeRepoData @@ -37,14 +20,6 @@ logger = logging.getLogger(__name__) -class StrictModel(BaseModel): - class Config: - extra = "forbid" - json_encoders = { - frozenset: list, - } - - class Selectors(StrictModel): platform: Optional[List[str]] = None @@ -89,346 +64,6 @@ class Package(StrictModel): hash: str -class DependencySource(StrictModel): - type: Literal["url"] - url: str - - -LockKey = namedtuple("LockKey", ["manager", "name", "platform"]) - - -class HashModel(StrictModel): - md5: Optional[str] = None - sha256: Optional[str] = None - - -class LockedDependency(StrictModel): - name: str - version: str - manager: Literal["conda", "pip"] - platform: str - dependencies: Dict[str, str] = {} - url: str - hash: HashModel - optional: bool = False - category: str = "main" - source: Optional[DependencySource] = None - build: Optional[str] = None - - def key(self) -> LockKey: - return LockKey(self.manager, self.name, self.platform) - - @validator("hash") - def validate_hash(cls, v: HashModel, values: Dict[str, typing.Any]) -> HashModel: - if (values["manager"] == "conda") and (v.md5 is None): - raise ValueError("conda package hashes must use MD5") - return v - - -class MetadataOption(enum.Enum): - TimeStamp = "timestamp" - GitSha = "git_sha" - GitUserName = "git_user_name" - GitUserEmail = "git_user_email" - InputMd5 = "input_md5" - InputSha = "input_sha" - - -class TimeMeta(StrictModel): - """Stores information about when the lockfile was generated.""" - - created_at: str = Field(..., description="Time stamp of lock-file creation time") - - @classmethod - def create(cls) -> "TimeMeta": - return cls( - created_at=datetime.datetime.utcnow().isoformat(timespec="seconds") + "Z" - ) - - -class GitMeta(StrictModel): - """ - Stores information about the git repo the lockfile is being generated in (if applicable) and - the git user generating the file. - """ - - git_user_name: Optional[str] = Field( - default=None, description="Git user.name field of global config" - ) - git_user_email: Optional[str] = Field( - default=None, description="Git user.email field of global config" - ) - git_sha: Optional[str] = Field( - default=None, - description=( - "sha256 hash of the most recent git commit that modified one of the input files for " - + "this lockfile" - ), - ) - - @classmethod - def create( - cls, - metadata_choices: AbstractSet[MetadataOption], - src_files: List[pathlib.Path], - ) -> "GitMeta | None": - try: - import git - except ImportError: - return None - - git_sha: "str | None" = None - git_user_name: "str | None" = None - git_user_email: "str | None" = None - - try: - repo = git.Repo(search_parent_directories=True) # type: ignore - if MetadataOption.GitSha in metadata_choices: - most_recent_datetime: Optional[datetime.datetime] = None - for src_file in src_files: - relative_src_file_path = relative_path( - pathlib.Path(repo.working_tree_dir), src_file # type: ignore - ) - commit = list( - repo.iter_commits(paths=relative_src_file_path, max_count=1) - )[0] - if repo.is_dirty(path=relative_src_file_path): - logger.warning( - "One of the inputs to conda-lock is dirty, using commit hash of head +" - ' "dirty"' - ) - git_sha = f"{repo.head.object.hexsha}-dirty" - break - else: - if ( - most_recent_datetime is None - or most_recent_datetime < commit.committed_datetime - ): - most_recent_datetime = commit.committed_datetime - git_sha = commit.hexsha - if MetadataOption.GitUserName in metadata_choices: - git_user_name = repo.config_reader().get_value("user", "name", None) # type: ignore - if MetadataOption.GitUserEmail in metadata_choices: - git_user_email = repo.config_reader().get_value("user", "email", None) # type: ignore - except git.exc.InvalidGitRepositoryError: # type: ignore - pass - - if any([git_sha, git_user_name, git_user_email]): - return cls( - git_sha=git_sha, - git_user_name=git_user_name, - git_user_email=git_user_email, - ) - else: - return None - - -class InputMeta(StrictModel): - """Stores information about an input provided to generate the lockfile.""" - - md5: Optional[str] = Field(..., description="md5 checksum for an input file") - sha256: Optional[str] = Field(..., description="md5 checksum for an input file") - - @classmethod - def create( - cls, metadata_choices: AbstractSet[MetadataOption], src_file: pathlib.Path - ) -> "InputMeta": - if MetadataOption.InputSha in metadata_choices: - sha256 = cls.get_input_sha256(src_file=src_file) - else: - sha256 = None - if MetadataOption.InputMd5 in metadata_choices: - md5 = cls.get_input_md5(src_file=src_file) - else: - md5 = None - return cls( - md5=md5, - sha256=sha256, - ) - - @classmethod - def get_input_md5(cls, src_file: pathlib.Path) -> str: - hasher = hashlib.md5() - return cls.hash_file(src_file=src_file, hasher=hasher) - - @classmethod - def get_input_sha256(cls, src_file: pathlib.Path) -> str: - hasher = hashlib.sha256() - return cls.hash_file(src_file=src_file, hasher=hasher) - - @staticmethod - def hash_file(src_file: pathlib.Path, hasher: "_Hash") -> str: - with src_file.open("r") as infile: - hasher.update(infile.read().encode("utf-8")) - return hasher.hexdigest() - - -class LockMeta(StrictModel): - content_hash: Dict[str, str] = Field( - ..., description="Hash of dependencies for each target platform" - ) - channels: List[Channel] = Field( - ..., description="Channels used to resolve dependencies" - ) - platforms: List[str] = Field(..., description="Target platforms") - sources: List[str] = Field( - ..., - description="paths to source files, relative to the parent directory of the lockfile", - ) - time_metadata: Optional[TimeMeta] = Field( - default=None, description="Metadata dealing with the time lockfile was created" - ) - git_metadata: Optional[GitMeta] = Field( - default=None, - description=( - "Metadata dealing with the git repo the lockfile was created in and the user that created it" - ), - ) - inputs_metadata: Optional[Dict[str, InputMeta]] = Field( - default=None, - description="Metadata dealing with the input files used to create the lockfile", - ) - custom_metadata: Optional[Dict[str, str]] = Field( - default=None, - description="Custom metadata provided by the user to be added to the lockfile", - ) - - def __or__(self, other: "LockMeta") -> "LockMeta": - """merge other into self""" - if other is None: - return self - elif not isinstance(other, LockMeta): - raise TypeError - - if self.inputs_metadata is None: - new_inputs_metadata = other.inputs_metadata - elif other.inputs_metadata is None: - new_inputs_metadata = self.inputs_metadata - else: - new_inputs_metadata = self.inputs_metadata - new_inputs_metadata.update(other.inputs_metadata) - - if self.custom_metadata is None: - new_custom_metadata = other.custom_metadata - elif other.custom_metadata is None: - new_custom_metadata = self.custom_metadata - else: - new_custom_metadata = self.custom_metadata - for key in other.custom_metadata: - if key in new_custom_metadata: - logger.warning( - f"Custom metadata key {key} provided twice, overwriting original value" - + f"({new_custom_metadata[key]}) with new value " - + f"({other.custom_metadata[key]})" - ) - new_custom_metadata.update(other.custom_metadata) - return LockMeta( - content_hash={**self.content_hash, **other.content_hash}, - channels=self.channels, - platforms=sorted(set(self.platforms).union(other.platforms)), - sources=ordered_union([self.sources, other.sources]), - time_metadata=other.time_metadata, - git_metadata=other.git_metadata, - inputs_metadata=new_inputs_metadata, - custom_metadata=new_custom_metadata, - ) - - @validator("channels", pre=True, always=True) - def ensure_channels(cls, v: List[Union[str, Channel]]) -> List[Channel]: - res = [] - for e in v: - if isinstance(e, str): - res.append(Channel.from_string(e)) - else: - res.append(e) - return typing.cast(List[Channel], res) - - -class Lockfile(StrictModel): - - version: ClassVar[int] = 1 - - package: List[LockedDependency] - metadata: LockMeta - - def __or__(self, other: "Lockfile") -> "Lockfile": - return other.__ror__(self) - - def __ror__(self, other: "Optional[Lockfile]") -> "Lockfile": - """ - merge self into other - """ - if other is None: - return self - elif not isinstance(other, Lockfile): - raise TypeError - - assert self.metadata.channels == other.metadata.channels - - ours = {d.key(): d for d in self.package} - theirs = {d.key(): d for d in other.package} - - # Pick ours preferentially - package: List[LockedDependency] = [] - for key in sorted(set(ours.keys()).union(theirs.keys())): - if key not in ours or key[-1] not in self.metadata.platforms: - package.append(theirs[key]) - else: - package.append(ours[key]) - - # Resort the conda packages topologically - final_package = self._toposort(package) - return Lockfile(package=final_package, metadata=other.metadata | self.metadata) - - def toposort_inplace(self) -> None: - self.package = self._toposort(self.package) - - @staticmethod - def _toposort( - package: List[LockedDependency], update: bool = False - ) -> List[LockedDependency]: - platforms = {d.platform for d in package} - - # Resort the conda packages topologically - final_package: List[LockedDependency] = [] - for platform in sorted(platforms): - from .._vendor.conda.common.toposort import toposort - - # Add the remaining non-conda packages in the order in which they appeared. - # Order the pip packages topologically ordered (might be not 100% perfect if they depend on - # other conda packages, but good enough - for manager in ["conda", "pip"]: - lookup = defaultdict(set) - packages: Dict[str, LockedDependency] = {} - - for d in package: - if d.platform != platform: - continue - - if d.manager != manager: - continue - - lookup[d.name] = set(d.dependencies) - packages[d.name] = d - - ordered = toposort(lookup) - for package_name in ordered: - # since we could have a pure dep in here, that does not have a package - # eg a pip package that depends on a conda package (the conda package will not be in this list) - dep = packages.get(package_name) - if dep is None: - continue - if dep.manager != manager: - continue - # skip virtual packages - if dep.manager == "conda" and dep.name.startswith("__"): - continue - - final_package.append(dep) - - return final_package - - class LockSpecification(BaseModel): dependencies: List[Dependency] # TODO: Should we store the auth info in here? @@ -470,69 +105,6 @@ def validate_channels(cls, v: List[Union[Channel, str]]) -> List[Channel]: return typing.cast(List[Channel], v) -def _apply_categories( - requested: Dict[str, Dependency], - planned: Dict[str, LockedDependency], - categories: Sequence[str] = ("main", "dev"), -) -> None: - """map each package onto the root request the with the highest-priority category""" - # walk dependency tree to assemble all transitive dependencies by request - dependents: Dict[str, Set[str]] = {} - by_category = defaultdict(list) - - def seperator_munge_get( - d: Dict[str, LockedDependency], key: str - ) -> LockedDependency: - # since separators are not consistent across managers (or even within) we need to do some double attempts here - try: - return d[key] - except KeyError: - try: - return d[key.replace("-", "_")] - except KeyError: - return d[key.replace("_", "-")] - - for name, request in requested.items(): - todo: List[str] = list() - deps: Set[str] = set() - item = name - while True: - todo.extend( - dep - for dep in seperator_munge_get(planned, item).dependencies - # exclude virtual packages - if not (dep in deps or dep.startswith("__")) - ) - if todo: - item = todo.pop(0) - deps.add(item) - else: - break - - dependents[name] = deps - - by_category[request.category].append(request.name) - - # now, map each package to its root request - categories = [*categories, *(k for k in by_category if k not in categories)] - root_requests = {} - for category in categories: - for root in by_category.get(category, []): - for transitive_dep in dependents[root]: - if transitive_dep not in root_requests: - root_requests[transitive_dep] = root - # include root requests themselves - for name in requested: - root_requests[name] = name - - for dep, root in root_requests.items(): - source = requested[root] - # try a conda target first - target = seperator_munge_get(planned, dep) - target.category = source.category - target.optional = source.optional - - def aggregate_lock_specs( lock_specs: List[LockSpecification], ) -> LockSpecification: @@ -564,13 +136,3 @@ def aggregate_lock_specs( platforms=ordered_union(lock_spec.platforms or [] for lock_spec in lock_specs), sources=ordered_union(lock_spec.sources or [] for lock_spec in lock_specs), ) - - -class UpdateSpecification: - def __init__( - self, - locked: Optional[List[LockedDependency]] = None, - update: Optional[List[str]] = None, - ): - self.locked = locked or [] - self.update = update or [] diff --git a/tests/test_conda_lock.py b/tests/test_conda_lock.py index dd85464c3..ed8530945 100644 --- a/tests/test_conda_lock.py +++ b/tests/test_conda_lock.py @@ -58,18 +58,16 @@ is_micromamba, reset_conda_pkgs_dir, ) -from conda_lock.models.channel import Channel -from conda_lock.pypi_solver import parse_pip_requirement, solve_pypi -from conda_lock.src_parser import ( +from conda_lock.lockfile import ( HashModel, LockedDependency, - LockSpecification, MetadataOption, - Selectors, - VersionedDependency, + parse_conda_lock_file, ) +from conda_lock.models.channel import Channel +from conda_lock.pypi_solver import parse_pip_requirement, solve_pypi +from conda_lock.src_parser import LockSpecification, Selectors, VersionedDependency from conda_lock.src_parser.environment_yaml import parse_environment_file -from conda_lock.src_parser.lockfile import parse_conda_lock_file from conda_lock.src_parser.pyproject_toml import ( parse_pyproject_toml, poetry_version_to_conda_version,