Skip to content

Commit

Permalink
fix: upgrade dataplex tables to managed, create new zone, remove manu…
Browse files Browse the repository at this point in the history
…al table creation
  • Loading branch information
bradmiro authored Jul 17, 2023
1 parent ed7088b commit 52a45f2
Show file tree
Hide file tree
Showing 8 changed files with 459 additions and 149 deletions.
131 changes: 109 additions & 22 deletions dataplex.tf
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
*/

resource "google_project_service_identity" "dataplex_sa" {
provider = google-beta
project = module.project-services.project_id
service = "dataplex.googleapis.com"
depends_on = [time_sleep.wait_after_all_workflows]
provider = google-beta
project = module.project-services.project_id
service = "dataplex.googleapis.com"
}

resource "google_dataplex_lake" "gcp_primary" {
Expand All @@ -31,59 +30,147 @@ resource "google_dataplex_lake" "gcp_primary" {
gcp-lake = "exists"
}

project = module.project-services.project_id
depends_on = [time_sleep.wait_after_all_workflows]
project = module.project-services.project_id

}

#zone
resource "google_dataplex_zone" "gcp_primary_zone" {
#zone - raw
resource "google_dataplex_zone" "gcp_primary_raw" {
discovery_spec {
enabled = true
}

lake = google_dataplex_lake.gcp_primary.name
location = var.region
name = "gcp-primary-zone"
name = "gcp-primary-raw"

resource_spec {
location_type = "SINGLE_REGION"
}

type = "RAW"
description = "Zone for thelook_ecommerce"
display_name = "Zone 1"
description = "Zone for thelook_ecommerce image data"
display_name = "images"
labels = {}
project = module.project-services.project_id
depends_on = [time_sleep.wait_after_all_workflows]


}

#give dataplex access to biglake bucket
resource "google_project_iam_member" "dataplex_bucket_access" {
#zone - curated, for staging the data
resource "google_dataplex_zone" "gcp_primary_staging" {
discovery_spec {
enabled = true
}

lake = google_dataplex_lake.gcp_primary.name
location = var.region
name = "gcp-primary-staging"

resource_spec {
location_type = "SINGLE_REGION"
}

type = "CURATED"
description = "Zone for thelook_ecommerce tabular data"
display_name = "staging"
labels = {}
project = module.project-services.project_id
}

#zone - curated, for BI
resource "google_dataplex_zone" "gcp_primary_curated_bi" {
discovery_spec {
enabled = true
}

lake = google_dataplex_lake.gcp_primary.name
location = var.region
name = "gcp-primary-curated"

resource_spec {
location_type = "SINGLE_REGION"
}

type = "CURATED"
description = "Zone for thelook_ecommerce tabular data"
display_name = "business_intelligence"
labels = {}
project = module.project-services.project_id
}

# Assets are listed below. Assets need to wait for data to be copied to be created.

#asset
resource "google_dataplex_asset" "gcp_primary_textocr" {
name = "gcp-primary-textocr"
location = var.region

lake = google_dataplex_lake.gcp_primary.name
dataplex_zone = google_dataplex_zone.gcp_primary_raw.name

discovery_spec {
enabled = true
}

resource_spec {
name = "projects/${module.project-services.project_id}/buckets/${google_storage_bucket.textocr_images_bucket.name}"
type = "STORAGE_BUCKET"
}

project = module.project-services.project_id
depends_on = [time_sleep.wait_after_all_resources, google_project_iam_member.dataplex_bucket_access]

}

#asset
resource "google_dataplex_asset" "gcp_primary_ga4_obfuscated_sample_ecommerce" {
name = "gcp-primary-ga4-obfuscated-sample-ecommerce"
location = var.region

lake = google_dataplex_lake.gcp_primary.name
dataplex_zone = google_dataplex_zone.gcp_primary_raw.name

discovery_spec {
enabled = true
}

resource_spec {
name = "projects/${module.project-services.project_id}/buckets/${google_storage_bucket.ga4_images_bucket.name}"
type = "STORAGE_BUCKET"
}

project = module.project-services.project_id
role = "roles/dataplex.serviceAgent"
member = "serviceAccount:${google_project_service_identity.dataplex_sa.email}"
depends_on = [time_sleep.wait_after_all_workflows]
depends_on = [time_sleep.wait_after_all_resources, google_project_iam_member.dataplex_bucket_access]

}

#asset
resource "google_dataplex_asset" "gcp_primary_asset" {
name = "gcp-primary-asset"
resource "google_dataplex_asset" "gcp_primary_tables" {
name = "gcp-primary-tables"
location = var.region

lake = google_dataplex_lake.gcp_primary.name
dataplex_zone = google_dataplex_zone.gcp_primary_zone.name
dataplex_zone = google_dataplex_zone.gcp_primary_staging.name

discovery_spec {
enabled = true
}

resource_spec {
name = "projects/${module.project-services.project_id}/buckets/${google_storage_bucket.raw_bucket.name}"
name = "projects/${module.project-services.project_id}/buckets/${google_storage_bucket.tables_bucket.name}"
type = "STORAGE_BUCKET"
}

project = module.project-services.project_id
depends_on = [time_sleep.wait_after_all_workflows, google_project_iam_member.dataplex_bucket_access]
depends_on = [time_sleep.wait_after_all_resources, google_project_iam_member.dataplex_bucket_access]

}


#give dataplex access to biglake bucket
resource "google_project_iam_member" "dataplex_bucket_access" {
project = module.project-services.project_id
role = "roles/dataplex.serviceAgent"
member = "serviceAccount:${google_project_service_identity.dataplex_sa.email}"
}
56 changes: 29 additions & 27 deletions main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,29 @@ resource "google_storage_bucket" "provisioning_bucket" {

}

resource "google_storage_bucket" "ga4_images_bucket" {
name = "gcp-${var.use_case_short}-ga4-images-${random_id.id.hex}"
project = module.project-services.project_id
location = var.region
uniform_bucket_level_access = true
force_destroy = var.force_destroy
}

resource "google_storage_bucket" "textocr_images_bucket" {
name = "gcp-${var.use_case_short}-textocr-images-${random_id.id.hex}"
project = module.project-services.project_id
location = var.region
uniform_bucket_level_access = true
force_destroy = var.force_destroy
}

resource "google_storage_bucket" "tables_bucket" {
name = "gcp-${var.use_case_short}-tables-${random_id.id.hex}"
project = module.project-services.project_id
location = var.region
uniform_bucket_level_access = true
force_destroy = var.force_destroy
}

resource "google_storage_bucket_object" "pyspark_file" {
bucket = google_storage_bucket.provisioning_bucket.name
Expand All @@ -132,41 +155,20 @@ resource "google_storage_bucket_object" "pyspark_file" {

}

#we will use this as a wait and to make sure every other resource in this project has completed.
#we will then make the last four workflow steps dependent on this
# Resources are dependent on one another. We will ensure the following set of resources are created before proceeding.
resource "time_sleep" "wait_after_all_resources" {
create_duration = "120s"
depends_on = [
module.project-services,
google_storage_bucket.provisioning_bucket,
google_project_service_identity.workflows,
google_bigquery_dataset.gcp_lakehouse_ds,
google_bigquery_connection.gcp_lakehouse_connection,
google_project_iam_member.connectionPermissionGrant,
google_workflows_workflow.project_setup,
data.google_storage_project_service_account.gcs_account
]
}

#execute workflows
data "google_client_config" "current" {
}

data "http" "call_workflows_initial_project_setup" {
url = "https://workflowexecutions.googleapis.com/v1/projects/${module.project-services.project_id}/locations/${var.region}/workflows/${google_workflows_workflow.project_setup.name}/executions"
method = "POST"
request_headers = {
Accept = "application/json"
Authorization = "Bearer ${data.google_client_config.current.access_token}" }
depends_on = [
time_sleep.wait_after_all_resources
]
}

resource "time_sleep" "wait_after_all_workflows" {
create_duration = "180s"

depends_on = [
data.http.call_workflows_initial_project_setup,
google_dataplex_zone.gcp_primary_raw,
google_dataplex_zone.gcp_primary_staging,
google_dataplex_zone.gcp_primary_curated_bi,
data.google_storage_project_service_account.gcs_account,
data.http.call_workflows_copy_data
]
}
2 changes: 1 addition & 1 deletion outputs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

output "workflow_return_project_setup" {
description = "Output of the project setup workflow"
value = data.http.call_workflows_initial_project_setup.response_body
value = data.http.call_workflows_project_setup.response_body
}

output "lookerstudio_report_url" {
Expand Down
11 changes: 9 additions & 2 deletions src/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
from pyspark.sql import SparkSession
import os

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
spark = SparkSession \
.builder \
.appName("spark-bigquery-demo") \
.enableHiveSupport() \
.getOrCreate()

catalog = os.getenv("lakehouse_catalog", "lakehouse_catalog")
database = os.getenv("lakehouse_db", "lakehouse_db")
Expand All @@ -30,6 +34,9 @@
# used by the connector.
spark.conf.set("temporaryGcsBucket", bucket)

# Delete the BigLake Catalog if it currently exists to ensure proper setup.
spark.sql(f"DROP NAMESPACE IF EXISTS {catalog} CASCADE;")

# Create BigLake Catalog and Database if they are not already created.
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {catalog};")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog}.{database};")
Expand All @@ -38,7 +45,7 @@

# Load data from BigQuery.
events = spark.read.format("bigquery") \
.option("table", "gcp_lakehouse_ds.gcp_tbl_events") \
.option("table", "gcp_primary_staging.stage_thelook_ecommerce_events") \
.load()
events.createOrReplaceTempView("events")

Expand Down
10 changes: 5 additions & 5 deletions src/sql/view_ecommerce.sql
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,21 @@ SELECT
u.longitude user_long,
u.traffic_source user_traffic_source
FROM
gcp_lakehouse_ds.gcp_tbl_orders o
gcp_primary_staging.stage_thelook_ecommerce_orders o
INNER JOIN
gcp_lakehouse_ds.gcp_tbl_order_items i
gcp_primary_staging.stage_thelook_ecommerce_order_items i
ON
o.order_id = i.order_id
INNER JOIN
`gcp_lakehouse_ds.gcp_tbl_products` p
`gcp_primary_staging.stage_thelook_ecommerce_products` p
ON
i.product_id = p.id
INNER JOIN
`gcp_lakehouse_ds.gcp_tbl_distribution_centers` d
`gcp_primary_staging.stage_thelook_ecommerce_distribution_centers` d
ON
p.distribution_center_id = d.id
INNER JOIN
`gcp_lakehouse_ds.gcp_tbl_users` u
`gcp_primary_staging.stage_thelook_ecommerce_users` u
ON
o.user_id = u.id
;
Loading

0 comments on commit 52a45f2

Please sign in to comment.