Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add context to ReporterV2 #11451

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Added support for using PYTHON_EXE to control what Python interpreter is used
by `make` and `mage`. Example: `export PYTHON_EXE=python2.7`. {pull}11212[11212]
- Prometheus helper for metricbeat contains now `Namespace` field for `prometheus.MetricsMappings` {pull}11424[11424]
- New method `Context()` in ReporterV2 to obtain the context of the report. {pull}11451[11451]
2 changes: 2 additions & 0 deletions metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ to implement Modules and their associated MetricSets.
package mb

import (
"context"
"fmt"
"net/url"
"time"
Expand Down Expand Up @@ -181,6 +182,7 @@ type PushMetricSet interface {
type ReporterV2 interface {
Event(event Event) bool // Event reports a single successful event.
Error(err error) bool
Context() context.Context
}

// PushReporterV2 is used by a MetricSet to report events, errors, or errors with
Expand Down
39 changes: 29 additions & 10 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package module

import (
"context"
"fmt"
"math/rand"
"sync"
Expand Down Expand Up @@ -181,9 +182,9 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) {

// Events and errors are reported through this.
reporter := &eventReporter{
msw: msw,
out: out,
done: done,
msw: msw,
out: out,
context: &eventReporterContext{done},
}

switch ms := msw.MetricSet.(type) {
Expand Down Expand Up @@ -299,10 +300,10 @@ type reporter interface {
// used by MetricSet implementations to report an event(s), an error, or an error
// with some additional metadata.
type eventReporter struct {
msw *metricSetWrapper
done <-chan struct{}
out chan<- beat.Event
start time.Time // Start time of the current fetch (or zero for push sources).
msw *metricSetWrapper
context context.Context
out chan<- beat.Event
start time.Time // Start time of the current fetch (or zero for push sources).
}

// startFetchTimer demarcates the start of a new fetch. The elapsed time of a
Expand All @@ -311,7 +312,25 @@ func (r *eventReporter) StartFetchTimer() { r.start = time.Now() }
func (r *eventReporter) V1() mb.PushReporter {
return reporterV1{v2: r.V2(), module: r.msw.module.Name()}
}
func (r *eventReporter) V2() mb.PushReporterV2 { return reporterV2{r} }
func (r *eventReporter) V2() mb.PushReporterV2 { return reporterV2{r} }
func (r *eventReporter) Context() context.Context { return r.context }

// eventReporterContext implements context.Context by wrapping a channel
type eventReporterContext struct {
done <-chan struct{}
}

func (r *eventReporterContext) Deadline() (time.Time, bool) { return time.Time{}, false }
func (r *eventReporterContext) Done() <-chan struct{} { return r.done }
func (r *eventReporterContext) Err() error {
select {
case <-r.done:
return context.Canceled
default:
return nil
}
}
func (r *eventReporterContext) Value(key interface{}) interface{} { return nil }

// reporterV1 wraps V2 to provide a v1 interface.
type reporterV1 struct {
Expand All @@ -334,7 +353,7 @@ type reporterV2 struct {
*eventReporter
}

func (r reporterV2) Done() <-chan struct{} { return r.done }
func (r reporterV2) Done() <-chan struct{} { return r.context.Done() }
func (r reporterV2) Error(err error) bool { return r.Event(mb.Event{Error: err}) }
func (r reporterV2) Event(event mb.Event) bool {
if event.Took == 0 && !r.start.IsZero() {
Expand Down Expand Up @@ -363,7 +382,7 @@ func (r reporterV2) Event(event mb.Event) bool {
event.Namespace = r.msw.Registration().Namespace
}
beatEvent := event.BeatEvent(r.msw.module.Name(), r.msw.MetricSet.Name(), r.msw.module.eventModifiers...)
if !writeEvent(r.done, r.out, beatEvent) {
if !writeEvent(r.context.Done(), r.out, beatEvent) {
return false
}
r.msw.stats.events.Add(1)
Expand Down
37 changes: 25 additions & 12 deletions metricbeat/mb/testing/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ that Metricbeat does it and with the same validations.
package testing

import (
"context"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -183,8 +184,9 @@ func NewReportingMetricSetV2Error(t testing.TB, config interface{}) mb.Reporting

// CapturingReporterV2 is a reporter used for testing which stores all events and errors
type CapturingReporterV2 struct {
events []mb.Event
errs []error
context context.Context
events []mb.Event
errs []error
}

// Event is used to report an event
Expand All @@ -204,6 +206,11 @@ func (r *CapturingReporterV2) GetEvents() []mb.Event {
return r.events
}

// Context returns reporter context
func (r *CapturingReporterV2) Context() context.Context {
return r.context
}

// GetErrors returns all reported errors
func (r *CapturingReporterV2) GetErrors() []error {
return r.errs
Expand All @@ -212,15 +219,15 @@ func (r *CapturingReporterV2) GetErrors() []error {
// ReportingFetchV2 runs the given reporting metricset and returns all of the
// events and errors that occur during that period.
func ReportingFetchV2(metricSet mb.ReportingMetricSetV2) ([]mb.Event, []error) {
Copy link
Contributor

@sayden sayden Apr 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid that I believe that we shouldn't add context at all 😅 There are two main rules when implementing context:

  • context.Context must be propagated from the initial call so func ReportingFetchV2(metricSet mb.ReportingMetricSetV2) ([]mb.Event, []error) must be func ReportingFetchV2(ctx context.Context, metricSet mb.ReportingMetricSetV2) ([]mb.Event, []error). Propagation is key when using context.
  • context.Context must not be stored anyhow. It's a continuation of the previous, if you follow the rule of propagation, you won't store it at all.

Finally, I don't see a real benefit of using context here. I mean that it looks like it's a "we can use Context here, as many other approaches" instead of a "it's a clear use case of Context". Also, I feel that we are replacing a piece with a explicit implementation which was already working. Explicit code is idiomatic in Go :)

A not so old post about this: https://dave.cheney.net/2017/08/20/context-isnt-for-cancellation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid that I believe that we shouldn't add context at all 😅 There are two main rules when implementing context:
* context.Context must be propagated from the initial call so func ReportingFetchV2(metricSet mb.ReportingMetricSetV2) ([]mb.Event, []error) must be func ReportingFetchV2(ctx context.Context, metricSet mb.ReportingMetricSetV2) ([]mb.Event, []error). Propagation is key when using context.

* `context.Context` must not be stored anyhow. It's a continuation of the previous, if you follow the rule of propagation, you won't store it at all.

In general I agree with that, but I'd like to avoid adding yet another interface (two of them if adding the one returning an error, plus its testing related code). We'd have to evaluate if it worths to add more interfaces for this.

Also this is a pattern also used in the standard library, for example HTTP handlers receive a Request that has a Context().

The context is only being stored where it is created, in the wrapper. We can continue with future refactorizations to replace Start(done <-chan struct{}) with Start(ctx context.Context) so we have a context that can be propagated from there.

Finally, I don't see a real benefit of using context here. I mean that it looks like it's a "we can use Context here, as many other approaches" instead of a "it's a clear use case of Context". Also, I feel that we are replacing a piece with a explicit implementation which was already working. Explicit code is idiomatic in Go :)

Well, most of our modules make requests to other services and many client libraries accept contexts.
The main benefit is that we can start using a context that makes sense in the current module lifecycle instead of using Background(), TODO() or avoiding APIs with contexts.

It is not using it "as many other approaches", it is using it as the approach that afaik is more widely used in the standard and third party libraries (even if we can discuss if it is the best approach).

A not so old post about this: https://dave.cheney.net/2017/08/20/context-isnt-for-cancellation

Nice read, but I think that start using something like tomb would require a huge effort in the case of Metricbeat, while getting a context from a done channel we already have is quite straightforward. Also many libraries we use expect contexts, if we use other things we still need to convert them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the first version of Beats was written, Context did not exist yet and we started with the done channel. Would we build it again I would assume Context would play quite a role in it. Long term I think more and more parts should be switched over to context but we need to go baby steps here (like in this PR). At least that is my thinking.

r := &CapturingReporterV2{}
r := &CapturingReporterV2{context: context.Background()}
metricSet.Fetch(r)
return r.events, r.errs
}

// ReportingFetchV2Error runs the given reporting metricset and returns all of the
// events and errors that occur during that period.
func ReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error) ([]mb.Event, []error) {
r := &CapturingReporterV2{}
r := &CapturingReporterV2{context: context.Background()}
err := metricSet.Fetch(r)
if err != nil {
r.errs = append(r.errs, err)
Expand Down Expand Up @@ -310,15 +317,15 @@ func NewPushMetricSetV2(t testing.TB, config interface{}) mb.PushMetricSetV2 {
// capturingPushReporterV2 stores all the events and errors from a metricset's
// Run method.
type capturingPushReporterV2 struct {
doneC chan struct{}
context context.Context
eventsC chan mb.Event
}

// report writes an event to the output channel and returns true. If the output
// is closed it returns false.
func (r *capturingPushReporterV2) report(event mb.Event) bool {
select {
case <-r.doneC:
case <-r.context.Done():
// Publisher is stopped.
return false
case r.eventsC <- event:
Expand All @@ -336,18 +343,24 @@ func (r *capturingPushReporterV2) Error(err error) bool {
return r.report(mb.Event{Error: err})
}

// Done returns the Done channel for this reporter.
// Context returns the reporter context
func (r *capturingPushReporterV2) Context() context.Context {
return r.context
}

// Done returns the Done channel for this reporter context
func (r *capturingPushReporterV2) Done() <-chan struct{} {
return r.doneC
return r.context.Done()
}

// RunPushMetricSetV2 run the given push metricset for the specific amount of
// time and returns all of the events and errors that occur during that period.
func RunPushMetricSetV2(timeout time.Duration, waitEvents int, metricSet mb.PushMetricSetV2) []mb.Event {
var (
r = &capturingPushReporterV2{doneC: make(chan struct{}), eventsC: make(chan mb.Event)}
wg sync.WaitGroup
events []mb.Event
ctx, cancel = context.WithCancel(context.Background())
r = &capturingPushReporterV2{context: ctx, eventsC: make(chan mb.Event)}
wg sync.WaitGroup
events []mb.Event
)
wg.Add(2)

Expand All @@ -364,7 +377,7 @@ func RunPushMetricSetV2(timeout time.Duration, waitEvents int, metricSet mb.Push
// Consumer
go func() {
defer wg.Done()
defer close(r.doneC)
defer cancel()

timer := time.NewTimer(timeout)
defer timer.Stop()
Expand Down
4 changes: 1 addition & 3 deletions metricbeat/module/docker/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package container

import (
"context"

"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
"github.com/pkg/errors"
Expand Down Expand Up @@ -65,7 +63,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// This is based on https://docs.docker.com/engine/reference/api/docker_remote_api_v1.24/#/list-containers.
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
// Fetch a list of all containers.
containers, err := m.dockerClient.ContainerList(context.Background(), types.ContainerListOptions{})
containers, err := m.dockerClient.ContainerList(r.Context(), types.ContainerListOptions{})
if err != nil {
return errors.Wrap(err, "failed to get docker containers list")
}
Expand Down
7 changes: 2 additions & 5 deletions metricbeat/module/docker/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package event

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -77,15 +76,14 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Run listens for docker events and reports them
func (m *MetricSet) Run(reporter mb.PushReporterV2) {
ctx, cancel := context.WithCancel(context.Background())
options := types.EventsOptions{
Since: fmt.Sprintf("%d", time.Now().Unix()),
}

defer m.dockerClient.Close()

for {
events, errors := m.dockerClient.Events(ctx, options)
events, errors := m.dockerClient.Events(reporter.Context(), options)

WATCH:
for {
Expand All @@ -100,9 +98,8 @@ func (m *MetricSet) Run(reporter mb.PushReporterV2) {
time.Sleep(1 * time.Second)
break WATCH

case <-reporter.Done():
case <-reporter.Context().Done():
m.logger.Debug("docker", "event watcher stopped")
cancel()
return
}
}
Expand Down