Skip to content

Commit

Permalink
Switch to shared Lock (SerializableLock if possible) for reading/writ…
Browse files Browse the repository at this point in the history
…ing (#1179)

* Switch to shared Lock (SerializableLock if possible) for reading and writing

Fixes #1172

The serializable lock will be useful for dask.distributed or multi-processing
(xref #798, #1173, among others).

* Test serializable lock

* Use conda-forge for builds

* remove broken/fragile .test_lock
  • Loading branch information
shoyer authored Jan 4, 2017
1 parent 3001ee1 commit 21a792d
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 29 deletions.
2 changes: 2 additions & 0 deletions ci/requirements-py27-netcdf4-dev.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
name: test_env
channels:
- conda-forge
dependencies:
- python=2.7
- cython
Expand Down
2 changes: 2 additions & 0 deletions ci/requirements-py27-pydap.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
name: test_env
channels:
- conda-forge
dependencies:
- python=2.7
- dask
Expand Down
2 changes: 2 additions & 0 deletions ci/requirements-py35.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
name: test_env
channels:
- conda-forge
dependencies:
- python=3.5
- cython
Expand Down
2 changes: 1 addition & 1 deletion doc/dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ larger chunksizes.
import os
os.remove('example-data.nc')
Optimization Tips
-----------------

Expand Down
6 changes: 5 additions & 1 deletion doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Breaking changes
By `Guido Imperiale <https://github.com/crusaderky>`_ and
`Stephan Hoyer <https://github.com/shoyer>`_.
- Pickling a ``Dataset`` or ``DataArray`` linked to a file on disk no longer
caches its values into memory before pickling :issue:`1128`. Instead, pickle
caches its values into memory before pickling (:issue:`1128`). Instead, pickle
stores file paths and restores objects by reopening file references. This
enables preliminary, experimental use of xarray for opening files with
`dask.distributed <https://distributed.readthedocs.io>`_.
Expand Down Expand Up @@ -227,6 +227,10 @@ Bug fixes
- Fixed a bug with facetgrid (the ``norm`` keyword was ignored, :issue:`1159`).
By `Fabien Maussion <https://github.com/fmaussion>`_.

- Resolved a concurrency bug that could cause Python to crash when
simultaneously reading and writing netCDF4 files with dask (:issue:`1172`).
By `Stephan Hoyer <https://github.com/shoyer>`_.

- Fix to make ``.copy()`` actually copy dask arrays, which will be relevant for
future releases of dask in which dask arrays will be mutable (:issue:`1180`).

Expand Down
10 changes: 3 additions & 7 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from __future__ import print_function
import gzip
import os.path
import threading
from distutils.version import StrictVersion
from glob import glob
from io import BytesIO
Expand All @@ -12,7 +11,7 @@
import numpy as np

from .. import backends, conventions
from .common import ArrayWriter
from .common import ArrayWriter, GLOBAL_LOCK
from ..core import indexing
from ..core.combine import auto_combine
from ..core.utils import close_on_error, is_remote_uri
Expand Down Expand Up @@ -55,9 +54,6 @@ def _normalize_path(path):
return os.path.abspath(os.path.expanduser(path))


_global_lock = threading.Lock()


def _default_lock(filename, engine):
if filename.endswith('.gz'):
lock = False
Expand All @@ -71,9 +67,9 @@ def _default_lock(filename, engine):
else:
# TODO: identify netcdf3 files and don't use the global lock
# for them
lock = _global_lock
lock = GLOBAL_LOCK
elif engine in {'h5netcdf', 'pynio'}:
lock = _global_lock
lock = GLOBAL_LOCK
else:
lock = False
return lock
Expand Down
14 changes: 11 additions & 3 deletions xarray/backends/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from __future__ import division
from __future__ import print_function
import numpy as np
import itertools
import logging
import time
import traceback
Expand All @@ -12,7 +11,12 @@

from ..conventions import cf_encoder
from ..core.utils import FrozenOrderedDict
from ..core.pycompat import iteritems, dask_array_type, OrderedDict
from ..core.pycompat import iteritems, dask_array_type

try:
from dask.utils import SerializableLock as Lock
except ImportError:
from threading import Lock

# Create a logger object, but don't add any handlers. Leave that to user code.
logger = logging.getLogger(__name__)
Expand All @@ -21,6 +25,10 @@
NONE_VAR_NAME = '__values__'


# dask.utils.SerializableLock if available, otherwise just a threading.Lock
GLOBAL_LOCK = Lock()


def _encode_variable_name(name):
if name is None:
name = NONE_VAR_NAME
Expand Down Expand Up @@ -150,7 +158,7 @@ def sync(self):
import dask.array as da
import dask
if StrictVersion(dask.__version__) > StrictVersion('0.8.1'):
da.store(self.sources, self.targets, lock=threading.Lock())
da.store(self.sources, self.targets, lock=GLOBAL_LOCK)
else:
da.store(self.sources, self.targets)
self.sources = []
Expand Down
14 changes: 0 additions & 14 deletions xarray/test/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -1034,20 +1034,6 @@ def preprocess(ds):
with open_mfdataset(tmp, preprocess=preprocess) as actual:
self.assertDatasetIdentical(expected, actual)

def test_lock(self):
original = Dataset({'foo': ('x', np.random.randn(10))})
with create_tmp_file() as tmp:
original.to_netcdf(tmp, format='NETCDF3_CLASSIC')
with open_dataset(tmp, chunks=10) as ds:
task = ds.foo.data.dask[ds.foo.data.name, 0]
self.assertIsInstance(task[-1], type(Lock()))
with open_mfdataset(tmp) as ds:
task = ds.foo.data.dask[ds.foo.data.name, 0]
self.assertIsInstance(task[-1], type(Lock()))
with open_mfdataset(tmp, engine='scipy') as ds:
task = ds.foo.data.dask[ds.foo.data.name, 0]
self.assertNotIsInstance(task[-1], type(Lock()))

def test_save_mfdataset_roundtrip(self):
original = Dataset({'foo': ('x', np.random.randn(10))})
datasets = [original.isel(x=slice(5)),
Expand Down
4 changes: 1 addition & 3 deletions xarray/test/test_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ def test_dask_distributed_integration_test(loop, engine):
original = create_test_data()
with create_tmp_file() as filename:
original.to_netcdf(filename, engine=engine)
# TODO: should be able to serialize locks
restored = xr.open_dataset(filename, chunks=3, lock=False,
engine=engine)
restored = xr.open_dataset(filename, chunks=3, engine=engine)
assert isinstance(restored.var1.data, da.Array)
computed = restored.compute()
assert_dataset_allclose(original, computed)

0 comments on commit 21a792d

Please sign in to comment.