This session guides you to create a Batch Spark Job to transform data from Google Cloud Storage to Google Cloud Storage
The src code used in this guide is located here.
You can generate this input table using the create-dataset job
________________________________________________________________________________
|value |timestamp |date |
|_______________________________|_______________________________|_______________|
|Lorem ipsum dolor sit amet |2022-01-25 02:35:55.630 UTC |2022-01-25 |
|_______________________________|_______________________________|_______________|
partitioned by date_partition=YYYY-MM-DD (copy of date column)
_______________________________________________________________
|words |count |timestamp |date |
|_______|_______|_______________________________|_______________|
|Lorem |1 |2022-01-25 02:36:42.630 UTC |2022-01-25 |
|ipsum |1 |2022-01-25 02:36:42.630 UTC |2022-01-25 |
|dolor |1 |2022-01-25 02:36:42.630 UTC |2022-01-25 |
|sit |1 |2022-01-25 02:36:42.630 UTC |2022-01-25 |
|amet |1 |2022-01-25 02:36:42.630 UTC |2022-01-25 |
|_______|_______|_______________________________|_______________|
partitioned by date_partition=YYYY-MM-DD (copy of date column)
- Compile the jar of the application
export PROJECT_ID="your_project_id"
export CLUSTER_ID="your_cluster_id"
export REGION="your_region"
export BUCKET="gs://your_bucket-batch-gcs-to-cgs"
export SPARK_APP_PATH="${BUCKET}/your_spark_app.jar"
export BUCKET_INPUT_PATH="${BUCKET}/your_data_input_path/"
export BUCKET_OUTPUT_PATH="${BUCKET}/your_data_output_path/"
export BUCKET_CLASS=YOUR_CLASS # e.g. STANDARD
gsutil mb -p ${PROJECT_ID} -c ${BUCKET_CLASS} -l ${REGION} ${BUCKET}
gsutil cp target/scala-2.12/{YOUR_LOCAL_JAR}.jar $SPARK_APP_PATH
Jobs Submit Reference Documentation
Prerequisite: Create a Dataproc cluster. Note: since it is a batch job, instead of creating a persistent cluster, follow the Composer guide, to run the job using a pipeline, that creates an ephemeral cluster and then deletes it after running the job
gcloud dataproc jobs submit spark \
--cluster=$CLUSTER_ID \
--jar=$SPARK_APP_PATH \
--region=$REGION \
-- --inputPath=$BUCKET_INPUT_PATH \
--outputPath=$BUCKET_OUTPUT_PATH
Workflow Reference Documentation 1 Workflow Reference Documentation 2
Before running the workflow, you must update the gcp-dataproc-workflow/workflow.yaml file with the required variables values that above are passed as arguments.
This example workflow will create an ephemeral cluster to run the job (Managed Cluster), but it can also be configured to select an existing cluster (Cluster Selector).
export BUCKET_WORKFLOW_PATH="${BUCKET}/config/batch-gcs-gcs-workflow.yaml"
gsutil cp gcp-dataproc-workflow/batch-gcs-gcs-workflow.yaml $BUCKET_WORKFLOW_PATH
gcloud dataproc workflow-templates instantiate-from-file \
--file=$BUCKET_WORKFLOW_PATH \
--region=$REGION \
--async
Serverless Batches Submit Reference Documentation
gcloud dataproc batches submit spark \
--region=$REGION \
--jar=$SPARK_APP_PATH \
--subnet="your_subnet" \
-- --inputPath=$BUCKET_INPUT_PATH \
--outputPath=$BUCKET_OUTPUT_PATH
You now have a populated output parquet table with the following path.
echo ${BUCKET_OUTPUT_PATH}
You can load the Parquet to a new BigQuery native table using following the steps below.
This will result in a BigQuery table partitioned by date=YYYY-MM-DD instead of date_partition=YYYY-MM-DD.
The --replace flag overwrites if table already exists.
export BQ_DATASET=your_dataset
export BQ_TABLE=your_table
export SRC_PARQUET_DATA="${BUCKET_OUTPUT_PATH}*.parquet"
bq load --replace --source_format=PARQUET --time_partitioning_field="date" ${BQ_DATASET}.${BQ_TABLE} ${SRC_PARQUET_DATA}
You can create a BigQuery External table that points to the data in the GCS bucket. That is done first by creating an external table definition and then creating the table with this definition. The external table definition can be partition as long as the data follow a default hive partitioning layout.
export BQ_DATASET=your_dataset
export BQ_TABLE=your_external_table
bq mkdef \
--source_format=PARQUET \
--hive_partitioning_mode=AUTO \
--hive_partitioning_source_uri_prefix=${BUCKET_OUTPUT_PATH} \
--require_hive_partition_filter=True \
${BUCKET_OUTPUT_PATH}*.parquet > external_table_definition.json
bq mk --external_table_definition=external_table_definition.json ${PROJECT_ID}:${BQ_DATASET}.${BQ_TABLE}
All code snippets within this document are provided under the following terms.
Copyright 2022 Google. This software is provided as-is, without warranty or representation for any use or purpose. Your use of it is subject to your agreement with Google.