Skip to content

Commit

Permalink
Use local IDs in scraper, global IDs in pusher
Browse files Browse the repository at this point in the history
In order to avoid confusion as to which kind of ID is expected where,
introduce a model package that takes a synthetic_monitoring.Check and
produces a wrapped one that has the IDs decoded.

Introduce a model.GlobaID type to make it explicit which kind of ID is
expected. In the scraper use local IDs, except for the lists of checks,
which need to use a global one. In the publisher, use global IDs.

This allows the compiler to flag misuses.
  • Loading branch information
mem committed Sep 8, 2023
1 parent 3cbf949 commit beadaab
Show file tree
Hide file tree
Showing 15 changed files with 371 additions and 146 deletions.
19 changes: 11 additions & 8 deletions internal/adhoc/adhoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/grafana/synthetic-monitoring-agent/internal/feature"
"github.com/grafana/synthetic-monitoring-agent/internal/k6runner"
"github.com/grafana/synthetic-monitoring-agent/internal/model"
"github.com/grafana/synthetic-monitoring-agent/internal/pkg/logproto"
"github.com/grafana/synthetic-monitoring-agent/internal/prober"
"github.com/grafana/synthetic-monitoring-agent/internal/pusher"
Expand Down Expand Up @@ -362,7 +363,7 @@ func (h *Handler) handleAdHocCheck(ctx context.Context, ahReq *sm.AdHocRequest)
return err
}

go runner.Run(ctx, ahReq.AdHocCheck.TenantId, h.publisher)
go runner.Run(ctx, model.GlobalID(ahReq.AdHocCheck.TenantId), h.publisher)

// If there's a tenant in the request, this should be forwarded
// to the changes handler.
Expand All @@ -383,10 +384,12 @@ func defaultGrpcAdhocChecksClientFactory(conn ClientConn) (sm.AdHocChecksClient,
}

func (h *Handler) defaultRunnerFactory(ctx context.Context, req *sm.AdHocRequest) (*runner, error) {
check := sm.Check{
Target: req.AdHocCheck.Target,
Timeout: req.AdHocCheck.Timeout,
Settings: req.AdHocCheck.Settings,
check := model.Check{
Check: sm.Check{
Target: req.AdHocCheck.Target,
Timeout: req.AdHocCheck.Timeout,
Settings: req.AdHocCheck.Settings,
},
}

p, target, err := h.proberFactory.New(ctx, h.logger, check)
Expand Down Expand Up @@ -447,7 +450,7 @@ func (l *jsonLogger) Log(keyvals ...interface{}) error {

// Run runs the specified prober once and captures the results using
// jsonLogger.
func (r *runner) Run(ctx context.Context, tenantId int64, publisher pusher.Publisher) {
func (r *runner) Run(ctx context.Context, tenantId model.GlobalID, publisher pusher.Publisher) {
r.logger.Info().Msg("running ad-hoc check")

registry := prometheus.NewRegistry()
Expand Down Expand Up @@ -531,7 +534,7 @@ type TimeSeries = []prompb.TimeSeries
type Streams = []logproto.Stream

type adhocData struct {
tenantId int64
tenantId model.GlobalID
streams Streams
}

Expand All @@ -543,6 +546,6 @@ func (d adhocData) Streams() Streams {
return d.streams
}

func (d adhocData) Tenant() int64 {
func (d adhocData) Tenant() model.GlobalID {
return d.tenantId
}
98 changes: 58 additions & 40 deletions internal/checks/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/grafana/synthetic-monitoring-agent/internal/feature"
"github.com/grafana/synthetic-monitoring-agent/internal/k6runner"
"github.com/grafana/synthetic-monitoring-agent/internal/model"
"github.com/grafana/synthetic-monitoring-agent/internal/pkg/logproto"
"github.com/grafana/synthetic-monitoring-agent/internal/pusher"
"github.com/grafana/synthetic-monitoring-agent/internal/scraper"
Expand Down Expand Up @@ -71,10 +72,10 @@ type Updater struct {
IsConnected func(bool)
probe *sm.Probe
scrapersMutex sync.Mutex
scrapers map[int64]*scraper.Scraper
scrapers map[model.GlobalID]*scraper.Scraper
metrics metrics
k6Runner k6runner.Runner
scraperFactory func(context.Context, sm.Check, pusher.Publisher, sm.Probe, zerolog.Logger, scraper.Incrementer, scraper.IncrementerVec, k6runner.Runner) (*scraper.Scraper, error)
scraperFactory func(context.Context, model.Check, pusher.Publisher, sm.Probe, zerolog.Logger, scraper.Incrementer, scraper.IncrementerVec, k6runner.Runner) (*scraper.Scraper, error)
}

type apiInfo struct {
Expand Down Expand Up @@ -104,7 +105,7 @@ type UpdaterOptions struct {
PromRegisterer prometheus.Registerer
Features feature.Collection
K6Runner k6runner.Runner
ScraperFactory func(context.Context, sm.Check, pusher.Publisher, sm.Probe, zerolog.Logger, scraper.Incrementer, scraper.IncrementerVec, k6runner.Runner) (*scraper.Scraper, error)
ScraperFactory func(context.Context, model.Check, pusher.Publisher, sm.Probe, zerolog.Logger, scraper.Incrementer, scraper.IncrementerVec, k6runner.Runner) (*scraper.Scraper, error)
}

func NewUpdater(opts UpdaterOptions) (*Updater, error) {
Expand Down Expand Up @@ -155,6 +156,7 @@ func NewUpdater(opts UpdaterOptions) (*Updater, error) {
}, []string{
"type",
"tenantId",
"regionId",
})

if err := opts.PromRegisterer.Register(scrapesCounter); err != nil {
Expand All @@ -170,6 +172,7 @@ func NewUpdater(opts UpdaterOptions) (*Updater, error) {
"type",
"source",
"tenantId",
"regionId",
})

if err := opts.PromRegisterer.Register(scrapeErrorCounter); err != nil {
Expand Down Expand Up @@ -220,7 +223,7 @@ func NewUpdater(opts UpdaterOptions) (*Updater, error) {
publisher: opts.Publisher,
tenantCh: opts.TenantCh,
IsConnected: opts.IsConnected,
scrapers: make(map[int64]*scraper.Scraper),
scrapers: make(map[model.GlobalID]*scraper.Scraper),
k6Runner: opts.K6Runner,
scraperFactory: scraperFactory,
metrics: metrics{
Expand Down Expand Up @@ -418,7 +421,7 @@ func (c *Updater) loop(ctx context.Context) (bool, error) {

for cID, scraper := range c.scrapers {
knownChecks.Checks = append(knownChecks.Checks, sm.EntityRef{
Id: cID,
Id: int64(cID),
LastModified: scraper.LastModified(),
})
}
Expand Down Expand Up @@ -567,7 +570,7 @@ func (c *Updater) processChanges(ctx context.Context, cc sm.Checks_GetChangesCli
}
}

func (c *Updater) handleCheckAdd(ctx context.Context, check sm.Check) error {
func (c *Updater) handleCheckAdd(ctx context.Context, check model.Check) error {
c.metrics.changesCounter.WithLabelValues("add").Inc()

if err := check.Validate(); err != nil {
Expand All @@ -577,19 +580,19 @@ func (c *Updater) handleCheckAdd(ctx context.Context, check sm.Check) error {
c.scrapersMutex.Lock()
defer c.scrapersMutex.Unlock()

if running, found := c.scrapers[check.Id]; found {
if running, found := c.scrapers[check.GlobalID()]; found {
// we can get here if the API sent us a check add twice:
// once during the initial connection and another right
// after that. The window for that is small, but it
// exists.

return fmt.Errorf("check with id %d already exists (version %s)", check.Id, running.ConfigVersion())
return fmt.Errorf("check with id %d already exists (version %s)", check.GlobalID(), running.ConfigVersion())
}

return c.addAndStartScraperWithLock(ctx, check)
}

func (c *Updater) handleCheckUpdate(ctx context.Context, check sm.Check) error {
func (c *Updater) handleCheckUpdate(ctx context.Context, check model.Check) error {
c.metrics.changesCounter.WithLabelValues("update").Inc()

if err := check.Validate(); err != nil {
Expand All @@ -604,10 +607,12 @@ func (c *Updater) handleCheckUpdate(ctx context.Context, check sm.Check) error {

// handleCheckUpdateWithLock is the bottom half of handleCheckUpdate. It
// MUST be called with the scrapersMutex lock held.
func (c *Updater) handleCheckUpdateWithLock(ctx context.Context, check sm.Check) error {
scraper, found := c.scrapers[check.Id]
func (c *Updater) handleCheckUpdateWithLock(ctx context.Context, check model.Check) error {
cid := check.GlobalID()

scraper, found := c.scrapers[cid]
if !found {
withCheckID(c.logger.Warn(), check.Id).Msg("update request for an unknown check")
c.logger.Warn().Int64("check_id", check.Id).Int("region_id", check.RegionId).Msg("update request for an unknown check")
return c.addAndStartScraperWithLock(ctx, check)
}

Expand All @@ -616,29 +621,31 @@ func (c *Updater) handleCheckUpdateWithLock(ctx context.Context, check sm.Check)

scraper.Stop()
checkType := scraper.CheckType().String()
delete(c.scrapers, check.Id)
delete(c.scrapers, cid)

c.metrics.runningScrapers.WithLabelValues(checkType).Dec()

return c.addAndStartScraperWithLock(ctx, check)
}

func (c *Updater) handleCheckDelete(ctx context.Context, check sm.Check) error {
func (c *Updater) handleCheckDelete(ctx context.Context, check model.Check) error {
c.metrics.changesCounter.WithLabelValues("delete").Inc()

cid := check.GlobalID()

c.scrapersMutex.Lock()
defer c.scrapersMutex.Unlock()

scraper, found := c.scrapers[check.Id]
scraper, found := c.scrapers[cid]
if !found {
withCheckID(c.logger.Warn(), check.Id).Msg("delete request for an unknown check")
c.logger.Warn().Int64("check_id", check.Id).Int("region_id", check.RegionId).Msg("delete request for an unknown check")
return errors.New("check not found")
}

scraper.Stop()
checkType := scraper.CheckType().String()

delete(c.scrapers, check.Id)
delete(c.scrapers, cid)

c.metrics.runningScrapers.WithLabelValues(checkType).Dec()

Expand All @@ -661,7 +668,7 @@ func (c *Updater) handleCheckDelete(ctx context.Context, check sm.Check) error {
// We have to do this exactly once per reconnect. It's up to the calling code
// to ensure this.
func (c *Updater) handleFirstBatch(ctx context.Context, changes *sm.Changes) {
newChecks := make(map[int64]struct{})
newChecks := make(map[model.GlobalID]struct{})

c.scrapersMutex.Lock()
defer c.scrapersMutex.Unlock()
Expand All @@ -672,16 +679,19 @@ func (c *Updater) handleFirstBatch(ctx context.Context, changes *sm.Changes) {

switch checkChange.Operation {
case sm.CheckOperation_CHECK_ADD:
if err := c.handleInitialChangeAddWithLock(ctx, checkChange.Check); err != nil {
var check model.Check
check.FromSM(checkChange.Check)

if err := c.handleInitialChangeAddWithLock(ctx, check); err != nil {
c.metrics.changeErrorsCounter.WithLabelValues("add").Inc()
withCheckID(c.logger.Error().Err(err), checkChange.Check.Id).
c.logger.Error().Err(err).Int64("check_id", check.Id).Int("region_id", check.RegionId).
Msg("adding check failed, dropping check")
continue
}

// add this to the list of checks we have seen during
// this operation
newChecks[checkChange.Check.Id] = struct{}{}
newChecks[check.GlobalID()] = struct{}{}

default:
// we should never hit this because the first time we
Expand All @@ -700,7 +710,10 @@ func (c *Updater) handleFirstBatch(ctx context.Context, changes *sm.Changes) {
continue
}

withCheckID(c.logger.Debug(), id).
cid, rid := model.GetLocalAndRegionIDs(id)
c.logger.Debug().
Int64("check_id", cid).
Int("region_id", rid).
Msg("stopping scraper during first batch handling")

checkType := scraper.CheckType().String()
Expand All @@ -718,8 +731,8 @@ func (c *Updater) handleFirstBatch(ctx context.Context, changes *sm.Changes) {
// and changes the operation to an update if necessary.
//
// This function MUST be called with the scrapers mutex held.
func (c *Updater) handleInitialChangeAddWithLock(ctx context.Context, check sm.Check) error {
if running, found := c.scrapers[check.Id]; found {
func (c *Updater) handleInitialChangeAddWithLock(ctx context.Context, check model.Check) error {
if running, found := c.scrapers[check.GlobalID()]; found {
oldVersion := running.ConfigVersion()
newVersion := check.ConfigVersion()

Expand Down Expand Up @@ -762,21 +775,24 @@ func (c *Updater) handleChangeBatch(ctx context.Context, changes *sm.Changes, fi
for _, checkChange := range changes.Checks {
c.logger.Debug().Interface("check change", checkChange).Msg("got check change")

var check model.Check
check.FromSM(checkChange.Check)

switch checkChange.Operation {
case sm.CheckOperation_CHECK_ADD:
if err := c.handleCheckAdd(ctx, checkChange.Check); err != nil {
if err := c.handleCheckAdd(ctx, check); err != nil {
c.metrics.changeErrorsCounter.WithLabelValues("add").Inc()
c.logger.Error().Err(err).Msg("handling check add")
}

case sm.CheckOperation_CHECK_UPDATE:
if err := c.handleCheckUpdate(ctx, checkChange.Check); err != nil {
if err := c.handleCheckUpdate(ctx, check); err != nil {
c.metrics.changeErrorsCounter.WithLabelValues("update").Inc()
c.logger.Error().Err(err).Msg("handling check update")
}

case sm.CheckOperation_CHECK_DELETE:
if err := c.handleCheckDelete(ctx, checkChange.Check); err != nil {
if err := c.handleCheckDelete(ctx, check); err != nil {
c.metrics.changeErrorsCounter.WithLabelValues("delete").Inc()
c.logger.Error().Err(err).Msg("handling check delete")
}
Expand All @@ -788,7 +804,7 @@ func (c *Updater) handleChangeBatch(ctx context.Context, changes *sm.Changes, fi
// scrapers managed by this updater and starts running it.
//
// This MUST be called with the scrapersMutex held.
func (c *Updater) addAndStartScraperWithLock(ctx context.Context, check sm.Check) error {
func (c *Updater) addAndStartScraperWithLock(ctx context.Context, check model.Check) error {
// This is a good place to filter out checks by feature flags.
//
// If we need to accept checks based on whether a feature flag
Expand All @@ -814,11 +830,22 @@ func (c *Updater) addAndStartScraperWithLock(ctx context.Context, check sm.Check

checkType := check.Type().String()

scrapeCounter := c.metrics.scrapesCounter.WithLabelValues(checkType, strconv.FormatInt(check.TenantId, 10))
tidStr := strconv.FormatInt(check.TenantId, 10)
ridStr := strconv.Itoa(check.RegionId)

scrapeCounter, err := c.metrics.scrapesCounter.GetMetricWith(prometheus.Labels{
"type": checkType,
"tenantId": tidStr,
"regionId": ridStr,
})
if err != nil {
return err
}

scrapeErrorCounter, err := c.metrics.scrapeErrorCounter.CurryWith(prometheus.Labels{
"type": checkType,
"tenantId": strconv.FormatInt(check.TenantId, 10),
"tenantId": tidStr,
"regionId": ridStr,
})
if err != nil {
return err
Expand All @@ -829,7 +856,7 @@ func (c *Updater) addAndStartScraperWithLock(ctx context.Context, check sm.Check
return fmt.Errorf("cannot create new scraper: %w", err)
}

c.scrapers[check.Id] = scraper
c.scrapers[check.GlobalID()] = scraper

go scraper.Run(ctx)

Expand Down Expand Up @@ -858,12 +885,3 @@ func sleepCtx(ctx context.Context, d time.Duration) error {

return err
}

// Helper to add a check ID and optionally a region ID to a log message.
func withCheckID(ev *zerolog.Event, checkID int64) *zerolog.Event {
if localID, regionID, err := sm.GlobalIDToLocalID(checkID); err != nil {
ev = ev.Int("region_id", regionID)
checkID = localID
}
return ev.Int64("check_id", checkID)
}
12 changes: 7 additions & 5 deletions internal/checks/checks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/grafana/synthetic-monitoring-agent/internal/feature"
"github.com/grafana/synthetic-monitoring-agent/internal/k6runner"
"github.com/grafana/synthetic-monitoring-agent/internal/model"
"github.com/grafana/synthetic-monitoring-agent/internal/prober"
"github.com/grafana/synthetic-monitoring-agent/internal/prober/logger"
"github.com/grafana/synthetic-monitoring-agent/internal/pusher"
Expand Down Expand Up @@ -183,7 +184,8 @@ func TestHandleCheckOp(t *testing.T) {
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()

check := sm.Check{
var check model.Check
check.FromSM(sm.Check{
Id: 5000,
TenantId: 1,
Frequency: 1000,
Expand All @@ -196,12 +198,12 @@ func TestHandleCheckOp(t *testing.T) {
},
Created: 0,
Modified: 0,
}
})

scraperExists := func() bool {
u.scrapersMutex.Lock()
defer u.scrapersMutex.Unlock()
_, found := u.scrapers[check.Id]
_, found := u.scrapers[check.GlobalID()]
return found
}

Expand Down Expand Up @@ -276,11 +278,11 @@ func (testProber) Probe(ctx context.Context, target string, registry *prometheus
type testProbeFactory struct {
}

func (f testProbeFactory) New(ctx context.Context, logger zerolog.Logger, check sm.Check) (prober.Prober, string, error) {
func (f testProbeFactory) New(ctx context.Context, logger zerolog.Logger, check model.Check) (prober.Prober, string, error) {
return testProber{}, check.Target, nil
}

func testScraperFactory(ctx context.Context, check sm.Check, publisher pusher.Publisher, _ sm.Probe, logger zerolog.Logger, scrapeCounter scraper.Incrementer, scrapeErrorCounter scraper.IncrementerVec, k6Runner k6runner.Runner) (*scraper.Scraper, error) {
func testScraperFactory(ctx context.Context, check model.Check, publisher pusher.Publisher, _ sm.Probe, logger zerolog.Logger, scrapeCounter scraper.Incrementer, scrapeErrorCounter scraper.IncrementerVec, k6Runner k6runner.Runner) (*scraper.Scraper, error) {
return scraper.NewWithOpts(
ctx,
check,
Expand Down
Loading

0 comments on commit beadaab

Please sign in to comment.