Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add internal/otelarrow package for shared code between otelarrow exporter, receiver #33579

Merged
merged 32 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9e00bf2
Internal package draft.
jmacd Jun 14, 2024
8ba5cb6
codeowners, versions
jmacd Jun 14, 2024
ccd157e
update otelarrow exporter
jmacd Jun 14, 2024
abf0398
otelarrow receiver
jmacd Jun 14, 2024
ffbdd88
lint
jmacd Jun 14, 2024
e4cb5a4
Add changelog
jmacd Jun 14, 2024
cda097c
edit
jmacd Jun 14, 2024
372c417
tidier
jmacd Jun 14, 2024
b2d1cc6
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Jun 17, 2024
ead7857
revert components
jmacd Jun 17, 2024
22c8a36
tidy
jmacd Jun 17, 2024
702a1a6
lint
jmacd Jun 17, 2024
9d1338e
argggh gomod
jmacd Jun 17, 2024
ef4ea8c
goporto
jmacd Jun 17, 2024
7654e56
Merge branch 'main' into jmacd/internalarrow
jmacd Jun 18, 2024
2804a9f
tidy
jmacd Jun 18, 2024
2eb27fa
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Jun 20, 2024
346b4b3
mod update
jmacd Jun 20, 2024
c9c73b5
Merge branch 'main' into jmacd/internalarrow
jmacd Jun 21, 2024
893b772
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Jul 8, 2024
3504b7a
tidy
jmacd Jul 8, 2024
3921073
Merge branch 'jmacd/internalarrow' of github.com:jmacd/opentelemetry-…
jmacd Jul 8, 2024
f717a98
Merge branch 'main' into jmacd/internalarrow
codeboten Jul 12, 2024
6312574
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Jul 15, 2024
8272b13
tidy
jmacd Jul 15, 2024
1f63d7c
Merge branch 'jmacd/internalarrow' of github.com:jmacd/opentelemetry-…
jmacd Jul 15, 2024
920cde0
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Jul 16, 2024
1095b64
Apply suggestions from code review
jmacd Jul 16, 2024
0bceca9
Merge branch 'jmacd/internalarrow' of github.com:jmacd/opentelemetry-…
jmacd Jul 16, 2024
bf321b5
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Jul 16, 2024
11ca343
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Jul 17, 2024
268c53f
update from https://github.com/open-telemetry/otel-arrow/pull/231
jmacd Jul 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/otelarrow-internals.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otelarrowreceiver, otelarrowexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: OTel-Arrow internal packages moved into this repository.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33567]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: New integration testing between otelarrowexporter and otelarrowreceiver.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ internal/k8stest/ @open-teleme
internal/kafka/ @open-telemetry/collector-contrib-approvers @pavolloffay @MovieStoreGuy
internal/kubelet/ @open-telemetry/collector-contrib-approvers @dmitryax
internal/metadataproviders/ @open-telemetry/collector-contrib-approvers @Aneurysm9 @dashpole
internal/otelarrow/ @open-telemetry/collector-contrib-approvers @jmacd @moh-osman3
internal/pdatautil/ @open-telemetry/collector-contrib-approvers @djaglowski
internal/sharedcomponent/ @open-telemetry/collector-contrib-approvers @open-telemetry/collector-approvers
internal/splunk/ @open-telemetry/collector-contrib-approvers @dmitryax
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ body:
- internal/kafka
- internal/kubelet
- internal/metadataproviders
- internal/otelarrow
- internal/pdatautil
- internal/sharedcomponent
- internal/splunk
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ body:
- internal/kafka
- internal/kubelet
- internal/metadataproviders
- internal/otelarrow
- internal/pdatautil
- internal/sharedcomponent
- internal/splunk
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ body:
- internal/kafka
- internal/kubelet
- internal/metadataproviders
- internal/otelarrow
- internal/pdatautil
- internal/sharedcomponent
- internal/splunk
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/unmaintained.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ body:
- internal/kafka
- internal/kubelet
- internal/metadataproviders
- internal/otelarrow
- internal/pdatautil
- internal/sharedcomponent
- internal/splunk
Expand Down
1 change: 1 addition & 0 deletions internal/otelarrow/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
20 changes: 20 additions & 0 deletions internal/otelarrow/admission/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Admission Package

## Overview

The admission package provides a BoundedQueue object which is a semaphore implementation that limits the number of bytes admitted into a collector pipeline. Additionally the BoundedQueue limits the number of waiters that can block on a call to `bq.Acquire(sz int64)`.

This package is an experiment to improve the behavior of Collector pipelines having their `exporterhelper` configured to apply backpressure. This package is meant to be used in receivers, via an interceptor or custom logic. Therefore, the BoundedQueue helps limit memory within the entire collector pipeline by limiting two dimensions that cause memory issues:
1. bytes: large requests that enter the collector pipeline can require large allocations even if downstream components will eventually limit or ratelimit the request.
2. waiters: limiting on bytes alone is not enough because requests that enter the pipeline and block on `bq.Acquire()` can still consume memory within the receiver. If there are enough waiters this can be a significant contribution to memory usage.

## Usage

Create a new BoundedQueue by calling `bq := admission.NewBoundedQueue(maxLimitBytes, maxLimitWaiters)`

Within the component call `bq.Acquire(ctx, requestSize)` which will either
1. succeed immediately if there is enough available memory
2. fail immediately if there are too many waiters
3. block until context cancelation or enough bytes becomes available

Once a request has finished processing and is sent downstream call `bq.Release(requestSize)` to allow waiters to be admitted for processing. Release should only fail if releasing more bytes than previously acquired.
152 changes: 152 additions & 0 deletions internal/otelarrow/admission/boundedqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package admission // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission"

import (
"context"
"fmt"
"sync"

"github.com/google/uuid"
orderedmap "github.com/wk8/go-ordered-map/v2"
)

var ErrTooManyWaiters = fmt.Errorf("rejecting request, too many waiters")

type BoundedQueue struct {
maxLimitBytes int64
maxLimitWaiters int64
currentBytes int64
currentWaiters int64
lock sync.Mutex
waiters *orderedmap.OrderedMap[uuid.UUID, waiter]
}

type waiter struct {
readyCh chan struct{}
pendingBytes int64
ID uuid.UUID
}

func NewBoundedQueue(maxLimitBytes, maxLimitWaiters int64) *BoundedQueue {
return &BoundedQueue{
maxLimitBytes: maxLimitBytes,
maxLimitWaiters: maxLimitWaiters,
waiters: orderedmap.New[uuid.UUID, waiter](),
}
}

func (bq *BoundedQueue) admit(pendingBytes int64) (bool, error) {
bq.lock.Lock()
defer bq.lock.Unlock()

if pendingBytes > bq.maxLimitBytes { // will never succeed
return false, fmt.Errorf("rejecting request, request size larger than configured limit")
}

if bq.currentBytes+pendingBytes <= bq.maxLimitBytes { // no need to wait to admit
bq.currentBytes += pendingBytes
return true, nil
}

// since we were unable to admit, check if we can wait.
if bq.currentWaiters+1 > bq.maxLimitWaiters { // too many waiters
return false, ErrTooManyWaiters
}

// if we got to this point we need to wait to acquire bytes, so update currentWaiters before releasing mutex.
bq.currentWaiters++
return false, nil
}

func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error {
success, err := bq.admit(pendingBytes)
if err != nil || success {
return err
}

// otherwise we need to wait for bytes to be released
curWaiter := waiter{
pendingBytes: pendingBytes,
readyCh: make(chan struct{}),
}

bq.lock.Lock()

// generate unique key
for {
id := uuid.New()
_, keyExists := bq.waiters.Get(id)
if keyExists {
continue
}
bq.waiters.Set(id, curWaiter)
curWaiter.ID = id
break
}

bq.lock.Unlock()
// @@@ instrument this code path

select {
case <-curWaiter.readyCh:
return nil
case <-ctx.Done():
// canceled before acquired so remove waiter.
bq.lock.Lock()
defer bq.lock.Unlock()
err = fmt.Errorf("context canceled: %w ", ctx.Err())

_, found := bq.waiters.Delete(curWaiter.ID)
if !found {
return err
}

bq.currentWaiters--
return err
}
}

func (bq *BoundedQueue) Release(pendingBytes int64) error {
bq.lock.Lock()
defer bq.lock.Unlock()

bq.currentBytes -= pendingBytes

if bq.currentBytes < 0 {
return fmt.Errorf("released more bytes than acquired")
}

for {
if bq.waiters.Len() == 0 {
return nil
}
next := bq.waiters.Oldest()
nextWaiter := next.Value
nextKey := next.Key
if bq.currentBytes+nextWaiter.pendingBytes <= bq.maxLimitBytes {
bq.currentBytes += nextWaiter.pendingBytes
bq.currentWaiters--
close(nextWaiter.readyCh)
_, found := bq.waiters.Delete(nextKey)
if !found {
return fmt.Errorf("deleting waiter that doesn't exist")
}
continue
}
break
}

return nil
}

func (bq *BoundedQueue) TryAcquire(pendingBytes int64) bool {
bq.lock.Lock()
defer bq.lock.Unlock()
if bq.currentBytes+pendingBytes <= bq.maxLimitBytes {
bq.currentBytes += pendingBytes
return true
}
return false
}
Loading