Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable resolution of indirect object references when using worker processes #51

Merged
merged 10 commits into from
Jan 22, 2025
51 changes: 51 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,57 @@ possible piece of information, PLAYA gives you some options here.
Wherever possible this information can be computed lazily, but this
involves some more work on the user's part.

## Using multiple CPUs

You may be wondering, what does "Parallel and Lazy" really mean?
PLAYA allows you to take advantage of multiple CPUs, which can greatly
speed up some operations on large documents. This parallelism
currently operates at the page level since this is the most logical
way to split up a PDF. To enable it, pass the `max_workers` argument
to `playa.open` with the number of cores you wish to use (you can also
explicitly pass `None` to use the maximum):

```python
with playa.open(path, max_workers=4) as pdf:
...
```

Now, you can apply a function across the pages of the PDF in parallel
using the `map` method of `pdf.pages`, for example:

```python
def get_page_size(page: Page) -> Tuple[int, int]:
return page.width, page.height

page_sizes = pdf.pages.map(get_page_size)
```

You could also just do this for certain pages by subscripting
`pdf.pages`:

```python
some_page_sizes = pdf.pages[2:5].map(get_page_size)
```

There are some limitations to this, because it uses `multiprocessing`.
The function you pass to `map` must be serializable by `pickle`, which
in practice means that an inner function or lambda generally doesn't
work. You can get around this in a very Java-like way by passing a
callable object that encapsulates the necessary state. If you wish to
avoid traumatising readers of your code, then use `functools.partial`
instead:

```python
pdf.pages.map(partial(myfunc, arg1=value1, arg2=value2))
```

Also, any value returned by your function must also be serializable.
There is a bit of magic that enables this to work for PDF objects
containing indirect object references, so you should be able to, for
instance, get the `dests` or `annots` from every page without any
trouble. But if you have your own complex objects that you return you
may encounter problems (or slowness).

## Dictionary-based API

There used to be a "dictionary-based" API here. You can now find it
Expand Down
10 changes: 2 additions & 8 deletions playa/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,16 @@
from concurrent.futures import ProcessPoolExecutor
from os import PathLike
from multiprocessing.context import BaseContext
from pathlib import Path
from typing import Union

from playa.worker import _set_document
from playa.worker import _init_worker
from playa.document import Document, LayoutDict, schema as schema # noqa: F401
from playa.page import DeviceSpace
from playa._version import __version__ # noqa: F401

fieldnames = LayoutDict.__annotations__.keys()


def _init_worker(path: Path, password: str = "", space: DeviceSpace = "screen") -> None:
fp = builtins.open(path, "rb")
_set_document(Document(fp, password=password, space=space, init_worker=True))


def open(
path: Union[PathLike, str],
*,
Expand Down Expand Up @@ -63,6 +57,6 @@ def open(
max_workers=max_workers,
mp_context=mp_context,
initializer=_init_worker, # type: ignore[arg-type]
initargs=(path, password, space), # type: ignore[arg-type]
initargs=(id(pdf), path, password, space), # type: ignore[arg-type]
)
return pdf
49 changes: 36 additions & 13 deletions playa/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,14 @@
nunpack,
)
from playa.structtree import StructTree
from playa.worker import _set_document, _ref_document, _deref_document, _deref_page
from playa.worker import (
_set_document,
_ref_document,
_deref_document,
_deref_page,
in_worker,
PageRef,
)

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -830,12 +837,13 @@ def __init__(
fp: BinaryIO,
password: str = "",
space: DeviceSpace = "screen",
init_worker: bool = False,
_boss_id: int = 0,
) -> None:
if init_worker:
if _boss_id:
# Set this **right away** because it is needed to get
# indirect object references right.
_set_document(self)
_set_document(self, _boss_id)
assert in_worker()
self.xrefs: List[XRef] = []
self.space = space
self.info = []
Expand Down Expand Up @@ -1385,22 +1393,33 @@ def _read_xref_from(
self._read_xref_from(pos + self.offset, xrefs)


def call_page(func: Callable[[Page], Any], idx: int) -> Any:
def call_page(func: Callable[[Page], Any], pageref: PageRef) -> Any:
"""Call a function on a page in a worker process."""
return func(_deref_page(idx))
return func(_deref_page(pageref))


class PageList:
"""List of pages indexable by 0-based index or string label."""

def __init__(self, doc: Document):
def __init__(
self, doc: Document, pages: Union[Iterable[Page], None] = None
) -> None:
self.docref = _ref_document(doc)
if pages is not None:
self._pages = list(pages)
self._labels: Dict[str, Page] = {
page.label: page for page in pages if page.label is not None
}
else:
self._init_pages(doc)

def _init_pages(self, doc: Document) -> None:
try:
page_labels: Iterable[Optional[str]] = doc.page_labels
page_labels: Iterable[Union[str, None]] = doc.page_labels
except (KeyError, ValueError):
page_labels = (str(idx) for idx in itertools.count(1))
self._pages = []
self._labels: Dict[str, Page] = {}
self._labels = {}
try:
page_objects = list(doc._get_page_objects())
except (KeyError, IndexError, TypeError):
Expand All @@ -1423,10 +1442,12 @@ def __iter__(self) -> Iterator[Page]:
return iter(self._pages)

def __getitem__(self, key: Union[int, str]) -> Page:
if isinstance(key, int) or isinstance(key, slice):
if isinstance(key, int):
return self._pages[key]
elif isinstance(key, tuple):
return [self[k] for k in key]
elif isinstance(key, slice):
return PageList(_deref_document(self.docref), self._pages[key])
elif isinstance(key, (tuple, list)):
return PageList(_deref_document(self.docref), (self[k] for k in key))
else:
return self._labels[key]

Expand All @@ -1444,7 +1465,9 @@ def map(self, func: Callable[[Page], Any]) -> Iterator:
doc = _deref_document(self.docref)
if doc._pool is not None:
return doc._pool.map(
call_page, itertools.repeat(func), (page.page_idx for page in self)
call_page,
itertools.repeat(func),
((id(doc), page.page_idx) for page in self),
)
else:
return (func(page) for page in self)
Expand Down
66 changes: 44 additions & 22 deletions playa/worker.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,48 @@
"""Worker subprocess related functions and data."""

import weakref
from typing import Union, TYPE_CHECKING
from pathlib import Path
from typing import Tuple, Union, TYPE_CHECKING

if TYPE_CHECKING:
from playa.document import Document
from playa.document import Document, DeviceSpace
from playa.page import Page

# Type signature of document reference
DocumentRef = Union[weakref.ReferenceType["Document"], str]
DocumentRef = int
# Type signature of page reference
PageRef = Union[weakref.ReferenceType["Page"], int]
PageRef = Tuple[DocumentRef, int]

# A global PDF object used in worker processes
__pdf: Union["Document", None] = None
# Flag used to signal that we should look at the global document
GLOBAL_DOC = "[citation needed]"
# Registry of documents which have workers
__bosses: weakref.WeakValueDictionary[int, "Document"] = weakref.WeakValueDictionary()
# Numeric id of the document in the boss process (will show up instead
# of weak references when serialized, gets looked up in _bosses)
GLOBAL_DOC: int = 0


def in_worker() -> bool:
"""Are we currently in a worker process?"""
return __pdf is not None


def _set_document(doc: "Document") -> None:
global __pdf
def _init_worker(
boss: int, path: Path, password: str = "", space: "DeviceSpace" = "screen"
) -> None:
from playa.document import Document

global __pdf, GLOBAL_DOC
fp = open(path, "rb")
__pdf = Document(fp, password=password, space=space, _boss_id=boss)
GLOBAL_DOC = boss


def _set_document(doc: "Document", boss: int) -> None:
"""Call this in the worker process."""
global __pdf, GLOBAL_DOC
__pdf = doc
GLOBAL_DOC = boss


def _get_document() -> Union["Document", None]:
Expand All @@ -34,29 +51,34 @@ def _get_document() -> Union["Document", None]:


def _ref_document(doc: "Document") -> DocumentRef:
return weakref.ref(doc) if __pdf is None else GLOBAL_DOC
if in_worker():
global GLOBAL_DOC
assert GLOBAL_DOC != 0
return GLOBAL_DOC
else:
docid = id(doc)
if docid not in __bosses:
__bosses[docid] = doc
return docid


def _deref_document(ref: DocumentRef) -> "Document":
doc = __pdf
if isinstance(ref, weakref.ReferenceType):
doc = ref()
if in_worker():
doc = __pdf
else:
if ref not in __bosses:
raise RuntimeError(f"Unknown or deleted document with ID {ref}!")
doc = __bosses[ref]
if doc is None:
raise RuntimeError("Document no longer exists (or never existed)!")
return doc


def _ref_page(page: "Page") -> PageRef:
return weakref.ref(page) if __pdf is None else page.page_idx
return page.docref, page.page_idx


def _deref_page(ref: PageRef) -> "Page":
if isinstance(ref, int):
if __pdf is None:
raise RuntimeError("Not in a worker process, cannot retrieve document!")
return __pdf.pages[ref]
else:
page = ref()
if page is None:
raise RuntimeError("Page no longer exists!")
return page
docref, idx = ref
doc = _deref_document(docref)
return doc.pages[idx]
2 changes: 2 additions & 0 deletions tests/test_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ def test_pages():
assert [p.label for p in twopages] == ["3", "4"]
threepages = doc.pages["2", 2, 3]
assert [p.label for p in threepages] == ["2", "3", "4"]
threepages = doc.pages[["2", 2, 3]]
assert [p.label for p in threepages] == ["2", "3", "4"]


@pytest.mark.skipif(not CONTRIB.exists(), reason="contrib samples not present")
Expand Down
1 change: 0 additions & 1 deletion tests/test_open.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ def test_weakrefs() -> None:
with playa.open(TESTDIR / "simple5.pdf") as doc:
ref = doc.catalog["Pages"]
del doc
assert ref.doc() is None
with pytest.raises(RuntimeError):
_ = ref.resolve()

Expand Down
44 changes: 42 additions & 2 deletions tests/test_parallel.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
"""Test parallel analysis."""

import operator
from typing import List

import pytest

import playa
import playa.document
from playa.page import Page
from playa.page import Page, XObjectObject
from playa.worker import in_worker, _get_document
from tests.data import TESTDIR, CONTRIB


def has_one_true_pdf() -> int:
doc = playa.worker._get_document()
assert in_worker()
doc = _get_document()
assert doc is not None
assert doc.space == "default"
return len(doc.pages)
Expand All @@ -28,6 +33,33 @@ def test_open_parallel():
assert future.result() == 1


def test_parallel_references():
with playa.open(
TESTDIR / "pdf_structure.pdf", space="default", max_workers=2
) as pdf:
(resources,) = pdf.pages.map(operator.attrgetter("resources"))
desc = resources["Font"].resolve() # should succeed!
assert "F1" in desc # should exist!
assert "F2" in desc
assert desc["F1"].resolve()["LastChar"] == 17


def get_xobjs(page: Page) -> List[XObjectObject]:
return list(page.xobjects)


@pytest.mark.skipif(not CONTRIB.exists(), reason="contrib samples not present")
def test_parallel_xobjects():
# Verify that page references (used in XObjects) also work
with playa.open(CONTRIB / "basicapi.pdf", space="default", max_workers=2) as pdf:
for page in pdf.pages:
for xobj in page.xobjects:
assert xobj.page.page_idx == page.page_idx
for idx, xobjs in enumerate(pdf.pages.map(get_xobjs)):
for xobj in xobjs:
assert xobj.page.page_idx == idx


def get_text(page: Page) -> str:
return " ".join(x.chars for x in page.texts)

Expand All @@ -39,3 +71,11 @@ def test_map_parallel():
with playa.open(CONTRIB / "PSC_Station.pdf", space="default") as pdf:
texts = list(pdf.pages.map(get_text))
assert texts == parallel_texts
with playa.open(CONTRIB / "PSC_Station.pdf", space="default", max_workers=2) as pdf:
parallel_texts = list(pdf.pages[3:8].map(get_text))
print(parallel_texts)
assert parallel_texts != texts


if __name__ == "__main__":
test_parallel_xobjects()
Loading
Loading