Skip to content

Commit

Permalink
Merge pull request #11 from Cloudbakers/feature/npredey-replay-pipeline
Browse files Browse the repository at this point in the history
Creation of Replay Pipeline for Splunk export
  • Loading branch information
rarsan authored Aug 25, 2021
2 parents f489a5e + be0ab52 commit 130bca7
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 28 deletions.
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ dataflow_job_batch_count | (Optional) Batch count of messages in single request
dataflow_job_disable_certificate_validation | (Optional) Boolean to disable SSL certificate validation (default false)
dataflow_job_udf_gcs_path | (Optional) GCS path for JavaScript file (default No UDF used)
dataflow_job_udf_function_name | (Optional) Name of JavaScript function to be called (default No UDF used)
dataflow_template_path | (Optional) Dataflow template GCS path (default 'gs://dataflow-templates/latest/Cloud_PubSub_to_Splunk'). Override this for version pinning.
dataflow_template_version | (Optional) Dataflow template release version (default 'latest'). Override this for version pinning e.g. '2021-08-02-00_RC00'. Must specify version only since template GCS path will be deduced automatically: 'gs://dataflow-templates/`version`/Cloud_PubSub_to_Splunk'

### Getting Started

Expand Down Expand Up @@ -74,6 +74,11 @@ $ terraform output dataflow_log_export_dashboad

2. Visit newly created Monitoring Dashboard in Cloud Console by replacing dashboard_id in the following URL: https://console.cloud.google.com/monitoring/dashboards/builder/{dashboard_id}

#### Deploy replay pipeline

In the `replay.tf` file, uncomment the code under `splunk_dataflow_replay` and follow the sequence of `terraform plan` and `terraform apply`.

Once the replay pipeline is no longer needed (the number of messages in the PubSub deadletter topic are at 0), comment out `splunk_dataflow_replay` and follow the `plan` and `apply` sequence above.

### Cleanup

Expand All @@ -86,7 +91,7 @@ $ terraform destroy

* Support KMS-encrypted HEC token
* Expose logging level knob
* Create replay pipeline
* ~~Create replay pipeline~~
* ~~Create secure network for self-contained setup if existing network is not provided~~
* ~~Add Cloud Monitoring dashboard~~

Expand Down
18 changes: 16 additions & 2 deletions main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,39 @@ provider "google" {
region = var.region
}

# Generate new random hex to be used for bucket name
resource "random_id" "bucket_suffix" {
byte_length = 4
}

# Generate new random id each time we switch to a new template version id,
# to be used for pipeline job name to force job replacement vs in-place update
resource "random_id" "dataflow_job_instance" {
byte_length = 2
keepers = {
dataflow_template_version = var.dataflow_template_version
}
}

locals {
dataflow_temporary_gcs_bucket_name = "${var.project}-${var.dataflow_job_name}-${random_id.bucket_suffix.hex}"
dataflow_temporary_gcs_bucket_path = "tmp/"

dataflow_splunk_template_gcs_path = "gs://dataflow-templates/${var.dataflow_template_version}/Cloud_PubSub_to_Splunk"
dataflow_pubsub_template_gcs_path = "gs://dataflow-templates/${var.dataflow_template_version}/Cloud_PubSub_to_Cloud_PubSub"

subnet_name = coalesce(var.subnet, "${var.network}-${var.region}")
project_log_sink_name = "${var.dataflow_job_name}-project-log-sink"
organization_log_sink_name = "${var.dataflow_job_name}-organization-log-sink"

dataflow_main_job_name = "${var.dataflow_job_name}-main-${random_id.dataflow_job_instance.hex}"
dataflow_replay_job_name = "${var.dataflow_job_name}-replay-${random_id.dataflow_job_instance.hex}"

dataflow_input_topic_name = "${var.dataflow_job_name}-input-topic"
dataflow_input_subscription_name = "${var.dataflow_job_name}-input-subscription"
dataflow_output_deadletter_topic_name = "${var.dataflow_job_name}-deadletter-topic"
dataflow_output_deadletter_sub_name = "${var.dataflow_job_name}-deadletter-subscription"

dataflow_replay_job_name = "${var.dataflow_job_name}-replay"

# dataflow job parameters (not externalized for this project)
dataflow_job_include_pubsub_message = true
}
Expand Down
14 changes: 7 additions & 7 deletions monitoring.tf
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ resource "google_monitoring_dashboard" "splunk-export-pipeline-dashboard" {
"scorecard": {
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "metric.type=\"dataflow.googleapis.com/job/elements_produced_count\" resource.type=\"dataflow_job\" resource.label.\"job_name\"=\"${var.dataflow_job_name}\" metric.label.\"ptransform\"=\"WriteToSplunk/Create KV pairs/Inject Keys\"",
"filter": "metric.type=\"dataflow.googleapis.com/job/elements_produced_count\" resource.type=\"dataflow_job\" resource.label.\"job_name\"=\"${local.dataflow_main_job_name}\" metric.label.\"ptransform\"=\"WriteToSplunk/Create KV pairs/Inject Keys\"",
"aggregation": {
"alignmentPeriod": "3600s",
"perSeriesAligner": "ALIGN_SUM",
Expand Down Expand Up @@ -136,7 +136,7 @@ resource "google_monitoring_dashboard" "splunk-export-pipeline-dashboard" {
{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "metric.type=\"dataflow.googleapis.com/job/elements_produced_count\" resource.type=\"dataflow_job\" resource.label.\"job_name\"=\"${var.dataflow_job_name}\" metric.label.\"pcollection\"=\"WriteToSplunk/Create KV pairs/Inject Keys.out0\"",
"filter": "metric.type=\"dataflow.googleapis.com/job/elements_produced_count\" resource.type=\"dataflow_job\" resource.label.\"job_name\"=\"${local.dataflow_main_job_name}\" metric.label.\"pcollection\"=\"WriteToSplunk/Create KV pairs/Inject Keys.out0\"",
"aggregation": {
"alignmentPeriod": "60s",
"perSeriesAligner": "ALIGN_RATE"
Expand Down Expand Up @@ -196,7 +196,7 @@ resource "google_monitoring_dashboard" "splunk-export-pipeline-dashboard" {
"scorecard": {
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "metric.type=\"custom.googleapis.com/dataflow/outbound-successful-events\" resource.type=\"dataflow_job\" resource.label.\"job_name\"=\"${var.dataflow_job_name}\"",
"filter": "metric.type=\"custom.googleapis.com/dataflow/outbound-successful-events\" resource.type=\"dataflow_job\" resource.label.\"job_name\"=\"${local.dataflow_main_job_name}\"",
"aggregation": {
"alignmentPeriod": "60s",
"perSeriesAligner": "ALIGN_MAX",
Expand All @@ -216,7 +216,7 @@ resource "google_monitoring_dashboard" "splunk-export-pipeline-dashboard" {
{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "metric.type=\"custom.googleapis.com/dataflow/outbound-failed-events\" resource.type=\"dataflow_job\" resource.label.\"job_name\"=\"${var.dataflow_job_name}\"",
"filter": "metric.type=\"custom.googleapis.com/dataflow/outbound-failed-events\" resource.type=\"dataflow_job\" resource.label.\"job_name\"=\"${local.dataflow_main_job_name}\"",
"aggregation": {
"alignmentPeriod": "60s",
"perSeriesAligner": "ALIGN_MEAN",
Expand All @@ -230,7 +230,7 @@ resource "google_monitoring_dashboard" "splunk-export-pipeline-dashboard" {
{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "metric.type=\"custom.googleapis.com/dataflow/total-failed-messages\" resource.type=\"dataflow_job\" resource.label.\"job_name\"=\"${var.dataflow_job_name}\"",
"filter": "metric.type=\"custom.googleapis.com/dataflow/total-failed-messages\" resource.type=\"dataflow_job\" resource.label.\"job_name\"=\"${local.dataflow_main_job_name}\"",
"aggregation": {
"alignmentPeriod": "60s",
"perSeriesAligner": "ALIGN_MEAN",
Expand Down Expand Up @@ -460,7 +460,7 @@ resource "google_monitoring_dashboard" "splunk-export-pipeline-dashboard" {
{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "metric.type=\"dataflow.googleapis.com/job/current_num_vcpus\" resource.type=\"dataflow_job\" resource.label.\"job_name\"=\"${var.dataflow_job_name}\"",
"filter": "metric.type=\"dataflow.googleapis.com/job/current_num_vcpus\" resource.type=\"dataflow_job\" resource.label.\"job_name\"=\"${local.dataflow_main_job_name}\"",
"aggregation": {
"alignmentPeriod": "60s",
"perSeriesAligner": "ALIGN_MEAN",
Expand Down Expand Up @@ -526,7 +526,7 @@ resource "google_monitoring_dashboard" "splunk-export-pipeline-dashboard" {
{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "metric.type=\"dataflow.googleapis.com/job/system_lag\" resource.type=\"dataflow_job\" resource.label.\"job_name\"=\"${var.dataflow_job_name}\"",
"filter": "metric.type=\"dataflow.googleapis.com/job/system_lag\" resource.type=\"dataflow_job\" resource.label.\"job_name\"=\"${local.dataflow_main_job_name}\"",
"aggregation": {
"alignmentPeriod": "60s",
"perSeriesAligner": "ALIGN_MEAN",
Expand Down
11 changes: 2 additions & 9 deletions pipeline.tf
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,9 @@ resource "google_storage_bucket_object" "dataflow_job_temp_object" {
bucket = google_storage_bucket.dataflow_job_temp_bucket.name
}

resource "random_id" "dataflow_job_instance" {
byte_length = 2
keepers = {
template_gcs_path = var.dataflow_template_path
}
}

resource "google_dataflow_job" "dataflow_job" {
name = "${var.dataflow_job_name}-${random_id.dataflow_job_instance.hex}"
template_gcs_path = random_id.dataflow_job_instance.keepers.template_gcs_path
name = local.dataflow_main_job_name
template_gcs_path = local.dataflow_splunk_template_gcs_path
temp_gcs_location = "gs://${local.dataflow_temporary_gcs_bucket_name}/${local.dataflow_temporary_gcs_bucket_path}"
machine_type = var.dataflow_job_machine_type
max_workers = var.dataflow_job_machine_count
Expand Down
44 changes: 44 additions & 0 deletions replay.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

/*
The replay job should stay commented out while the main export pipeline is initially deployed.
When the replay job needs to be run, simply uncomment the module and deploy the replay pipeline.
From the CLI, this may look like `terraform apply -target="google_dataflow_job.splunk_dataflow_replay"`
After the deadletter Pub/Sub topic has no more messages, comment out the module and run a regular terraform deployment (ex. terraform apply). Terraform will automatically destroy the replay job.
`terraform apply -target` usage documentation is here: https://www.terraform.io/docs/cli/commands/apply.html
*/

/*
resource "google_dataflow_job" "splunk_dataflow_replay" {
name = local.dataflow_replay_job_name
template_gcs_path = local.dataflow_pubsub_template_gcs_path
temp_gcs_location = "gs://${local.dataflow_temporary_gcs_bucket_name}/${local.dataflow_temporary_gcs_bucket_path}"
machine_type = var.dataflow_job_machine_type
max_workers = var.dataflow_job_machine_count
parameters = {
inputSubscription = google_pubsub_subscription.dataflow_deadletter_pubsub_sub.id
outputTopic = google_pubsub_topic.dataflow_input_pubsub_topic.id
}
region = var.region
network = var.network
subnetwork = "regions/${var.region}/subnetworks/${local.subnet_name}"
ip_configuration = "WORKER_IP_PRIVATE"
depends_on = [
google_compute_subnetwork.splunk_subnet
]
}
*/
12 changes: 4 additions & 8 deletions variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,10 @@ variable "splunk_hec_token" {

# Dataflow job parameters

variable "dataflow_template_path" {
description = "Dataflow template path. Defaults to latest version of Google-hosted Pub/Sub to Splunk template"
default = "gs://dataflow-templates/latest/Cloud_PubSub_to_Splunk"

validation {
condition = can(regex("gs://.+", var.dataflow_template_path))
error_message = "Splunk Dataflow template path must be a GCS object path gs://<bucket_name>/<path> ."
}
variable "dataflow_template_version" {
type = string
description = "Dataflow template version for the replay job."
default = "latest"
}

variable "dataflow_job_name" {
Expand Down

0 comments on commit 130bca7

Please sign in to comment.