Skip to content
This repository has been archived by the owner on Nov 21, 2023. It is now read-only.

Transform flow test failing when running on k8s on Azure. #4

Closed
sharkinsspatial opened this issue May 11, 2021 · 26 comments
Closed

Transform flow test failing when running on k8s on Azure. #4

sharkinsspatial opened this issue May 11, 2021 · 26 comments

Comments

@sharkinsspatial
Copy link
Contributor

When trying to reproduce the testing Prefect flow used while developing pangeo-forge-aws-bakery we are encountering an error when running ds.to_zarr https://github.com/pangeo-forge/pangeo-forge-azure-bakery/blob/add-k8s-cluster/flow_test/transform_tasks/xarray.py#L30.

Traceback (most recent call last):
  File "/Users/ciaran/dev/pangeo-forge-azure-bakery/flow_test/transform_tasks/xarray.py", line 30, in combine_and_write
  File "/usr/local/lib/python3.8/site-packages/xarray/core/dataset.py", line 1907, in to_zarr
    return to_zarr(
  File "/usr/local/lib/python3.8/site-packages/xarray/backends/api.py", line 1454, in to_zarr
    dump_to_store(dataset, zstore, writer, encoding=encoding)
  File "/usr/local/lib/python3.8/site-packages/xarray/backends/api.py", line 1130, in dump_to_store
    store.store(variables, attrs, check_encoding, writer, unlimited_dims=unlimited_dims)
  File "/usr/local/lib/python3.8/site-packages/xarray/backends/zarr.py", line 465, in store
    ds = open_zarr(self.ds.store, group=self.ds.path, chunks=None)
  File "/usr/local/lib/python3.8/site-packages/xarray/backends/zarr.py", line 688, in open_zarr
    ds = open_dataset(
  File "/usr/local/lib/python3.8/site-packages/xarray/backends/api.py", line 501, in open_dataset
    backend_ds = backend.open_dataset(
  File "/usr/local/lib/python3.8/site-packages/xarray/backends/zarr.py", line 749, in open_dataset
    ds = store_entrypoint.open_dataset(
  File "/usr/local/lib/python3.8/site-packages/xarray/backends/store.py", line 37, in open_dataset
    ds = Dataset(vars, attrs=attrs)
  File "/usr/local/lib/python3.8/site-packages/xarray/core/dataset.py", line 741, in __init__
    variables, coord_names, dims, indexes, _ = merge_data_and_coords(
  File "/usr/local/lib/python3.8/site-packages/xarray/core/merge.py", line 477, in merge_data_and_coords
    return merge_core(
  File "/usr/local/lib/python3.8/site-packages/xarray/core/merge.py", line 631, in merge_core
    dims = calculate_dimensions(variables)
  File "/usr/local/lib/python3.8/site-packages/xarray/core/dataset.py", line 203, in calculate_dimensions
    raise ValueError(
ValueError: conflicting sizes for dimension 'time': length 20 on 'time' and length 15 on 'anom'

@TomAugspurger any suggestions you might have for diagnosing if this might be related to our configuration of Azure Blob storage would be super appreciated 🙇 .

@TomAugspurger
Copy link

Reproducing locally

Here are some steps I took to try to reproduce this locally:

  1. Modify the transform_flow.py to remove assumptions about the bakery, insert a breakpoint in the recipe:
diff --git a/flow_test/transform_flow.py b/flow_test/transform_flow.py
index f742a7f..2ffd899 100755
--- a/flow_test/transform_flow.py
+++ b/flow_test/transform_flow.py
@@ -11,7 +11,8 @@ from flow_test.transform_tasks.http import download
 from flow_test.transform_tasks.xarray import chunk, combine_and_write
 from flow_test.transform_tasks.zarr import consolidate_metadata
 
-project = os.environ["PREFECT_PROJECT"]
+# project = os.environ["PREFECT_PROJECT"]
+project = "debug"
 flow_name = "dask-transform-flow"
 
 
@@ -28,33 +29,32 @@ def source_url(day: str) -> str:
 
 with Flow(
     flow_name,
-    storage=storage.Azure(
-        container=os.environ["FLOW_STORAGE_CONTAINER"],
-        connection_string=os.environ["FLOW_STORAGE_CONNECTION_STRING"],
-    ),
-    run_config=KubernetesRun(
-        image=os.environ["AZURE_BAKERY_IMAGE"],
-        env={"AZURE_STORAGE_CONNECTION_STRING": os.environ["FLOW_STORAGE_CONNECTION_STRING"], "AZURE_BAKERY_IMAGE": os.environ["AZURE_BAKERY_IMAGE"]},
-        labels=json.loads(os.environ["PREFECT__CLOUD__AGENT__LABELS"]),
-    ),
-    executor=DaskExecutor(
-        cluster_class="dask_kubernetes.KubeCluster",
-        cluster_kwargs={
-            "pod_template": make_pod_spec(
-                image=os.environ["AZURE_BAKERY_IMAGE"],
-                labels={
-                    "flow": flow_name
-                },
-                memory_limit=None,
-                memory_request=None,
-                env={
-                    "AZURE_STORAGE_CONNECTION_STRING": os.environ["FLOW_STORAGE_CONNECTION_STRING"]
-                }
-            )
-        },
-        adapt_kwargs={"maximum": 10},
-
-    ),
+    # storage=storage.Azure(
+    #     container=os.environ["FLOW_STORAGE_CONTAINER"],
+    #     connection_string=os.environ["FLOW_STORAGE_CONNECTION_STRING"],
+    # ),
+    # run_config=KubernetesRun(
+    #     image=os.environ["AZURE_BAKERY_IMAGE"],
+    #     env={"AZURE_STORAGE_CONNECTION_STRING": os.environ["FLOW_STORAGE_CONNECTION_STRING"], "AZURE_BAKERY_IMAGE": os.environ["AZURE_BAKERY_IMAGE"]},
+    #     labels=json.loads(os.environ["PREFECT__CLOUD__AGENT__LABELS"]),
+    # ),
+    # executor=DaskExecutor(
+    #     cluster_class="dask_kubernetes.KubeCluster",
+    #     cluster_kwargs={
+    #         "pod_template": make_pod_spec(
+    #             image=os.environ["AZURE_BAKERY_IMAGE"],
+    #             labels={
+    #                 "flow": flow_name
+    #             },
+    #             memory_limit=None,
+    #             memory_request=None,
+    #             env={
+    #                 "AZURE_STORAGE_CONNECTION_STRING": os.environ["FLOW_STORAGE_CONNECTION_STRING"]
+    #             }
+    #         )
+    #     },
+    #     adapt_kwargs={"maximum": 10},
+    # ),
 ) as flow:
     days = Parameter(
         "days",
@@ -64,10 +64,12 @@ with Flow(
     zarr_output = "dask_transform_flow.zarr"
     nc_sources = download.map(
         sources,
-        cache_location=unmapped(f"abfs://{os.environ['FLOW_STORAGE_CONTAINER']}/cache/{zarr_output}"),
+        # cache_location=unmapped(f"abfs://{os.environ['FLOW_STORAGE_CONTAINER']}/cache/{zarr_output}"),
+        cache_location=unmapped(f"memory://cache/{zarr_output}")
     )
     chunked = chunk(nc_sources, size=5)
-    target = f"abfs://{os.environ['FLOW_STORAGE_CONTAINER']}/target/{zarr_output}"
+    # target = f"abfs://{os.environ['FLOW_STORAGE_CONTAINER']}/target/{zarr_output}"
+    target="memory://target/{zarr_output}"
     writes = combine_and_write.map(
         chunked,
         unmapped(target),
@@ -77,4 +79,4 @@ with Flow(
 
     consolidate_metadata(target, writes=writes)
 
-flow.register(project_name=project)
+# flow.register(project_name=project)
diff --git a/flow_test/transform_tasks/xarray.py b/flow_test/transform_tasks/xarray.py
index 131d55b..a6c52f9 100644
--- a/flow_test/transform_tasks/xarray.py
+++ b/flow_test/transform_tasks/xarray.py
@@ -19,6 +19,8 @@ def combine_and_write(
 ) -> List[str]:
     try:
         double_open_files = [fsspec.open(url).open() for url in sources]
+        breakpoint()
+        print(sources)
         ds = xr.open_mfdataset(double_open_files, combine="nested", concat_dim=concat_dim)
         ds = ds.chunk({append_dim: len(sources)})
         mapper = fsspec.get_mapper(target)
  1. Run the flow
ipython -i flow_test/transform_flow.py
Python 3.9.4 | packaged by conda-forge | (default, May 10 2021, 22:13:33)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.23.1 -- An enhanced Interactive Python. Type '?' for help.
In [1]: flow.run()
[2021-05-11 10:28:12-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'dask-transform-flow'

Unfortunately, everything went fine.

Possible issues

I think the next two issues to consider are

  1. An issue with the software environment. Is there an easy way to reproduce the software environment prefect is using (a docker image maybe?)
  2. An issue with azure blob storage / adlfs. Can you inspect the bucket where files are cached? Does anything look off? Maybe disable caching entirely.
  3. Verify that the failure is at ds.load().

@ciaransweet
Copy link
Contributor

Thanks for tackling this @TomAugspurger

You can find the image the dask workers are running here https://github.com/pangeo-forge/pangeo-forge-azure-bakery/tree/add-k8s-cluster/images, you should be able to run the flow in there

@ciaransweet
Copy link
Contributor

From what I could see, the buckets looked fine, here's a screenshot:

image

This is from a past convo with Sean, but I'll run it again and send a current look

@TomAugspurger
Copy link

Thanks, I ran it from the docker image and it again ran fine.

Can you make sure that the cache is cleared prior to running through prefect?

@ciaransweet
Copy link
Contributor

The mystery continues 🕵️‍♀️

Images of storage before:

image
image

Images of storage after:

image
image
image
image

@ciaransweet
Copy link
Contributor

@TomAugspurger I'm going to spend tomorrow fully fleshing out a run through of exactly what I'm doing, running, and seeing, I'll be dumping all the logs I can too

@ciaransweet
Copy link
Contributor

@TomAugspurger @martindurant @sharkinsspatial

Hey folks, here's a full rundown of what I'm doing to get this flow to break, what is left in Azure after each flow run and what logs I can get out of Prefect & K8s

High level overview

Whilst trying to confirm that the Azure Bakery is deployed/setup correctly, we've been trying to run flow_test/transform_flow.py which is a copy of the same flow from the AWS Bakery here. The modifications made are:

  • Removal of all the ECS/Dask setup
  • Switch from s3:// to abfs
  • Addition of a try/except block in flow_tests/transform_tasks/xarray.py::combine_and_write

Combine and Write try/except

In the AWS Bakery, this function looks like:

@task
def combine_and_write(
    sources: List[str], target: str, append_dim: str, concat_dim: str
) -> List[str]:
    double_open_files = [fsspec.open(url).open() for url in sources]
    ds = xr.open_mfdataset(double_open_files, combine="nested", concat_dim=concat_dim)
    ds = ds.chunk({append_dim: len(sources)})
    mapper = fsspec.get_mapper(target)

    if not len(mapper):
        kwargs = dict(mode="w")
    else:
        kwargs = dict(mode="a", append_dim=append_dim)
    ds.to_zarr(mapper, **kwargs)
    return target

This function currently fails in the Azure Bakery, because of this and because of a current 'bug' in Prefect, we don't get a chance for the Dask logs to be collected by Prefect. To get around this, I've wrapped the functionality in a try/except block. When the exception is raised, I log out the error and grossly whack a 60s time.sleep which gives Prefect a chance to log out the errors to the UI.

The modified function in the Azure Bakery looks like:

@task
def combine_and_write(
    sources: List[str], target: str, append_dim: str, concat_dim: str
) -> List[str]:
    try:
        double_open_files = [fsspec.open(url).open() for url in sources]
        ds = xr.open_mfdataset(double_open_files, combine="nested", concat_dim=concat_dim)
        ds = ds.chunk({append_dim: len(sources)})
        mapper = fsspec.get_mapper(target)

        if not len(mapper):
            kwargs = dict(mode="w")
        else:
            kwargs = dict(mode="a", append_dim=append_dim)
        ds.to_zarr(mapper, **kwargs)
        return target
    except Exception as ex:
        logger = prefect.context.get("logger")
        logger.info(ex)
        logger.info(traceback.format_exc())
        import time
        time.sleep(60)
        raise ex

Methods of displaying the failure

My method of displaying the failures we're seeing will be 2 flavours:

  • Method One which registers the flow, invokes it, then deletes all generated data (Prefect Results, Flow artifacts) - This will be done 3 times (I.E 3 clean runs in succession)
  • Method Two which registers the flow, invokes it but doesn't delete any generated data - This will be done 3 times (I.E 3 over the top runs in succession)

I'll deploy and destroy the Bakery for each Method, so I know it's not tainting the methods.

For each method, I'll provide the container logs of the Dask Workers, Prefect Logs, and a .zip of the Storage container per run

For each Flow run, I click the 🚀 Run button on the Prefect UI on the homepage of the current Flow version, on the Run configuration page, I click the Run 🚀 button without modifying the parameters

To get the Dask Worker logs, I run the following query in Azure Logs Analytics:

let container_ids = toscalar(KubePodInventory
| where ContainerCreationTimeStamp >= datetime(<time just before the flow run>)
| extend labels = todynamic(PodLabel)[0]
| where labels.flow contains "dask-transform-flow" and labels["dask.org/component"] contains "worker"
| project PodUid, ContainerID
| distinct PodUid, ContainerID
| summarize make_set(ContainerID));
ContainerLog
| where (container_ids) contains ContainerID
| order by ContainerID, TimeGenerated
| project ContainerID, TimeGenerated, LogEntry

This provides me with results for all Dask worker containers from when I ran the Flow, the results are rows of ContainerID, TimeGenerated, and the actual LogEntry - There should be 2 containers in each results dump, this is just what I've observed gets spun up

Method One

Bakery Deployment

I deploy the Bakery with:

$ make deploy-bakery

Flow registration

I registered the Flow with:

$ poetry run dotenv run python3 flow_test/transform_flow.py
[2021-05-13 11:04:12+0100] INFO - prefect.Azure | Uploading dask-transform-flow/2021-05-13t10-04-12-300682-00-00 to ciarandev-bakery-flow-storage-container
Flow URL: https://cloud.prefect.io/ciaran-developmentseed-account/flow/a4d1db1c-fd98-4e5e-a7c1-6dab7ce2adac
 └── ID: 1b5ef992-c210-4634-8cd0-81e977f181c8
 └── Project: CiaransTestProject
 └── Labels: ['ciarandev']

Apologies for the verbose command, one downside of Poetry is no native .env support, hence the dotenv run

Post Registration Storage Container contents

The zip of the Storage Container contents can be found here

Flow Run 1

The zip of the Dask worker logs, Prefect logs, and Storage Container contents can be found here

Flow Run 2

The zip of the Dask worker logs, Prefect logs, and Storage Container contents can be found here

Flow Run 3

The zip of the Dask worker logs, Prefect logs, and Storage Container contents can be found here

Method Two

Bakery Deployment

I deploy the Bakery with:

$ make deploy-bakery

Flow registration

I registered the Flow with:

$ poetry run dotenv run python3 flow_test/transform_flow.py
[2021-05-13 12:37:34+0100] INFO - prefect.Azure | Uploading dask-transform-flow/2021-05-13t11-37-34-683803-00-00 to ciarandev-bakery-flow-storage-container
Flow URL: https://cloud.prefect.io/ciaran-developmentseed-account/flow/a4d1db1c-fd98-4e5e-a7c1-6dab7ce2adac
 └── ID: c6ae8bca-2a96-40b6-976b-f0531af10c40
 └── Project: CiaransTestProject
 └── Labels: ['ciarandev']

Apologies for the verbose command, one downside of Poetry is no native .env support, hence the dotenv run

Post Registration Storage Container contents

The zip of the Storage Container contents can be found here

Flow Run 1

The zip of the Dask worker logs, Prefect logs, and Storage Container contents can be found here

Flow Run 2

The zip of the Dask worker logs, Prefect logs, and Storage Container contents can be found here

Flow Run 3

The zip of the Dask worker logs, Prefect logs, and Storage Container contents can be found here

Note The Pod didn't seem to log out this time around, though nothing changed so I can't include the logs. For this reason, I'll do a 4th run and see what happens

Flow Run 4

The zip of the Dask worker logs, Prefect Flow container logs, Prefect logs, and Storage Container contents can be found here

Summary

It's really hard to pin down what goes wrong here. Mainly because an error causes the containers to get shutdown so most of the error messages are actually do with the shutdown rather than what went wrong.

Hopefully everything I've included will help... Really appreciate any insight folks have as I'm pretty lost at this point!

@TomAugspurger
Copy link

Glancing at the worker logs, this stands out:

distributed.comm.utils - ERROR - ('Could not serialize object of type ImplicitToExplicitIndexingAdapter.', "ImplicitToExplicitIndexingAdapter(array=CopyOnWriteArray(array=LazilyIndexedArray(array=_ElementwiseFunctionArray(_ElementwiseFunctionArray(LazilyIndexedArray(array=<xarray.backends.h5netcdf_.H5NetCDFArrayWrapper object at 0x7f2dd4b575e0>, key=BasicIndexer((slice(None, None, None), slice(None, None, None), slice(None, None, None), slice(None, None, None)))), func=functools.partial(<function _apply_mask at 0x7f2dddc2f790>, encoded_fill_values={-999}, decoded_fill_value=nan, dtype=dtype('float32')), dtype=dtype('float32')), func=functools.partial(<function _scale_offset_decoding at 0x7f2dddc2f820>, scale_factor=0.009999999776482582, add_offset=0.0, dtype=<class 'numpy.float32'>), dtype=dtype('float32')), key=BasicIndexer((slice(None, None, None), slice(None, None, None), slice(None, None, None), slice(None, None, None))))))")

which looks like pydata/xarray#4591, supposed to be fixed by fsspec/filesystem_spec#477.

It's possible that my attempt in #4 (comment) didn't create a Client with multiple processes, which requires calling cloudpickle.dumps on the dataset to move it between processes.

@ciaransweet
Copy link
Contributor

@TomAugspurger could you confirm what versions of packages you've got locally? I'm just going to actually pin pangeo-forge-recipes now that there's a recent release, and pin xarray and see what happens.

@TomAugspurger
Copy link

Running pip list in the docker image I built from the Dockerfile in this repo:

Package                              Version
------------------------------------ ---------------------
adal                                 1.2.7
adlfs                                0.7.4
aiohttp                              3.7.4.post0
applicationinsights                  0.11.10
asciitree                            0.3.3
async-timeout                        3.0.1
atlassian-python-api                 3.8.0
attrs                                20.3.0
azure-common                         1.1.27
azure-core                           1.13.0
azure-cosmos                         3.1.2
azure-datalake-store                 0.0.52
azure-graphrbac                      0.61.1
azure-identity                       1.5.0
azure-mgmt-authorization             1.0.0
azure-mgmt-containerregistry         2.8.0
azure-mgmt-core                      1.2.2
azure-mgmt-keyvault                  9.0.0
azure-mgmt-resource                  8.0.1
azure-mgmt-storage                   17.1.0
azure-storage-blob                   12.8.1
azureml-automl-core                  1.0.85.5
azureml-core                         1.0.85.6
azureml-dataprep                     1.1.38
azureml-dataprep-native              13.2.0
azureml-pipeline                     1.0.85
azureml-pipeline-core                1.0.85.1
azureml-pipeline-steps               1.0.85
azureml-sdk                          1.0.85
azureml-telemetry                    1.0.85.2
azureml-train                        1.0.85
azureml-train-automl-client          1.0.85.4
azureml-train-core                   1.0.85
azureml-train-restclients-hyperdrive 1.0.85
backports.tempfile                   1.0
backports.weakref                    1.0.post1
boto3                                1.17.59
botocore                             1.20.59
cachetools                           4.2.2
certifi                              2020.12.5
cffi                                 1.14.5
chardet                              4.0.0
click                                7.1.2
cloudpickle                          1.6.0
contextlib2                          0.6.0.post1
croniter                             0.3.37
cryptography                         3.4.7
dask                                 2021.4.1
dask-kubernetes                      2021.3.0
Deprecated                           1.2.12
distributed                          2021.4.1
distro                               1.5.0
docker                               5.0.0
dotnetcore2                          2.1.20
dulwich                              0.20.21
entrypoints                          0.3
fasteners                            0.16
flake8                               3.7.9
fsspec                               2021.4.0
fusepy                               3.0.1
google-api-core                      1.26.3
google-auth                          1.30.0
google-cloud-core                    1.6.0
google-cloud-storage                 1.38.0
google-crc32c                        1.1.2
google-resumable-media               1.2.0
googleapis-common-protos             1.53.0
h5netcdf                             0.11.0
h5py                                 3.2.1
HeapDict                             1.0.1
idna                                 2.10
isodate                              0.6.0
jeepney                              0.6.0
jmespath                             0.10.0
jsonpickle                           2.0.0
kubernetes                           12.0.1
kubernetes-asyncio                   12.1.0
locket                               0.2.1
marshmallow                          3.11.1
marshmallow-oneofschema              2.1.0
mccabe                               0.6.1
msal                                 1.11.0
msal-extensions                      0.3.0
msgpack                              1.0.2
msrest                               0.6.21
msrestazure                          0.6.4
multidict                            5.1.0
mypy-extensions                      0.4.3
natsort                              7.1.1
ndg-httpsclient                      0.5.1
numcodecs                            0.7.3
numpy                                1.20.2
oauthlib                             3.1.0
packaging                            20.9
pandas                               1.2.4
pangeo-forge-recipes                 0.3.3
partd                                1.2.0
pathspec                             0.8.1
pendulum                             2.1.2
pip                                  21.1.1
portalocker                          1.7.1
prefect                              0.14.17
protobuf                             3.15.8
psutil                               5.8.0
pyasn1                               0.4.8
pyasn1-modules                       0.2.8
pycodestyle                          2.5.0
pycparser                            2.20
pyflakes                             2.1.1
PyGithub                             1.55
PyJWT                                2.0.1
PyNaCl                               1.4.0
pyOpenSSL                            20.0.1
pyparsing                            2.4.7
python-box                           5.3.0
python-dateutil                      2.8.1
python-dotenv                        0.17.0
python-gitlab                        2.7.1
python-slugify                       4.0.1
pytz                                 2021.1
pytzdata                             2020.1
PyYAML                               5.4.1
rechunker                            0.4.2
requests                             2.25.1
requests-oauthlib                    1.3.0
requests-toolbelt                    0.9.1
rsa                                  4.7.2
ruamel.yaml                          0.15.89
s3transfer                           0.4.2
SecretStorage                        3.3.1
setuptools                           56.0.0
six                                  1.15.0
sortedcontainers                     2.3.0
tabulate                             0.8.9
tblib                                1.7.0
text-unidecode                       1.3
toml                                 0.10.2
toolz                                0.11.1
tornado                              6.1
typing-extensions                    3.7.4.3
urllib3                              1.26.4
websocket-client                     0.58.0
wheel                                0.36.2
wrapt                                1.12.1
xarray                               0.18.1.dev6+g6e14df62
yarl                                 1.6.3
zarr                                 2.7.0
zict                                 2.0.0

@ciaransweet
Copy link
Contributor

Hmm interesting. Maybe I was running with the same. I've pinned them specifically now to pangeo-forge-recipes==0.3.3 - Also I've just picked up prefect==0.14.19 which has seemingly just been released. We'll see what this does

@ciaransweet
Copy link
Contributor

Great. Bumping to 0.14.19 breaks because of click 🙃

@ciaransweet
Copy link
Contributor

So with the following versions:

adal                                 1.2.7
adlfs                                0.7.5
aiohttp                              3.7.4.post0
applicationinsights                  0.11.10
asciitree                            0.3.3
async-timeout                        3.0.1
attrs                                21.2.0
azure-common                         1.1.27
azure-core                           1.13.0
azure-cosmos                         3.1.2
azure-datalake-store                 0.0.52
azure-graphrbac                      0.61.1
azure-identity                       1.5.0
azure-mgmt-authorization             1.0.0
azure-mgmt-containerregistry         2.8.0
azure-mgmt-core                      1.2.2
azure-mgmt-keyvault                  9.0.0
azure-mgmt-resource                  8.0.1
azure-mgmt-storage                   17.1.0
azure-storage-blob                   12.8.1
azureml-automl-core                  1.0.85.5
azureml-core                         1.0.85.6
azureml-dataprep                     1.1.38
azureml-dataprep-native              13.2.0
azureml-pipeline                     1.0.85
azureml-pipeline-core                1.0.85.1
azureml-pipeline-steps               1.0.85
azureml-sdk                          1.0.85
azureml-telemetry                    1.0.85.2
azureml-train                        1.0.85
azureml-train-automl-client          1.0.85.4
azureml-train-core                   1.0.85
azureml-train-restclients-hyperdrive 1.0.85
backports.tempfile                   1.0
backports.weakref                    1.0.post1
cachetools                           4.2.2
certifi                              2020.12.5
cffi                                 1.14.5
chardet                              4.0.0
click                                7.1.2
cloudpickle                          1.6.0
contextlib2                          0.6.0.post1
croniter                             0.3.37
cryptography                         3.4.7
dask                                 2021.4.1
dask-kubernetes                      2021.3.1
distributed                          2021.4.1
distro                               1.5.0
docker                               5.0.0
dotnetcore2                          2.1.20
entrypoints                          0.3
fasteners                            0.16
flake8                               3.7.9
fsspec                               2021.4.0
fusepy                               3.0.1
google-auth                          1.30.0
h5netcdf                             0.11.0
h5py                                 3.2.1
HeapDict                             1.0.1
idna                                 2.10
isodate                              0.6.0
jeepney                              0.6.0
jmespath                             0.10.0
jsonpickle                           2.0.0
kubernetes                           12.0.1
kubernetes-asyncio                   12.1.0
locket                               0.2.1
marshmallow                          3.12.1
marshmallow-oneofschema              2.1.0
mccabe                               0.6.1
msal                                 1.11.0
msal-extensions                      0.3.0
msgpack                              1.0.2
msrest                               0.6.21
msrestazure                          0.6.4
multidict                            5.1.0
mypy-extensions                      0.4.3
natsort                              7.1.1
ndg-httpsclient                      0.5.1
numcodecs                            0.7.3
numpy                                1.20.3
oauthlib                             3.1.0
pandas                               1.2.4
pangeo-forge-recipes                 0.3.3
partd                                1.2.0
pathspec                             0.8.1
pendulum                             2.1.2
pip                                  21.1.1
portalocker                          1.7.1
prefect                              0.14.19
psutil                               5.8.0
pyasn1                               0.4.8
pyasn1-modules                       0.2.8
pycodestyle                          2.5.0
pycparser                            2.20
pyflakes                             2.1.1
PyJWT                                2.1.0
pyOpenSSL                            20.0.1
python-box                           5.3.0
python-dateutil                      2.8.1
python-dotenv                        0.17.1
python-slugify                       5.0.2
pytz                                 2021.1
pytzdata                             2020.1
PyYAML                               5.4.1
rechunker                            0.4.2
requests                             2.25.1
requests-oauthlib                    1.3.0
rsa                                  4.7.2
ruamel.yaml                          0.15.89
SecretStorage                        3.3.1
setuptools                           56.0.0
six                                  1.16.0
sortedcontainers                     2.3.0
tabulate                             0.8.9
tblib                                1.7.0
text-unidecode                       1.3
toml                                 0.10.2
toolz                                0.11.1
tornado                              6.1
typing-extensions                    3.10.0.0
urllib3                              1.26.4
websocket-client                     0.59.0
wheel                                0.36.2
xarray                               0.18.0
yarl                                 1.6.3
zarr                                 2.7.0
zict                                 2.0.0

I get the following logs here

@ciaransweet
Copy link
Contributor

@TomAugspurger Looks like even with these updated dependencies, we're still hitting similar issues

@sharkinsspatial
Copy link
Contributor Author

sharkinsspatial commented May 20, 2021

@martindurant We are still experiencing issues trying use adlfs with pangeo-forge-recipes. Is it possible for you to create a minimal functioning example of writing chunks to a zarr archive using AzureBlobFileSystem? We are currently seeing this in our logging

fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","json.decoder.JSONDecodeError: Expecting ',' delimiter: line 20 column 1 (char 314)"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    obj, end = self.scan_once(s, idx)"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/json/decoder.py"", line 353, in raw_decode"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    obj, end = self.raw_decode(s, idx=_w(s, 0).end())"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/json/decoder.py"", line 337, in decode"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    return _default_decoder.decode(s)"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/json/__init__.py"", line 357, in loads"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    return json.loads(ensure_text(s, 'ascii'))"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/zarr/util.py"", line 35, in json_loads"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    meta = json_loads(s)"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/zarr/meta.py"", line 27, in parse_metadata"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    meta = parse_metadata(s)"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/zarr/meta.py"", line 33, in decode_array_metadata"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    meta = decode_array_metadata(meta_bytes)"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/zarr/core.py"", line 190, in _load_metadata_nosync"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    self._load_metadata_nosync()"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/zarr/core.py"", line 175, in _load_metadata"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    self._load_metadata()"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/zarr/core.py"", line 158, in __init__"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","json.decoder.JSONDecodeError: Expecting ',' delimiter: line 20 column 1 (char 314)"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    obj, end = self.scan_once(s, idx)"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/json/decoder.py"", line 353, in raw_decode"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    obj, end = self.raw_decode(s, idx=_w(s, 0).end())"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/json/decoder.py"", line 337, in decode"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    return _default_decoder.decode(s)"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/json/__init__.py"", line 357, in loads"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/zarr/util.py"", line 35, in json_loads"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    meta = json_loads(s)"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/zarr/meta.py"", line 27, in parse_metadata"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    meta = parse_metadata(s)"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/zarr/meta.py"", line 33, in decode_array_metadata"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    meta = decode_array_metadata(meta_bytes)"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/zarr/core.py"", line 190, in _load_metadata_nosync"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    self._load_metadata_nosync()"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/zarr/core.py"", line 175, in _load_metadata"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    self._load_metadata()"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/zarr/core.py"", line 158, in __init__"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    return Array(self._store, read_only=self._read_only, path=path,"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/zarr/hierarchy.py"", line 341, in __getitem__"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    zgroup[dim][:] = 0"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/pangeo_forge_recipes/recipes/xarray_zarr.py"", line 483, in expand_target_dim"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/Users/ciaran/Library/Caches/pypoetry/virtualenvs/pangeo-forge-azure-bakery-IMqFot_V-py3.8/lib/python3.8/site-packages/pangeo_forge_recipes/recipes/xarray_zarr.py"", line 307, in prepare_target"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    return self.stage.func()"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    return Array(self._store, read_only=self._read_only, path=path,"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/zarr/hierarchy.py"", line 341, in __getitem__"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    zgroup[dim][:] = 0"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/pangeo_forge_recipes/recipes/xarray_zarr.py"", line 483, in expand_target_dim"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/Users/ciaran/Library/Caches/pypoetry/virtualenvs/pangeo-forge-azure-bakery-IMqFot_V-py3.8/lib/python3.8/site-packages/pangeo_forge_recipes/recipes/xarray_zarr.py"", line 307, in prepare_target"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    return self.stage.func()"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/rechunker/executors/prefect.py"", line 39, in run"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""flow_test/new_recipe.py"", line 26, in wrapper"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    return task.run(*args, **kwargs)  # type: ignore"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py"", line 323, in run_task_with_timeout"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py"", line 865, in get_task_run_state"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    new_state = method(self, state, *args, **kwargs)"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py"", line 48, in inner"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","Traceback (most recent call last):"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","[2021-05-20 14:52:13+0000] ERROR - prefect.CloudTaskRunner | Unexpected error: JSONDecodeError(""Expecting ',' delimiter: line 20 column 1 (char 314)"")"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/rechunker/executors/prefect.py"", line 39, in run"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""flow_test/new_recipe.py"", line 26, in wrapper"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    return task.run(*args, **kwargs)  # type: ignore"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py"", line 323, in run_task_with_timeout"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    value = prefect.utilities.executors.run_task_with_timeout("
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py"", line 865, in get_task_run_state"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    new_state = method(self, state, *args, **kwargs)"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","  File ""/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py"", line 48, in inner"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","Traceback (most recent call last):"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    return json.loads(ensure_text(s, 'ascii'))"
fe325106c9e4fb41db9498c43e5ac4520279d8f1ee9bbeaba8ca56171d88f81c,"5/20/2021, 2:52:13.725 PM","    value = prefect.utilities.executors.run_task_with_timeout("

But it may be worthwhile producing a minimal reproducible example (which I am probably not the best to do correctly 😄 ). If necessary you can coordinate with @ciaranevans and he can provide you the blob storage connection information we are using).

@martindurant
Copy link

I see a couple of JSON errors followed by a timeout? I don't have much of an idea of what this might mean. I recently gained Azure credentials, but I've never actually dealt with abfs in any way - I wrote an early version of the adlfs ("legacy") implementation. If you can make a concrete reproducer, I'm sure adlfs would fix things quickly.

@TomAugspurger
Copy link

TomAugspurger commented May 20, 2021

This JSONDecodeError is one of the issues I was hitting last weekend. Somewhere in adlfs's caches there's a partially written blob (the JSON Zarr metadata). The actual bytes in BlobStorage are fully written, so it probably isn't a flushing issue, just a caching thing.

I spent a couple hours trying to simplify and debug it, but haven't been able to come up with a minimal reproducer.

In general, I think pangeo-forge bakeries should disable all of fsspec's caching on the write side. I'll try to write that up as an issue.

@sharkinsspatial
Copy link
Contributor Author

@TomAugspurger We made a quick attempt with your streamlined AzureBlobStorageFS implementation but ran into the fact that it didn't implement exists https://github.com/pangeo-forge/pangeo-forge-recipes/blob/master/pangeo_forge_recipes/storage.py#L94. Not sure if you hit this in your recipe test?

@TomAugspurger
Copy link

Nope, I just implemented the stuff need to get my workflow running.

You could implement it as something like

return path in self.get_mapper()

@martindurant
Copy link

^ won't that trigger a full listing?

@TomAugspurger
Copy link

Ah, I guess since I didn't implement __contains__ it would. With AzureBlobStorageStore.__contains__ as something like self.container_client.exists(path/to/blob), we can use azure.storage.blob's fast prefix matching.

@ciaransweet
Copy link
Contributor

@TomAugspurger @martindurant @sharkinsspatial so in the spirit of sorting this once and for all - what would our plan of action look like?

I don't really feel comfortable just whacking in our own implementation for the sake of pushing this forwards, especially if it's something adlfs could fix quickly.

In general, I think pangeo-forge bakeries should disable all of fsspec's caching on the write side. I'll try to write that up as an issue.

Is this something we can do easily? How would I do that?

@TomAugspurger
Copy link

TomAugspurger commented May 21, 2021 via email

@ciaransweet
Copy link
Contributor

@TomAugspurger is that:

use_listings_cache=False,
skip_instance_cache=True,

These? Should I add these in and try again?

@TomAugspurger
Copy link

Perhaps, though fsspec/adlfs#230 would likely need to be fixed first.

@ciaransweet
Copy link
Contributor

Just updating to point to this flow https://github.com/pangeo-forge/pangeo-forge-azure-bakery/blob/main/flow_test/oisst_recipe.py as this is the 'new' Recipe -> Flow conversion format and we're still getting the failures

@ciaransweet ciaransweet removed their assignment Aug 6, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants