Skip to content

Commit

Permalink
chore: remove complicated OverlapDetector function
Browse files Browse the repository at this point in the history
  • Loading branch information
clintval committed Nov 7, 2024
1 parent 9ac8724 commit 3d4d2d2
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 142 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ To generate a code coverage report after testing locally, run:
poetry run coverage html
```

To check the lock file is up to date:
To check the lock file is up-to-date:

```console
poetry check --lock
Expand Down
9 changes: 5 additions & 4 deletions bedspec/_bedspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Any
from typing import ClassVar
from typing import Protocol
from typing import final
from typing import runtime_checkable

from typing_extensions import override
Expand Down Expand Up @@ -77,14 +78,12 @@ def territory(self) -> Iterator[GenomicSpan]:

def header(bed: BedLike | type[BedLike]) -> list[str]:
"""Return the list of field names for this BED record."""
_header = [field.name for field in fields(bed)]
return _header
return [field.name for field in fields(bed)]


def types(bed: BedLike | type[BedLike]) -> list[type | str | Any]:
"""Return the list of field types for this BED record."""
_types = [field.type for field in fields(bed)]
return _types
return [field.type for field in fields(bed)]


class PointBed(BedLike, ABC):
Expand All @@ -93,6 +92,7 @@ class PointBed(BedLike, ABC):
refname: str
start: int

@final
@property
def length(self) -> int:
"""The length of this record."""
Expand All @@ -116,6 +116,7 @@ def __post_init__(self) -> None:
if self.start >= self.end or self.start < 0:
raise ValueError("start must be greater than 0 and less than end!")

@final
@property
def length(self) -> int:
"""The length of this record."""
Expand Down
15 changes: 7 additions & 8 deletions bedspec/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
from bedspec._bedspec import BedColor
from bedspec._bedspec import BedLike
from bedspec._bedspec import BedStrand

from ._bedspec import header
from ._bedspec import types
from bedspec._bedspec import header
from bedspec._bedspec import types

BedType = TypeVar("BedType", bound=BedLike)
"""A type variable for any kind of BED record type."""
Expand Down Expand Up @@ -103,10 +102,10 @@ def bed_type(self) -> type[BedType] | None:
return self._bed_type

@bed_type.setter
def bed_type(self, bed_type: type[BedType]) -> None:
self._bed_type: type[BedType] = bed_type # type: ignore[no-redef]
self._header: list[str] = header(cast(BedLike, bed_type))
self._types: list[type | str | Any] = types(cast(BedLike, bed_type))
def bed_type(self, value: type[BedType]) -> None:
self._bed_type: type[BedType] = value # type: ignore[no-redef]
self._header: list[str] = header(cast(BedLike, value))
self._types: list[type | str | Any] = types(cast(BedLike, value))

@override
def __enter__(self) -> "BedWriter[BedType]":
Expand Down Expand Up @@ -165,7 +164,7 @@ def write_comment(self, comment: str) -> None:
"""Write a comment to the BED output."""
for line in comment.splitlines():
prefix = "" if any(line.startswith(prefix) for prefix in COMMENT_PREFIXES) else "# "
self._handle.write(f"{prefix}{comment}\n")
self._handle.write(f"{prefix}{line}\n")

@classmethod
def from_path(cls, path: Path | str) -> "BedWriter":
Expand Down
2 changes: 1 addition & 1 deletion bedspec/overlap/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# ruff: noqa: F401
from ._overlap import GenomicSpanLike
from ._overlap import OverlapDetector
from ._overlap import ReferenceSpanType
109 changes: 31 additions & 78 deletions bedspec/overlap/_overlap.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@


@runtime_checkable
class _Span(Hashable, Protocol):
class Span(Hashable, Protocol):
"""A span with a start and an end. 0-based open-ended."""

@property
Expand All @@ -28,136 +28,89 @@ def end(self) -> int:


@runtime_checkable
class _GenomicSpanWithChrom(_Span, Protocol):
"""A genomic feature where reference sequence is accessed with `chrom`."""

@property
def chrom(self) -> str:
"""A reference sequence name."""
return NotImplemented


@runtime_checkable
class _GenomicSpanWithContig(_Span, Protocol):
"""A genomic feature where reference sequence is accessed with `contig`."""

@property
def contig(self) -> str:
"""A reference sequence name."""
return NotImplemented


@runtime_checkable
class _GenomicSpanWithRefName(_Span, Protocol):
"""A genomic feature where reference sequence is accessed with `refname`."""
class ReferenceSpan(Span, Protocol):
"""A feature on a reference sequence."""

@property
def refname(self) -> str:
"""A reference sequence name."""
return NotImplemented


GenomicSpanLike = TypeVar(
"GenomicSpanLike",
bound=_GenomicSpanWithChrom | _GenomicSpanWithContig | _GenomicSpanWithRefName,
)
"""
A 0-based end-exclusive genomic feature where the reference sequence name is accessed with any of
the 3 most common property names ("chrom", "contig", "refname").
"""

GenericGenomicSpanLike = TypeVar(
"GenericGenomicSpanLike",
bound=_GenomicSpanWithChrom | _GenomicSpanWithContig | _GenomicSpanWithRefName,
)
"""
A generic 0-based end-exclusive genomic feature where the reference sequence name is accessed with
any of the most common property names ("chrom", "contig", "refname"). This type variable is used for
describing the generic type contained within the :class:`~bedspec.overlap.OverlapDetector`.
"""
ReferenceSpanType = TypeVar("ReferenceSpanType", bound=ReferenceSpan)
"""A 0-based end-exclusive feature on a reference sequence."""

GenericReferenceSpanType = TypeVar("GenericReferenceSpanType", bound=ReferenceSpan)
"""A generic 0-based end-exclusive feature on a reference sequence."""

Refname: TypeAlias = str
"""A type alias for a reference sequence name string."""


class OverlapDetector(Generic[GenericGenomicSpanLike], Iterable[GenericGenomicSpanLike]):
class OverlapDetector(Iterable[GenericReferenceSpanType], Generic[GenericReferenceSpanType]):
"""
Detects and returns overlaps between a collection of genomic features and an interval.
Detects and returns overlaps between a collection of reference features and query feature.
The overlap detector may be built with any genomic feature-like Python object that has the
following properties:
The overlap detector may be built with any feature-like Python object that has the following
properties:
* `chrom` or `contig` or `refname`: The reference sequence name
* `refname`: The reference sequence name
* `start`: A 0-based start position
* `end`: A 0-based end-exclusive end position
* `end`: A 0-based half-open end position
This detector is most efficiently used when all features to be queried are added ahead of time.
"""

def __init__(self, features: Iterable[GenericGenomicSpanLike] | None = None) -> None:
self._refname_to_features: dict[Refname, list[GenericGenomicSpanLike]] = defaultdict(list)
def __init__(self, features: Iterable[GenericReferenceSpanType] | None = None) -> None:
self._refname_to_features: dict[Refname, list[GenericReferenceSpanType]] = defaultdict(list)
self._refname_to_tree: dict[Refname, cr.cgranges] = defaultdict(cr.cgranges) # type: ignore[attr-defined,name-defined]
self._refname_to_is_indexed: dict[Refname, bool] = defaultdict(lambda: False)
if features is not None:
self.add_all(features)

def __iter__(self) -> Iterator[GenericGenomicSpanLike]:
def __iter__(self) -> Iterator[GenericReferenceSpanType]:
"""Iterate over the features in the overlap detector."""
return chain(*self._refname_to_features.values())

@staticmethod
def _reference_sequence_name(feature: GenomicSpanLike) -> Refname:
"""Return the reference name of a given genomic feature."""
if hasattr(feature, "refname"):
return feature.refname
if hasattr(feature, "contig"):
return feature.contig
if hasattr(feature, "chrom"):
return feature.chrom
raise ValueError(
f"Genomic feature is missing a reference sequence name property: {feature}"
)

def add(self, feature: GenericGenomicSpanLike) -> None:
"""Add a genomic feature to this overlap detector."""
def add(self, feature: GenericReferenceSpanType) -> None:
"""Add a feature to this overlap detector."""
if not isinstance(feature, Hashable):
raise ValueError(f"Genomic feature is not hashable but should be: {feature}")

refname: Refname = self._reference_sequence_name(feature)
refname: Refname = feature.refname
feature_idx: int = len(self._refname_to_features[refname])

self._refname_to_features[refname].append(feature)
self._refname_to_tree[refname].add(refname, feature.start, feature.end, feature_idx)
self._refname_to_is_indexed[refname] = False # mark that this tree needs re-indexing

def add_all(self, features: Iterable[GenericGenomicSpanLike]) -> None:
"""Adds one or more genomic features to this overlap detector."""
def add_all(self, features: Iterable[GenericReferenceSpanType]) -> None:
"""Adds one or more features to this overlap detector."""
for feature in features:
self.add(feature)

def overlapping(self, feature: GenomicSpanLike) -> Iterator[GenericGenomicSpanLike]:
"""Yields all the overlapping features for a given genomic span."""
refname: Refname = self._reference_sequence_name(feature)
def overlapping(self, feature: ReferenceSpanType) -> Iterator[GenericReferenceSpanType]:
"""Yields all the overlapping features for a given query feature."""
refname: Refname = feature.refname

if refname in self._refname_to_tree and not self._refname_to_is_indexed[refname]:
self._refname_to_tree[refname].index() # index the tree if we find it is not indexed

for *_, idx in self._refname_to_tree[refname].overlap(refname, feature.start, feature.end):
yield self._refname_to_features[refname][idx]

def overlaps(self, feature: GenomicSpanLike) -> bool:
"""Determine if a given genomic span overlaps any features."""
def overlaps(self, feature: ReferenceSpanType) -> bool:
"""Determine if a query feature overlaps any other features."""
return next(self.overlapping(feature), None) is not None

def enclosing(self, feature: GenomicSpanLike) -> Iterator[GenericGenomicSpanLike]:
"""Yields all the overlapping features that completely enclose the given genomic span."""
def enclosing(self, feature: ReferenceSpanType) -> Iterator[GenericReferenceSpanType]:
"""Yields all the overlapping features that completely enclose the given query feature."""
for overlap in self.overlapping(feature):
if feature.start >= overlap.start and feature.end <= overlap.end:
yield overlap

def enclosed_by(self, feature: GenomicSpanLike) -> Iterator[GenericGenomicSpanLike]:
"""Yields all the overlapping features that are enclosed by the given genomic span."""
def enclosed_by(self, feature: ReferenceSpanType) -> Iterator[GenericReferenceSpanType]:
"""Yields all the overlapping features that are enclosed by the given query feature."""
for overlap in self.overlapping(feature):
if feature.start <= overlap.start and feature.end >= overlap.end:
yield overlap
52 changes: 2 additions & 50 deletions tests/test_overlap.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from dataclasses import dataclass
from typing import TypeAlias

import pytest

Expand Down Expand Up @@ -50,21 +49,6 @@ class MissingHashFeature:
detector.add(feature)


def test_structural_type_reference_name_raises_if_not_found() -> None:
"""Test that an invalid reference name property will raise an exception."""

@dataclass(eq=True, frozen=True)
class BadInterval:
chromosome_name: str
start: int
end: int

feature: BadInterval = BadInterval("chr1", 1, 2)

with pytest.raises(ValueError):
OverlapDetector._reference_sequence_name(feature) # type: ignore[type-var]


def test_we_can_add_all_features_to_the_overlap_detector() -> None:
"""Test we can add all features to the overlap detector."""
bed1 = Bed3(refname="chr1", start=1, end=2)
Expand All @@ -83,7 +67,7 @@ def test_we_can_query_with_different_type_in_the_overlap_detector() -> None:


def test_we_can_those_enclosing_intervals() -> None:
"""Test that we can get intervals enclosing a given genomic feature."""
"""Test that we can get intervals enclosing a given query feature."""
bed1 = Bed3(refname="chr1", start=1, end=5)
bed2 = Bed3(refname="chr1", start=3, end=9)
detector: OverlapDetector[Bed3] = OverlapDetector([bed1, bed2])
Expand All @@ -96,7 +80,7 @@ def test_we_can_those_enclosing_intervals() -> None:


def test_we_can_those_enclosed_by_intervals() -> None:
"""Test that we can get intervals enclosed by a given genomic feature."""
"""Test that we can get intervals enclosed by a given query feature."""
bed1 = Bed3(refname="chr1", start=1, end=5)
bed2 = Bed3(refname="chr1", start=3, end=9)
detector: OverlapDetector[Bed3] = OverlapDetector([bed1, bed2])
Expand Down Expand Up @@ -140,35 +124,3 @@ def test_we_can_query_if_at_least_one_feature_overlaps() -> None:
assert detector.overlaps(Bed3("chr1", start=5, end=6))
assert not detector.overlaps(Bed3("chr2", start=0, end=1))
assert detector.overlaps(Bed3("chr2", start=4, end=5))


def test_we_support_features_with_all_three_common_reference_sequence_name_properties() -> None:
"""Test that we can store features with either of 3 reference sequence name properties."""

@dataclass(eq=True, frozen=True)
class FeatureWithChrom:
chrom: str
start: int
end: int

@dataclass(eq=True, frozen=True)
class FeatureWithContig:
contig: str
start: int
end: int

@dataclass(eq=True, frozen=True)
class FeatureWithRefname:
refname: str
start: int
end: int

feature_with_chrom: FeatureWithChrom = FeatureWithChrom("chr1", 1, 3)
feature_with_contig: FeatureWithContig = FeatureWithContig("chr1", 1, 3)
feature_with_refname: FeatureWithRefname = FeatureWithRefname("chr1", 1, 3)

AllKinds: TypeAlias = FeatureWithChrom | FeatureWithContig | FeatureWithRefname
features: list[AllKinds] = [feature_with_chrom, feature_with_contig, feature_with_refname]
detector: OverlapDetector[AllKinds] = OverlapDetector(features)

assert list(detector) == features

0 comments on commit 3d4d2d2

Please sign in to comment.