Skip to content

Commit

Permalink
Merge pull request #208 from GoogleCloudPlatform/dag-improvements
Browse files Browse the repository at this point in the history
Dag improvements
  • Loading branch information
fellipeamedeiros authored Oct 21, 2024
2 parents 93a4823 + eae201b commit fda05b4
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 95 deletions.
3 changes: 3 additions & 0 deletions cloud-composer-etl/.gcloudignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
venv
README.md
assets
6 changes: 4 additions & 2 deletions cloud-composer-etl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,12 @@ gcloud builds submit . --config build/cloudbuild_destroy.yaml
```

## Known issues
If you face problems to delete the Service Networking peering:
If you face problems to delete the Service Networking peering, that is expected. Please check this [link for reference](https://cloud.google.com/vpc/docs/configure-private-services-access#removing-connection).

```
Error: Unable to remove Service Networking Connection, err: Error waiting for Delete Service Networking Connection: Error code 9, message: Failed to delete connection; Producer services (e.g. CloudSQL, Cloud Memstore, etc.) are still using this connection
```
Go to the [Console](https://console.cloud.google.com/networking/peering/list) and manually delete the peering.

Workaround: Go to the [Console](https://console.cloud.google.com/networking/peering/list) and manually delete the peering.

Then, run the Cloud Build Destroy job again.
72 changes: 0 additions & 72 deletions cloud-composer-etl/dags/bigquery_transform.py

This file was deleted.

48 changes: 48 additions & 0 deletions cloud-composer-etl/dags/dag_with_sla.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# For more information about SLAs, please check https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#slas

from datetime import datetime, timedelta
from airflow import models
from airflow.operators.bash_operator import BashOperator

def my_sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
# add your code here, for example, it can send a notification to your team chat
print(f"Dag {dag.dag_id} tasks {task_list} missed SLA")


with models.DAG(
dag_id='dag_with_sla',
description='This dag shows how to set up SLA for each task',
schedule_interval="*/5 * * * *",
start_date=datetime(2022, 1, 1),
catchup=False,
sla_miss_callback=my_sla_miss_callback,
tags=["example", "has_sla"]
) as dag:

task_1 = BashOperator(
task_id='task_1',
bash_command='sleep 5',
dag=dag,
sla=timedelta(seconds=10)) # from dag start time

task_2 = BashOperator(
task_id='task_2',
bash_command='sleep 20',
dag=dag,
sla=timedelta(seconds=20)) # from dag start time

task_1 >> task_2
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,25 @@
from airflow import models
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyDatasetOperator

from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyDatasetOperator,BigQueryInsertJobOperator

CONN_ID = "pgCitibike"
DATASET_NAME = "citibike"
GCS_DATA_LAKE_BUCKET = os.environ.get("GCS_DATA_LAKE_BUCKET")

PROJECT_ID = os.environ.get("GCP_PROJECT")

with models.DAG(
dag_id='datalake_to_dw',
dag_id='from_data_lake_to_data_warehouse',
description='Import data from the data lake to the data warehouse in BigQuery',
schedule_interval="@once",
start_date=datetime(2022, 1, 1),
catchup=False,
tags=["example", "bigquery"],
) as dag:

trigger_datalake_dag = TriggerDagRunOperator(
task_id="trigger_datalake_dag",
trigger_dag_id="postgres_to_datalake",
task_id="trigger_data_lake_dag",
trigger_dag_id="from_database_to_data_lake",
wait_for_completion=True,
poke_interval=10, # seconds
execution_date="{{ execution_date }}"
Expand Down Expand Up @@ -72,6 +72,43 @@
write_disposition='WRITE_TRUNCATE',
)

create_bike_trips_table = BigQueryInsertJobOperator(
task_id="create_bike_trips_table",
configuration={
"query": {
"query": '''SELECT
bikeid,
COUNT(*) AS trip_count,
MAX(starttime) AS last_start,
MAX(stoptime) AS last_stop,
ARRAY_AGG( STRUCT(
ss.name AS start_station_name,
t.starttime AS start_time,
es.name AS end_station_name,
t.stoptime AS end_time,
tripduration AS trip_duration) ) AS trips
FROM
`{0}.citibike.trips` t
JOIN
`{0}.citibike.stations` AS ss ON (t.start_station_id = ss.station_id)
JOIN
`{0}.citibike.stations` AS es ON (t.end_station_id = es.station_id)
GROUP BY
bikeid
'''.format(PROJECT_ID),
"useLegacySql": False,
"destinationTable": {
"projectId": PROJECT_ID,
"datasetId": "citibike",
"tableId": "bike_trips",
},
"createDisposition": "CREATE_IF_NEEDED",
"writeDisposition": "WRITE_TRUNCATE",
}
},
)

# task dependency
trigger_datalake_dag >> create_dataset
create_dataset >> load_stations
create_dataset >> load_trips
create_dataset >> [load_stations, load_trips]
[load_stations, load_trips] >> create_bike_trips_table
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@

from airflow import models
from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator
from airflow.operators.dummy import DummyOperator


CONN_ID = "pgCitibike"
GCS_DATA_LAKE_BUCKET = os.environ.get("GCS_DATA_LAKE_BUCKET")


with models.DAG(
dag_id='postgres_to_datalake',
dag_id='from_database_to_data_lake',
description='Export data from the relational database to data lake in Google Cloud Storage',
start_date=datetime(2022, 1, 1),
schedule_interval="0 1 * * *",
catchup=False,
tags=['cloudsql', 'postgres', 'gcs'],
tags=['cloudsql', 'postgres', 'gcs', 'data_lake'],
) as dag:

task_stations = PostgresToGCSOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@


with models.DAG(
dag_id='postgres_restore',
dag_id='restore_postgres_backup',
description='Restore a PostgresSQL database backup',
start_date=datetime(2022, 1, 1),
schedule_interval="@once",
catchup=False,
tags=['example'],
tags=['example', 'infrastructure'],
) as dag:

import_body = {"importContext": {"fileType": "sql", "uri": FILE_NAME, "database":"citibike"}}
Expand Down
1 change: 1 addition & 0 deletions cloud-composer-etl/infra/composer.tf
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ module "composer" {
region = var.region
composer_env_name = var.composer_env_name
composer_service_account = google_service_account.service_account.email
image_version = var.image_version
environment_size = "ENVIRONMENT_SIZE_SMALL"
labels = local.resource_labels

Expand Down
14 changes: 7 additions & 7 deletions cloud-composer-etl/infra/network.tf
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@


resource "google_compute_network" "vpc_network" {
name = var.network_name
description = "VPC for Data Platform"
routing_mode = "GLOBAL"
name = var.network_name
description = "VPC for Data Platform"
routing_mode = "GLOBAL"
auto_create_subnetworks = false
}

resource "google_compute_subnetwork" "composer_subnetwork" {
name = var.composer_env_name
ip_cidr_range = var.composer_ip_ranges.nodes
region = var.region
network = google_compute_network.vpc_network.id
name = var.composer_env_name
ip_cidr_range = var.composer_ip_ranges.nodes
region = var.region
network = google_compute_network.vpc_network.id
private_ip_google_access = true

secondary_ip_range {
Expand Down
2 changes: 1 addition & 1 deletion cloud-composer-etl/infra/sql.tf
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

locals {
db_instance_name = "postgres-${random_id.db_name_suffix.hex}"
db_instance_name = "${var.composer_env_name}-${random_id.db_name_suffix.hex}"
airflow_conn_sample_db = "gcpcloudsql://airflow:${random_password.db_password.result}@${google_sql_database_instance.instance.private_ip_address}:5432/citibike"
}

Expand Down
1 change: 1 addition & 0 deletions cloud-composer-etl/infra/terraform.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

network_name = "cloud-composer-etl-vpc"
composer_env_name = "composer-af2"
image_version = "composer-2.9.6-airflow-2.9.3"

resource_labels = {
env = "sandbox"
Expand Down
5 changes: 5 additions & 0 deletions cloud-composer-etl/infra/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ variable "composer_env_name" {
description = "Cloud Composer environment name"
}

variable "image_version" {
type = string
description = "Cloud Composer image version"
}

variable "composer_ip_ranges" {
type = map(string)
description = "Composer 2 runs on GKE, so inform here the IP ranges you want to use"
Expand Down

0 comments on commit fda05b4

Please sign in to comment.