Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compress_batches feature #1

Merged
merged 8 commits into from
May 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
---
inherit_from: .rubocop_todo.yml

AllCops:
# Matches the minimum version in .travis.yml
TargetRubyVersion: 2.4

Style/StringLiterals:
EnforcedStyle: "double_quotes"

# New cops: https://docs.rubocop.org/en/latest/versioning/
Layout/EmptyLinesAroundAttributeAccessor:
Enabled: true

Layout/SpaceAroundMethodCallOperator:
Enabled: true

Lint/RaiseException:
Enabled: true

Lint/StructNewOverride:
Enabled: true

Style/ExponentialNotation:
Enabled: true

Style/HashEachMethods:
Enabled: true

Style/HashTransformKeys:
Enabled: true

Style/HashTransformValues:
Enabled: true

Style/SlicingWithRange:
Enabled: true

Style/TrailingCommaInHashLiteral:
EnforcedStyleForMultiline: comma

Style/TrailingCommaInArrayLiteral:
EnforcedStyleForMultiline: comma

Style/TrailingCommaInArguments:
EnforcedStyleForMultiline: comma
98 changes: 98 additions & 0 deletions .rubocop_todo.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
---
# This configuration was generated by
# `rubocop --auto-gen-config`
# on 2020-05-12 17:34:19 +0100 using RuboCop version 0.83.0.
# The point is for the user to remove these configuration records
# one by one as the offenses are removed from the code base.
# Note that changes in the inspected code, or installation of new
# versions of RuboCop, may require this file to be generated again.

# Offense count: 3
# Configuration parameters: IgnoredMethods.
Metrics/AbcSize:
Max: 50

Metrics/BlockLength:
Enabled: false

Metrics/ClassLength:
Enabled: false

Metrics/MethodLength:
Max: 30

# Offense count: 1
# Configuration parameters: IgnoredPatterns.
# SupportedStyles: snake_case, camelCase
Naming/MethodName:
EnforcedStyle: snake_case

# Offense count: 3
# Cop supports --auto-correct.
# Configuration parameters: AutoCorrect, EnforcedStyle.
# SupportedStyles: nested, compact
Style/ClassAndModuleChildren:
Exclude:
- 'lib/fluent/plugin/in_gcloud_pubsub.rb'
- 'lib/fluent/plugin/out_gcloud_pubsub.rb'
- 'test/test_helper.rb'

# Offense count: 6
Style/Documentation:
Exclude:
- 'spec/**/*'
- 'test/**/*'
- 'lib/fluent/plugin/gcloud_pubsub/client.rb'
- 'lib/fluent/plugin/in_gcloud_pubsub.rb'
- 'lib/fluent/plugin/out_gcloud_pubsub.rb'

# Offense count: 1
# Configuration parameters: AllowedVariables.
Style/GlobalVars:
Exclude:
- 'test/test_helper.rb'

# Offense count: 1
# Configuration parameters: MinBodyLength.
Style/GuardClause:
Exclude:
- 'lib/fluent/plugin/gcloud_pubsub/client.rb'

# Offense count: 2
# Cop supports --auto-correct.
Style/IfUnlessModifier:
Exclude:
- 'lib/fluent/plugin/gcloud_pubsub/client.rb'

# Offense count: 1
Style/MethodMissingSuper:
Exclude:
- 'test/test_helper.rb'

# Offense count: 1
Style/MissingRespondToMissing:
Exclude:
- 'test/test_helper.rb'

# Offense count: 260
# Cop supports --auto-correct.
# Configuration parameters: EnforcedStyle, ConsistentQuotesInMultiline.
# SupportedStyles: single_quotes, double_quotes
Style/StringLiterals:
Exclude:
- 'Gemfile'
- 'Rakefile'
- 'fluent-plugin-gcloud-pubsub-custom.gemspec'
- 'lib/fluent/plugin/gcloud_pubsub/client.rb'
- 'lib/fluent/plugin/in_gcloud_pubsub.rb'
- 'lib/fluent/plugin/out_gcloud_pubsub.rb'
- 'test/plugin/test_in_gcloud_pubsub.rb'
- 'test/plugin/test_out_gcloud_pubsub.rb'
- 'test/test_helper.rb'

# Offense count: 36
# Cop supports --auto-correct.
# Configuration parameters: AutoCorrect, AllowHeredoc, AllowURI, URISchemes, IgnoreCopDirectives, IgnoredPatterns.
# URISchemes: http, https
Layout/LineLength:
Max: 120
1 change: 1 addition & 0 deletions .ruby-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2.6.5
13 changes: 8 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
language: ruby

rvm:
- 2.4.6
- 2.5.5
- 2.6.3
- 2.5
- 2.6
- 2.7
- ruby-head

gemfile:
- Gemfile
- Gemfile

branches:
only:
- master
- gocardless

before_install: gem update bundler
script: bundle exec rake test
script:
- bundle exec rake test
- bundle exec rubocop

sudo: false

Expand Down
4 changes: 3 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
source 'https://rubygems.org'
# frozen_string_literal: true

source "https://rubygems.org"

gemspec
75 changes: 73 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Use `gcloud_pubsub` output plugin.
max_messages 1000
max_total_size 9800000
max_message_size 4000000
compress_batches false
<buffer>
@type memory
flush_interval 1s
Expand Down Expand Up @@ -92,7 +93,11 @@ Use `gcloud_pubsub` output plugin.
- `max_message_size` (optional, default: `4000000` = `4MB`)
- Messages exceeding `max_message_size` are not published because Pub/Sub clients cannot receive it.
- `attribute_keys` (optional, default: `[]`)
- Publishing the set fields as attributes.
- Extract these fields from the record and send them as attributes on the Pub/Sub message. Cannot be set if compress_batches is enabled.
- `metric_prefix` (optional, default: `fluentd_output_gcloud_pubsub`)
- The prefix for Prometheus metric names
- `compress_batches` (optional, default: `false`)
- If set to `true`, messages will be batched and compressed before publication. See [message compression](#message-compression) for details.

### Pull messages

Expand Down Expand Up @@ -147,18 +152,84 @@ Use `gcloud_pubsub` input plugin.
- `pull_threads` (optional, default: `1`)
- Set number of threads to pull messages.
- `attribute_keys` (optional, default: `[]`)
- Specify the key of the attribute to be emitted as the field of record.
- Acquire these fields from attributes on the Pub/Sub message and merge them into the record.
- `parse_error_action` (optional, default: `exception`)
- Set error type when parsing messages fails.
- `exception`: Raise exception. Messages are not acknowledged.
- `warning`: Only logging as warning.
- `metric_prefix` (optional, default: `fluentd_input_gcloud_pubsub`)
- The prefix for Prometheus metric names
- `enable_rpc` (optional, default: `false`)
- If `true` is specified, HTTP RPC to stop or start pulling message is enabled.
- `rpc_bind` (optional, default: `0.0.0.0`)
- Bind IP address for HTTP RPC.
- `rpc_port` (optional, default: `24680`)
- Port for HTTP RPC.

## Message compression

The `compress_batches` option can be used to enable the compression of messages
_before_ publication to Pub/Sub.

This works by collecting the buffered messages, taking up to `max_total_size` or
`max_message_size` input records, then compressing them with Zlib (i.e.
gzip/Deflate) before publishing them as a single message to the Pub/Sub topic.

When transporting large volumes of records via Pub/Sub, e.g. multiple Terabytes
per month, this can lead to significant cost savings, as typically the CPU time
required to compress the messages will be minimal in comparison to the Pub/Sub
costs.

The compression ratio achievable will vary largely depending on the homogeneity
of the input records, but typically will be 50% at the very minimum and often
around 80-90%.

In order to achieve good compression, consider the following:
- Ensure that the buffer is being filled with a reasonable batch of messages: do
not use `flush_mode immediate`, and keep the `flush_interval` value
sufficiently high. Use the Prometheus metrics to determine how many records
are being published per message.
- Keep the `max_messages` and `max_message_size` values high (the defaults are
optimal).
- If there are many different sources of messages being mixed and routed to a
single `gcloud_pubsub` output, use multiple outputs (which will each have
their own buffer) through tagging or [labelling][fluentd-labels].

[fluentd-labels]: https://docs.fluentd.org/quickstart/life-of-a-fluentd-event#labels

The receiving end must be able to decode these compressed batches of messages,
which it can determine via an attribute set on the Pub/Sub message. The
`gcloud_pubsub` input plugin will do this transparently, decompressing any
messages which contain a batch of records and normally processing any messages
which represent just a single record.
Therefore, as long as all of the receivers are updated with support for
compressed batches first, it's then possible to gradually roll out this feature.

## Prometheus metrics

The input and output plugins expose several metrics in order to monitor
performance:

- `fluentd_output_gcloud_pubsub_compression_enabled`
- Gauge: Whether compression/batching is enabled
- `fluentd_output_gcloud_pubsub_messages_published_per_batch`
- Histogram: Number of records published to Pub/Sub per buffer flush
- `fluentd_output_gcloud_pubsub_messages_published_bytes`
- Histogram: Total size in bytes of the records published to Pub/Sub
- `fluentd_output_gcloud_pubsub_messages_compression_duration_seconds`
- Histogram: Time taken to compress a batch of messages
- `fluentd_output_gcloud_pubsub_messages_messages_compressed_size_per_original_size_ratio`
- Histogram: Compression ratio achieved on a batch of messages, expressed in
terms of space saved.

- `fluentd_input_gcloud_pubsub_pull_errors_total`
- Counter: Errors encountered while pulling or processing messages (split by a
`retryable` label)
- `fluentd_input_gcloud_pubsub_messages_pulled`
- Histogram: Number of Pub/Sub messages pulled by the subscriber on each invocation
- `fluentd_input_gcloud_pubsub_messages_pulled_bytes`
- Histogram: Total size in bytes of the Pub/Sub messages pulled by the subscriber on each invocation

## Contributing

1. Fork it
Expand Down
12 changes: 7 additions & 5 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
require 'bundler'
# frozen_string_literal: true

require "bundler"
Bundler::GemHelper.install_tasks

require 'rake/testtask'
require "rake/testtask"

Rake::TestTask.new(:test) do |test|
test.libs << 'lib' << 'test'
test.test_files = FileList['test/plugin/test_*.rb']
test.libs << "lib" << "test"
test.test_files = FileList["test/plugin/test_*.rb"]
test.verbose = true
end

task :default => [:build]
task default: [:build]
16 changes: 11 additions & 5 deletions fluent-plugin-gcloud-pubsub-custom.gemspec
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# encoding: utf-8
$:.push File.expand_path('../lib', __FILE__)
# frozen_string_literal: true

$LOAD_PATH.push File.expand_path("lib", __dir__)

Gem::Specification.new do |gem|
gem.name = "fluent-plugin-gcloud-pubsub-custom"
Expand All @@ -10,17 +11,22 @@ Gem::Specification.new do |gem|
gem.version = "1.3.2"
gem.authors = ["Yoshihiro MIYAI"]
gem.email = "msparrow17@gmail.com"
gem.has_rdoc = false
gem.files = `git ls-files`.split("\n")
gem.test_files = `git ls-files -- {test,spec,features}/*`.split("\n")
gem.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) }
gem.require_paths = ['lib']
gem.executables = `git ls-files -- bin/*`.split("\n").map { |f| File.basename(f) }
gem.require_paths = ["lib"]

gem.add_runtime_dependency "fluentd", [">= 0.14.15", "< 2"]
gem.add_runtime_dependency "google-cloud-pubsub", "~> 0.30.0"

# Use the same version constraint as fluent-plugin-prometheus currently specifies
gem.add_runtime_dependency "prometheus-client", "< 0.10"

gem.add_development_dependency "bundler"
gem.add_development_dependency "pry"
gem.add_development_dependency "pry-byebug"
gem.add_development_dependency "rake"
gem.add_development_dependency "rubocop", "~>0.83"
gem.add_development_dependency "test-unit"
gem.add_development_dependency "test-unit-rr"
end
Loading