-
Notifications
You must be signed in to change notification settings - Fork 2
Transform flow test failing when running on k8s on Azure. #4
Comments
Reproducing locally Here are some steps I took to try to reproduce this locally:
diff --git a/flow_test/transform_flow.py b/flow_test/transform_flow.py
index f742a7f..2ffd899 100755
--- a/flow_test/transform_flow.py
+++ b/flow_test/transform_flow.py
@@ -11,7 +11,8 @@ from flow_test.transform_tasks.http import download
from flow_test.transform_tasks.xarray import chunk, combine_and_write
from flow_test.transform_tasks.zarr import consolidate_metadata
-project = os.environ["PREFECT_PROJECT"]
+# project = os.environ["PREFECT_PROJECT"]
+project = "debug"
flow_name = "dask-transform-flow"
@@ -28,33 +29,32 @@ def source_url(day: str) -> str:
with Flow(
flow_name,
- storage=storage.Azure(
- container=os.environ["FLOW_STORAGE_CONTAINER"],
- connection_string=os.environ["FLOW_STORAGE_CONNECTION_STRING"],
- ),
- run_config=KubernetesRun(
- image=os.environ["AZURE_BAKERY_IMAGE"],
- env={"AZURE_STORAGE_CONNECTION_STRING": os.environ["FLOW_STORAGE_CONNECTION_STRING"], "AZURE_BAKERY_IMAGE": os.environ["AZURE_BAKERY_IMAGE"]},
- labels=json.loads(os.environ["PREFECT__CLOUD__AGENT__LABELS"]),
- ),
- executor=DaskExecutor(
- cluster_class="dask_kubernetes.KubeCluster",
- cluster_kwargs={
- "pod_template": make_pod_spec(
- image=os.environ["AZURE_BAKERY_IMAGE"],
- labels={
- "flow": flow_name
- },
- memory_limit=None,
- memory_request=None,
- env={
- "AZURE_STORAGE_CONNECTION_STRING": os.environ["FLOW_STORAGE_CONNECTION_STRING"]
- }
- )
- },
- adapt_kwargs={"maximum": 10},
-
- ),
+ # storage=storage.Azure(
+ # container=os.environ["FLOW_STORAGE_CONTAINER"],
+ # connection_string=os.environ["FLOW_STORAGE_CONNECTION_STRING"],
+ # ),
+ # run_config=KubernetesRun(
+ # image=os.environ["AZURE_BAKERY_IMAGE"],
+ # env={"AZURE_STORAGE_CONNECTION_STRING": os.environ["FLOW_STORAGE_CONNECTION_STRING"], "AZURE_BAKERY_IMAGE": os.environ["AZURE_BAKERY_IMAGE"]},
+ # labels=json.loads(os.environ["PREFECT__CLOUD__AGENT__LABELS"]),
+ # ),
+ # executor=DaskExecutor(
+ # cluster_class="dask_kubernetes.KubeCluster",
+ # cluster_kwargs={
+ # "pod_template": make_pod_spec(
+ # image=os.environ["AZURE_BAKERY_IMAGE"],
+ # labels={
+ # "flow": flow_name
+ # },
+ # memory_limit=None,
+ # memory_request=None,
+ # env={
+ # "AZURE_STORAGE_CONNECTION_STRING": os.environ["FLOW_STORAGE_CONNECTION_STRING"]
+ # }
+ # )
+ # },
+ # adapt_kwargs={"maximum": 10},
+ # ),
) as flow:
days = Parameter(
"days",
@@ -64,10 +64,12 @@ with Flow(
zarr_output = "dask_transform_flow.zarr"
nc_sources = download.map(
sources,
- cache_location=unmapped(f"abfs://{os.environ['FLOW_STORAGE_CONTAINER']}/cache/{zarr_output}"),
+ # cache_location=unmapped(f"abfs://{os.environ['FLOW_STORAGE_CONTAINER']}/cache/{zarr_output}"),
+ cache_location=unmapped(f"memory://cache/{zarr_output}")
)
chunked = chunk(nc_sources, size=5)
- target = f"abfs://{os.environ['FLOW_STORAGE_CONTAINER']}/target/{zarr_output}"
+ # target = f"abfs://{os.environ['FLOW_STORAGE_CONTAINER']}/target/{zarr_output}"
+ target="memory://target/{zarr_output}"
writes = combine_and_write.map(
chunked,
unmapped(target),
@@ -77,4 +79,4 @@ with Flow(
consolidate_metadata(target, writes=writes)
-flow.register(project_name=project)
+# flow.register(project_name=project)
diff --git a/flow_test/transform_tasks/xarray.py b/flow_test/transform_tasks/xarray.py
index 131d55b..a6c52f9 100644
--- a/flow_test/transform_tasks/xarray.py
+++ b/flow_test/transform_tasks/xarray.py
@@ -19,6 +19,8 @@ def combine_and_write(
) -> List[str]:
try:
double_open_files = [fsspec.open(url).open() for url in sources]
+ breakpoint()
+ print(sources)
ds = xr.open_mfdataset(double_open_files, combine="nested", concat_dim=concat_dim)
ds = ds.chunk({append_dim: len(sources)})
mapper = fsspec.get_mapper(target)
ipython -i flow_test/transform_flow.py
Python 3.9.4 | packaged by conda-forge | (default, May 10 2021, 22:13:33)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.23.1 -- An enhanced Interactive Python. Type '?' for help.
In [1]: flow.run()
[2021-05-11 10:28:12-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'dask-transform-flow' Unfortunately, everything went fine. Possible issues I think the next two issues to consider are
|
Thanks for tackling this @TomAugspurger You can find the image the dask workers are running here https://github.com/pangeo-forge/pangeo-forge-azure-bakery/tree/add-k8s-cluster/images, you should be able to run the flow in there |
Thanks, I ran it from the docker image and it again ran fine. Can you make sure that the cache is cleared prior to running through prefect? |
@TomAugspurger I'm going to spend tomorrow fully fleshing out a run through of exactly what I'm doing, running, and seeing, I'll be dumping all the logs I can too |
@TomAugspurger @martindurant @sharkinsspatial Hey folks, here's a full rundown of what I'm doing to get this flow to break, what is left in Azure after each flow run and what logs I can get out of Prefect & K8s High level overviewWhilst trying to confirm that the Azure Bakery is deployed/setup correctly, we've been trying to run
Combine and Write try/exceptIn the AWS Bakery, this function looks like: @task
def combine_and_write(
sources: List[str], target: str, append_dim: str, concat_dim: str
) -> List[str]:
double_open_files = [fsspec.open(url).open() for url in sources]
ds = xr.open_mfdataset(double_open_files, combine="nested", concat_dim=concat_dim)
ds = ds.chunk({append_dim: len(sources)})
mapper = fsspec.get_mapper(target)
if not len(mapper):
kwargs = dict(mode="w")
else:
kwargs = dict(mode="a", append_dim=append_dim)
ds.to_zarr(mapper, **kwargs)
return target This function currently fails in the Azure Bakery, because of this and because of a current 'bug' in Prefect, we don't get a chance for the Dask logs to be collected by Prefect. To get around this, I've wrapped the functionality in a The modified function in the Azure Bakery looks like: @task
def combine_and_write(
sources: List[str], target: str, append_dim: str, concat_dim: str
) -> List[str]:
try:
double_open_files = [fsspec.open(url).open() for url in sources]
ds = xr.open_mfdataset(double_open_files, combine="nested", concat_dim=concat_dim)
ds = ds.chunk({append_dim: len(sources)})
mapper = fsspec.get_mapper(target)
if not len(mapper):
kwargs = dict(mode="w")
else:
kwargs = dict(mode="a", append_dim=append_dim)
ds.to_zarr(mapper, **kwargs)
return target
except Exception as ex:
logger = prefect.context.get("logger")
logger.info(ex)
logger.info(traceback.format_exc())
import time
time.sleep(60)
raise ex Methods of displaying the failureMy method of displaying the failures we're seeing will be 2 flavours:
I'll deploy and destroy the Bakery for each Method, so I know it's not tainting the methods. For each method, I'll provide the container logs of the Dask Workers, Prefect Logs, and a For each Flow run, I click the 🚀 Run button on the Prefect UI on the homepage of the current Flow version, on the Run configuration page, I click the Run 🚀 button without modifying the parameters To get the Dask Worker logs, I run the following query in Azure Logs Analytics: let container_ids = toscalar(KubePodInventory
| where ContainerCreationTimeStamp >= datetime(<time just before the flow run>)
| extend labels = todynamic(PodLabel)[0]
| where labels.flow contains "dask-transform-flow" and labels["dask.org/component"] contains "worker"
| project PodUid, ContainerID
| distinct PodUid, ContainerID
| summarize make_set(ContainerID));
ContainerLog
| where (container_ids) contains ContainerID
| order by ContainerID, TimeGenerated
| project ContainerID, TimeGenerated, LogEntry This provides me with results for all Dask Method OneBakery DeploymentI deploy the Bakery with: $ make deploy-bakery Flow registrationI registered the Flow with: $ poetry run dotenv run python3 flow_test/transform_flow.py
[2021-05-13 11:04:12+0100] INFO - prefect.Azure | Uploading dask-transform-flow/2021-05-13t10-04-12-300682-00-00 to ciarandev-bakery-flow-storage-container
Flow URL: https://cloud.prefect.io/ciaran-developmentseed-account/flow/a4d1db1c-fd98-4e5e-a7c1-6dab7ce2adac
└── ID: 1b5ef992-c210-4634-8cd0-81e977f181c8
└── Project: CiaransTestProject
└── Labels: ['ciarandev'] Apologies for the verbose command, one downside of Poetry is no native Post Registration Storage Container contentsThe zip of the Storage Container contents can be found here Flow Run 1The zip of the Dask worker logs, Prefect logs, and Storage Container contents can be found here Flow Run 2The zip of the Dask worker logs, Prefect logs, and Storage Container contents can be found here Flow Run 3The zip of the Dask worker logs, Prefect logs, and Storage Container contents can be found here Method TwoBakery DeploymentI deploy the Bakery with: $ make deploy-bakery Flow registrationI registered the Flow with: $ poetry run dotenv run python3 flow_test/transform_flow.py
[2021-05-13 12:37:34+0100] INFO - prefect.Azure | Uploading dask-transform-flow/2021-05-13t11-37-34-683803-00-00 to ciarandev-bakery-flow-storage-container
Flow URL: https://cloud.prefect.io/ciaran-developmentseed-account/flow/a4d1db1c-fd98-4e5e-a7c1-6dab7ce2adac
└── ID: c6ae8bca-2a96-40b6-976b-f0531af10c40
└── Project: CiaransTestProject
└── Labels: ['ciarandev'] Apologies for the verbose command, one downside of Poetry is no native Post Registration Storage Container contentsThe zip of the Storage Container contents can be found here Flow Run 1The zip of the Dask worker logs, Prefect logs, and Storage Container contents can be found here Flow Run 2The zip of the Dask worker logs, Prefect logs, and Storage Container contents can be found here Flow Run 3The zip of the Note The Pod didn't seem to log out this time around, though nothing changed so I can't include the logs. For this reason, I'll do a 4th run and see what happens Flow Run 4The zip of the Dask worker logs, Prefect Flow container logs, Prefect logs, and Storage Container contents can be found here SummaryIt's really hard to pin down what goes wrong here. Mainly because an error causes the containers to get shutdown so most of the error messages are actually do with the shutdown rather than what went wrong. Hopefully everything I've included will help... Really appreciate any insight folks have as I'm pretty lost at this point! |
Glancing at the worker logs, this stands out:
which looks like pydata/xarray#4591, supposed to be fixed by fsspec/filesystem_spec#477. It's possible that my attempt in #4 (comment) didn't create a |
@TomAugspurger could you confirm what versions of packages you've got locally? I'm just going to actually pin |
Running
|
Hmm interesting. Maybe I was running with the same. I've pinned them specifically now to |
Great. Bumping to |
So with the following versions:
I get the following logs here |
@TomAugspurger Looks like even with these updated dependencies, we're still hitting similar issues |
@martindurant We are still experiencing issues trying use
But it may be worthwhile producing a minimal reproducible example (which I am probably not the best to do correctly 😄 ). If necessary you can coordinate with @ciaranevans and he can provide you the blob storage connection information we are using). |
I see a couple of JSON errors followed by a timeout? I don't have much of an idea of what this might mean. I recently gained Azure credentials, but I've never actually dealt with abfs in any way - I wrote an early version of the adlfs ("legacy") implementation. If you can make a concrete reproducer, I'm sure adlfs would fix things quickly. |
This JSONDecodeError is one of the issues I was hitting last weekend. Somewhere in adlfs's caches there's a partially written blob (the JSON Zarr metadata). The actual bytes in BlobStorage are fully written, so it probably isn't a flushing issue, just a caching thing. I spent a couple hours trying to simplify and debug it, but haven't been able to come up with a minimal reproducer. In general, I think pangeo-forge bakeries should disable all of fsspec's caching on the write side. I'll try to write that up as an issue. |
@TomAugspurger We made a quick attempt with your streamlined |
Nope, I just implemented the stuff need to get my workflow running. You could implement it as something like return path in self.get_mapper() |
^ won't that trigger a full listing? |
Ah, I guess since I didn't implement |
@TomAugspurger @martindurant @sharkinsspatial so in the spirit of sorting this once and for all - what would our plan of action look like? I don't really feel comfortable just whacking in our own implementation for the sake of pushing this forwards, especially if it's something
Is this something we can do easily? How would I do that? |
@TomAugspurger is that:
These? Should I add these in and try again? |
Perhaps, though fsspec/adlfs#230 would likely need to be fixed first. |
Just updating to point to this flow https://github.com/pangeo-forge/pangeo-forge-azure-bakery/blob/main/flow_test/oisst_recipe.py as this is the 'new' Recipe -> Flow conversion format and we're still getting the failures |
When trying to reproduce the testing Prefect flow used while developing
pangeo-forge-aws-bakery
we are encountering an error when runningds.to_zarr
https://github.com/pangeo-forge/pangeo-forge-azure-bakery/blob/add-k8s-cluster/flow_test/transform_tasks/xarray.py#L30.@TomAugspurger any suggestions you might have for diagnosing if this might be related to our configuration of Azure Blob storage would be super appreciated 🙇 .
The text was updated successfully, but these errors were encountered: