From 52a45f2aee107dfd6fde04ce92e77cf7b61c4e5c Mon Sep 17 00:00:00 2001 From: Brad Miro Date: Mon, 17 Jul 2023 16:53:52 -0400 Subject: [PATCH] fix: upgrade dataplex tables to managed, create new zone, remove manual table creation --- dataplex.tf | 131 +++++++++-- main.tf | 56 ++--- outputs.tf | 2 +- src/bigquery.py | 11 +- src/sql/view_ecommerce.sql | 10 +- src/yaml/copy-data.yaml | 111 ++++++++++ ...-project-setup.yaml => project-setup.yaml} | 207 ++++++++++-------- workflows.tf | 80 ++++++- 8 files changed, 459 insertions(+), 149 deletions(-) create mode 100644 src/yaml/copy-data.yaml rename src/yaml/{initial-workflow-project-setup.yaml => project-setup.yaml} (59%) diff --git a/dataplex.tf b/dataplex.tf index 14350e6a..958eb3e0 100644 --- a/dataplex.tf +++ b/dataplex.tf @@ -15,10 +15,9 @@ */ resource "google_project_service_identity" "dataplex_sa" { - provider = google-beta - project = module.project-services.project_id - service = "dataplex.googleapis.com" - depends_on = [time_sleep.wait_after_all_workflows] + provider = google-beta + project = module.project-services.project_id + service = "dataplex.googleapis.com" } resource "google_dataplex_lake" "gcp_primary" { @@ -31,59 +30,147 @@ resource "google_dataplex_lake" "gcp_primary" { gcp-lake = "exists" } - project = module.project-services.project_id - depends_on = [time_sleep.wait_after_all_workflows] + project = module.project-services.project_id } -#zone -resource "google_dataplex_zone" "gcp_primary_zone" { +#zone - raw +resource "google_dataplex_zone" "gcp_primary_raw" { discovery_spec { enabled = true } lake = google_dataplex_lake.gcp_primary.name location = var.region - name = "gcp-primary-zone" + name = "gcp-primary-raw" resource_spec { location_type = "SINGLE_REGION" } type = "RAW" - description = "Zone for thelook_ecommerce" - display_name = "Zone 1" + description = "Zone for thelook_ecommerce image data" + display_name = "images" labels = {} project = module.project-services.project_id - depends_on = [time_sleep.wait_after_all_workflows] + + } -#give dataplex access to biglake bucket -resource "google_project_iam_member" "dataplex_bucket_access" { +#zone - curated, for staging the data +resource "google_dataplex_zone" "gcp_primary_staging" { + discovery_spec { + enabled = true + } + + lake = google_dataplex_lake.gcp_primary.name + location = var.region + name = "gcp-primary-staging" + + resource_spec { + location_type = "SINGLE_REGION" + } + + type = "CURATED" + description = "Zone for thelook_ecommerce tabular data" + display_name = "staging" + labels = {} + project = module.project-services.project_id +} + +#zone - curated, for BI +resource "google_dataplex_zone" "gcp_primary_curated_bi" { + discovery_spec { + enabled = true + } + + lake = google_dataplex_lake.gcp_primary.name + location = var.region + name = "gcp-primary-curated" + + resource_spec { + location_type = "SINGLE_REGION" + } + + type = "CURATED" + description = "Zone for thelook_ecommerce tabular data" + display_name = "business_intelligence" + labels = {} + project = module.project-services.project_id +} + +# Assets are listed below. Assets need to wait for data to be copied to be created. + +#asset +resource "google_dataplex_asset" "gcp_primary_textocr" { + name = "gcp-primary-textocr" + location = var.region + + lake = google_dataplex_lake.gcp_primary.name + dataplex_zone = google_dataplex_zone.gcp_primary_raw.name + + discovery_spec { + enabled = true + } + + resource_spec { + name = "projects/${module.project-services.project_id}/buckets/${google_storage_bucket.textocr_images_bucket.name}" + type = "STORAGE_BUCKET" + } + + project = module.project-services.project_id + depends_on = [time_sleep.wait_after_all_resources, google_project_iam_member.dataplex_bucket_access] + +} + +#asset +resource "google_dataplex_asset" "gcp_primary_ga4_obfuscated_sample_ecommerce" { + name = "gcp-primary-ga4-obfuscated-sample-ecommerce" + location = var.region + + lake = google_dataplex_lake.gcp_primary.name + dataplex_zone = google_dataplex_zone.gcp_primary_raw.name + + discovery_spec { + enabled = true + } + + resource_spec { + name = "projects/${module.project-services.project_id}/buckets/${google_storage_bucket.ga4_images_bucket.name}" + type = "STORAGE_BUCKET" + } + project = module.project-services.project_id - role = "roles/dataplex.serviceAgent" - member = "serviceAccount:${google_project_service_identity.dataplex_sa.email}" - depends_on = [time_sleep.wait_after_all_workflows] + depends_on = [time_sleep.wait_after_all_resources, google_project_iam_member.dataplex_bucket_access] + } #asset -resource "google_dataplex_asset" "gcp_primary_asset" { - name = "gcp-primary-asset" +resource "google_dataplex_asset" "gcp_primary_tables" { + name = "gcp-primary-tables" location = var.region lake = google_dataplex_lake.gcp_primary.name - dataplex_zone = google_dataplex_zone.gcp_primary_zone.name + dataplex_zone = google_dataplex_zone.gcp_primary_staging.name discovery_spec { enabled = true } resource_spec { - name = "projects/${module.project-services.project_id}/buckets/${google_storage_bucket.raw_bucket.name}" + name = "projects/${module.project-services.project_id}/buckets/${google_storage_bucket.tables_bucket.name}" type = "STORAGE_BUCKET" } project = module.project-services.project_id - depends_on = [time_sleep.wait_after_all_workflows, google_project_iam_member.dataplex_bucket_access] + depends_on = [time_sleep.wait_after_all_resources, google_project_iam_member.dataplex_bucket_access] } + + +#give dataplex access to biglake bucket +resource "google_project_iam_member" "dataplex_bucket_access" { + project = module.project-services.project_id + role = "roles/dataplex.serviceAgent" + member = "serviceAccount:${google_project_service_identity.dataplex_sa.email}" +} diff --git a/main.tf b/main.tf index 10eb58b6..d45a2662 100644 --- a/main.tf +++ b/main.tf @@ -120,6 +120,29 @@ resource "google_storage_bucket" "provisioning_bucket" { } +resource "google_storage_bucket" "ga4_images_bucket" { + name = "gcp-${var.use_case_short}-ga4-images-${random_id.id.hex}" + project = module.project-services.project_id + location = var.region + uniform_bucket_level_access = true + force_destroy = var.force_destroy +} + +resource "google_storage_bucket" "textocr_images_bucket" { + name = "gcp-${var.use_case_short}-textocr-images-${random_id.id.hex}" + project = module.project-services.project_id + location = var.region + uniform_bucket_level_access = true + force_destroy = var.force_destroy +} + +resource "google_storage_bucket" "tables_bucket" { + name = "gcp-${var.use_case_short}-tables-${random_id.id.hex}" + project = module.project-services.project_id + location = var.region + uniform_bucket_level_access = true + force_destroy = var.force_destroy +} resource "google_storage_bucket_object" "pyspark_file" { bucket = google_storage_bucket.provisioning_bucket.name @@ -132,41 +155,20 @@ resource "google_storage_bucket_object" "pyspark_file" { } -#we will use this as a wait and to make sure every other resource in this project has completed. -#we will then make the last four workflow steps dependent on this +# Resources are dependent on one another. We will ensure the following set of resources are created before proceeding. resource "time_sleep" "wait_after_all_resources" { create_duration = "120s" depends_on = [ module.project-services, google_storage_bucket.provisioning_bucket, - google_project_service_identity.workflows, google_bigquery_dataset.gcp_lakehouse_ds, google_bigquery_connection.gcp_lakehouse_connection, google_project_iam_member.connectionPermissionGrant, google_workflows_workflow.project_setup, - data.google_storage_project_service_account.gcs_account - ] -} - -#execute workflows -data "google_client_config" "current" { -} - -data "http" "call_workflows_initial_project_setup" { - url = "https://workflowexecutions.googleapis.com/v1/projects/${module.project-services.project_id}/locations/${var.region}/workflows/${google_workflows_workflow.project_setup.name}/executions" - method = "POST" - request_headers = { - Accept = "application/json" - Authorization = "Bearer ${data.google_client_config.current.access_token}" } - depends_on = [ - time_sleep.wait_after_all_resources - ] -} - -resource "time_sleep" "wait_after_all_workflows" { - create_duration = "180s" - - depends_on = [ - data.http.call_workflows_initial_project_setup, + google_dataplex_zone.gcp_primary_raw, + google_dataplex_zone.gcp_primary_staging, + google_dataplex_zone.gcp_primary_curated_bi, + data.google_storage_project_service_account.gcs_account, + data.http.call_workflows_copy_data ] } diff --git a/outputs.tf b/outputs.tf index 3b6c36fa..08994f8c 100644 --- a/outputs.tf +++ b/outputs.tf @@ -16,7 +16,7 @@ output "workflow_return_project_setup" { description = "Output of the project setup workflow" - value = data.http.call_workflows_initial_project_setup.response_body + value = data.http.call_workflows_project_setup.response_body } output "lookerstudio_report_url" { diff --git a/src/bigquery.py b/src/bigquery.py index db0f123c..3ad9e093 100644 --- a/src/bigquery.py +++ b/src/bigquery.py @@ -17,7 +17,11 @@ from pyspark.sql import SparkSession import os -spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() +spark = SparkSession \ + .builder \ + .appName("spark-bigquery-demo") \ + .enableHiveSupport() \ + .getOrCreate() catalog = os.getenv("lakehouse_catalog", "lakehouse_catalog") database = os.getenv("lakehouse_db", "lakehouse_db") @@ -30,6 +34,9 @@ # used by the connector. spark.conf.set("temporaryGcsBucket", bucket) +# Delete the BigLake Catalog if it currently exists to ensure proper setup. +spark.sql(f"DROP NAMESPACE IF EXISTS {catalog} CASCADE;") + # Create BigLake Catalog and Database if they are not already created. spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {catalog};") spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog}.{database};") @@ -38,7 +45,7 @@ # Load data from BigQuery. events = spark.read.format("bigquery") \ - .option("table", "gcp_lakehouse_ds.gcp_tbl_events") \ + .option("table", "gcp_primary_staging.stage_thelook_ecommerce_events") \ .load() events.createOrReplaceTempView("events") diff --git a/src/sql/view_ecommerce.sql b/src/sql/view_ecommerce.sql index b4d5c903..a83dfba2 100644 --- a/src/sql/view_ecommerce.sql +++ b/src/sql/view_ecommerce.sql @@ -51,21 +51,21 @@ SELECT u.longitude user_long, u.traffic_source user_traffic_source FROM - gcp_lakehouse_ds.gcp_tbl_orders o + gcp_primary_staging.stage_thelook_ecommerce_orders o INNER JOIN - gcp_lakehouse_ds.gcp_tbl_order_items i + gcp_primary_staging.stage_thelook_ecommerce_order_items i ON o.order_id = i.order_id INNER JOIN - `gcp_lakehouse_ds.gcp_tbl_products` p + `gcp_primary_staging.stage_thelook_ecommerce_products` p ON i.product_id = p.id INNER JOIN - `gcp_lakehouse_ds.gcp_tbl_distribution_centers` d + `gcp_primary_staging.stage_thelook_ecommerce_distribution_centers` d ON p.distribution_center_id = d.id INNER JOIN - `gcp_lakehouse_ds.gcp_tbl_users` u + `gcp_primary_staging.stage_thelook_ecommerce_users` u ON o.user_id = u.id ; diff --git a/src/yaml/copy-data.yaml b/src/yaml/copy-data.yaml new file mode 100644 index 00000000..5ea802ec --- /dev/null +++ b/src/yaml/copy-data.yaml @@ -0,0 +1,111 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +main: + params: [] + steps: + - init: + # Define local variables from terraform env variables + assign: + - source_bucket_name: "data-analytics-demos" + - dest_ga4_images_bucket_name: ${ga4_images_bucket} + - dest_textocr_images_bucket_name: ${textocr_images_bucket} + - dest_tables_bucket_name: ${tables_bucket} + - images_zone_name: ${images_zone_name}ga4 + - tables_zone_name: ${tables_zone_name} + - lake_name: ${lake_name} + - sub_copy_data: + parallel: + branches: + - copy_textocr_images: + steps: + - copy_textocr_images_call: + call: copy_objects + args: + source_bucket_name: $${source_bucket_name} + prefix: "stage/TextOCR_images" + dest_bucket_name: $${dest_textocr_images_bucket_name} + result: copy_textocr_images_output + - copy_ga4_images: + steps: + - copy_ga4_images_call: + call: copy_objects + args: + source_bucket_name: $${source_bucket_name} + prefix: "stage/ga4_obfuscated_sample_ecommerce_images" + dest_bucket_name: $${dest_ga4_images_bucket_name} + result: copy_ga4_output + - copy_new_york_taxi_trips_tables: + steps: + - copy_new_york_taxi_trips_tables_call: + call: copy_objects + args: + source_bucket_name: $${source_bucket_name} + prefix: "new-york-taxi-trips" + dest_bucket_name: $${dest_tables_bucket_name} + result: copy_new_york_taxi_trips_tables_output + - copy_thelook_ecommerce_tables: + steps: + - copy_thelook_ecommerce_tables_call: + call: copy_objects + args: + source_bucket_name: $${source_bucket_name} + prefix: "stage/thelook_ecommerce" + dest_bucket_name: $${dest_tables_bucket_name} + result: copy_thelook_ecommerce_tables_output + +# Subworkflow to copy initial objects +copy_objects: + params: [source_bucket_name, prefix, dest_bucket_name] + steps: + - list_objects: + call: googleapis.storage.v1.objects.list + args: + bucket: $${source_bucket_name} + prefix: $${prefix} + result: list_result + - start_counter: + assign: + - copied_objects: 0 + - copy_objects: + parallel: + shared: [copied_objects] + for: + value: object + index: i + in: $${list_result.items} + steps: + - copy: + try: + steps: + - copy_object: + call: googleapis.storage.v1.objects.copy + args: + sourceBucket: $${source_bucket_name} + sourceObject: $${text.url_encode(object.name)} + destinationBucket: $${dest_bucket_name} + destinationObject: $${text.url_encode(object.name)} + result: copy_result + - save_result: + assign: + - copied_objects: $${copied_objects + 1} + except: + as: e + raise: + exception: $${e} + sourceBucket: $${source_bucket_name} + sourceObject: $${object.name} + destinationBucket: $${dest_bucket_name} + - finish: + return: $${copied_objects + " objects copied"} diff --git a/src/yaml/initial-workflow-project-setup.yaml b/src/yaml/project-setup.yaml similarity index 59% rename from src/yaml/initial-workflow-project-setup.yaml rename to src/yaml/project-setup.yaml index 2933348b..935cd2d2 100644 --- a/src/yaml/initial-workflow-project-setup.yaml +++ b/src/yaml/project-setup.yaml @@ -12,110 +12,142 @@ # See the License for the specific language governing permissions and # limitations under the License. +# This defines the Google Workflow for the Analytics lakehouse Soultion: https://console.cloud.google.com/products/solutions/details/analytics-lakehouse +# This Workflow executes through Terraform. For Google Workflows executed via Terraform, variables are defined such that: +# +# - Terraform environment variables are denoted by $ +# - Google Workflow variables are escaped via $$ +# +# To modify this Workflow to stand alone (no Terraform): +# +# - Replace vars in `main` -> `steps` -> `assign` with your own (or use https://cloud.google.com/workflows/docs/passing-runtime-arguments#gcloud) +# - Change all $$ to $ + main: params: [] steps: - - sub_copy_objects: - call: copy_objects - result: output + - init: + # Define local variables from terraform env variables + assign: + - temp_bucket_name: ${temp_bucket} + - dataproc_service_account_name: ${dataproc_service_account} + - provisioner_bucket_name: ${provisioner_bucket} + - warehouse_bucket_name: ${warehouse_bucket} + - sub_upgrade_dataplex_assets: + call: upgrade_dataplex_assets + result: upgrade_dataplex_assets_output + # TODO: change this to poll for BigQuery table creation + - sub_wait_for_dataplex_discovery: + call: sys.sleep + args: + seconds: 480 - sub_create_tables: call: create_tables - result: output1 + result: create_tables_output - sub_create_iceberg: call: create_iceberg - result: output2 + args: + temp_bucket_name: $${temp_bucket_name} + dataproc_service_account_name: $${dataproc_service_account_name} + provisioner_bucket_name: $${provisioner_bucket_name} + warehouse_bucket_name: $${warehouse_bucket_name} + result: create_iceberg_output - sub_create_taxonomy: call: create_taxonomy - result: output3 + result: create_taxonomy_output -# Subworkflow to copy initial objects -copy_objects: +# Subworkflow to upgrade all Dataplex Assets to Managed +# Subworkflow gets all lakes, then all zones within each lake, then all assets within each zone and upgrades +upgrade_dataplex_assets: steps: - init: assign: - - source_bucket: "data-analytics-demos" - - dest_bucket: ${raw_bucket} - - copied_objects: [] - - list_objects: - call: googleapis.storage.v1.objects.list + - project_id: $${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} + - location: $${sys.get_env("GOOGLE_CLOUD_LOCATION")} + - zones: [] + - get_lakes: + call: http.get args: - bucket: $${source_bucket} - prefix: "thelook" - result: list_result - - start_counter: + url: $${"https://dataplex.googleapis.com/v1/projects/"+project_id+"/locations/"+location+"/lakes"} + auth: + type: OAuth2 + result: Response + - assign_lakes: assign: - - copied_objects: 0 - - copy_objects: - for: - value: object - index: i - in: $${list_result.items} - steps: - - step1: - try: - steps: - - copy_object: - call: googleapis.storage.v1.objects.copy - args: - sourceBucket: $${source_bucket} - sourceObject: $${text.url_encode(object.name)} - destinationBucket: $${dest_bucket} - destinationObject: $${text.url_encode(object.name)} - result: copy_result - - save_result: - assign: - - copied_objects: $${copied_objects + 1} - except: - as: e - raise: - exception: $${e} - sourceBucket: $${source_bucket} - sourceObject: $${object.name} - destinationBucket: $${dest_bucket} - - finish: - return: $${copied_objects + " objects copied"} + - response_lakes: $${Response.body.lakes} + - get_zones: + for: + value: lake + index: i + in: $${response_lakes} + steps: + - get_zones_in_lake: + call: http.get + args: + url: $${"https://dataplex.googleapis.com/v1/"+lake.name+"/zones"} + auth: + type: OAuth2 + result: Response + - assign_zones: + assign: + - response_zones: $${Response.body.zones} -# Subworkflow to create BigLake tables -create_tables: - steps: - - assignStepTables: - assign: - - results: {} - - location: $${sys.get_env("GOOGLE_CLOUD_LOCATION")} - - bucket: ${raw_bucket} - - project_id: $${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} - - map: - gcp_tbl_order_items: $${"CREATE OR REPLACE EXTERNAL TABLE `gcp_lakehouse_ds.gcp_tbl_order_items` WITH CONNECTION `"+location+".gcp_lakehouse_connection` OPTIONS(format ='Parquet', uris = ['gs://" + bucket + "/thelook_ecommerce/order_items-0*.Parquet'], max_staleness = INTERVAL 30 MINUTE, metadata_cache_mode = 'AUTOMATIC');"} - gcp_tbl_orders: $${"CREATE OR REPLACE EXTERNAL TABLE `gcp_lakehouse_ds.gcp_tbl_orders` WITH CONNECTION `"+location+".gcp_lakehouse_connection` OPTIONS(format ='Parquet', uris = ['gs://" + bucket + "/thelook_ecommerce/orders-*.Parquet'], max_staleness = INTERVAL 30 MINUTE, metadata_cache_mode = 'AUTOMATIC');"} - gcp_tbl_users: $${"CREATE OR REPLACE EXTERNAL TABLE `gcp_lakehouse_ds.gcp_tbl_users` WITH CONNECTION `"+location+".gcp_lakehouse_connection` OPTIONS(format ='Parquet', uris = ['gs://" + bucket + "/thelook_ecommerce/users-*.Parquet'], max_staleness = INTERVAL 30 MINUTE, metadata_cache_mode = 'AUTOMATIC');"} - gcp_tbl_distribution_centers: $${"CREATE OR REPLACE EXTERNAL TABLE `gcp_lakehouse_ds.gcp_tbl_distribution_centers` WITH CONNECTION `"+location+".gcp_lakehouse_connection` OPTIONS(format ='Parquet', uris = ['gs://" + bucket + "/thelook_ecommerce/distribution_centers-*.Parquet'], max_staleness = INTERVAL 30 MINUTE, metadata_cache_mode = 'AUTOMATIC');"} - gcp_tbl_inventory_items: $${"CREATE OR REPLACE EXTERNAL TABLE `gcp_lakehouse_ds.gcp_tbl_inventory_items` WITH CONNECTION `"+location+".gcp_lakehouse_connection` OPTIONS(format ='Parquet', uris = ['gs://" + bucket + "/thelook_ecommerce/inventory_items-*.Parquet'], max_staleness = INTERVAL 30 MINUTE, metadata_cache_mode = 'AUTOMATIC');"} - gcp_tbl_products: $${"CREATE OR REPLACE EXTERNAL TABLE `gcp_lakehouse_ds.gcp_tbl_products` WITH CONNECTION `"+location+".gcp_lakehouse_connection` OPTIONS(format ='Parquet', uris = ['gs://" + bucket + "/thelook_ecommerce/products-0*.Parquet'], max_staleness = INTERVAL 30 MINUTE, metadata_cache_mode = 'AUTOMATIC');"} - - loopStepTables: + - save_zones: + for: + value: zone + index: j + in: $${response_zones} + steps: + - save_to_list: + assign: + - zones: $${list.concat(zones, zone)} + - get_and_upgrade_all_assets: for: - value: key - in: $${keys(map)} + value: zone + index: i + in: $${zones} steps: - - runQuery: - call: googleapis.bigquery.v2.jobs.query - args: - projectId: $${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} - body: - useLegacySql: false - useQueryCache: false - location: $${sys.get_env("GOOGLE_CLOUD_LOCATION")} - timeoutMs: 600000 - query: $${map[key]} - result: queryResult - - sumStep: - assign: - - results[key]: $${queryResult} + - get_assets_in_zone: + call: http.get + args: + url: $${"https://dataplex.googleapis.com/v1/"+zone.name+"/assets"} + auth: + type: OAuth2 + result: Response + - check_for_assets: + switch: + - condition: $${not("assets" in Response.body)} + next: continue + - assign_assets: + assign: + - response_assets: $${Response.body.assets} + - upgrade_all_assets: + for: + value: asset + index: j + in: $${response_assets} + steps: + - upgrade_asset: + call: http.patch + args: + url: $${"https://dataplex.googleapis.com/v1/"+asset.name+"?updateMask=resourceSpec.readAccessMode"} + auth: + type: OAuth2 + body: + resourceSpec: + readAccessMode: "MANAGED" + +# Subworkflow to create BigQuery views +create_tables: + steps: # Create and Assign Views - assignStepPolicies: assign: - - marketing_user: ${marketing_user} - - data_analyst_user: ${data_analyst_user} + - results: {} + # - marketing_user: ${marketing_user} + # - data_analyst_user: ${data_analyst_user} - policy_map: + # create something that duplicates the table and adds the policy # row_policy_usa_filter: $${"CREATE OR REPLACE ROW ACCESS POLICY usa_filter ON `" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + ".gcp_lakehouse_ds.gcp_tbl_users` GRANT TO ('serviceAccount:" + data_analyst_user + "') FILTER USING (Country = 'United States')"} # row_policy_product_category_filter: $${"CREATE OR REPLACE ROW ACCESS POLICY product_category_filter ON `" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + ".gcp_lakehouse_ds.gcp_tbl_products` GRANT TO ('serviceAccount:" + marketing_user + "') FILTER USING (Category = 'Swim' or Category = 'Active' or Category = 'Fashion Hoodies & Sweatshirts')"} create_view_ecommerce: $${"call gcp_lakehouse_ds.create_view_ecommerce()"} @@ -143,6 +175,7 @@ create_tables: # Subworkflow to create BLMS and Iceberg tables create_iceberg: + params: [temp_bucket_name, provisioner_bucket_name, dataproc_service_account_name, warehouse_bucket_name] steps: - assign_values: assign: @@ -150,10 +183,6 @@ create_iceberg: - location: $${sys.get_env("GOOGLE_CLOUD_LOCATION")} - connection_name: bq_spark_connection - batch_name: $${"initial-setup-"+text.substring(sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID"),0,7)} - - dataproc_service_account: ${dataproc_service_account} - - provisioner_bucket_name: ${provisioner_bucket} - - warehouse_bucket_name: "gs://${warehouse_bucket}/warehouse" - - temp_bucket_name: ${temp_bucket} - lakehouse_catalog: lakehouse_catalog - lakehouse_database: lakehouse_database - bq_dataset: gcp_lakehouse_ds @@ -178,8 +207,8 @@ create_iceberg: "spark.sql.catalog.lakehouse_catalog.catalog-impl": "org.apache.iceberg.gcp.biglake.BigLakeCatalog" "spark.sql.catalog.lakehouse_catalog.gcp_location": "$${location}" "spark.sql.catalog.lakehouse_catalog.gcp_project": "$${project_id}" - "spark.sql.catalog.lakehouse_catalog.warehouse": "$${warehouse_bucket_name}" - "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.14.1" + "spark.sql.catalog.lakehouse_catalog.warehouse": $${"gs://"+warehouse_bucket_name+"/warehouse"} + "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.3_2.13:1.2.1" "spark.dataproc.driverEnv.lakehouse_catalog": $${lakehouse_catalog} "spark.dataproc.driverEnv.lakehouse_database": $${lakehouse_database} "spark.dataproc.driverEnv.temp_bucket": $${temp_bucket_name} @@ -188,7 +217,7 @@ create_iceberg: environmentConfig: executionConfig: - serviceAccount: $${dataproc_service_account} + serviceAccount: $${dataproc_service_account_name} subnetworkUri: "dataproc-subnet" query: batchId: $${batch_name} diff --git a/workflows.tf b/workflows.tf index 060139a9..e24b20e0 100644 --- a/workflows.tf +++ b/workflows.tf @@ -56,14 +56,41 @@ resource "google_project_iam_member" "workflows_sa_roles" { ] } +# Workflow to copy data from prod GCS bucket to private buckets +# NOTE: google_storage_bucket..name omits the `gs://` prefix. +# You can use google_storage_bucket..url to include the prefix. +resource "google_workflows_workflow" "copy_data" { + name = "copy_data" + project = module.project-services.project_id + region = var.region + description = "Copies data and performs project setup" + service_account = google_service_account.workflows_sa.email + source_contents = templatefile("${path.module}/src/yaml/copy-data.yaml", { + textocr_images_bucket = google_storage_bucket.textocr_images_bucket.name, + ga4_images_bucket = google_storage_bucket.ga4_images_bucket.name, + tables_bucket = google_storage_bucket.tables_bucket.name, + images_zone_name = google_dataplex_zone.gcp_primary_raw.name, + tables_zone_name = google_dataplex_zone.gcp_primary_staging.name, + lake_name = google_dataplex_lake.gcp_primary.name + }) + + depends_on = [ + google_project_iam_member.workflows_sa_roles, + google_project_iam_member.dataproc_sa_roles + ] + +} + +# Workflow to set up project resources +# Note: google_storage_bucket..name omits the `gs://` prefix. +# You can use google_storage_bucket..url to include the prefix. resource "google_workflows_workflow" "project_setup" { - name = "initial-workflow-project-setup" + name = "project-setup" project = module.project-services.project_id region = var.region description = "Copies data and performs project setup" service_account = google_service_account.workflows_sa.email - source_contents = templatefile("${path.module}/src/yaml/initial-workflow-project-setup.yaml", { - raw_bucket = google_storage_bucket.raw_bucket.name, + source_contents = templatefile("${path.module}/src/yaml/project-setup.yaml", { data_analyst_user = google_service_account.data_analyst_user.email, marketing_user = google_service_account.marketing_user.email, dataproc_service_account = google_service_account.dataproc_service_account.email, @@ -78,3 +105,50 @@ resource "google_workflows_workflow" "project_setup" { ] } + +# execute workflows after all resources are created +# # get a token to execute the workflows +data "google_client_config" "current" { +} + +# # execute the copy data workflow +data "http" "call_workflows_copy_data" { + url = "https://workflowexecutions.googleapis.com/v1/projects/${module.project-services.project_id}/locations/${var.region}/workflows/${google_workflows_workflow.copy_data.name}/executions" + method = "POST" + request_headers = { + Accept = "application/json" + Authorization = "Bearer ${data.google_client_config.current.access_token}" } + depends_on = [ + google_workflows_workflow.copy_data, + google_storage_bucket.textocr_images_bucket, + google_storage_bucket.ga4_images_bucket, + google_storage_bucket.tables_bucket + ] +} + +# # execute the other project setup workflow +data "http" "call_workflows_project_setup" { + url = "https://workflowexecutions.googleapis.com/v1/projects/${module.project-services.project_id}/locations/${var.region}/workflows/${google_workflows_workflow.project_setup.name}/executions" + method = "POST" + request_headers = { + Accept = "application/json" + Authorization = "Bearer ${data.google_client_config.current.access_token}" } + depends_on = [ + google_workflows_workflow.project_setup, + google_dataplex_asset.gcp_primary_textocr, + google_dataplex_asset.gcp_primary_ga4_obfuscated_sample_ecommerce, + google_dataplex_asset.gcp_primary_tables + ] +} + +# Wait for the project setup workflow to finish. This step should take about +# 12 minutes total. Completing this is not a blocker to begin exploring the +# deployment, but we pause for five minutes to give some resources time to +# spin up. +resource "time_sleep" "wait_after_all_workflows" { + create_duration = "300s" + + depends_on = [ + data.http.call_workflows_project_setup, + ] +}