Skip to content

Commit

Permalink
Use container/list for subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
mwear committed Jun 10, 2024
1 parent a3016a3 commit a0e93fe
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 31 deletions.
54 changes: 31 additions & 23 deletions extension/healthcheckv2extension/internal/status/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package status // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status"

import (
"container/list"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -81,13 +82,16 @@ type subscription struct {
verbosity Verbosity
}

// UnsubscribeFunc is a function used to unsubscribe from a stream.
type UnsubscribeFunc func()

// Aggregator records individual status events for components and aggregates statuses for the
// pipelines they belong to and the collector overall.
type Aggregator struct {
// mu protects aggregateStatus and subscriptions from concurrent modification
mu sync.RWMutex
aggregateStatus *AggregateStatus
subscriptions map[string][]*subscription
subscriptions map[string]*list.List
aggregationFunc aggregationFunc
}

Expand All @@ -98,7 +102,7 @@ func NewAggregator(errPriority ErrorPriority) *Aggregator {
Event: &component.StatusEvent{},
ComponentStatusMap: make(map[string]*AggregateStatus),
},
subscriptions: make(map[string][]*subscription),
subscriptions: make(map[string]*list.List),
aggregationFunc: newAggregationFunc(errPriority),
}
}
Expand Down Expand Up @@ -164,7 +168,8 @@ func (a *Aggregator) RecordStatus(source *component.InstanceID, event *component
// It is possible to subscribe to a pipeline that has not yet reported. An initial nil
// will be sent on the channel and events will start streaming if and when it starts reporting.
// A `Verbose` verbosity specifies that subtrees should be returned with the *AggregateStatus.
func (a *Aggregator) Subscribe(scope Scope, verbosity Verbosity) <-chan *AggregateStatus {
// To unsubscribe, call the returned UnsubscribeFunc.
func (a *Aggregator) Subscribe(scope Scope, verbosity Verbosity) (<-chan *AggregateStatus, UnsubscribeFunc) {
a.mu.Lock()
defer a.mu.Unlock()

Expand All @@ -180,42 +185,45 @@ func (a *Aggregator) Subscribe(scope Scope, verbosity Verbosity) <-chan *Aggrega
statusCh: make(chan *AggregateStatus, 1),
verbosity: verbosity,
}
subList, ok := a.subscriptions[key]
if !ok {
subList = list.New()
a.subscriptions[key] = subList
}
el := subList.PushBack(sub)

a.subscriptions[key] = append(a.subscriptions[key], sub)
sub.statusCh <- st

return sub.statusCh
}

// Unbsubscribe removes a stream from further status updates.
func (a *Aggregator) Unsubscribe(statusCh <-chan *AggregateStatus) {
a.mu.Lock()
defer a.mu.Unlock()

for scope, subs := range a.subscriptions {
for i, sub := range subs {
if sub.statusCh == statusCh {
a.subscriptions[scope] = append(subs[:i], subs[i+1:]...)
return
}
unsubFunc := func() {
subList.Remove(el)
if subList.Front() == nil {
delete(a.subscriptions, key)
}
}

sub.statusCh <- st

return sub.statusCh, unsubFunc
}

// Close terminates all existing subscriptions.
func (a *Aggregator) Close() {
a.mu.Lock()
defer a.mu.Unlock()

for _, subs := range a.subscriptions {
for _, sub := range subs {
for _, subList := range a.subscriptions {
for el := subList.Front(); el != nil; el = el.Next() {
sub := el.Value.(*subscription)
close(sub.statusCh)
}
}
}

func (a *Aggregator) notifySubscribers(scope Scope, status *AggregateStatus) {
for _, sub := range a.subscriptions[scope.toKey()] {
subList, ok := a.subscriptions[scope.toKey()]
if !ok {
return
}
for el := subList.Front(); el != nil; el = el.Next() {
sub := el.Value.(*subscription)
// clear unread events
select {
case <-sub.statusCh:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,14 @@ func TestStreaming(t *testing.T) {
traces := testhelpers.NewPipelineMetadata("traces")
metrics := testhelpers.NewPipelineMetadata("metrics")

traceEvents := agg.Subscribe(status.Scope(traces.PipelineID.String()), status.Concise)
metricEvents := agg.Subscribe(status.Scope(metrics.PipelineID.String()), status.Concise)
allEvents := agg.Subscribe(status.ScopeAll, status.Concise)
traceEvents, traceUnsub := agg.Subscribe(status.Scope(traces.PipelineID.String()), status.Concise)
defer traceUnsub()

metricEvents, metricUnsub := agg.Subscribe(status.Scope(metrics.PipelineID.String()), status.Concise)
defer metricUnsub()

allEvents, allUnsub := agg.Subscribe(status.ScopeAll, status.Concise)
defer allUnsub()

assert.Nil(t, <-traceEvents)
assert.Nil(t, <-metricEvents)
Expand Down Expand Up @@ -373,7 +378,8 @@ func TestStreamingVerbose(t *testing.T) {
traces := testhelpers.NewPipelineMetadata("traces")
tracesKey := toPipelineKey(traces.PipelineID)

allEvents := agg.Subscribe(status.ScopeAll, status.Verbose)
allEvents, unsub := agg.Subscribe(status.ScopeAll, status.Verbose)
defer unsub()

t.Run("zero value", func(t *testing.T) {
st := <-allEvents
Expand Down Expand Up @@ -431,8 +437,8 @@ func TestUnsubscribe(t *testing.T) {

traces := testhelpers.NewPipelineMetadata("traces")

traceEvents := agg.Subscribe(status.Scope(traces.PipelineID.String()), status.Concise)
allEvents := agg.Subscribe(status.ScopeAll, status.Concise)
traceEvents, traceUnsub := agg.Subscribe(status.Scope(traces.PipelineID.String()), status.Concise)
allEvents, allUnsub := agg.Subscribe(status.ScopeAll, status.Concise)

assert.Nil(t, <-traceEvents)
assert.NotNil(t, <-allEvents)
Expand All @@ -441,14 +447,14 @@ func TestUnsubscribe(t *testing.T) {
testhelpers.SeedAggregator(agg, traces.InstanceIDs(), component.StatusStarting)
assertEventsRecvdMatch(t, component.StatusStarting, traceEvents, allEvents)

agg.Unsubscribe(traceEvents)
traceUnsub()

// Pipeline OK
testhelpers.SeedAggregator(agg, traces.InstanceIDs(), component.StatusOK)
assertNoEventsRecvd(t, traceEvents)
assertEventsRecvdMatch(t, component.StatusOK, allEvents)

agg.Unsubscribe(allEvents)
allUnsub()

// Stop pipeline
testhelpers.SeedAggregator(agg, traces.InstanceIDs(), component.StatusStopping)
Expand Down

0 comments on commit a0e93fe

Please sign in to comment.