This session guides you to create Streaming Spark Structured Streaming Job to transform data from Kafka to Google Cloud Storage
The src code used in this guide is located here.
________________________________
|value |
|_______________________________|
|Lorem ipsum dolor sit amet |
|_______________________________|
_______________________________________________________________________________________________________________________________
|window.start |window.end |words |count |timestamp |date |
|_______________________________|_______________________________|_______|_______|_______________________________|_______________|
|2022-01-25 02:35:50 UTC |2022-01-25 02:36:00 UTC |Lorem |1 |2022-01-25 02:36:42.630 UTC |2022-01-25 |
|2022-01-25 02:35:50 UTC |2022-01-25 02:36:00 UTC |ipsum |1 |2022-01-25 02:36:42.630 UTC |2022-01-25 |
|2022-01-25 02:35:50 UTC |2022-01-25 02:36:00 UTC |dolor |1 |2022-01-25 02:36:42.630 UTC |2022-01-25 |
|2022-01-25 02:35:50 UTC |2022-01-25 02:36:00 UTC |sit |1 |2022-01-25 02:36:42.630 UTC |2022-01-25 |
|2022-01-25 02:35:50 UTC |2022-01-25 02:36:00 UTC |amet |1 |2022-01-25 02:36:42.630 UTC |2022-01-25 |
|_______________________________|_______________________________|_______|_______|_______________________________|_______________|
TODO - make it partitioned
- Have a Kafka cluster running to connect to (dev-tools instructions below)
- Compile the jar of the application
- Create a Dataproc cluster to run the streaming job
export PROJECT_ID="your_project_id"
export CLUSTER_ID="your_cluster_id"
export REGION="your_region"
export BUCKET="gs://your_bucket-streaming-kafka-gcs"
export SPARK_APP_PATH="${BUCKET}/your_spark_app.jar"
export KAFKA_BROKERS="your_kafka_brokers"
export TOPIC="your_kafka_topic"
export BUCKET_CHECKPOINT_PATH="${BUCKET}/your_stream_checkpoint_path/"
export WATERMARK_PERIOD="your_period (e.g. "10 seconds")"
export WINDOW_PERIOD="your_period (e.g. "10 seconds")"
export TRIGGER_PERIOD="your_period (e.g. "10 seconds")"
export BUCKET_OUTPUT_PATH="${BUCKET}/your_stream_data_output_path/"
export BUCKET_CLASS=YOUR_CLASS # e.g. STANDARD
gsutil mb -p ${PROJECT_ID} -c ${BUCKET_CLASS} -l ${REGION} ${BUCKET}
gsutil cp target/scala-2.12/{YOUR_LOCAL_JAR}.jar $SPARK_APP_PATH
Jobs Submit Reference Documentation
gcloud dataproc jobs submit spark \
--cluster=$CLUSTER_ID \
--jar=$SPARK_APP_PATH \
--region=$REGION \
-- --brokers=$KAFKA_BROKERS \
--topic=$TOPIC \
--checkpointPath=$BUCKET_CHECKPOINT_PATH \
--watermark="$WATERMARK_PERIOD" \
--windowDuration="$WINDOW_PERIOD" \
--triggerProcTime="$TRIGGER_PERIOD" \
--outputPath=$BUCKET_OUTPUT_PATH
You now have a populated output parquet table with the following path, after processing Kafka WordCount events.
echo ${BUCKET_OUTPUT_PATH}
You can load the Parquet to BigQuery using following the steps below.
The --replace flag overwrites if table already exists.
export GENERATED_DATE_PARTIONED_TABLE="your_dataset.your_stream_table_imported"
export SRC_PARQUET_DATA="${GENERATED_INPUT_PARQUET}*.parquet"
bq load --replace --source_format=PARQUET ${GENERATED_DATE_PARTIONED_TABLE} ${SRC_PARQUET_DATA}
If you want quickly a Kafka cluster on GCP and be able to send sentences messages to the topic, an option is:
- Go to GCP Marketplace, and launch an Apache Kafka Server on CentOS 8.4 Server.
- Go to your GCP Compute Engine VM Instances and locate the created Kafka instance.
- Open 3 SSH sessions and apply the following commands:
# SSH 1
# Start Zookeeper
cd /opt/kafka/
sudo bin/zookeeper-server-start.sh config/zookeeper.properties
# SSH 2
# Start Kafka
cd /opt/kafka/
sudo bin/kafka-server-start.sh config/server.properties
# SSH 3
# Create topic and open console to send messages
cd /opt/kafka/
bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic my-topic --bootstrap-server localhost:9092
bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092
Be aware of the incurred costs.
All code snippets within this document are provided under the following terms.
Copyright 2022 Google. This software is provided as-is, without warranty or representation for any use or purpose. Your use of it is subject to your agreement with Google.