Skip to content

Commit

Permalink
Remove thread-unsafe event logging code (#95)
Browse files Browse the repository at this point in the history
This would occasionally cause errors because of dask/distributed#5552, and wasn't very useful anyway.
  • Loading branch information
gjoseph92 authored Dec 1, 2021
1 parent 2f3e348 commit cc0598b
Showing 1 changed file with 0 additions and 37 deletions.
37 changes: 0 additions & 37 deletions stackstac/rio_reader.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from __future__ import annotations

import functools
import logging
import threading
import warnings
import weakref
from typing import TYPE_CHECKING, Optional, Protocol, Tuple, Type, TypedDict, Union

import numpy as np
Expand Down Expand Up @@ -32,16 +30,6 @@ def _curthread():
return threading.current_thread().name


def log_event(topic: str, msg: dict) -> None:
try:
import distributed

worker = distributed.get_worker()
except (ImportError, ValueError):
return
worker.log_event(topic, dict(msg, thread=_curthread()))


# /TODO


Expand Down Expand Up @@ -203,8 +191,6 @@ def __init__(
# but because `close` closes datasets across all threads by simply deleting the current threadlocal
# and replacing it with an empty one, we have to synchronize all access to `self._threadlocal`.

log_event("create_ThreadLocalRioDataset", dict(url=self._url, vrt=bool(vrt)))

def _open(self) -> Union[SelfCleaningDatasetReader, WarpedVRT]:
with self._env.open:
with time(f"Reopen {self._url!r} in {_curthread()}: {{t}}"):
Expand All @@ -214,23 +200,16 @@ def _open(self) -> Union[SelfCleaningDatasetReader, WarpedVRT]:
driver=self._driver,
**self._open_options,
)
log_event("open_dataset", dict(url=self._url))
if self._vrt_params:
with self._env.open_vrt:
result = vrt = WarpedVRT(ds, sharing=False, **self._vrt_params)
log_event("open_vrt", dict(url=self._url))
else:
vrt = None

with self._lock:
self._threadlocal.ds = ds
self._threadlocal.vrt = vrt

weakref.ref(
ds, functools.partial(log_event, "close_dataset", dict(url=self._url))
)
weakref.ref(vrt, functools.partial(log_event, "close_vrt", dict(url=self._url)))
# NOTE: functools.partial to hopefully avoid taking a closure over `self`
return result

@property
Expand Down Expand Up @@ -277,7 +256,6 @@ def close(self) -> None:
# datasets.
# NOTE: we're assuming here that closing a GDAL dataset from a thread other than the one that created
# it is safe to do, which, knowing GDAL, is quite possibly untrue.
log_event("close_ThreadLocalRioDataset", dict(url=self._url))
with self._lock:
self._threadlocal = threading.local()

Expand Down Expand Up @@ -365,14 +343,6 @@ def _open(self) -> ThreadsafeRioDataset:
"a separate STAC asset), so you'll need to exclude this asset from your analysis."
)

log_event("open_dataset_initial", dict(url=self.url))
weakref.ref(
ds,
functools.partial(
log_event, "close_dataset_initial", dict(url=self.url)
),
)

# Only make a VRT if the dataset doesn't match the spatial spec we want
if self.spec.vrt_params != {
"crs": ds.crs.to_epsg(),
Expand All @@ -387,13 +357,6 @@ def _open(self) -> ThreadsafeRioDataset:
resampling=self.resampling,
**self.spec.vrt_params,
)
log_event("open_vrt_initial", dict(url=self.url))
weakref.ref(
vrt,
functools.partial(
log_event, "close_vrt_initial", dict(url=self.url)
),
)
else:
logger.info(f"Skipping VRT for {self.url!r}")
vrt = None
Expand Down

0 comments on commit cc0598b

Please sign in to comment.