Skip to content

Commit

Permalink
Clarify that memory limiter refuses data, doesn't drop it
Browse files Browse the repository at this point in the history
Contributes to open-telemetry#1084

- Clarify what the memory limiter does.
- Set expectations from receivers, how they are supposed to react
  when the memory limiter refuses the data.

All receivers must adhere to this contract. See for example
an issue opened against filelog receiver:
open-telemetry/opentelemetry-collector-contrib#20511

Note that there are no functional changes to the memory limiter.

Future work: one additional thing we can do is implement a backoff
logic in the memory limiter. When in memory limited mode the processor
can introduce pauses before it returns from the ConsumeLogs/Traces/Metrics
call. This will allow to slow down the inflow of data into the Collector
and give time for the pipeline to clear and memory usage to return to the
normal. This needs to be explored further.
  • Loading branch information
tigrannajaryan committed Mar 31, 2023
1 parent d4c25d4 commit ba858b4
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 60 deletions.
28 changes: 20 additions & 8 deletions processor/memorylimiterprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,32 @@ on the configured processors, it is important to put checks in place regarding
memory usage.

The memory_limiter processor allows to perform periodic checks of memory
usage if it exceeds defined limits will begin dropping data and forcing GC to reduce
usage if it exceeds defined limits will begin refusing data and forcing GC to reduce
memory consumption.

The memory_limiter uses soft and hard memory limits. Hard limit is always above or equal
the soft limit.

When the memory usage exceeds the soft limit the processor will start dropping the data and
return errors to the preceding component it in the pipeline (which should be normally a
receiver).
When the memory usage exceeds the soft limit the processor will enter the memory limited
mode and will start refusing the data by returning errors to the preceding component
in the pipeline that made the ConsumeLogs/Trace/Metrics function call.
The preceding component should be normally a receiver.

When the memory usage is above the hard limit in addition to dropping the data the
In memory limited mode the error returned by ConsumeLogs/Trace/Metrics function is a
non-permanent error. When receivers see this error they are expected to retry sending
the same data. The receivers may also apply a backpressure to their data sources
in order to slow down the inflow of data into the Collector and allow the memory usage
to go below the limits.

>Warning: if the component preceding the memory limiter in the pipeline does not correctly
retry and send the data again after ConsumeLogs/Trace/Metrics functions return then that
data will be permanently lost. We consider such components incorrectly implemented.

When the memory usage is above the hard limit in addition to refusing the data the
processor will forcedly perform garbage collection in order to try to free memory.

When the memory usage drop below the soft limit, the normal operation is resumed (data
will not longer be dropped and no forced garbage collection will be performed).
will no longer be refused and no forced garbage collection will be performed).

The difference between the soft limit and hard limits is defined via `spike_limit_mib`
configuration option. The value of this option should be selected in a way that ensures
Expand All @@ -39,8 +50,9 @@ A good starting point for `spike_limit_mib` is 20% of the hard limit. Bigger
Note that while the processor can help mitigate out of memory situations,
it is not a replacement for properly sizing and configuring the
collector. Keep in mind that if the soft limit is crossed, the collector will
return errors to all receive operations until enough memory is freed. This will
result in dropped data.
return errors to all receive operations until enough memory is freed. This may
eventually result in dropped data since the receivers may not be able to hold back
and retry the data indefinitely.

It is highly recommended to configure `ballastextension` as well as the
`memory_limiter` processor on every collector. The ballast should be configured to
Expand Down
3 changes: 1 addition & 2 deletions processor/memorylimiterprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
// limitations under the License.

// Package memorylimiterprocessor provides a processor for OpenTelemetry Service pipeline
// that drops data on the pipeline according to the current state of memory
// usage.
// that refuses data on the pipeline according to the current state of memory usage.
package memorylimiterprocessor // import "go.opentelemetry.io/collector/processor/memorylimiterprocessor"

import (
Expand Down
46 changes: 23 additions & 23 deletions processor/memorylimiterprocessor/memorylimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ const (
)

var (
// errForcedDrop will be returned to callers of ConsumeTraceData to indicate
// that data is being dropped due to high memory usage.
errForcedDrop = errors.New("data dropped due to high memory usage")
// errDataRefused will be returned to callers of ConsumeTraceData to indicate
// that data is being refused due to high memory usage.
errDataRefused = errors.New("data refused due to high memory usage")

// Construction errors

Expand Down Expand Up @@ -70,8 +70,8 @@ type memoryLimiter struct {
memCheckWait time.Duration
ballastSize uint64

// forceDrop is used atomically to indicate when data should be dropped.
forceDrop *atomic.Bool
// mustRefuse is used to indicate when data should be refused.
mustRefuse *atomic.Bool

ticker *time.Ticker

Expand Down Expand Up @@ -129,7 +129,7 @@ func newMemoryLimiter(set processor.CreateSettings, cfg *Config) (*memoryLimiter
ticker: time.NewTicker(cfg.CheckInterval),
readMemStatsFn: runtime.ReadMemStats,
logger: logger,
forceDrop: &atomic.Bool{},
mustRefuse: &atomic.Bool{},
obsrep: obsrep,
}

Expand Down Expand Up @@ -180,15 +180,15 @@ func (ml *memoryLimiter) shutdown(context.Context) error {

func (ml *memoryLimiter) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {
numSpans := td.SpanCount()
if ml.forceDrop.Load() {
if ml.mustRefuse.Load() {
// TODO: actually to be 100% sure that this is "refused" and not "dropped"
// it is necessary to check the pipeline to see if this is directly connected
// to a receiver (ie.: a receiver is on the call stack). For now it
// assumes that the pipeline is properly configured and a receiver is on the
// callstack.
// callstack and that the receiver will correctly retry the refused data again.
ml.obsrep.TracesRefused(ctx, numSpans)

return td, errForcedDrop
return td, errDataRefused
}

// Even if the next consumer returns error record the data as accepted by
Expand All @@ -199,14 +199,14 @@ func (ml *memoryLimiter) processTraces(ctx context.Context, td ptrace.Traces) (p

func (ml *memoryLimiter) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
numDataPoints := md.DataPointCount()
if ml.forceDrop.Load() {
if ml.mustRefuse.Load() {
// TODO: actually to be 100% sure that this is "refused" and not "dropped"
// it is necessary to check the pipeline to see if this is directly connected
// to a receiver (ie.: a receiver is on the call stack). For now it
// assumes that the pipeline is properly configured and a receiver is on the
// callstack.
ml.obsrep.MetricsRefused(ctx, numDataPoints)
return md, errForcedDrop
return md, errDataRefused
}

// Even if the next consumer returns error record the data as accepted by
Expand All @@ -217,15 +217,15 @@ func (ml *memoryLimiter) processMetrics(ctx context.Context, md pmetric.Metrics)

func (ml *memoryLimiter) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) {
numRecords := ld.LogRecordCount()
if ml.forceDrop.Load() {
if ml.mustRefuse.Load() {
// TODO: actually to be 100% sure that this is "refused" and not "dropped"
// it is necessary to check the pipeline to see if this is directly connected
// to a receiver (ie.: a receiver is on the call stack). For now it
// assumes that the pipeline is properly configured and a receiver is on the
// callstack.
ml.obsrep.LogsRefused(ctx, numRecords)

return ld, errForcedDrop
return ld, errDataRefused
}

// Even if the next consumer returns error record the data as accepted by
Expand Down Expand Up @@ -288,33 +288,33 @@ func (ml *memoryLimiter) checkMemLimits() {
ms = ml.doGCandReadMemStats()
}

// Remember current dropping state.
wasForcingDrop := ml.forceDrop.Load()
// Remember current state.
wasRefusing := ml.mustRefuse.Load()

// Check if the memory usage is above the soft limit.
mustForceDrop := ml.usageChecker.aboveSoftLimit(ms)
mustRefuse := ml.usageChecker.aboveSoftLimit(ms)

if wasForcingDrop && !mustForceDrop {
// Was previously dropping but enough memory is available now, no need to limit.
if wasRefusing && !mustRefuse {
// Was previously refusing but enough memory is available now, no need to limit.
ml.logger.Info("Memory usage back within limits. Resuming normal operation.", memstatToZapField(ms))
}

if !wasForcingDrop && mustForceDrop {
if !wasRefusing && mustRefuse {
// We are above soft limit, do a GC if it wasn't done recently and see if
// it brings memory usage below the soft limit.
if time.Since(ml.lastGCDone) > minGCIntervalWhenSoftLimited {
ml.logger.Info("Memory usage is above soft limit. Forcing a GC.", memstatToZapField(ms))
ms = ml.doGCandReadMemStats()
// Check the limit again to see if GC helped.
mustForceDrop = ml.usageChecker.aboveSoftLimit(ms)
mustRefuse = ml.usageChecker.aboveSoftLimit(ms)
}

if mustForceDrop {
ml.logger.Warn("Memory usage is above soft limit. Dropping data.", memstatToZapField(ms))
if mustRefuse {
ml.logger.Warn("Memory usage is above soft limit. Refusing data.", memstatToZapField(ms))
}
}

ml.forceDrop.Store(mustForceDrop)
ml.mustRefuse.Store(mustRefuse)
}

type memUsageChecker struct {
Expand Down
54 changes: 27 additions & 27 deletions processor/memorylimiterprocessor/memorylimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) {
usageChecker: memUsageChecker{
memAllocLimit: 1024,
},
forceDrop: &atomic.Bool{},
mustRefuse: &atomic.Bool{},
readMemStatsFn: func(ms *runtime.MemStats) {
ms.Alloc = currentMemAlloc
},
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) {
// Above memAllocLimit.
currentMemAlloc = 1800
ml.checkMemLimits()
assert.Equal(t, errForcedDrop, mp.ConsumeMetrics(ctx, md))
assert.Equal(t, errDataRefused, mp.ConsumeMetrics(ctx, md))

// Check ballast effect
ml.ballastSize = 1000
Expand All @@ -153,7 +153,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) {
// Above memAllocLimit even accountiing for ballast.
currentMemAlloc = 1800 + ml.ballastSize
ml.checkMemLimits()
assert.Equal(t, errForcedDrop, mp.ConsumeMetrics(ctx, md))
assert.Equal(t, errDataRefused, mp.ConsumeMetrics(ctx, md))

// Restore ballast to default.
ml.ballastSize = 0
Expand All @@ -169,7 +169,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) {
// Above memSpikeLimit.
currentMemAlloc = 550
ml.checkMemLimits()
assert.Equal(t, errForcedDrop, mp.ConsumeMetrics(ctx, md))
assert.Equal(t, errDataRefused, mp.ConsumeMetrics(ctx, md))

}

Expand All @@ -181,7 +181,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) {
usageChecker: memUsageChecker{
memAllocLimit: 1024,
},
forceDrop: &atomic.Bool{},
mustRefuse: &atomic.Bool{},
readMemStatsFn: func(ms *runtime.MemStats) {
ms.Alloc = currentMemAlloc
},
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) {
// Above memAllocLimit.
currentMemAlloc = 1800
ml.checkMemLimits()
assert.Equal(t, errForcedDrop, tp.ConsumeTraces(ctx, td))
assert.Equal(t, errDataRefused, tp.ConsumeTraces(ctx, td))

// Check ballast effect
ml.ballastSize = 1000
Expand All @@ -222,7 +222,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) {
// Above memAllocLimit even accountiing for ballast.
currentMemAlloc = 1800 + ml.ballastSize
ml.checkMemLimits()
assert.Equal(t, errForcedDrop, tp.ConsumeTraces(ctx, td))
assert.Equal(t, errDataRefused, tp.ConsumeTraces(ctx, td))

// Restore ballast to default.
ml.ballastSize = 0
Expand All @@ -238,7 +238,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) {
// Above memSpikeLimit.
currentMemAlloc = 550
ml.checkMemLimits()
assert.Equal(t, errForcedDrop, tp.ConsumeTraces(ctx, td))
assert.Equal(t, errDataRefused, tp.ConsumeTraces(ctx, td))

}

Expand All @@ -250,7 +250,7 @@ func TestLogMemoryPressureResponse(t *testing.T) {
usageChecker: memUsageChecker{
memAllocLimit: 1024,
},
forceDrop: &atomic.Bool{},
mustRefuse: &atomic.Bool{},
readMemStatsFn: func(ms *runtime.MemStats) {
ms.Alloc = currentMemAlloc
},
Expand Down Expand Up @@ -278,7 +278,7 @@ func TestLogMemoryPressureResponse(t *testing.T) {
// Above memAllocLimit.
currentMemAlloc = 1800
ml.checkMemLimits()
assert.Equal(t, errForcedDrop, lp.ConsumeLogs(ctx, ld))
assert.Equal(t, errDataRefused, lp.ConsumeLogs(ctx, ld))

// Check ballast effect
ml.ballastSize = 1000
Expand All @@ -291,7 +291,7 @@ func TestLogMemoryPressureResponse(t *testing.T) {
// Above memAllocLimit even accountiing for ballast.
currentMemAlloc = 1800 + ml.ballastSize
ml.checkMemLimits()
assert.Equal(t, errForcedDrop, lp.ConsumeLogs(ctx, ld))
assert.Equal(t, errDataRefused, lp.ConsumeLogs(ctx, ld))

// Restore ballast to default.
ml.ballastSize = 0
Expand All @@ -307,7 +307,7 @@ func TestLogMemoryPressureResponse(t *testing.T) {
// Above memSpikeLimit.
currentMemAlloc = 550
ml.checkMemLimits()
assert.Equal(t, errForcedDrop, lp.ConsumeLogs(ctx, ld))
assert.Equal(t, errDataRefused, lp.ConsumeLogs(ctx, ld))
}

func TestGetDecision(t *testing.T) {
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestGetDecision(t *testing.T) {
})
}

func TestDropDecision(t *testing.T) {
func TestRefuseDecision(t *testing.T) {
decison1000Limit30Spike30, err := newPercentageMemUsageChecker(1000, 60, 30)
require.NoError(t, err)
decison1000Limit60Spike50, err := newPercentageMemUsageChecker(1000, 60, 50)
Expand All @@ -364,46 +364,46 @@ func TestDropDecision(t *testing.T) {
name string
usageChecker memUsageChecker
ms *runtime.MemStats
shouldDrop bool
shouldRefuse bool
}{
{
name: "should drop over limit",
name: "should refuse over limit",
usageChecker: *decison1000Limit30Spike30,
ms: &runtime.MemStats{Alloc: 600},
shouldDrop: true,
shouldRefuse: true,
},
{
name: "should not drop",
name: "should not refuse",
usageChecker: *decison1000Limit30Spike30,
ms: &runtime.MemStats{Alloc: 100},
shouldDrop: false,
shouldRefuse: false,
},
{
name: "should not drop spike, fixed usageChecker",
name: "should not refuse spike, fixed usageChecker",
usageChecker: memUsageChecker{
memAllocLimit: 600,
memSpikeLimit: 500,
},
ms: &runtime.MemStats{Alloc: 300},
shouldDrop: true,
ms: &runtime.MemStats{Alloc: 300},
shouldRefuse: true,
},
{
name: "should drop, spike, percentage usageChecker",
name: "should refuse, spike, percentage usageChecker",
usageChecker: *decison1000Limit60Spike50,
ms: &runtime.MemStats{Alloc: 300},
shouldDrop: true,
shouldRefuse: true,
},
{
name: "should drop, spike, percentage usageChecker",
name: "should refuse, spike, percentage usageChecker",
usageChecker: *decison1000Limit40Spike20,
ms: &runtime.MemStats{Alloc: 250},
shouldDrop: true,
shouldRefuse: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
shouldDrop := test.usageChecker.aboveSoftLimit(test.ms)
assert.Equal(t, test.shouldDrop, shouldDrop)
shouldRefuse := test.usageChecker.aboveSoftLimit(test.ms)
assert.Equal(t, test.shouldRefuse, shouldRefuse)
})
}
}
Expand Down

0 comments on commit ba858b4

Please sign in to comment.