Skip to content

Commit

Permalink
Persistent storage in queued_retry, backed by file storage extension (#…
Browse files Browse the repository at this point in the history
…3274)

Persistent queue implementation within queued_retry, aimed at being compatible with Jager's [BoundedQueue](https://github.com/jaegertracing/jaeger/blob/master/pkg/queue/bounded_queue.go) interface (providing a simple replacement) and backed by [file storage extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage) for storing WAL.

Currently, to run the persistent queue, OpenTelemetry Collector Contrib with `enable_unstable` build tag is required.

**Link to tracking Issue:** #2285 

[Design doc](https://docs.google.com/document/d/1Y4vNthCGdYI61ezeAzL5dXWgiZ73y9eSjIDitk3zXsU/edit#)

**Testing:** Unit Tests and manual testing, more to come

**Documentation:** README.md updated, including an example
  • Loading branch information
pmm-sumo authored Sep 10, 2021
1 parent 47e125a commit eca1ba7
Show file tree
Hide file tree
Showing 21 changed files with 2,035 additions and 105 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ jobs:
- attach_to_workspace
- run:
name: Build collector for all archs
command: grep ^binaries-all-sys Makefile|fmt -w 1|tail -n +2|circleci tests split|xargs make
command: grep ^binaries-all-sys Makefile|fmt -w 1|grep -v binaries-all-sys|circleci tests split|xargs make
- run:
name: Log checksums to console
command: shasum -a 256 bin/*
Expand Down
10 changes: 10 additions & 0 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,13 @@ jobs:
with:
name: collector-binaries
path: ./bin.tar
- name: Build Unstable Collector for All Architectures
run: make binaries-all-sys-unstable
- name: Create Unstable Collector Binaries Archive
run: tar -cvf bin-unstable.tar ./bin/*unstable
- name: Upload Unstable Collector Binaries
uses: actions/upload-artifact@v2.2.4
with:
name: collector-binaries-unstable
path: ./bin-unstable.tar

36 changes: 34 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ gotestinstall:

.PHONY: gotest
gotest:
@$(MAKE) for-all CMD="make test"
@$(MAKE) for-all CMD="make test test-unstable"

.PHONY: gobenchmark
gobenchmark:
Expand All @@ -81,7 +81,7 @@ gotest-with-cover:

.PHONY: golint
golint:
@$(MAKE) for-all CMD="make lint"
@$(MAKE) for-all CMD="make lint lint-unstable"

.PHONY: goimpi
goimpi:
Expand Down Expand Up @@ -148,6 +148,11 @@ otelcol:
go generate ./...
$(MAKE) build-binary-internal

.PHONY: otelcol-unstable
otelcol-unstable:
go generate ./...
$(MAKE) build-binary-internal-unstable

.PHONY: run
run:
GO111MODULE=on go run --race ./cmd/otelcol/... --config ${RUN_CONFIG} ${RUN_ARGS}
Expand Down Expand Up @@ -213,6 +218,9 @@ docker-otelcol:
.PHONY: binaries-all-sys
binaries-all-sys: binaries-darwin_amd64 binaries-darwin_arm64 binaries-linux_amd64 binaries-linux_arm64 binaries-windows_amd64

.PHONY: binaries-all-sys-unstable
binaries-all-sys-unstable: binaries-darwin_amd64-unstable binaries-darwin_arm64-unstable binaries-linux_amd64-unstable binaries-linux_arm64-unstable binaries-windows_amd64-unstable

.PHONY: binaries-darwin_amd64
binaries-darwin_amd64:
GOOS=darwin GOARCH=amd64 $(MAKE) build-binary-internal
Expand All @@ -233,10 +241,34 @@ binaries-linux_arm64:
binaries-windows_amd64:
GOOS=windows GOARCH=amd64 EXTENSION=.exe $(MAKE) build-binary-internal

.PHONY: binaries-darwin_amd64-unstable
binaries-darwin_amd64-unstable:
GOOS=darwin GOARCH=amd64 $(MAKE) build-binary-internal-unstable

.PHONY: binaries-darwin_arm64-unstable
binaries-darwin_arm64-unstable:
GOOS=darwin GOARCH=arm64 $(MAKE) build-binary-internal-unstable

.PHONY: binaries-linux_amd64-unstable
binaries-linux_amd64-unstable:
GOOS=linux GOARCH=amd64 $(MAKE) build-binary-internal-unstable

.PHONY: binaries-linux_arm64-unstable
binaries-linux_arm64-unstable:
GOOS=linux GOARCH=arm64 $(MAKE) build-binary-internal-unstable

.PHONY: binaries-windows_amd64-unstable
binaries-windows_amd64-unstable:
GOOS=windows GOARCH=amd64 EXTENSION=.exe $(MAKE) build-binary-internal-unstable

.PHONY: build-binary-internal
build-binary-internal:
GO111MODULE=on CGO_ENABLED=0 go build -trimpath -o ./bin/otelcol_$(GOOS)_$(GOARCH)$(EXTENSION) $(BUILD_INFO) ./cmd/otelcol

.PHONY: build-binary-internal-unstable
build-binary-internal-unstable:
GO111MODULE=on CGO_ENABLED=0 go build -trimpath -o ./bin/otelcol_$(GOOS)_$(GOARCH)$(EXTENSION)_unstable $(BUILD_INFO) -tags enable_unstable ./cmd/otelcol

.PHONY: deb-rpm-package
%-package: ARCH ?= amd64
%-package:
Expand Down
8 changes: 8 additions & 0 deletions Makefile.Common
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ IMPI=impi
test:
@echo $(ALL_PKGS) | xargs -n 10 $(GOTEST) $(GOTEST_OPT)

.PHONY: test-unstable
test-unstable:
@echo $(ALL_PKGS) | xargs -n 10 $(GOTEST) $(GOTEST_OPT) -tags enable_unstable

.PHONY: benchmark
benchmark:
$(GOTEST) -bench=. -run=notests ./...
Expand All @@ -25,6 +29,10 @@ fmt:
lint:
$(LINT) run --allow-parallel-runners

.PHONY: lint-unstable
lint-unstable:
$(LINT) run --allow-parallel-runners --build-tags enable_unstable

.PHONY: impi
impi:
@$(IMPI) --local go.opentelemetry.io/collector --scheme stdThirdPartyLocal ./...
82 changes: 81 additions & 1 deletion exporter/exporterhelper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,92 @@ The following configuration options can be modified:
- `sending_queue`
- `enabled` (default = true)
- `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false`
- `queue_size` (default = 5000): Maximum number of batches kept in memory before data; ignored if `enabled` is `false`;
- `queue_size` (default = 5000): Maximum number of batches kept in memory or on disk (for persistent storage) before dropping; ignored if `enabled` is `false`
User should calculate this as `num_seconds * requests_per_second` where:
- `num_seconds` is the number of seconds to buffer in case of a backend outage
- `requests_per_second` is the average number of requests per seconds.
- `persistent_storage_enabled` (default = false): When set, enables persistence via a file storage extension
(note, `enable_unstable` build tag needs to be enabled first, see below for more details)
- `resource_to_telemetry_conversion`
- `enabled` (default = false): If `enabled` is `true`, all the resource attributes will be converted to metric labels by default.
- `timeout` (default = 5s): Time to wait per individual attempt to send data to a backend.

The full list of settings exposed for this helper exporter are documented [here](factory.go).

### Persistent Queue

**Status: under development**

> :warning: The capability is under development and currently can be enabled only in OpenTelemetry
> Collector Contrib with `enable_unstable` build tag set.
When `persistent_storage_enabled` is set to true, the queue is being buffered to disk using
[file storage extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage).
If collector instance is killed while having some items in the persistent queue, on restart the items are being picked and
the exporting is continued.

```
┌─Consumer #1─┐
│ ┌───┐ │
──────Deleted────── ┌───►│ │ 1 │ ├───► Success
Waiting in channel x x x │ │ └───┘ │
for consumer ───┐ x x x │ │ │
│ x x x │ └─────────────┘
▼ x x x │
┌─────────────────────────────────────────x─────x───┐ │ ┌─Consumer #2─┐
│ x x x │ │ │ ┌───┐ │
│ ┌───┐ ┌───┐ ┌───┐ ┌─x─┐ ┌───┐ ┌─x─┐ ┌─x─┐ │ │ │ │ 2 │ ├───► Permanent -> X
│ n+1 │ n │ ... │ 6 │ │ 5 │ │ 4 │ │ 3 │ │ 2 │ │ 1 │ ├────┼───►│ └───┘ │ failure
│ └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ │ │ │ │
│ │ │ └─────────────┘
└───────────────────────────────────────────────────┘ │
▲ ▲ ▲ ▲ │ ┌─Consumer #3─┐
│ │ │ │ │ │ ┌───┐ │
│ │ │ │ │ │ │ 3 │ ├───► (in progress)
write read └─────┬─────┘ ├───►│ └───┘ │
index index │ │ │ │
▲ │ │ └─────────────┘
│ │ │
│ currently │ ┌─Consumer #4─┐
│ dispatched │ │ ┌───┐ │ Temporary
│ └───►│ │ 4 │ ├───► failure
│ │ └───┘ │ │
│ │ │ │
│ └─────────────┘ │
│ ▲ │
│ └── Retry ───────┤
│ │
│ │
└────────────────────────────────────── Requeuing ◄────── Retry limit exceeded ───┘
```

Example:

```
receivers:
otlp:
protocols:
grpc:
exporters:
otlp:
endpoint: <ENDPOINT>
sending_queue:
persistent_storage_enabled: true
extensions:
file_storage:
directory: /var/lib/storage/otc
timeout: 10s
service:
extensions: [file_storage]
pipelines:
metrics:
receivers: [otlp]
exporters: [otlp]
logs:
receivers: [otlp]
exporters: [otlp]
traces:
receivers: [otlp]
exporters: [otlp]
```
28 changes: 24 additions & 4 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,26 @@ type request interface {
onError(error) request
// Returns the count of spans/metric points or log records.
count() int
// marshal serializes the current request into a byte stream
marshal() ([]byte, error)
// onProcessingFinished calls the optional callback function to handle cleanup after all processing is finished
onProcessingFinished()
// setOnProcessingFinished allows to set an optional callback function to do the cleanup (e.g. remove the item from persistent queue)
setOnProcessingFinished(callback func())
}

// requestUnmarshaler defines a function which takes a byte slice and unmarshals it into a relevant request
type requestUnmarshaler func([]byte) (request, error)

// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
type requestSender interface {
send(req request) error
}

// baseRequest is a base implementation for the request.
type baseRequest struct {
ctx context.Context
ctx context.Context
processingFinishedCallback func()
}

func (req *baseRequest) context() context.Context {
Expand All @@ -72,6 +82,16 @@ func (req *baseRequest) setContext(ctx context.Context) {
req.ctx = ctx
}

func (req *baseRequest) setOnProcessingFinished(callback func()) {
req.processingFinishedCallback = callback
}

func (req *baseRequest) onProcessingFinished() {
if req.processingFinishedCallback != nil {
req.processingFinishedCallback()
}
}

// baseSettings represents all the options that users can configure.
type baseSettings struct {
componentOptions []componenthelper.Option
Expand Down Expand Up @@ -159,7 +179,7 @@ type baseExporter struct {
qrSender *queuedRetrySender
}

func newBaseExporter(cfg config.Exporter, set component.ExporterCreateSettings, bs *baseSettings) *baseExporter {
func newBaseExporter(cfg config.Exporter, set component.ExporterCreateSettings, bs *baseSettings, signal config.DataType, reqUnmarshaler requestUnmarshaler) *baseExporter {
be := &baseExporter{
Component: componenthelper.New(bs.componentOptions...),
}
Expand All @@ -169,7 +189,7 @@ func newBaseExporter(cfg config.Exporter, set component.ExporterCreateSettings,
ExporterID: cfg.ID(),
ExporterCreateSettings: set,
})
be.qrSender = newQueuedRetrySender(cfg.ID().String(), bs.QueueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.qrSender = newQueuedRetrySender(cfg.ID(), signal, bs.QueueSettings, bs.RetrySettings, reqUnmarshaler, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.sender = be.qrSender

return be
Expand All @@ -189,7 +209,7 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
}

// If no error then start the queuedRetrySender.
return be.qrSender.start()
return be.qrSender.start(ctx, host)
}

// Shutdown all senders and exporter and is invoked during service shutdown.
Expand Down
16 changes: 15 additions & 1 deletion exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/model/pdata"
)

var (
Expand All @@ -37,7 +39,7 @@ var (
)

func TestBaseExporter(t *testing.T) {
be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions())
be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(), "", nopRequestUnmarshaler())
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
}
Expand All @@ -51,6 +53,8 @@ func TestBaseExporterWithOptions(t *testing.T) {
WithStart(func(ctx context.Context, host component.Host) error { return want }),
WithShutdown(func(ctx context.Context) error { return want }),
WithTimeout(DefaultTimeoutSettings())),
"",
nopRequestUnmarshaler(),
)
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
require.Equal(t, want, be.Shutdown(context.Background()))
Expand All @@ -64,3 +68,13 @@ func checkStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) {
require.Equal(t, codes.Unset, sd.Status().Code, "SpanData %v", sd)
}
}

func nopTracePusher() consumerhelper.ConsumeTracesFunc {
return func(ctx context.Context, ld pdata.Traces) error {
return nil
}
}

func nopRequestUnmarshaler() requestUnmarshaler {
return newTraceRequestUnmarshalerFunc(nopTracePusher())
}
32 changes: 32 additions & 0 deletions exporter/exporterhelper/consumers_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exporterhelper

// consumersQueue is largely based on queue.BoundedQueue and matches the subset used in the collector
// It describes a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue
// (queue.BoundedQueue) or via disk-based queue (persistentQueue)
type consumersQueue interface {
// StartConsumers starts a given number of goroutines consuming items from the queue
// and passing them into the consumer callback.
StartConsumers(num int, callback func(item interface{}))
// Produce is used by the producer to submit new item to the queue. Returns false if the item wasn't added
// to the queue due to queue overflow.
Produce(item interface{}) bool
// Stop stops all consumers, as well as the length reporter if started,
// and releases the items channel. It blocks until all consumers have stopped.
Stop()
// Size returns the current Size of the queue
Size() int
}
Loading

0 comments on commit eca1ba7

Please sign in to comment.