Skip to content

Commit

Permalink
feat: implement BedReader.from_path() (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
clintval authored Apr 19, 2024
1 parent 1365f60 commit 2ceac80
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 22 deletions.
44 changes: 25 additions & 19 deletions bedspec/_bedspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
import typing
from abc import ABC
from abc import abstractmethod
from csv import DictWriter
from dataclasses import asdict as as_dict
from dataclasses import dataclass
from dataclasses import fields
from enum import StrEnum
from enum import unique
from pathlib import Path
from types import FrameType
from types import TracebackType
from typing import Any
Expand Down Expand Up @@ -133,6 +133,11 @@ class SimpleBed(BedType, ABC, Locatable):
start: int
end: int

def __post_init__(self) -> None:
"""Validate this dataclass."""
if self.start >= self.end or self.start < 0:
raise ValueError("start must be greater than 0 and less than end!")

def length(self) -> int:
"""The length of this record."""
return self.end - self.start
Expand All @@ -152,6 +157,13 @@ class PairBed(BedType, ABC):
start2: int
end2: int

def __post_init__(self) -> None:
"""Validate this dataclass."""
if self.start1 >= self.end1 or self.start1 < 0:
raise ValueError("start1 must be greater than 0 and less than end1!")
if self.start2 >= self.end2 or self.start2 < 0:
raise ValueError("start2 must be greater than 0 and less than end2!")

@property
def bed1(self) -> SimpleBed:
"""The first of the two intervals."""
Expand Down Expand Up @@ -301,8 +313,7 @@ def __class_getitem__(cls, key: Any) -> type:
def __new__(cls, handle: io.TextIOWrapper) -> "BedWriter[BedKind]":
"""Bind the kind of BED type to this class for later introspection."""
signature = cast(FrameType, cast(FrameType, inspect.currentframe()).f_back)
argvalues = inspect.getargvalues(signature)
typelevel = argvalues.locals.get("self", None)
typelevel = signature.f_locals.get("self", None)
bed_kind = None if typelevel is None else typelevel.__args__[0]
instance = super().__new__(cls)
instance.bed_kind = bed_kind
Expand All @@ -315,7 +326,6 @@ def __enter__(self) -> "BedWriter[BedKind]":
def __init__(self, handle: io.TextIOWrapper) -> None:
"""Initialize a BED writer wihout knowing yet what BED types we will write."""
self._handle = handle
self._writer: DictWriter | None = None

def __exit__(
self,
Expand Down Expand Up @@ -350,14 +360,7 @@ def write(self, bed: BedKind) -> None:
else:
self.bed_kind = type(bed)

if self._writer is None:
self._writer = DictWriter(
self._handle,
delimiter="\t",
fieldnames=[field.name for field in fields(self.bed_kind)],
)

self._writer.writerow(as_dict(bed))
self._handle.write(f"{"\t".join(map(str, as_dict(bed).values()))}\n")

def write_all(self, beds: Iterable[BedKind]) -> None:
"""Write all the BED records to the BED output."""
Expand Down Expand Up @@ -394,8 +397,7 @@ def __class_getitem__(cls, key: Any) -> type:
def __new__(cls, handle: io.TextIOWrapper) -> "BedReader[BedKind]":
"""Bind the kind of BED type to this class for later introspection."""
signature = cast(FrameType, cast(FrameType, inspect.currentframe()).f_back)
argvalues = inspect.getargvalues(signature)
typelevel = argvalues.locals.get("self", None)
typelevel = signature.f_locals.get("self", None)
bed_kind = None if typelevel is None else typelevel.__args__[0]
instance = super().__new__(cls)
instance.bed_kind = bed_kind
Expand All @@ -411,12 +413,14 @@ def __enter__(self) -> "BedReader[BedKind]":

def __iter__(self) -> Iterator[BedKind]:
"""Iterate through the BED records of this IO handle."""
if self.bed_kind is None:
raise NotImplementedError("Untyped reading is not yet supported!")
for line in self._handle:
if line.strip() == "":
continue
if any(line.startswith(prefix) for prefix in COMMENT_PREFIXES):
continue
yield self._decode(line)
yield cast(BedKind, self.bed_kind.decode(line))

def __exit__(
self,
Expand All @@ -428,10 +432,12 @@ def __exit__(
self.close()
return super().__exit__(__exc_type, __exc_value, __traceback)

def _decode(self, line: str) -> BedKind:
if self.bed_kind is None:
raise NotImplementedError("Untyped reading is not yet supported!")
return cast(BedKind, self.bed_kind.decode(line))
@classmethod
def from_path(cls, path: Path | str, bed_kind: type[BedKind]) -> "BedReader[BedKind]":
"""Open a BED reader from a file path."""
reader = cls(handle=Path(path).open())
reader.bed_kind = bed_kind
return reader

def close(self) -> None:
"""Close the underlying IO handle."""
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "bedspec"
version = "0.1.2"
version = "0.2.0"
description = "An HTS-specs compliant BED toolkit."
authors = ["Clint Valentine <valentine.clint@gmail.com>"]
license = "MIT"
Expand Down
50 changes: 48 additions & 2 deletions tests/test_bedspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,34 @@ def test_simple_bed_types_have_a_territory() -> None:
assert list(record.territory()) == [record]


def test_simple_bed_validates_start_and_end() -> None:
"""Test that a simple BED record validates its start and end."""
with pytest.raises(ValueError):
Bed3(contig="chr1", start=-1, end=5)
with pytest.raises(ValueError):
Bed3(contig="chr1", start=5, end=5)
with pytest.raises(ValueError):
Bed3(contig="chr1", start=5, end=0)


def test_paired_bed_validates_start_and_end() -> None:
"""Test that a simple BED record validates its start and end."""
# fmt: off
with pytest.raises(ValueError):
BedPE(contig1="chr1", start1=-1, end1=5, contig2="chr1", start2=1, end2=2, name="foo", score=5, strand1=BedStrand.POSITIVE, strand2=BedStrand.POSITIVE) # noqa: E501
with pytest.raises(ValueError):
BedPE(contig1="chr1", start1=5, end1=5, contig2="chr1", start2=1, end2=2, name="foo", score=5, strand1=BedStrand.POSITIVE, strand2=BedStrand.POSITIVE) # noqa: E501
with pytest.raises(ValueError):
BedPE(contig1="chr1", start1=5, end1=0, contig2="chr1", start2=1, end2=2, name="foo", score=5, strand1=BedStrand.POSITIVE, strand2=BedStrand.POSITIVE) # noqa: E501
with pytest.raises(ValueError):
BedPE(contig1="chr1", start1=1, end1=2, contig2="chr1", start2=-1, end2=5, name="foo", score=5, strand1=BedStrand.POSITIVE, strand2=BedStrand.POSITIVE) # noqa: E501
with pytest.raises(ValueError):
BedPE(contig1="chr1", start1=1, end1=2, contig2="chr1", start2=5, end2=5, name="foo", score=5, strand1=BedStrand.POSITIVE, strand2=BedStrand.POSITIVE) # noqa: E501
with pytest.raises(ValueError):
BedPE(contig1="chr1", start1=1, end1=2, contig2="chr1", start2=5, end2=0, name="foo", score=5, strand1=BedStrand.POSITIVE, strand2=BedStrand.POSITIVE) # noqa: E501
# fmt: on


def test_paired_bed_types_have_a_territory() -> None:
"""Test that simple BEDs are their own territory."""
record = BedPE(
Expand Down Expand Up @@ -402,6 +430,24 @@ def test_bed_reader_can_read_bed_records_if_typed(tmp_path: Path) -> None:
assert list(BedReader[Bed3](handle)) == [bed]


def test_bed_reader_can_read_bed_records_from_a_path(tmp_path: Path) -> None:
"""Test that the BED reader can read BED records from a path if it is typed."""

bed: Bed3 = Bed3(contig="chr1", start=1, end=2)

with open(tmp_path / "test.bed", "w") as handle:
writer: BedWriter = BedWriter(handle)
writer.write(bed)

assert Path(tmp_path / "test.bed").read_text() == "chr1\t1\t2\n"

reader = BedReader[Bed3].from_path(tmp_path / "test.bed", bed_kind=Bed3)
assert list(reader) == [bed]

reader = BedReader[Bed3].from_path(str(tmp_path / "test.bed"), bed_kind=Bed3)
assert list(reader) == [bed]


def test_bed_reader_can_raises_exception_if_not_typed(tmp_path: Path) -> None:
"""Test that the BED reader raises an exception if it is not typed."""

Expand Down Expand Up @@ -431,8 +477,8 @@ def test_bed_reader_can_read_bed_records_with_comments(tmp_path: Path) -> None:
writer.write_comment("track this-is-fine")
writer.write_comment("browser is mario's enemy?")
writer.write_comment("hello mom!")
handle.write("\n") # empty line
handle.write(" \t\n") # empty line
handle.write("\n") # empty line
handle.write(" \t\n") # empty line
writer.write(bed)
writer.write_comment("hello dad!")

Expand Down

0 comments on commit 2ceac80

Please sign in to comment.