Skip to content

Commit

Permalink
Prevent iNaturalist from running alongside any other DAGs (#3025)
Browse files Browse the repository at this point in the history
  • Loading branch information
rwidom authored Jan 15, 2024
1 parent 6e8b73e commit 2df8d87
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions catalog/dags/providers/provider_api_scripts/inaturalist.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,14 +391,13 @@ def create_ingestion_workflow():
task_id="get_batches",
python_callable=INaturalistDataIngester.get_batches,
op_kwargs={
"batch_length": 2_000_000,
"batch_length": 1_000_000,
},
execution_timeout=timedelta(minutes=1),
)

# In testing this locally, the longest full iteration took 39 minutes,
# median was 18 minutes. We should probably adjust the timeouts with
# more info from production runs.
# In testing this locally with batch length 2_000_000, the longest full
# iteration took 39 minutes, median was 18 minutes.
load_transformed_data = PythonOperator.partial(
task_id="load_transformed_data",
python_callable=INaturalistDataIngester.load_transformed_data,
Expand All @@ -413,6 +412,14 @@ def create_ingestion_workflow():
doc_md=(
"Load one batch of data from source tables to target table."
),
# Use all of the available pool slots.
pool_slots=128,
# Default priority_weight is 1, higher numbers are more important.
priority_weight=0,
# Particularly towards the beginning there will be lots of
# of downstream / dependent tasks, and we don't want airflow to
# consider that in scheduling.
weight_rule="absolute",
).expand(
op_args=XComArg(get_batches, "return_value"),
)
Expand Down

0 comments on commit 2df8d87

Please sign in to comment.