Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor pusher to allow multiple implementations #488

Merged
merged 1 commit into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions cmd/synthetic-monitoring-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,21 @@ import (
"syscall"
"time"

"github.com/jpillora/backoff"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/grpclog"

"github.com/grafana/synthetic-monitoring-agent/internal/adhoc"
"github.com/grafana/synthetic-monitoring-agent/internal/checks"
"github.com/grafana/synthetic-monitoring-agent/internal/feature"
"github.com/grafana/synthetic-monitoring-agent/internal/http"
"github.com/grafana/synthetic-monitoring-agent/internal/k6runner"
"github.com/grafana/synthetic-monitoring-agent/internal/pusher"
pusherV1 "github.com/grafana/synthetic-monitoring-agent/internal/pusher/v1"
"github.com/grafana/synthetic-monitoring-agent/internal/version"
"github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring"
"github.com/jpillora/backoff"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/grpclog"
)

const exitFail = 1
Expand All @@ -49,6 +51,7 @@ func run(args []string, stdout io.Writer) error {
enablePProf = flags.Bool("enable-pprof", false, "exposes profiling data via HTTP /debug/pprof/ endpoint")
httpListenAddr = flags.String("listen-address", "localhost:4050", "listen address")
k6URI = flags.String("k6-uri", "k6", "how to run k6 (path or URL)")
selectedPublisher = flags.String("publisher", pusherV1.Name, "publisher type (EXPERIMENTAL)")
)

flags.Var(&features, "features", "optional feature flags")
Expand Down Expand Up @@ -190,7 +193,6 @@ func run(args []string, stdout io.Writer) error {
return httpServer.Run(httpListener)
})

publishCh := make(chan pusher.Payload)
tenantCh := make(chan synthetic_monitoring.Tenant)

conn, err := dialAPIServer(ctx, *grpcApiServerAddr, *grpcInsecure, *apiToken)
Expand All @@ -205,11 +207,23 @@ func run(args []string, stdout io.Writer) error {
k6Runner = k6runner.New(*k6URI)
}

tm := pusher.NewTenantManager(ctx, synthetic_monitoring.NewTenantsClient(conn), tenantCh, 15*time.Minute)

pusherRegistry := pusher.NewRegistry[pusher.Factory]()
pusherRegistry.MustRegister(pusherV1.Name, pusherV1.NewPublisher)

publisherFactory, err := pusherRegistry.Lookup(*selectedPublisher)
if err != nil {
return fmt.Errorf("creating publisher: %w", err)
}

publisher := publisherFactory(ctx, tm, zl.With().Str("subsystem", "publisher").Str("version", *selectedPublisher).Logger(), promRegisterer)

checksUpdater, err := checks.NewUpdater(checks.UpdaterOptions{
Conn: conn,
Logger: zl.With().Str("subsystem", "updater").Logger(),
Backoff: newConnectionBackoff(),
PublishCh: publishCh,
Publisher: publisher,
TenantCh: tenantCh,
IsConnected: readynessHandler.Set,
PromRegisterer: promRegisterer,
Expand All @@ -229,7 +243,7 @@ func run(args []string, stdout io.Writer) error {
Conn: conn,
Logger: zl.With().Str("subsystem", "adhoc").Logger(),
Backoff: newConnectionBackoff(),
PublishCh: publishCh,
Publisher: publisher,
TenantCh: tenantCh,
PromRegisterer: promRegisterer,
Features: features,
Expand All @@ -244,14 +258,6 @@ func run(args []string, stdout io.Writer) error {
})
}

tm := pusher.NewTenantManager(ctx, synthetic_monitoring.NewTenantsClient(conn), tenantCh, 15*time.Minute)

publisher := pusher.NewPublisher(tm, publishCh, zl.With().Str("subsystem", "publisher").Logger(), promRegisterer)

g.Go(func() error {
return publisher.Run(ctx)
})

return g.Wait()
}

Expand Down
29 changes: 15 additions & 14 deletions internal/adhoc/adhoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,21 @@ import (
"io"
"time"

"github.com/grafana/synthetic-monitoring-agent/internal/feature"
"github.com/grafana/synthetic-monitoring-agent/internal/k6runner"
"github.com/grafana/synthetic-monitoring-agent/internal/pkg/logproto"
"github.com/grafana/synthetic-monitoring-agent/internal/prober"
"github.com/grafana/synthetic-monitoring-agent/internal/pusher"
"github.com/grafana/synthetic-monitoring-agent/internal/version"
sm "github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/prompb"
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/status"

"github.com/grafana/synthetic-monitoring-agent/internal/feature"
"github.com/grafana/synthetic-monitoring-agent/internal/k6runner"
"github.com/grafana/synthetic-monitoring-agent/internal/pkg/logproto"
"github.com/grafana/synthetic-monitoring-agent/internal/prober"
"github.com/grafana/synthetic-monitoring-agent/internal/pusher"
"github.com/grafana/synthetic-monitoring-agent/internal/version"
sm "github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring"
)

// Handler is in charge of retrieving ad-hoc checks from the
Expand All @@ -34,7 +35,7 @@ type Handler struct {
backoff Backoffer
probe *sm.Probe
metrics metrics
publishCh chan<- pusher.Payload
publisher pusher.Publisher
tenantCh chan<- sm.Tenant
runnerFactory func(context.Context, *sm.AdHocRequest) (*runner, error)
grpcAdhocChecksClientFactory func(conn ClientConn) (sm.AdHocChecksClient, error)
Expand Down Expand Up @@ -100,7 +101,7 @@ type HandlerOpts struct {
Conn ClientConn
Logger zerolog.Logger
Backoff Backoffer
PublishCh chan<- pusher.Payload
Publisher pusher.Publisher
TenantCh chan<- sm.Tenant
PromRegisterer prometheus.Registerer
Features feature.Collection
Expand Down Expand Up @@ -138,7 +139,7 @@ func NewHandler(opts HandlerOpts) (*Handler, error) {
logger: opts.Logger,
features: opts.Features,
backoff: opts.Backoff,
publishCh: opts.PublishCh,
publisher: opts.Publisher,
tenantCh: opts.TenantCh,
runnerFactory: opts.runnerFactory,
grpcAdhocChecksClientFactory: opts.grpcAdhocChecksClientFactory,
Expand Down Expand Up @@ -361,7 +362,7 @@ func (h *Handler) handleAdHocCheck(ctx context.Context, ahReq *sm.AdHocRequest)
return err
}

go runner.Run(ctx, ahReq.AdHocCheck.TenantId, h.publishCh)
go runner.Run(ctx, ahReq.AdHocCheck.TenantId, h.publisher)

// If there's a tenant in the request, this should be forwarded
// to the changes handler.
Expand Down Expand Up @@ -446,7 +447,7 @@ func (l *jsonLogger) Log(keyvals ...interface{}) error {

// Run runs the specified prober once and captures the results using
// jsonLogger.
func (r *runner) Run(ctx context.Context, tenantId int64, publishCh chan<- pusher.Payload) {
func (r *runner) Run(ctx context.Context, tenantId int64, publisher pusher.Publisher) {
r.logger.Info().Msg("running ad-hoc check")

registry := prometheus.NewRegistry()
Expand Down Expand Up @@ -503,7 +504,7 @@ func (r *runner) Run(ctx context.Context, tenantId int64, publishCh chan<- pushe
Str("check_name", r.prober.Name()).
Msg("ad-hoc check done")

publishCh <- adhocData{
publisher.Publish(adhocData{
tenantId: tenantId,
streams: Streams{
{
Expand All @@ -516,7 +517,7 @@ func (r *runner) Run(ctx context.Context, tenantId int64, publishCh chan<- pushe
},
},
},
}
})

r.logger.Debug().
Str("id", r.id).
Expand Down
23 changes: 15 additions & 8 deletions internal/adhoc/adhoc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ import (
"testing"
"time"

"github.com/grafana/synthetic-monitoring-agent/internal/feature"
"github.com/grafana/synthetic-monitoring-agent/internal/prober/logger"
"github.com/grafana/synthetic-monitoring-agent/internal/pusher"
sm "github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
Expand All @@ -19,6 +15,11 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/grafana/synthetic-monitoring-agent/internal/feature"
"github.com/grafana/synthetic-monitoring-agent/internal/prober/logger"
"github.com/grafana/synthetic-monitoring-agent/internal/pusher"
sm "github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring"
)

func TestNewHandler(t *testing.T) {
Expand All @@ -28,7 +29,7 @@ func TestNewHandler(t *testing.T) {
opts := HandlerOpts{
Conn: nil,
Logger: zerolog.New(io.Discard),
PublishCh: make(chan pusher.Payload),
Publisher: channelPublisher(make(chan pusher.Payload)),
TenantCh: make(chan sm.Tenant),
PromRegisterer: prometheus.NewPedanticRegistry(),
Features: features,
Expand All @@ -41,7 +42,7 @@ func TestNewHandler(t *testing.T) {
require.Equal(t, opts.Conn, h.api.conn)
require.Equal(t, opts.Logger, h.logger)
require.Equal(t, opts.Features, h.features)
require.Equal(t, opts.PublishCh, h.publishCh)
require.Equal(t, opts.Publisher, h.publisher)
require.NotNil(t, h.runnerFactory)
require.NotNil(t, h.grpcAdhocChecksClientFactory)
require.Nil(t, h.probe, "probe should not be set at this point")
Expand All @@ -61,7 +62,7 @@ func TestHandlerRun(t *testing.T) {

opts := HandlerOpts{
Logger: logger,
PublishCh: publishCh,
Publisher: channelPublisher(publishCh),
TenantCh: make(chan sm.Tenant),
PromRegisterer: prometheus.NewPedanticRegistry(),
Features: features,
Expand Down Expand Up @@ -93,6 +94,12 @@ func TestHandlerRun(t *testing.T) {
require.Len(t, payload.Streams(), 1)
}

type channelPublisher chan pusher.Payload

func (c channelPublisher) Publish(payload pusher.Payload) {
c <- payload
}

type grpcTestError struct {
code codes.Code
msg string
Expand Down Expand Up @@ -178,7 +185,7 @@ func TestHandlerRunErrors(t *testing.T) {
opts := HandlerOpts{
Conn: &grpcTestConn{},
Logger: logger,
PublishCh: publishCh,
Publisher: channelPublisher(publishCh),
Backoff: testBackoff{},
TenantCh: make(chan sm.Tenant),
PromRegisterer: prometheus.NewPedanticRegistry(),
Expand Down
27 changes: 14 additions & 13 deletions internal/checks/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ import (
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/prompb"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/grafana/synthetic-monitoring-agent/internal/feature"
"github.com/grafana/synthetic-monitoring-agent/internal/k6runner"
"github.com/grafana/synthetic-monitoring-agent/internal/pkg/logproto"
Expand All @@ -21,13 +29,6 @@ import (
"github.com/grafana/synthetic-monitoring-agent/internal/version"
"github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring"
sm "github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/prompb"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type Error string
Expand Down Expand Up @@ -65,15 +66,15 @@ type Updater struct {
logger zerolog.Logger
features feature.Collection
backoff Backoffer
publishCh chan<- pusher.Payload
publisher pusher.Publisher
tenantCh chan<- sm.Tenant
IsConnected func(bool)
probe *sm.Probe
scrapersMutex sync.Mutex
scrapers map[int64]*scraper.Scraper
metrics metrics
k6Runner k6runner.Runner
scraperFactory func(context.Context, sm.Check, chan<- pusher.Payload, sm.Probe, zerolog.Logger, prometheus.Counter, *prometheus.CounterVec, k6runner.Runner) (*scraper.Scraper, error)
scraperFactory func(context.Context, sm.Check, pusher.Publisher, sm.Probe, zerolog.Logger, prometheus.Counter, *prometheus.CounterVec, k6runner.Runner) (*scraper.Scraper, error)
}

type apiInfo struct {
Expand All @@ -97,13 +98,13 @@ type UpdaterOptions struct {
Conn *grpc.ClientConn
Logger zerolog.Logger
Backoff Backoffer
PublishCh chan<- pusher.Payload
Publisher pusher.Publisher
TenantCh chan<- sm.Tenant
IsConnected func(bool)
PromRegisterer prometheus.Registerer
Features feature.Collection
K6Runner k6runner.Runner
ScraperFactory func(context.Context, sm.Check, chan<- pusher.Payload, sm.Probe, zerolog.Logger, prometheus.Counter, *prometheus.CounterVec, k6runner.Runner) (*scraper.Scraper, error)
ScraperFactory func(context.Context, sm.Check, pusher.Publisher, sm.Probe, zerolog.Logger, prometheus.Counter, *prometheus.CounterVec, k6runner.Runner) (*scraper.Scraper, error)
}

func NewUpdater(opts UpdaterOptions) (*Updater, error) {
Expand Down Expand Up @@ -214,7 +215,7 @@ func NewUpdater(opts UpdaterOptions) (*Updater, error) {
logger: opts.Logger,
features: opts.Features,
backoff: opts.Backoff,
publishCh: opts.PublishCh,
publisher: opts.Publisher,
tenantCh: opts.TenantCh,
IsConnected: opts.IsConnected,
scrapers: make(map[int64]*scraper.Scraper),
Expand Down Expand Up @@ -820,7 +821,7 @@ func (c *Updater) addAndStartScraperWithLock(ctx context.Context, check sm.Check
return err
}

scraper, err := c.scraperFactory(ctx, check, c.publishCh, *c.probe, c.logger, scrapeCounter, scrapeErrorCounter, c.k6Runner)
scraper, err := c.scraperFactory(ctx, check, c.publisher, *c.probe, c.logger, scrapeCounter, scrapeErrorCounter, c.k6Runner)
if err != nil {
return fmt.Errorf("cannot create new scraper: %w", err)
}
Expand Down
Loading