diff --git a/cloud-composer-etl/.gcloudignore b/cloud-composer-etl/.gcloudignore new file mode 100644 index 0000000..50c7e83 --- /dev/null +++ b/cloud-composer-etl/.gcloudignore @@ -0,0 +1,3 @@ +venv +README.md +assets diff --git a/cloud-composer-etl/README.md b/cloud-composer-etl/README.md index 92c3698..8c5d1c2 100644 --- a/cloud-composer-etl/README.md +++ b/cloud-composer-etl/README.md @@ -113,10 +113,12 @@ gcloud builds submit . --config build/cloudbuild_destroy.yaml ``` ## Known issues -If you face problems to delete the Service Networking peering: +If you face problems to delete the Service Networking peering, that is expected. Please check this [link for reference](https://cloud.google.com/vpc/docs/configure-private-services-access#removing-connection). + ``` Error: Unable to remove Service Networking Connection, err: Error waiting for Delete Service Networking Connection: Error code 9, message: Failed to delete connection; Producer services (e.g. CloudSQL, Cloud Memstore, etc.) are still using this connection ``` -Go to the [Console](https://console.cloud.google.com/networking/peering/list) and manually delete the peering. + +Workaround: Go to the [Console](https://console.cloud.google.com/networking/peering/list) and manually delete the peering. Then, run the Cloud Build Destroy job again. diff --git a/cloud-composer-etl/dags/bigquery_transform.py b/cloud-composer-etl/dags/bigquery_transform.py deleted file mode 100644 index 363c624..0000000 --- a/cloud-composer-etl/dags/bigquery_transform.py +++ /dev/null @@ -1,72 +0,0 @@ -# Copyright 2023 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -This example shows how to denormalize your data with a BigQuery job. -""" -import os -from datetime import datetime -from airflow import models -from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator - - -PROJECT_ID = os.environ.get("GCP_PROJECT") - -with models.DAG( - dag_id='bigquery_transform', - schedule_interval="@once", - start_date=datetime(2022, 1, 1), - catchup=False, - tags=["example", "bigquery"], -) as dag: - - BIKE_TRIPS = ( - '''SELECT - bikeid, - COUNT(*) AS trip_count, - MAX(starttime) AS last_start, - MAX(stoptime) AS last_stop, - ARRAY_AGG( STRUCT( - ss.name AS start_station_name, - t.starttime AS start_time, - es.name AS end_station_name, - t.stoptime AS end_time, - tripduration AS trip_duration) ) AS trips - FROM - `{0}.citibike.trips` t - JOIN - `{0}.citibike.stations` AS ss ON (t.start_station_id = ss.station_id) - JOIN - `{0}.citibike.stations` AS es ON (t.end_station_id = es.station_id) - GROUP BY - bikeid - '''.format(PROJECT_ID) - ) - - create_bike_trips_table = BigQueryInsertJobOperator( - task_id="create_bike_trips_table", - configuration={ - "query": { - "query": BIKE_TRIPS, - "useLegacySql": False, - "destinationTable": { - "projectId": PROJECT_ID, - "datasetId": "citibike", - "tableId": "bike_trips", - }, - "createDisposition": "CREATE_IF_NEEDED", - "writeDisposition": "WRITE_TRUNCATE", - } - }, - ) diff --git a/cloud-composer-etl/dags/dag_with_sla.py b/cloud-composer-etl/dags/dag_with_sla.py new file mode 100644 index 0000000..9b9db98 --- /dev/null +++ b/cloud-composer-etl/dags/dag_with_sla.py @@ -0,0 +1,48 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# For more information about SLAs, please check https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#slas + +from datetime import datetime, timedelta +from airflow import models +from airflow.operators.bash_operator import BashOperator + +def my_sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis): + # add your code here, for example, it can send a notification to your team chat + print(f"Dag {dag.dag_id} tasks {task_list} missed SLA") + + +with models.DAG( + dag_id='dag_with_sla', + description='This dag shows how to set up SLA for each task', + schedule_interval="*/5 * * * *", + start_date=datetime(2022, 1, 1), + catchup=False, + sla_miss_callback=my_sla_miss_callback, + tags=["example", "has_sla"] +) as dag: + + task_1 = BashOperator( + task_id='task_1', + bash_command='sleep 5', + dag=dag, + sla=timedelta(seconds=10)) # from dag start time + + task_2 = BashOperator( + task_id='task_2', + bash_command='sleep 20', + dag=dag, + sla=timedelta(seconds=20)) # from dag start time + +task_1 >> task_2 diff --git a/cloud-composer-etl/dags/datalake_to_dw.py b/cloud-composer-etl/dags/from_data_lake_to_data_warehouse.py similarity index 53% rename from cloud-composer-etl/dags/datalake_to_dw.py rename to cloud-composer-etl/dags/from_data_lake_to_data_warehouse.py index 47888f4..793bea5 100644 --- a/cloud-composer-etl/dags/datalake_to_dw.py +++ b/cloud-composer-etl/dags/from_data_lake_to_data_warehouse.py @@ -20,16 +20,16 @@ from airflow import models from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator -from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyDatasetOperator - +from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyDatasetOperator,BigQueryInsertJobOperator CONN_ID = "pgCitibike" DATASET_NAME = "citibike" GCS_DATA_LAKE_BUCKET = os.environ.get("GCS_DATA_LAKE_BUCKET") - +PROJECT_ID = os.environ.get("GCP_PROJECT") with models.DAG( - dag_id='datalake_to_dw', + dag_id='from_data_lake_to_data_warehouse', + description='Import data from the data lake to the data warehouse in BigQuery', schedule_interval="@once", start_date=datetime(2022, 1, 1), catchup=False, @@ -37,8 +37,8 @@ ) as dag: trigger_datalake_dag = TriggerDagRunOperator( - task_id="trigger_datalake_dag", - trigger_dag_id="postgres_to_datalake", + task_id="trigger_data_lake_dag", + trigger_dag_id="from_database_to_data_lake", wait_for_completion=True, poke_interval=10, # seconds execution_date="{{ execution_date }}" @@ -72,6 +72,43 @@ write_disposition='WRITE_TRUNCATE', ) + create_bike_trips_table = BigQueryInsertJobOperator( + task_id="create_bike_trips_table", + configuration={ + "query": { + "query": '''SELECT + bikeid, + COUNT(*) AS trip_count, + MAX(starttime) AS last_start, + MAX(stoptime) AS last_stop, + ARRAY_AGG( STRUCT( + ss.name AS start_station_name, + t.starttime AS start_time, + es.name AS end_station_name, + t.stoptime AS end_time, + tripduration AS trip_duration) ) AS trips + FROM + `{0}.citibike.trips` t + JOIN + `{0}.citibike.stations` AS ss ON (t.start_station_id = ss.station_id) + JOIN + `{0}.citibike.stations` AS es ON (t.end_station_id = es.station_id) + GROUP BY + bikeid + '''.format(PROJECT_ID), + "useLegacySql": False, + "destinationTable": { + "projectId": PROJECT_ID, + "datasetId": "citibike", + "tableId": "bike_trips", + }, + "createDisposition": "CREATE_IF_NEEDED", + "writeDisposition": "WRITE_TRUNCATE", + } + }, + ) + +# task dependency trigger_datalake_dag >> create_dataset -create_dataset >> load_stations -create_dataset >> load_trips +create_dataset >> [load_stations, load_trips] +[load_stations, load_trips] >> create_bike_trips_table diff --git a/cloud-composer-etl/dags/postgres_to_datalake.py b/cloud-composer-etl/dags/from_database_to_data_lake.py similarity index 90% rename from cloud-composer-etl/dags/postgres_to_datalake.py rename to cloud-composer-etl/dags/from_database_to_data_lake.py index bba61bd..b102375 100644 --- a/cloud-composer-etl/dags/postgres_to_datalake.py +++ b/cloud-composer-etl/dags/from_database_to_data_lake.py @@ -21,7 +21,6 @@ from airflow import models from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator -from airflow.operators.dummy import DummyOperator CONN_ID = "pgCitibike" @@ -29,11 +28,12 @@ with models.DAG( - dag_id='postgres_to_datalake', + dag_id='from_database_to_data_lake', + description='Export data from the relational database to data lake in Google Cloud Storage', start_date=datetime(2022, 1, 1), schedule_interval="0 1 * * *", catchup=False, - tags=['cloudsql', 'postgres', 'gcs'], + tags=['cloudsql', 'postgres', 'gcs', 'data_lake'], ) as dag: task_stations = PostgresToGCSOperator( diff --git a/cloud-composer-etl/dags/postgres_restore.py b/cloud-composer-etl/dags/restore_postgres_backup.py similarity index 90% rename from cloud-composer-etl/dags/postgres_restore.py rename to cloud-composer-etl/dags/restore_postgres_backup.py index cbf02c0..47a6cc3 100644 --- a/cloud-composer-etl/dags/postgres_restore.py +++ b/cloud-composer-etl/dags/restore_postgres_backup.py @@ -27,11 +27,12 @@ with models.DAG( - dag_id='postgres_restore', + dag_id='restore_postgres_backup', + description='Restore a PostgresSQL database backup', start_date=datetime(2022, 1, 1), schedule_interval="@once", catchup=False, - tags=['example'], + tags=['example', 'infrastructure'], ) as dag: import_body = {"importContext": {"fileType": "sql", "uri": FILE_NAME, "database":"citibike"}} diff --git a/cloud-composer-etl/infra/composer.tf b/cloud-composer-etl/infra/composer.tf index 3783e0b..d94aabd 100644 --- a/cloud-composer-etl/infra/composer.tf +++ b/cloud-composer-etl/infra/composer.tf @@ -20,6 +20,7 @@ module "composer" { region = var.region composer_env_name = var.composer_env_name composer_service_account = google_service_account.service_account.email + image_version = var.image_version environment_size = "ENVIRONMENT_SIZE_SMALL" labels = local.resource_labels diff --git a/cloud-composer-etl/infra/network.tf b/cloud-composer-etl/infra/network.tf index bf01007..294ba13 100644 --- a/cloud-composer-etl/infra/network.tf +++ b/cloud-composer-etl/infra/network.tf @@ -14,17 +14,17 @@ resource "google_compute_network" "vpc_network" { - name = var.network_name - description = "VPC for Data Platform" - routing_mode = "GLOBAL" + name = var.network_name + description = "VPC for Data Platform" + routing_mode = "GLOBAL" auto_create_subnetworks = false } resource "google_compute_subnetwork" "composer_subnetwork" { - name = var.composer_env_name - ip_cidr_range = var.composer_ip_ranges.nodes - region = var.region - network = google_compute_network.vpc_network.id + name = var.composer_env_name + ip_cidr_range = var.composer_ip_ranges.nodes + region = var.region + network = google_compute_network.vpc_network.id private_ip_google_access = true secondary_ip_range { diff --git a/cloud-composer-etl/infra/sql.tf b/cloud-composer-etl/infra/sql.tf index d76ae4d..0147e03 100644 --- a/cloud-composer-etl/infra/sql.tf +++ b/cloud-composer-etl/infra/sql.tf @@ -13,7 +13,7 @@ # limitations under the License. locals { - db_instance_name = "postgres-${random_id.db_name_suffix.hex}" + db_instance_name = "${var.composer_env_name}-${random_id.db_name_suffix.hex}" airflow_conn_sample_db = "gcpcloudsql://airflow:${random_password.db_password.result}@${google_sql_database_instance.instance.private_ip_address}:5432/citibike" } diff --git a/cloud-composer-etl/infra/terraform.tfvars b/cloud-composer-etl/infra/terraform.tfvars index 1134d58..fb18661 100644 --- a/cloud-composer-etl/infra/terraform.tfvars +++ b/cloud-composer-etl/infra/terraform.tfvars @@ -14,6 +14,7 @@ network_name = "cloud-composer-etl-vpc" composer_env_name = "composer-af2" +image_version = "composer-2.9.6-airflow-2.9.3" resource_labels = { env = "sandbox" diff --git a/cloud-composer-etl/infra/variables.tf b/cloud-composer-etl/infra/variables.tf index c0238ad..302a99c 100644 --- a/cloud-composer-etl/infra/variables.tf +++ b/cloud-composer-etl/infra/variables.tf @@ -40,6 +40,11 @@ variable "composer_env_name" { description = "Cloud Composer environment name" } +variable "image_version" { + type = string + description = "Cloud Composer image version" +} + variable "composer_ip_ranges" { type = map(string) description = "Composer 2 runs on GKE, so inform here the IP ranges you want to use"