From 2ceac8020d2aee72abe0f82daee72185343bc842 Mon Sep 17 00:00:00 2001 From: Clint Valentine Date: Thu, 18 Apr 2024 19:01:02 -0700 Subject: [PATCH] feat: implement BedReader.from_path() (#13) --- bedspec/_bedspec.py | 44 +++++++++++++++++++++---------------- pyproject.toml | 2 +- tests/test_bedspec.py | 50 +++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 74 insertions(+), 22 deletions(-) diff --git a/bedspec/_bedspec.py b/bedspec/_bedspec.py index d457cec..42dbe5c 100644 --- a/bedspec/_bedspec.py +++ b/bedspec/_bedspec.py @@ -4,12 +4,12 @@ import typing from abc import ABC from abc import abstractmethod -from csv import DictWriter from dataclasses import asdict as as_dict from dataclasses import dataclass from dataclasses import fields from enum import StrEnum from enum import unique +from pathlib import Path from types import FrameType from types import TracebackType from typing import Any @@ -133,6 +133,11 @@ class SimpleBed(BedType, ABC, Locatable): start: int end: int + def __post_init__(self) -> None: + """Validate this dataclass.""" + if self.start >= self.end or self.start < 0: + raise ValueError("start must be greater than 0 and less than end!") + def length(self) -> int: """The length of this record.""" return self.end - self.start @@ -152,6 +157,13 @@ class PairBed(BedType, ABC): start2: int end2: int + def __post_init__(self) -> None: + """Validate this dataclass.""" + if self.start1 >= self.end1 or self.start1 < 0: + raise ValueError("start1 must be greater than 0 and less than end1!") + if self.start2 >= self.end2 or self.start2 < 0: + raise ValueError("start2 must be greater than 0 and less than end2!") + @property def bed1(self) -> SimpleBed: """The first of the two intervals.""" @@ -301,8 +313,7 @@ def __class_getitem__(cls, key: Any) -> type: def __new__(cls, handle: io.TextIOWrapper) -> "BedWriter[BedKind]": """Bind the kind of BED type to this class for later introspection.""" signature = cast(FrameType, cast(FrameType, inspect.currentframe()).f_back) - argvalues = inspect.getargvalues(signature) - typelevel = argvalues.locals.get("self", None) + typelevel = signature.f_locals.get("self", None) bed_kind = None if typelevel is None else typelevel.__args__[0] instance = super().__new__(cls) instance.bed_kind = bed_kind @@ -315,7 +326,6 @@ def __enter__(self) -> "BedWriter[BedKind]": def __init__(self, handle: io.TextIOWrapper) -> None: """Initialize a BED writer wihout knowing yet what BED types we will write.""" self._handle = handle - self._writer: DictWriter | None = None def __exit__( self, @@ -350,14 +360,7 @@ def write(self, bed: BedKind) -> None: else: self.bed_kind = type(bed) - if self._writer is None: - self._writer = DictWriter( - self._handle, - delimiter="\t", - fieldnames=[field.name for field in fields(self.bed_kind)], - ) - - self._writer.writerow(as_dict(bed)) + self._handle.write(f"{"\t".join(map(str, as_dict(bed).values()))}\n") def write_all(self, beds: Iterable[BedKind]) -> None: """Write all the BED records to the BED output.""" @@ -394,8 +397,7 @@ def __class_getitem__(cls, key: Any) -> type: def __new__(cls, handle: io.TextIOWrapper) -> "BedReader[BedKind]": """Bind the kind of BED type to this class for later introspection.""" signature = cast(FrameType, cast(FrameType, inspect.currentframe()).f_back) - argvalues = inspect.getargvalues(signature) - typelevel = argvalues.locals.get("self", None) + typelevel = signature.f_locals.get("self", None) bed_kind = None if typelevel is None else typelevel.__args__[0] instance = super().__new__(cls) instance.bed_kind = bed_kind @@ -411,12 +413,14 @@ def __enter__(self) -> "BedReader[BedKind]": def __iter__(self) -> Iterator[BedKind]: """Iterate through the BED records of this IO handle.""" + if self.bed_kind is None: + raise NotImplementedError("Untyped reading is not yet supported!") for line in self._handle: if line.strip() == "": continue if any(line.startswith(prefix) for prefix in COMMENT_PREFIXES): continue - yield self._decode(line) + yield cast(BedKind, self.bed_kind.decode(line)) def __exit__( self, @@ -428,10 +432,12 @@ def __exit__( self.close() return super().__exit__(__exc_type, __exc_value, __traceback) - def _decode(self, line: str) -> BedKind: - if self.bed_kind is None: - raise NotImplementedError("Untyped reading is not yet supported!") - return cast(BedKind, self.bed_kind.decode(line)) + @classmethod + def from_path(cls, path: Path | str, bed_kind: type[BedKind]) -> "BedReader[BedKind]": + """Open a BED reader from a file path.""" + reader = cls(handle=Path(path).open()) + reader.bed_kind = bed_kind + return reader def close(self) -> None: """Close the underlying IO handle.""" diff --git a/pyproject.toml b/pyproject.toml index cb3be6c..e88883b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "bedspec" -version = "0.1.2" +version = "0.2.0" description = "An HTS-specs compliant BED toolkit." authors = ["Clint Valentine "] license = "MIT" diff --git a/tests/test_bedspec.py b/tests/test_bedspec.py index 2f6d3f8..3ae43b2 100644 --- a/tests/test_bedspec.py +++ b/tests/test_bedspec.py @@ -119,6 +119,34 @@ def test_simple_bed_types_have_a_territory() -> None: assert list(record.territory()) == [record] +def test_simple_bed_validates_start_and_end() -> None: + """Test that a simple BED record validates its start and end.""" + with pytest.raises(ValueError): + Bed3(contig="chr1", start=-1, end=5) + with pytest.raises(ValueError): + Bed3(contig="chr1", start=5, end=5) + with pytest.raises(ValueError): + Bed3(contig="chr1", start=5, end=0) + + +def test_paired_bed_validates_start_and_end() -> None: + """Test that a simple BED record validates its start and end.""" + # fmt: off + with pytest.raises(ValueError): + BedPE(contig1="chr1", start1=-1, end1=5, contig2="chr1", start2=1, end2=2, name="foo", score=5, strand1=BedStrand.POSITIVE, strand2=BedStrand.POSITIVE) # noqa: E501 + with pytest.raises(ValueError): + BedPE(contig1="chr1", start1=5, end1=5, contig2="chr1", start2=1, end2=2, name="foo", score=5, strand1=BedStrand.POSITIVE, strand2=BedStrand.POSITIVE) # noqa: E501 + with pytest.raises(ValueError): + BedPE(contig1="chr1", start1=5, end1=0, contig2="chr1", start2=1, end2=2, name="foo", score=5, strand1=BedStrand.POSITIVE, strand2=BedStrand.POSITIVE) # noqa: E501 + with pytest.raises(ValueError): + BedPE(contig1="chr1", start1=1, end1=2, contig2="chr1", start2=-1, end2=5, name="foo", score=5, strand1=BedStrand.POSITIVE, strand2=BedStrand.POSITIVE) # noqa: E501 + with pytest.raises(ValueError): + BedPE(contig1="chr1", start1=1, end1=2, contig2="chr1", start2=5, end2=5, name="foo", score=5, strand1=BedStrand.POSITIVE, strand2=BedStrand.POSITIVE) # noqa: E501 + with pytest.raises(ValueError): + BedPE(contig1="chr1", start1=1, end1=2, contig2="chr1", start2=5, end2=0, name="foo", score=5, strand1=BedStrand.POSITIVE, strand2=BedStrand.POSITIVE) # noqa: E501 + # fmt: on + + def test_paired_bed_types_have_a_territory() -> None: """Test that simple BEDs are their own territory.""" record = BedPE( @@ -402,6 +430,24 @@ def test_bed_reader_can_read_bed_records_if_typed(tmp_path: Path) -> None: assert list(BedReader[Bed3](handle)) == [bed] +def test_bed_reader_can_read_bed_records_from_a_path(tmp_path: Path) -> None: + """Test that the BED reader can read BED records from a path if it is typed.""" + + bed: Bed3 = Bed3(contig="chr1", start=1, end=2) + + with open(tmp_path / "test.bed", "w") as handle: + writer: BedWriter = BedWriter(handle) + writer.write(bed) + + assert Path(tmp_path / "test.bed").read_text() == "chr1\t1\t2\n" + + reader = BedReader[Bed3].from_path(tmp_path / "test.bed", bed_kind=Bed3) + assert list(reader) == [bed] + + reader = BedReader[Bed3].from_path(str(tmp_path / "test.bed"), bed_kind=Bed3) + assert list(reader) == [bed] + + def test_bed_reader_can_raises_exception_if_not_typed(tmp_path: Path) -> None: """Test that the BED reader raises an exception if it is not typed.""" @@ -431,8 +477,8 @@ def test_bed_reader_can_read_bed_records_with_comments(tmp_path: Path) -> None: writer.write_comment("track this-is-fine") writer.write_comment("browser is mario's enemy?") writer.write_comment("hello mom!") - handle.write("\n") # empty line - handle.write(" \t\n") # empty line + handle.write("\n") # empty line + handle.write(" \t\n") # empty line writer.write(bed) writer.write_comment("hello dad!")