From 2df8d871f4ff662702229e1077cb3544860c5dfe Mon Sep 17 00:00:00 2001 From: rwidom Date: Mon, 15 Jan 2024 10:14:45 -0500 Subject: [PATCH] Prevent iNaturalist from running alongside any other DAGs (#3025) --- .../providers/provider_api_scripts/inaturalist.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/catalog/dags/providers/provider_api_scripts/inaturalist.py b/catalog/dags/providers/provider_api_scripts/inaturalist.py index b16e66d4b14..532ba0f92d5 100644 --- a/catalog/dags/providers/provider_api_scripts/inaturalist.py +++ b/catalog/dags/providers/provider_api_scripts/inaturalist.py @@ -391,14 +391,13 @@ def create_ingestion_workflow(): task_id="get_batches", python_callable=INaturalistDataIngester.get_batches, op_kwargs={ - "batch_length": 2_000_000, + "batch_length": 1_000_000, }, execution_timeout=timedelta(minutes=1), ) - # In testing this locally, the longest full iteration took 39 minutes, - # median was 18 minutes. We should probably adjust the timeouts with - # more info from production runs. + # In testing this locally with batch length 2_000_000, the longest full + # iteration took 39 minutes, median was 18 minutes. load_transformed_data = PythonOperator.partial( task_id="load_transformed_data", python_callable=INaturalistDataIngester.load_transformed_data, @@ -413,6 +412,14 @@ def create_ingestion_workflow(): doc_md=( "Load one batch of data from source tables to target table." ), + # Use all of the available pool slots. + pool_slots=128, + # Default priority_weight is 1, higher numbers are more important. + priority_weight=0, + # Particularly towards the beginning there will be lots of + # of downstream / dependent tasks, and we don't want airflow to + # consider that in scheduling. + weight_rule="absolute", ).expand( op_args=XComArg(get_batches, "return_value"), )