Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow test #145

Merged
merged 39 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
d7c36a9
test
AaronClaydon Dec 17, 2024
cf12234
test
AaronClaydon Dec 17, 2024
90d4d2c
test
AaronClaydon Dec 17, 2024
be41d4b
test
AaronClaydon Dec 17, 2024
18a8a18
test
AaronClaydon Dec 17, 2024
c56bd01
test
AaronClaydon Dec 17, 2024
105e0c0
test
AaronClaydon Dec 17, 2024
e107fa3
nice
AaronClaydon Dec 17, 2024
b39bebe
test
AaronClaydon Dec 18, 2024
bf4054d
test
AaronClaydon Dec 18, 2024
154eb35
test
AaronClaydon Dec 18, 2024
a2a9f18
test
AaronClaydon Dec 18, 2024
213f63c
test
AaronClaydon Dec 18, 2024
2c9429d
secrets
AaronClaydon Dec 18, 2024
a200a90
test
AaronClaydon Dec 18, 2024
c71c482
test
AaronClaydon Dec 18, 2024
2df2d87
test
AaronClaydon Dec 18, 2024
1529f09
test
AaronClaydon Dec 18, 2024
fdaa7c2
test
AaronClaydon Dec 18, 2024
86186cf
ignore realtime
AaronClaydon Dec 18, 2024
5182ed3
test
AaronClaydon Dec 18, 2024
c379bbe
indexer
AaronClaydon Dec 18, 2024
a2bfbd4
logic
AaronClaydon Dec 18, 2024
58bf80a
test
AaronClaydon Dec 18, 2024
550c6f2
bug
AaronClaydon Dec 18, 2024
9ebe352
performance
AaronClaydon Dec 18, 2024
af10554
conc
AaronClaydon Dec 18, 2024
32c139a
test
AaronClaydon Dec 19, 2024
307ae1d
test
AaronClaydon Dec 19, 2024
10ec0e3
test
AaronClaydon Dec 19, 2024
5cb156d
test
AaronClaydon Dec 19, 2024
2cfd2e0
fix
AaronClaydon Dec 19, 2024
c697ee2
test
AaronClaydon Dec 19, 2024
a3b0071
test
AaronClaydon Dec 19, 2024
68ce873
test
AaronClaydon Dec 19, 2024
4b9f676
test
AaronClaydon Dec 19, 2024
0d38668
start
AaronClaydon Dec 19, 2024
00d2855
test
AaronClaydon Dec 19, 2024
6f9e5a7
test
AaronClaydon Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions airflow/dags/batch-data-import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
"""
This is an example dag for using the KubernetesPodOperator.
"""

from kubernetes.client import models as k8s
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.utils.dates import days_ago
from airflow.hooks.base_hook import BaseHook
from airflow.providers.slack.notifications.slack_webhook import send_slack_webhook_notification
from airflow.utils.task_group import TaskGroup

import yaml

from pathlib import Path

default_args = {
'owner': 'airflow'
}

def generate_data_job(dataset : str, instance_size : str = "small", taskgroup : TaskGroup = None):
return generate_job(dataset, ["data-importer", "dataset", "--id", dataset], instance_size=instance_size, taskgroup=taskgroup)

def generate_job(name : str, command : str, instance_size : str = "small", taskgroup : TaskGroup = None):
name = f"data-import-{name}"

tolerations = []
node_selector = None
container_resources = None
if instance_size == "medium" or instance_size == "large":
node_selector = {"cloud.google.com/gke-nodepool": "batch-burst-node-pool"}
tolerations.append(k8s.V1Toleration(effect="NoSchedule", key="BATCH_BURST", operator="Equal", value="true"))

if instance_size == "medium":
memory_requests = "20Gi"
elif instance_size == "large":
memory_requests = "40Gi"

container_resources = k8s.V1ResourceRequirements(requests={"memory": memory_requests})

k = KubernetesPodOperator(
namespace='default',
image='ghcr.io/travigo/travigo:main',
image_pull_policy='Always',
arguments=command,
name=name,
task_id=name,
is_delete_operator_pod=True,
hostnetwork=False,
startup_timeout_seconds=1000,
tolerations=tolerations,
node_selector=node_selector,
container_resources=container_resources,
trigger_rule="all_done",
task_group=taskgroup,
# on_success_callback=[
# send_slack_webhook_notification(
# slack_webhook_conn_id="slack-dataimport",
# text="The task {{ ti.task_id }} was successful",
# )
# ],
on_failure_callback=[
send_slack_webhook_notification(
slack_webhook_conn_id="slack-dataimport",
text="The task {{ ti.task_id }} failed",
)
],
env_vars = [
k8s.V1EnvVar(
name = "TRAVIGO_BODS_API_KEY",
value_from = k8s.V1EnvVarSource(secret_key_ref=k8s.V1SecretKeySelector(name="travigo-bods-api", key="api_key"))
),
k8s.V1EnvVar(
name = "TRAVIGO_IE_NATIONALTRANSPORT_API_KEY",
value_from = k8s.V1EnvVarSource(secret_key_ref=k8s.V1SecretKeySelector(name="travigo-ie-nationaltransport-api", key="api_key"))
),
k8s.V1EnvVar(
name = "TRAVIGO_NATIONALRAIL_USERNAME",
value_from = k8s.V1EnvVarSource(secret_key_ref=k8s.V1SecretKeySelector(name="travigo-nationalrail-credentials", key="username"))
),
k8s.V1EnvVar(
name = "TRAVIGO_NATIONALRAIL_PASSWORD",
value_from = k8s.V1EnvVarSource(secret_key_ref=k8s.V1SecretKeySelector(name="travigo-nationalrail-credentials", key="password"))
),
k8s.V1EnvVar(
name = "TRAVIGO_NETWORKRAIL_USERNAME",
value_from = k8s.V1EnvVarSource(secret_key_ref=k8s.V1SecretKeySelector(name="travigo-networkrail-credentials", key="username"))
),
k8s.V1EnvVar(
name = "TRAVIGO_NETWORKRAIL_PASSWORD",
value_from = k8s.V1EnvVarSource(secret_key_ref=k8s.V1SecretKeySelector(name="travigo-networkrail-credentials", key="password"))
),
k8s.V1EnvVar(
name = "TRAVIGO_SE_TRAFIKLAB_STATIC_API_KEY",
value_from = k8s.V1EnvVarSource(secret_key_ref=k8s.V1SecretKeySelector(name="travigo-trafiklab-sweden-static", key="api_key"))
),
k8s.V1EnvVar(
name = "TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY",
value_from = k8s.V1EnvVarSource(secret_key_ref=k8s.V1SecretKeySelector(name="travigo-trafiklab-sweden-realtime", key="api_key"))
),

k8s.V1EnvVar(
name = "TRAVIGO_MONGODB_CONNECTION",
value_from = k8s.V1EnvVarSource(secret_key_ref=k8s.V1SecretKeySelector(name="travigo-mongodb-admin-travigo", key="connectionString.standard"))
),
k8s.V1EnvVar(
name = "TRAVIGO_ELASTICSEARCH_ADDRESS",
value = "https://primary-es-http.elastic:9200"
),
k8s.V1EnvVar(
name = "TRAVIGO_ELASTICSEARCH_USERNAME",
value_from = k8s.V1EnvVarSource(secret_key_ref=k8s.V1SecretKeySelector(name="travigo-elasticsearch-user", key="username"))
),
k8s.V1EnvVar(
name = "TRAVIGO_ELASTICSEARCH_PASSWORD",
value_from = k8s.V1EnvVarSource(secret_key_ref=k8s.V1SecretKeySelector(name="travigo-elasticsearch-user", key="password"))
),
k8s.V1EnvVar(
name = "TRAVIGO_REDIS_ADDRESS",
value = "redis-headless.redis:6379"
),
k8s.V1EnvVar(
name = "TRAVIGO_REDIS_PASSWORD",
value_from = k8s.V1EnvVarSource(secret_key_ref=k8s.V1SecretKeySelector(name="redis-password", key="password"))
)
]
)

return k

with DAG(
dag_id='batch-data-import',
default_args=default_args,
schedule_interval="0 7 * * *",
start_date=days_ago(2),
catchup=False,
max_active_runs=1,
concurrency=2,
) as dag:
start = DummyOperator(task_id="start")
end = DummyOperator(task_id="end")

stop_linker = generate_job("stop-linker", [ "data-linker", "run", "--type", "stops" ])
stop_indexer = generate_job("stop-indexer", [ "indexer", "stops" ])

taskgroups = {
"small": TaskGroup("small"),
"medium": TaskGroup("medium"),
"large": TaskGroup("large"),
}

stop_linker >> stop_indexer

pathlist = Path("/opt/airflow/dags/repo/data/datasources").glob('**/*.yaml')
for path in pathlist:
# because path is object not string
path_in_str = str(path)

with open(path_in_str) as stream:
try:
yaml_file = yaml.safe_load(stream)

source_identifier = yaml_file["identifier"]

for dataset in yaml_file["datasets"]:
if "importdestination" in dataset and dataset["importdestination"] == "realtime-queue":
continue

dataset_identifier = dataset["identifier"]

dataset_size = "small"
if "datasetsize" in dataset:
dataset_size = dataset["datasetsize"]

import_job = generate_data_job(f"{source_identifier}-{dataset_identifier}", instance_size=dataset_size, taskgroup=taskgroups[dataset_size])
except yaml.YAMLError as exc:
print(exc)

start >> taskgroups["small"] >> taskgroups["medium"] >> taskgroups["large"] >> stop_linker >> stop_indexer >> end
1 change: 1 addition & 0 deletions data/datasources/de-gtfs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ datasets:
- identifier: gtfs-schedule
format: gtfs-schedule
source: "https://download.gtfs.de/germany/free/latest.zip"
datasetsize: large
supportedobjects:
operators: true
stops: true
Expand Down
1 change: 1 addition & 0 deletions data/datasources/fr-ilevia-lille.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ datasets:
- identifier: gtfs-schedule
format: gtfs-schedule
source: "https://media.ilevia.fr/opendata/gtfs.zip"
datasetsize: medium
supportedobjects:
operators: true
stops: true
Expand Down
1 change: 1 addition & 0 deletions data/datasources/gb-dft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ datasets:
- identifier: bods-gtfs-schedule
format: gtfs-schedule
source: "https://data.bus-data.dft.gov.uk/timetable/download/gtfs-file/all/"
datasetsize: large
supportedobjects:
services: true
journeys: true
Expand Down
1 change: 1 addition & 0 deletions data/datasources/ie-tfi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ datasets:
- identifier: gtfs-schedule
format: gtfs-schedule
source: "https://www.transportforireland.ie/transitData/Data/GTFS_Realtime.zip"
datasetsize: large
supportedobjects:
operators: true
stops: true
Expand Down
1 change: 1 addition & 0 deletions data/datasources/se-trafiklab.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ datasets:
- identifier: gtfs-schedule
format: gtfs-schedule
source: "https://opendata.samtrafiken.se/gtfs-sweden/sweden.zip"
datasetsize: large
sourceauthentication:
query:
key: TRAVIGO_SE_TRAFIKLAB_STATIC_API_KEY
Expand Down
2 changes: 2 additions & 0 deletions pkg/dataimporter/datasets/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type DataSet struct {
Source string
SourceAuthentication SourceAuthentication `json:"-"`

DatasetSize string

UnpackBundle BundleFormat `json:"-"`
SupportedObjects SupportedObjects
IgnoreObjects IgnoreObjects
Expand Down
Loading