Skip to content

Commit

Permalink
dal: process in reverse chronological order, add error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
karlicoss committed Nov 6, 2023
1 parent 939aee6 commit cdd0b34
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 35 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
dynamic = ["version"] # version is managed by setuptools_scm
name = "pinbexport"
dependencies = [
"orjson", # faster json parsing
"more-itertools",
]
# TODO maybe split out DAL deps and export deps? might be nice

Expand Down
95 changes: 60 additions & 35 deletions src/pinbexport/dal.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#!/usr/bin/env python3
from datetime import datetime
import json
from pathlib import Path
from typing import NamedTuple, Sequence, Iterator, Set, Iterable, NewType
from typing import Iterator, NamedTuple, NewType, Sequence

import orjson
from more_itertools import unique_everseen

from .exporthelpers import dal_helper
from .exporthelpers.dal_helper import PathIsh, Json, datetime_aware
from .exporthelpers.dal_helper import Json, PathIsh, Res, datetime_aware, pathify
from .exporthelpers.logging_helper import make_logger


Expand Down Expand Up @@ -48,37 +49,61 @@ def tags(self) -> Sequence[Tag]:

class DAL:
def __init__(self, sources: Sequence[PathIsh]) -> None:
self.sources = [p if isinstance(p, Path) else Path(p) for p in sources]

def raw(self) -> Json:
# TODO merge them carefully
last = max(self.sources)
try:
return json.loads(last.read_text())
except Exception as e:
raise RuntimeError(f'While processing {last}') from e

def _bookmarks_raw(self) -> Iterable[Json]:
data = self.raw()
if isinstance(data, list):
return data # old format
else:
return data['posts']

def bookmarks(self) -> Iterator[Bookmark]:
def key(b: Bookmark):
return (b.created, b.url)

emitted: Set = set()
for j in self._bookmarks_raw():
bm = Bookmark(j)
# TODO could also detect that by hash?
bk = key(bm)
if bk in emitted:
logger.debug(f'skipping duplicate item {bm}')
continue
emitted.add(bk)
yield bm
self.sources = list(map(pathify, sources))

def raw(self) -> Iterator[Res[Json]]:
total = len(self.sources)
width = len(str(total))
for idx, path in enumerate(
# TODO: perhaps reversing this should be configurable?
# a bit of a problem that if we process in chronological order, we never emit updates for bookmarks
# note that for pinboard it's hard to guarantee sort order when we emit items anyway
# because API is flaky and sometimes bookmarks disappear from some exports for no reason
reversed(self.sources),
# however in some cases it may be useful to emit everything with minimal uniquification
# e.g. if we want some sort of database that actually contains updates for all entities
):
logger.info(f'processing [{idx:>{width}}/{total:>{width}}] {path}')
try:
yield orjson.loads(path.read_bytes())
except Exception as e:
ex = RuntimeError(f'While processing {path}')
ex.__cause__ = e
yield ex

def _bookmarks_raw(self) -> Iterator[Res[Json]]:
for j in self.raw():
if isinstance(j, Exception):
yield j
else:
if isinstance(j, list):
yield from j # old format
else:
yield from j['posts']

def bookmarks(self) -> Iterator[Res[Bookmark]]:
# first step -- deduplicate raw jsons
it_jsons: Iterator[Res[Json]] = unique_everseen(
self._bookmarks_raw(),
# without it, dict isn't hashable, so unique_everseen takes quadratic time
key=lambda j: j if isinstance(j, Exception) else orjson.dumps(j),
# ugh. it's a bit wasteful to parse first and then dump again though?
# might be much nicer if we can do some sort of partial parsing of the json
)
# fmt: off
it_bookmarks: Iterator[Res[Bookmark]] = (
j if isinstance(j, Exception) else Bookmark(j)
for j in it_jsons
)
# fmt: on

# second step -- deduplicate bookmarks with same dt/url
# sadly pinboard doesn't have unique ids for bookmarks
it_bookmarks = unique_everseen(
it_bookmarks,
key=lambda b: b if isinstance(b, Exception) else (b.created, b.url),
)
return it_bookmarks


def demo(dal: DAL) -> None:
Expand Down

0 comments on commit cdd0b34

Please sign in to comment.