From beadaabb8b8408de166f851125867ad703911d53 Mon Sep 17 00:00:00 2001 From: "Marcelo E. Magallon" Date: Wed, 6 Sep 2023 11:01:49 -0600 Subject: [PATCH] Use local IDs in scraper, global IDs in pusher In order to avoid confusion as to which kind of ID is expected where, introduce a model package that takes a synthetic_monitoring.Check and produces a wrapped one that has the IDs decoded. Introduce a model.GlobaID type to make it explicit which kind of ID is expected. In the scraper use local IDs, except for the lists of checks, which need to use a global one. In the publisher, use global IDs. This allows the compiler to flag misuses. --- internal/adhoc/adhoc.go | 19 ++-- internal/checks/checks.go | 98 ++++++++++------- internal/checks/checks_test.go | 12 +- internal/model/model.go | 68 ++++++++++++ internal/model/model_test.go | 133 +++++++++++++++++++++++ internal/prober/prober.go | 19 ++-- internal/pusher/clients.go | 9 -- internal/pusher/pusher.go | 3 +- internal/pusher/v1/pusher.go | 17 +-- internal/pusher/v2/options.go | 5 +- internal/pusher/v2/publisher.go | 18 +-- internal/pusher/v2/tenant_pusher.go | 9 +- internal/pusher/v2/tenant_pusher_test.go | 39 ++++--- internal/scraper/scraper.go | 34 +++--- internal/scraper/scraper_test.go | 34 +++--- 15 files changed, 371 insertions(+), 146 deletions(-) create mode 100644 internal/model/model.go create mode 100644 internal/model/model_test.go diff --git a/internal/adhoc/adhoc.go b/internal/adhoc/adhoc.go index deb8dd4b..f07fe78e 100644 --- a/internal/adhoc/adhoc.go +++ b/internal/adhoc/adhoc.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/synthetic-monitoring-agent/internal/feature" "github.com/grafana/synthetic-monitoring-agent/internal/k6runner" + "github.com/grafana/synthetic-monitoring-agent/internal/model" "github.com/grafana/synthetic-monitoring-agent/internal/pkg/logproto" "github.com/grafana/synthetic-monitoring-agent/internal/prober" "github.com/grafana/synthetic-monitoring-agent/internal/pusher" @@ -362,7 +363,7 @@ func (h *Handler) handleAdHocCheck(ctx context.Context, ahReq *sm.AdHocRequest) return err } - go runner.Run(ctx, ahReq.AdHocCheck.TenantId, h.publisher) + go runner.Run(ctx, model.GlobalID(ahReq.AdHocCheck.TenantId), h.publisher) // If there's a tenant in the request, this should be forwarded // to the changes handler. @@ -383,10 +384,12 @@ func defaultGrpcAdhocChecksClientFactory(conn ClientConn) (sm.AdHocChecksClient, } func (h *Handler) defaultRunnerFactory(ctx context.Context, req *sm.AdHocRequest) (*runner, error) { - check := sm.Check{ - Target: req.AdHocCheck.Target, - Timeout: req.AdHocCheck.Timeout, - Settings: req.AdHocCheck.Settings, + check := model.Check{ + Check: sm.Check{ + Target: req.AdHocCheck.Target, + Timeout: req.AdHocCheck.Timeout, + Settings: req.AdHocCheck.Settings, + }, } p, target, err := h.proberFactory.New(ctx, h.logger, check) @@ -447,7 +450,7 @@ func (l *jsonLogger) Log(keyvals ...interface{}) error { // Run runs the specified prober once and captures the results using // jsonLogger. -func (r *runner) Run(ctx context.Context, tenantId int64, publisher pusher.Publisher) { +func (r *runner) Run(ctx context.Context, tenantId model.GlobalID, publisher pusher.Publisher) { r.logger.Info().Msg("running ad-hoc check") registry := prometheus.NewRegistry() @@ -531,7 +534,7 @@ type TimeSeries = []prompb.TimeSeries type Streams = []logproto.Stream type adhocData struct { - tenantId int64 + tenantId model.GlobalID streams Streams } @@ -543,6 +546,6 @@ func (d adhocData) Streams() Streams { return d.streams } -func (d adhocData) Tenant() int64 { +func (d adhocData) Tenant() model.GlobalID { return d.tenantId } diff --git a/internal/checks/checks.go b/internal/checks/checks.go index bf79029a..a4fe84f4 100644 --- a/internal/checks/checks.go +++ b/internal/checks/checks.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/synthetic-monitoring-agent/internal/feature" "github.com/grafana/synthetic-monitoring-agent/internal/k6runner" + "github.com/grafana/synthetic-monitoring-agent/internal/model" "github.com/grafana/synthetic-monitoring-agent/internal/pkg/logproto" "github.com/grafana/synthetic-monitoring-agent/internal/pusher" "github.com/grafana/synthetic-monitoring-agent/internal/scraper" @@ -71,10 +72,10 @@ type Updater struct { IsConnected func(bool) probe *sm.Probe scrapersMutex sync.Mutex - scrapers map[int64]*scraper.Scraper + scrapers map[model.GlobalID]*scraper.Scraper metrics metrics k6Runner k6runner.Runner - scraperFactory func(context.Context, sm.Check, pusher.Publisher, sm.Probe, zerolog.Logger, scraper.Incrementer, scraper.IncrementerVec, k6runner.Runner) (*scraper.Scraper, error) + scraperFactory func(context.Context, model.Check, pusher.Publisher, sm.Probe, zerolog.Logger, scraper.Incrementer, scraper.IncrementerVec, k6runner.Runner) (*scraper.Scraper, error) } type apiInfo struct { @@ -104,7 +105,7 @@ type UpdaterOptions struct { PromRegisterer prometheus.Registerer Features feature.Collection K6Runner k6runner.Runner - ScraperFactory func(context.Context, sm.Check, pusher.Publisher, sm.Probe, zerolog.Logger, scraper.Incrementer, scraper.IncrementerVec, k6runner.Runner) (*scraper.Scraper, error) + ScraperFactory func(context.Context, model.Check, pusher.Publisher, sm.Probe, zerolog.Logger, scraper.Incrementer, scraper.IncrementerVec, k6runner.Runner) (*scraper.Scraper, error) } func NewUpdater(opts UpdaterOptions) (*Updater, error) { @@ -155,6 +156,7 @@ func NewUpdater(opts UpdaterOptions) (*Updater, error) { }, []string{ "type", "tenantId", + "regionId", }) if err := opts.PromRegisterer.Register(scrapesCounter); err != nil { @@ -170,6 +172,7 @@ func NewUpdater(opts UpdaterOptions) (*Updater, error) { "type", "source", "tenantId", + "regionId", }) if err := opts.PromRegisterer.Register(scrapeErrorCounter); err != nil { @@ -220,7 +223,7 @@ func NewUpdater(opts UpdaterOptions) (*Updater, error) { publisher: opts.Publisher, tenantCh: opts.TenantCh, IsConnected: opts.IsConnected, - scrapers: make(map[int64]*scraper.Scraper), + scrapers: make(map[model.GlobalID]*scraper.Scraper), k6Runner: opts.K6Runner, scraperFactory: scraperFactory, metrics: metrics{ @@ -418,7 +421,7 @@ func (c *Updater) loop(ctx context.Context) (bool, error) { for cID, scraper := range c.scrapers { knownChecks.Checks = append(knownChecks.Checks, sm.EntityRef{ - Id: cID, + Id: int64(cID), LastModified: scraper.LastModified(), }) } @@ -567,7 +570,7 @@ func (c *Updater) processChanges(ctx context.Context, cc sm.Checks_GetChangesCli } } -func (c *Updater) handleCheckAdd(ctx context.Context, check sm.Check) error { +func (c *Updater) handleCheckAdd(ctx context.Context, check model.Check) error { c.metrics.changesCounter.WithLabelValues("add").Inc() if err := check.Validate(); err != nil { @@ -577,19 +580,19 @@ func (c *Updater) handleCheckAdd(ctx context.Context, check sm.Check) error { c.scrapersMutex.Lock() defer c.scrapersMutex.Unlock() - if running, found := c.scrapers[check.Id]; found { + if running, found := c.scrapers[check.GlobalID()]; found { // we can get here if the API sent us a check add twice: // once during the initial connection and another right // after that. The window for that is small, but it // exists. - return fmt.Errorf("check with id %d already exists (version %s)", check.Id, running.ConfigVersion()) + return fmt.Errorf("check with id %d already exists (version %s)", check.GlobalID(), running.ConfigVersion()) } return c.addAndStartScraperWithLock(ctx, check) } -func (c *Updater) handleCheckUpdate(ctx context.Context, check sm.Check) error { +func (c *Updater) handleCheckUpdate(ctx context.Context, check model.Check) error { c.metrics.changesCounter.WithLabelValues("update").Inc() if err := check.Validate(); err != nil { @@ -604,10 +607,12 @@ func (c *Updater) handleCheckUpdate(ctx context.Context, check sm.Check) error { // handleCheckUpdateWithLock is the bottom half of handleCheckUpdate. It // MUST be called with the scrapersMutex lock held. -func (c *Updater) handleCheckUpdateWithLock(ctx context.Context, check sm.Check) error { - scraper, found := c.scrapers[check.Id] +func (c *Updater) handleCheckUpdateWithLock(ctx context.Context, check model.Check) error { + cid := check.GlobalID() + + scraper, found := c.scrapers[cid] if !found { - withCheckID(c.logger.Warn(), check.Id).Msg("update request for an unknown check") + c.logger.Warn().Int64("check_id", check.Id).Int("region_id", check.RegionId).Msg("update request for an unknown check") return c.addAndStartScraperWithLock(ctx, check) } @@ -616,29 +621,31 @@ func (c *Updater) handleCheckUpdateWithLock(ctx context.Context, check sm.Check) scraper.Stop() checkType := scraper.CheckType().String() - delete(c.scrapers, check.Id) + delete(c.scrapers, cid) c.metrics.runningScrapers.WithLabelValues(checkType).Dec() return c.addAndStartScraperWithLock(ctx, check) } -func (c *Updater) handleCheckDelete(ctx context.Context, check sm.Check) error { +func (c *Updater) handleCheckDelete(ctx context.Context, check model.Check) error { c.metrics.changesCounter.WithLabelValues("delete").Inc() + cid := check.GlobalID() + c.scrapersMutex.Lock() defer c.scrapersMutex.Unlock() - scraper, found := c.scrapers[check.Id] + scraper, found := c.scrapers[cid] if !found { - withCheckID(c.logger.Warn(), check.Id).Msg("delete request for an unknown check") + c.logger.Warn().Int64("check_id", check.Id).Int("region_id", check.RegionId).Msg("delete request for an unknown check") return errors.New("check not found") } scraper.Stop() checkType := scraper.CheckType().String() - delete(c.scrapers, check.Id) + delete(c.scrapers, cid) c.metrics.runningScrapers.WithLabelValues(checkType).Dec() @@ -661,7 +668,7 @@ func (c *Updater) handleCheckDelete(ctx context.Context, check sm.Check) error { // We have to do this exactly once per reconnect. It's up to the calling code // to ensure this. func (c *Updater) handleFirstBatch(ctx context.Context, changes *sm.Changes) { - newChecks := make(map[int64]struct{}) + newChecks := make(map[model.GlobalID]struct{}) c.scrapersMutex.Lock() defer c.scrapersMutex.Unlock() @@ -672,16 +679,19 @@ func (c *Updater) handleFirstBatch(ctx context.Context, changes *sm.Changes) { switch checkChange.Operation { case sm.CheckOperation_CHECK_ADD: - if err := c.handleInitialChangeAddWithLock(ctx, checkChange.Check); err != nil { + var check model.Check + check.FromSM(checkChange.Check) + + if err := c.handleInitialChangeAddWithLock(ctx, check); err != nil { c.metrics.changeErrorsCounter.WithLabelValues("add").Inc() - withCheckID(c.logger.Error().Err(err), checkChange.Check.Id). + c.logger.Error().Err(err).Int64("check_id", check.Id).Int("region_id", check.RegionId). Msg("adding check failed, dropping check") continue } // add this to the list of checks we have seen during // this operation - newChecks[checkChange.Check.Id] = struct{}{} + newChecks[check.GlobalID()] = struct{}{} default: // we should never hit this because the first time we @@ -700,7 +710,10 @@ func (c *Updater) handleFirstBatch(ctx context.Context, changes *sm.Changes) { continue } - withCheckID(c.logger.Debug(), id). + cid, rid := model.GetLocalAndRegionIDs(id) + c.logger.Debug(). + Int64("check_id", cid). + Int("region_id", rid). Msg("stopping scraper during first batch handling") checkType := scraper.CheckType().String() @@ -718,8 +731,8 @@ func (c *Updater) handleFirstBatch(ctx context.Context, changes *sm.Changes) { // and changes the operation to an update if necessary. // // This function MUST be called with the scrapers mutex held. -func (c *Updater) handleInitialChangeAddWithLock(ctx context.Context, check sm.Check) error { - if running, found := c.scrapers[check.Id]; found { +func (c *Updater) handleInitialChangeAddWithLock(ctx context.Context, check model.Check) error { + if running, found := c.scrapers[check.GlobalID()]; found { oldVersion := running.ConfigVersion() newVersion := check.ConfigVersion() @@ -762,21 +775,24 @@ func (c *Updater) handleChangeBatch(ctx context.Context, changes *sm.Changes, fi for _, checkChange := range changes.Checks { c.logger.Debug().Interface("check change", checkChange).Msg("got check change") + var check model.Check + check.FromSM(checkChange.Check) + switch checkChange.Operation { case sm.CheckOperation_CHECK_ADD: - if err := c.handleCheckAdd(ctx, checkChange.Check); err != nil { + if err := c.handleCheckAdd(ctx, check); err != nil { c.metrics.changeErrorsCounter.WithLabelValues("add").Inc() c.logger.Error().Err(err).Msg("handling check add") } case sm.CheckOperation_CHECK_UPDATE: - if err := c.handleCheckUpdate(ctx, checkChange.Check); err != nil { + if err := c.handleCheckUpdate(ctx, check); err != nil { c.metrics.changeErrorsCounter.WithLabelValues("update").Inc() c.logger.Error().Err(err).Msg("handling check update") } case sm.CheckOperation_CHECK_DELETE: - if err := c.handleCheckDelete(ctx, checkChange.Check); err != nil { + if err := c.handleCheckDelete(ctx, check); err != nil { c.metrics.changeErrorsCounter.WithLabelValues("delete").Inc() c.logger.Error().Err(err).Msg("handling check delete") } @@ -788,7 +804,7 @@ func (c *Updater) handleChangeBatch(ctx context.Context, changes *sm.Changes, fi // scrapers managed by this updater and starts running it. // // This MUST be called with the scrapersMutex held. -func (c *Updater) addAndStartScraperWithLock(ctx context.Context, check sm.Check) error { +func (c *Updater) addAndStartScraperWithLock(ctx context.Context, check model.Check) error { // This is a good place to filter out checks by feature flags. // // If we need to accept checks based on whether a feature flag @@ -814,11 +830,22 @@ func (c *Updater) addAndStartScraperWithLock(ctx context.Context, check sm.Check checkType := check.Type().String() - scrapeCounter := c.metrics.scrapesCounter.WithLabelValues(checkType, strconv.FormatInt(check.TenantId, 10)) + tidStr := strconv.FormatInt(check.TenantId, 10) + ridStr := strconv.Itoa(check.RegionId) + + scrapeCounter, err := c.metrics.scrapesCounter.GetMetricWith(prometheus.Labels{ + "type": checkType, + "tenantId": tidStr, + "regionId": ridStr, + }) + if err != nil { + return err + } scrapeErrorCounter, err := c.metrics.scrapeErrorCounter.CurryWith(prometheus.Labels{ "type": checkType, - "tenantId": strconv.FormatInt(check.TenantId, 10), + "tenantId": tidStr, + "regionId": ridStr, }) if err != nil { return err @@ -829,7 +856,7 @@ func (c *Updater) addAndStartScraperWithLock(ctx context.Context, check sm.Check return fmt.Errorf("cannot create new scraper: %w", err) } - c.scrapers[check.Id] = scraper + c.scrapers[check.GlobalID()] = scraper go scraper.Run(ctx) @@ -858,12 +885,3 @@ func sleepCtx(ctx context.Context, d time.Duration) error { return err } - -// Helper to add a check ID and optionally a region ID to a log message. -func withCheckID(ev *zerolog.Event, checkID int64) *zerolog.Event { - if localID, regionID, err := sm.GlobalIDToLocalID(checkID); err != nil { - ev = ev.Int("region_id", regionID) - checkID = localID - } - return ev.Int64("check_id", checkID) -} diff --git a/internal/checks/checks_test.go b/internal/checks/checks_test.go index 82200921..32cde254 100644 --- a/internal/checks/checks_test.go +++ b/internal/checks/checks_test.go @@ -15,6 +15,7 @@ import ( "github.com/grafana/synthetic-monitoring-agent/internal/feature" "github.com/grafana/synthetic-monitoring-agent/internal/k6runner" + "github.com/grafana/synthetic-monitoring-agent/internal/model" "github.com/grafana/synthetic-monitoring-agent/internal/prober" "github.com/grafana/synthetic-monitoring-agent/internal/prober/logger" "github.com/grafana/synthetic-monitoring-agent/internal/pusher" @@ -183,7 +184,8 @@ func TestHandleCheckOp(t *testing.T) { ctx, cancel := context.WithDeadline(context.Background(), deadline) defer cancel() - check := sm.Check{ + var check model.Check + check.FromSM(sm.Check{ Id: 5000, TenantId: 1, Frequency: 1000, @@ -196,12 +198,12 @@ func TestHandleCheckOp(t *testing.T) { }, Created: 0, Modified: 0, - } + }) scraperExists := func() bool { u.scrapersMutex.Lock() defer u.scrapersMutex.Unlock() - _, found := u.scrapers[check.Id] + _, found := u.scrapers[check.GlobalID()] return found } @@ -276,11 +278,11 @@ func (testProber) Probe(ctx context.Context, target string, registry *prometheus type testProbeFactory struct { } -func (f testProbeFactory) New(ctx context.Context, logger zerolog.Logger, check sm.Check) (prober.Prober, string, error) { +func (f testProbeFactory) New(ctx context.Context, logger zerolog.Logger, check model.Check) (prober.Prober, string, error) { return testProber{}, check.Target, nil } -func testScraperFactory(ctx context.Context, check sm.Check, publisher pusher.Publisher, _ sm.Probe, logger zerolog.Logger, scrapeCounter scraper.Incrementer, scrapeErrorCounter scraper.IncrementerVec, k6Runner k6runner.Runner) (*scraper.Scraper, error) { +func testScraperFactory(ctx context.Context, check model.Check, publisher pusher.Publisher, _ sm.Probe, logger zerolog.Logger, scrapeCounter scraper.Incrementer, scrapeErrorCounter scraper.IncrementerVec, k6Runner k6runner.Runner) (*scraper.Scraper, error) { return scraper.NewWithOpts( ctx, check, diff --git a/internal/model/model.go b/internal/model/model.go new file mode 100644 index 00000000..661b15d3 --- /dev/null +++ b/internal/model/model.go @@ -0,0 +1,68 @@ +package model + +import ( + "fmt" + + sm "github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring" +) + +type GlobalID int64 + +type Check struct { + sm.Check + RegionId int `json:"regionId"` +} + +func (c *Check) FromSM(check sm.Check) { + // This implementation is a bit wasteful, but it ensures that it + // remains in sync with the protobuf definition. + + data, err := check.Marshal() + if err != nil { + panic(err) + } + + if err := c.Check.Unmarshal(data); err != nil { + panic(err) + } + + cid, crid := GetLocalAndRegionIDs(GlobalID(check.Id)) + tid, trid := GetLocalAndRegionIDs(GlobalID(check.TenantId)) + + if crid != trid { + panic(fmt.Sprintf("inconsistent region ids %d and %d, checkId %d, tenantId %d", crid, trid, check.Id, check.TenantId)) + } + + c.Id = cid + c.TenantId = tid + c.RegionId = crid +} + +func (c *Check) GlobalID() GlobalID { + id, err := sm.LocalIDToGlobalID(c.Id, c.RegionId) + if err != nil { + return GlobalID(c.Id) + } + return GlobalID(id) +} + +func (c *Check) GlobalTenantID() GlobalID { + id, err := sm.LocalIDToGlobalID(c.TenantId, c.RegionId) + if err != nil { + return GlobalID(c.TenantId) + } + return GlobalID(id) +} + +// GetLocalAndRegionIDs takes a Global ID as specified in the sm data +// structures and returns a pair of ids corresponding to the local ID and the +// region ID. If the provided id is already a local one, it's returned without +// modification with the region set to 0. +func GetLocalAndRegionIDs(id GlobalID) (localID int64, regionID int) { + localID, regionID, err := sm.GlobalIDToLocalID(int64(id)) + if err != nil { + // Id is already local, use region 0. + return int64(id), 0 + } + return localID, regionID +} diff --git a/internal/model/model_test.go b/internal/model/model_test.go new file mode 100644 index 00000000..2e8fbf1a --- /dev/null +++ b/internal/model/model_test.go @@ -0,0 +1,133 @@ +package model + +import ( + "testing" + + sm "github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring" + "github.com/stretchr/testify/require" +) + +func TestGetLocalAndRegionIDs(t *testing.T) { + type expected struct { + localID int64 + regionID int + } + + testcases := map[string]struct { + input GlobalID + expected expected + }{ + "local id": { + input: 1, + expected: expected{localID: 1, regionID: 0}, + }, + "min local id, min region id": { + input: localToGlobal(t, sm.MinLocalID, sm.MinRegionID), + expected: expected{localID: sm.MinLocalID, regionID: sm.MinRegionID}, + }, + "min local id, max region id": { + input: localToGlobal(t, sm.MinLocalID, sm.MaxRegionID), + expected: expected{localID: sm.MinLocalID, regionID: sm.MaxRegionID}, + }, + "max local id, min region id": { + input: localToGlobal(t, sm.MaxLocalID, sm.MinRegionID), + expected: expected{localID: sm.MaxLocalID, regionID: sm.MinRegionID}, + }, + "max local id, max region id": { + input: localToGlobal(t, sm.MaxLocalID, sm.MaxRegionID), + expected: expected{localID: sm.MaxLocalID, regionID: sm.MaxRegionID}, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + localID, regionID := GetLocalAndRegionIDs(tc.input) + require.Equal(t, tc.expected.localID, localID) + require.Equal(t, tc.expected.regionID, regionID) + }) + } +} + +func TestCheckFromSM(t *testing.T) { + var ( + testRid = sm.MinRegionID + testCid = int64(sm.MinLocalID) + testTid = int64(sm.MinLocalID + 1) + testGlobalCid = int64(localToGlobal(t, testCid, testRid)) + testGlobalTid = int64(localToGlobal(t, testTid, testRid)) + ) + + type expected struct { + check Check + } + + testcases := map[string]struct { + input sm.Check + expected expected + }{ + "empty check": { + input: sm.Check{}, + expected: expected{check: Check{}}, + }, + "local ids": { + input: sm.Check{ + Id: testCid, + TenantId: testTid, + Settings: sm.CheckSettings{ + Ping: &sm.PingSettings{}, + }, + }, + expected: expected{ + check: Check{ + Check: sm.Check{ + Id: testCid, + TenantId: testTid, + Settings: sm.CheckSettings{ + Ping: &sm.PingSettings{}, + }, + }, + RegionId: 0, + }, + }, + }, + "global ids": { + input: sm.Check{ + Id: testGlobalCid, + TenantId: testGlobalTid, + Settings: sm.CheckSettings{ + Ping: &sm.PingSettings{}, + }, + }, + expected: expected{ + check: Check{ + Check: sm.Check{ + Id: testCid, + TenantId: testTid, + Settings: sm.CheckSettings{ + Ping: &sm.PingSettings{}, + }, + }, + RegionId: testRid, + }, + }, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + var c Check + c.FromSM(tc.input) + + require.Equal(t, tc.expected.check, c) + require.Equal(t, GlobalID(tc.input.Id), c.GlobalID()) + require.Equal(t, GlobalID(tc.input.TenantId), c.GlobalTenantID()) + }) + } +} + +func localToGlobal(t *testing.T, localID int64, regionID int) GlobalID { + t.Helper() + globalID, err := sm.LocalIDToGlobalID(localID, regionID) + require.NoError(t, err) + return GlobalID(globalID) +} diff --git a/internal/prober/prober.go b/internal/prober/prober.go index 332e68bd..3fba567d 100644 --- a/internal/prober/prober.go +++ b/internal/prober/prober.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/grafana/synthetic-monitoring-agent/internal/k6runner" + "github.com/grafana/synthetic-monitoring-agent/internal/model" "github.com/grafana/synthetic-monitoring-agent/internal/prober/dns" "github.com/grafana/synthetic-monitoring-agent/internal/prober/http" "github.com/grafana/synthetic-monitoring-agent/internal/prober/icmp" @@ -28,7 +29,7 @@ func Run(ctx context.Context, p Prober, target string, registry *prometheus.Regi } type ProberFactory interface { - New(ctx context.Context, logger zerolog.Logger, check sm.Check) (Prober, string, error) + New(ctx context.Context, logger zerolog.Logger, check model.Check) (Prober, string, error) } type proberFactory struct { @@ -41,7 +42,7 @@ func NewProberFactory(runner k6runner.Runner) ProberFactory { } } -func (f proberFactory) New(ctx context.Context, logger zerolog.Logger, check sm.Check) (Prober, string, error) { +func (f proberFactory) New(ctx context.Context, logger zerolog.Logger, check model.Check) (Prober, string, error) { var ( p Prober target string @@ -50,28 +51,28 @@ func (f proberFactory) New(ctx context.Context, logger zerolog.Logger, check sm. switch checkType := check.Type(); checkType { case sm.CheckTypePing: - p, err = icmp.NewProber(check) + p, err = icmp.NewProber(check.Check) target = check.Target case sm.CheckTypeHttp: - p, err = http.NewProber(ctx, check, logger) + p, err = http.NewProber(ctx, check.Check, logger) target = check.Target case sm.CheckTypeDns: - p, err = dns.NewProber(check) + p, err = dns.NewProber(check.Check) target = check.Settings.Dns.Server case sm.CheckTypeTcp: - p, err = tcp.NewProber(ctx, check, logger) + p, err = tcp.NewProber(ctx, check.Check, logger) target = check.Target case sm.CheckTypeTraceroute: - p, err = traceroute.NewProber(check, logger) + p, err = traceroute.NewProber(check.Check, logger) target = check.Target case sm.CheckTypeK6: if f.runner != nil { - p, err = k6.NewProber(ctx, check, logger, f.runner) + p, err = k6.NewProber(ctx, check.Check, logger, f.runner) target = check.Target } else { err = fmt.Errorf("k6 checks are not enabled") @@ -79,7 +80,7 @@ func (f proberFactory) New(ctx context.Context, logger zerolog.Logger, check sm. case sm.CheckTypeMultiHttp: if f.runner != nil { - p, err = multihttp.NewProber(ctx, check, logger, f.runner) + p, err = multihttp.NewProber(ctx, check.Check, logger, f.runner) target = check.Target } else { err = fmt.Errorf("k6 checks are not enabled") diff --git a/internal/pusher/clients.go b/internal/pusher/clients.go index 29b86d9d..eaa6c13e 100644 --- a/internal/pusher/clients.go +++ b/internal/pusher/clients.go @@ -41,12 +41,3 @@ func ClientFromRemoteInfo(remote *sm.RemoteInfo) (*prom.ClientConfig, error) { clientCfg.Headers["X-Prometheus-Remote-Write-Version"] = "0.1.0" return &clientCfg, nil } - -func GetLocalAndRegionIDs(id int64) (localID int64, regionID int) { - var err error - if localID, regionID, err = sm.GlobalIDToLocalID(id); err != nil { - // Id is already local, use region 0. - return id, 0 - } - return localID, regionID -} diff --git a/internal/pusher/pusher.go b/internal/pusher/pusher.go index da51e403..14f08021 100644 --- a/internal/pusher/pusher.go +++ b/internal/pusher/pusher.go @@ -7,12 +7,13 @@ import ( "github.com/prometheus/prometheus/prompb" "github.com/rs/zerolog" + "github.com/grafana/synthetic-monitoring-agent/internal/model" "github.com/grafana/synthetic-monitoring-agent/internal/pkg/logproto" sm "github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring" ) type Payload interface { - Tenant() int64 + Tenant() model.GlobalID Metrics() []prompb.TimeSeries Streams() []logproto.Stream } diff --git a/internal/pusher/v1/pusher.go b/internal/pusher/v1/pusher.go index 50f9d6fd..32be3171 100644 --- a/internal/pusher/v1/pusher.go +++ b/internal/pusher/v1/pusher.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/prometheus/prompb" "github.com/rs/zerolog" + "github.com/grafana/synthetic-monitoring-agent/internal/model" "github.com/grafana/synthetic-monitoring-agent/internal/pkg/logproto" "github.com/grafana/synthetic-monitoring-agent/internal/pkg/loki" "github.com/grafana/synthetic-monitoring-agent/internal/pkg/prom" @@ -41,7 +42,7 @@ type publisherImpl struct { tenantManager pusher.TenantProvider logger zerolog.Logger clientsMutex sync.Mutex - clients map[int64]*remoteTarget + clients map[model.GlobalID]*remoteTarget metrics pusher.Metrics } @@ -51,7 +52,7 @@ func NewPublisher(ctx context.Context, tm pusher.TenantProvider, logger zerolog. return &publisherImpl{ ctx: ctx, tenantManager: tm, - clients: make(map[int64]*remoteTarget), + clients: make(map[model.GlobalID]*remoteTarget), logger: logger, metrics: pusher.NewMetrics(promRegisterer), } @@ -68,7 +69,7 @@ func (p *publisherImpl) publish(ctx context.Context, payload pusher.Payload) { // The above tenant ID is potentially a global ID. This is valid // for using internally but in logs and metrics we want to publish // the region and local tenant ID. - localID, regionID = pusher.GetLocalAndRegionIDs(tenantID) + localID, regionID = model.GetLocalAndRegionIDs(tenantID) regionStr = strconv.FormatInt(int64(regionID), 10) tenantStr = strconv.FormatInt(localID, 10) @@ -153,13 +154,13 @@ func (p *publisherImpl) pushMetrics(ctx context.Context, client *prom.Client, me return len(*buf), nil } -func (p *publisherImpl) getClient(ctx context.Context, tenantId int64, newClient bool) (*remoteTarget, error) { +func (p *publisherImpl) getClient(ctx context.Context, tenantId model.GlobalID, newClient bool) (*remoteTarget, error) { var ( client *remoteTarget found bool ) - localID, regionID := pusher.GetLocalAndRegionIDs(tenantId) + localID, regionID := model.GetLocalAndRegionIDs(tenantId) p.clientsMutex.Lock() if newClient { @@ -177,7 +178,7 @@ func (p *publisherImpl) getClient(ctx context.Context, tenantId int64, newClient p.logger.Info().Int("regionId", regionID).Int64("tenantId", localID).Msg("fetching tenant credentials") req := sm.TenantInfo{ - Id: tenantId, + Id: int64(tenantId), } tenant, err := p.tenantManager.GetTenant(ctx, &req) if err != nil { @@ -193,7 +194,7 @@ func (p *publisherImpl) updateClient(tenant *sm.Tenant) (*remoteTarget, error) { return nil, fmt.Errorf("creating metrics client configuration: %w", err) } - localID, regionID := pusher.GetLocalAndRegionIDs(tenant.Id) + localID, regionID := model.GetLocalAndRegionIDs(model.GlobalID(tenant.Id)) regionStr := strconv.FormatInt(int64(regionID), 10) tenantStr := strconv.FormatInt(localID, 10) @@ -223,7 +224,7 @@ func (p *publisherImpl) updateClient(tenant *sm.Tenant) (*remoteTarget, error) { } p.clientsMutex.Lock() - p.clients[tenant.Id] = clients + p.clients[model.GlobalID(tenant.Id)] = clients p.clientsMutex.Unlock() p.logger.Debug().Int("regionId", regionID).Int64("tenantId", localID).Int64("stackId", tenant.StackId).Msg("updated client") diff --git a/internal/pusher/v2/options.go b/internal/pusher/v2/options.go index 0356d6c7..c74b0c96 100644 --- a/internal/pusher/v2/options.go +++ b/internal/pusher/v2/options.go @@ -7,6 +7,7 @@ import ( "github.com/rs/zerolog" + "github.com/grafana/synthetic-monitoring-agent/internal/model" "github.com/grafana/synthetic-monitoring-agent/internal/pusher" ) @@ -89,8 +90,8 @@ type pusherOptions struct { pool bufferPool } -func (o pusherOptions) withTenant(id int64) pusherOptions { - localID, regionID := pusher.GetLocalAndRegionIDs(id) +func (o pusherOptions) withTenant(id model.GlobalID) pusherOptions { + localID, regionID := model.GetLocalAndRegionIDs(id) o.logger = o.logger.With().Int("region", regionID).Int64("tenant", localID).Logger() o.metrics = o.metrics.WithTenant(localID, regionID) return o diff --git a/internal/pusher/v2/publisher.go b/internal/pusher/v2/publisher.go index 31644dc9..c43f00f0 100644 --- a/internal/pusher/v2/publisher.go +++ b/internal/pusher/v2/publisher.go @@ -7,6 +7,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog" + "github.com/grafana/synthetic-monitoring-agent/internal/model" "github.com/grafana/synthetic-monitoring-agent/internal/pusher" ) @@ -20,7 +21,7 @@ func NewPublisher(ctx context.Context, tenantProvider pusher.TenantProvider, log ctx: ctx, tenantProvider: tenantProvider, options: defaultPusherOptions, - handlers: make(map[int64]payloadHandler), + handlers: make(map[model.GlobalID]payloadHandler), } impl.options.logger = logger impl.options.metrics = pusher.NewMetrics(pr) @@ -40,7 +41,7 @@ type publisherImpl struct { tenantProvider pusher.TenantProvider options pusherOptions handlerMutex sync.Mutex // protects the handlers map - handlers map[int64]payloadHandler + handlers map[model.GlobalID]payloadHandler } var _ pusher.Publisher = &publisherImpl{} @@ -59,15 +60,16 @@ func (p *publisherImpl) Publish(payload pusher.Payload) { handler.publish(payload) } -func (p *publisherImpl) runHandler(tenantID int64, h payloadHandler) { - p.options.logger.Info().Int64("tenantID", tenantID).Msg("started push handler") - defer p.options.logger.Info().Int64("tenantID", tenantID).Msg("stopped push handler") +func (p *publisherImpl) runHandler(tenantID model.GlobalID, h payloadHandler) { + tid, rid := model.GetLocalAndRegionIDs(tenantID) + p.options.logger.Info().Int64("tenant_id", tid).Int("region_id", rid).Msg("started push handler") + defer p.options.logger.Info().Int64("tenant_id", tid).Int("region_id", rid).Msg("stopped push handler") for ok := true; ok && h != nil; { next := h.run(p.ctx) h, ok = p.replaceHandler(tenantID, h, next) if !ok { - p.options.logger.Error().Int64("tenantID", tenantID).Msg("unable to swap handler, tenant hijacked") + p.options.logger.Error().Int64("tenant_id", tid).Int("region_id", rid).Msg("unable to swap handler, tenant hijacked") } } } @@ -80,7 +82,7 @@ func (p *publisherImpl) runHandler(tenantID int64, h payloadHandler) { // // The handler currently in effect is returned, along with whether the handler // was changed or not. -func (p *publisherImpl) replaceHandler(tenantID int64, old, new payloadHandler) (payloadHandler, bool) { +func (p *publisherImpl) replaceHandler(tenantID model.GlobalID, old, new payloadHandler) (payloadHandler, bool) { p.handlerMutex.Lock() defer p.handlerMutex.Unlock() @@ -112,7 +114,7 @@ func (p *publisherImpl) replaceHandler(tenantID int64, old, new payloadHandler) return new, true } -func (p *publisherImpl) getHandler(tenantID int64) (payloadHandler, bool) { +func (p *publisherImpl) getHandler(tenantID model.GlobalID) (payloadHandler, bool) { p.handlerMutex.Lock() defer p.handlerMutex.Unlock() diff --git a/internal/pusher/v2/tenant_pusher.go b/internal/pusher/v2/tenant_pusher.go index cc2fc63b..45abfd3a 100644 --- a/internal/pusher/v2/tenant_pusher.go +++ b/internal/pusher/v2/tenant_pusher.go @@ -11,6 +11,7 @@ import ( "github.com/prometheus/prometheus/prompb" "golang.org/x/sync/errgroup" + "github.com/grafana/synthetic-monitoring-agent/internal/model" "github.com/grafana/synthetic-monitoring-agent/internal/pkg/logproto" "github.com/grafana/synthetic-monitoring-agent/internal/pusher" sm "github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring" @@ -18,8 +19,8 @@ import ( // tenantPusher is in charge of pushing changes for a specific tenant. type tenantPusher struct { - tenantID int64 - pushCounter uint64 + tenantID model.GlobalID + pushCounter uint64 // FIXME(mem) logs, metrics queue tenantProvider pusher.TenantProvider options pusherOptions @@ -27,7 +28,7 @@ type tenantPusher struct { var _ payloadHandler = &tenantPusher{} -func newTenantPusher(tenantID int64, tenantProvider pusher.TenantProvider, options pusherOptions) *tenantPusher { +func newTenantPusher(tenantID model.GlobalID, tenantProvider pusher.TenantProvider, options pusherOptions) *tenantPusher { mOptions := options.withType(pusher.LabelValueMetrics) eOptions := options.withType(pusher.LabelValueLogs) tp := &tenantPusher{ @@ -91,7 +92,7 @@ func (p *tenantPusher) run(ctx context.Context) payloadHandler { func (p *tenantPusher) runPushers(ctx context.Context) error { // TODO(adrian): If tenant had the plan in here, we could have different retention for paid tenants. tenant, err := p.tenantProvider.GetTenant(ctx, &sm.TenantInfo{ - Id: p.tenantID, + Id: int64(p.tenantID), }) if err != nil { p.options.metrics.FailedCounter.WithLabelValues(pusher.LabelValueTenant).Inc() diff --git a/internal/pusher/v2/tenant_pusher_test.go b/internal/pusher/v2/tenant_pusher_test.go index b889c6ac..2f90a09f 100644 --- a/internal/pusher/v2/tenant_pusher_test.go +++ b/internal/pusher/v2/tenant_pusher_test.go @@ -11,36 +11,41 @@ import ( "time" "github.com/golang/snappy" + "github.com/prometheus/client_golang/prometheus" + "github.com/grafana/synthetic-monitoring-agent/internal/pusher" sm "github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring" ) func TestTenantPusher(t *testing.T) { + // This is an extremely basic test that verifies that a tenant pusher + // can be constructed. tenantProvider := testTenantProvider{ 1: &sm.Tenant{ Id: 1, OrgId: 1, - MetricsRemote: nil, - EventsRemote: nil, + MetricsRemote: &sm.RemoteInfo{}, + EventsRemote: &sm.RemoteInfo{}, Status: sm.TenantStatus_ACTIVE, }, } - for title, tc := range map[string]struct { - tenantID int64 - options pusherOptions - }{} { - t.Run(title, func(t *testing.T) { - p := newTenantPusher(tc.tenantID, tenantProvider, tc.options) - deadline, ok := t.Deadline() - if !ok { - deadline = time.Now().Add(time.Minute * 5) - } - ctx, cancel := context.WithDeadline(context.Background(), deadline) - defer cancel() - go p.run(ctx) - }) - } + registry := prometheus.NewPedanticRegistry() + metrics := pusher.NewMetrics(registry) + + p := newTenantPusher(1, tenantProvider, pusherOptions{ + metrics: metrics, + }) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(50*time.Millisecond)) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + p.run(ctx) + wg.Done() + }() + wg.Wait() } func makeRecords(blocks [][]byte) []queueEntry { diff --git a/internal/scraper/scraper.go b/internal/scraper/scraper.go index f26ed3f1..e0d22eaa 100644 --- a/internal/scraper/scraper.go +++ b/internal/scraper/scraper.go @@ -17,11 +17,12 @@ import ( "github.com/mmcloughlin/geohash" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/model" + prom "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" "github.com/rs/zerolog" "github.com/grafana/synthetic-monitoring-agent/internal/k6runner" + "github.com/grafana/synthetic-monitoring-agent/internal/model" "github.com/grafana/synthetic-monitoring-agent/internal/pkg/logproto" "github.com/grafana/synthetic-monitoring-agent/internal/prober" "github.com/grafana/synthetic-monitoring-agent/internal/pusher" @@ -66,7 +67,7 @@ type Scraper struct { checkName string target string logger zerolog.Logger - check sm.Check + check model.Check probe sm.Probe prober prober.Prober stop chan struct{} @@ -80,7 +81,7 @@ type TimeSeries = []prompb.TimeSeries type Streams = []logproto.Stream type probeData struct { - tenantId int64 + tenantId model.GlobalID ts TimeSeries streams Streams } @@ -93,11 +94,11 @@ func (d *probeData) Streams() Streams { return d.streams } -func (d *probeData) Tenant() int64 { +func (d *probeData) Tenant() model.GlobalID { return d.tenantId } -func New(ctx context.Context, check sm.Check, publisher pusher.Publisher, probe sm.Probe, logger zerolog.Logger, scrapeCounter Incrementer, errorCounter IncrementerVec, k6runner k6runner.Runner) (*Scraper, error) { +func New(ctx context.Context, check model.Check, publisher pusher.Publisher, probe sm.Probe, logger zerolog.Logger, scrapeCounter Incrementer, errorCounter IncrementerVec, k6runner k6runner.Runner) (*Scraper, error) { return NewWithOpts(ctx, check, ScraperOpts{ Probe: probe, Publisher: publisher, @@ -117,11 +118,13 @@ type ScraperOpts struct { ProbeFactory prober.ProberFactory } -func NewWithOpts(ctx context.Context, check sm.Check, opts ScraperOpts) (*Scraper, error) { +func NewWithOpts(ctx context.Context, check model.Check, opts ScraperOpts) (*Scraper, error) { checkName := check.Type().String() - logger := withCheckID(opts.Logger.With(), check.Id). + logger := opts.Logger.With(). + Int("region_id", check.RegionId). Int64("tenantId", check.TenantId). + Int64("check_id", check.Id). Str("probe", opts.Probe.Name). Str("target", check.Target). Str("job", check.Job). @@ -414,7 +417,7 @@ func (s Scraper) collectData(ctx context.Context, t time.Time) (*probeData, erro // loki does not support joins streams := s.extractLogs(t, logs.Bytes(), logLabels) - return &probeData{ts: ts, streams: streams, tenantId: s.check.TenantId}, err + return &probeData{ts: ts, streams: streams, tenantId: s.check.GlobalTenantID()}, err } func getProbeMetrics( @@ -887,12 +890,12 @@ func updateHistogramFromMetric(mName, help string, m *dto.Metric, histograms map } func hashMetricNameAndLabels(name string, dtoLabels []*dto.LabelPair) uint64 { - ls := model.LabelSet{ - model.MetricNameLabel: model.LabelValue(name), + ls := prom.LabelSet{ + prom.MetricNameLabel: prom.LabelValue(name), } for _, label := range dtoLabels { - ls[model.LabelName(label.GetName())] = model.LabelValue(label.GetValue()) + ls[prom.LabelName(label.GetName())] = prom.LabelValue(label.GetValue()) } return uint64(ls.Fingerprint()) @@ -912,15 +915,6 @@ func getLabels(m *dto.Metric) map[string]string { return labels } -// Helper to add a check ID and optionally a region ID to a log context. -func withCheckID(ctx zerolog.Context, checkID int64) zerolog.Context { - if localID, regionID, err := sm.GlobalIDToLocalID(checkID); err == nil { - ctx = ctx.Int("region_id", regionID) - checkID = localID - } - return ctx.Int64("check_id", checkID) -} - func truncateLabelValue(str string) string { if len(str) > maxLabelValueLength { b := []byte(str[:maxLabelValueLength]) diff --git a/internal/scraper/scraper_test.go b/internal/scraper/scraper_test.go index 016d7a25..7e577a5c 100644 --- a/internal/scraper/scraper_test.go +++ b/internal/scraper/scraper_test.go @@ -22,6 +22,7 @@ import ( "github.com/go-logfmt/logfmt" "github.com/grafana/synthetic-monitoring-agent/internal/k6runner" + "github.com/grafana/synthetic-monitoring-agent/internal/model" "github.com/grafana/synthetic-monitoring-agent/internal/pkg/logproto" "github.com/grafana/synthetic-monitoring-agent/internal/prober" dnsProber "github.com/grafana/synthetic-monitoring-agent/internal/prober/dns" @@ -1146,18 +1147,20 @@ func TestScraperCollectData(t *testing.T) { prober: testProber{}, summaries: make(map[uint64]prometheus.Summary), histograms: make(map[uint64]prometheus.Histogram), - check: sm.Check{ - Id: 1, - TenantId: 2, - Frequency: frequency, - Timeout: frequency, - Enabled: true, - Target: checkTarget, - Job: job, - BasicMetricsOnly: true, - Created: modifiedTs, - Modified: modifiedTs, - Labels: tc.checkLabels, + check: model.Check{ + Check: sm.Check{ + Id: 1, + TenantId: 2, + Frequency: frequency, + Timeout: frequency, + Enabled: true, + Target: checkTarget, + Job: job, + BasicMetricsOnly: true, + Created: modifiedTs, + Modified: modifiedTs, + Labels: tc.checkLabels, + }, }, probe: sm.Probe{ Id: 100, @@ -1384,7 +1387,7 @@ type testProbeFactory struct { builder func() prober.Prober } -func (f testProbeFactory) New(ctx context.Context, logger zerolog.Logger, check sm.Check) (prober.Prober, string, error) { +func (f testProbeFactory) New(ctx context.Context, logger zerolog.Logger, check model.Check) (prober.Prober, string, error) { return f.builder(), check.Target, nil } @@ -1401,7 +1404,8 @@ func TestScraperRun(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) t.Cleanup(cancel) - check := sm.Check{ + var check model.Check + check.FromSM(sm.Check{ Id: 1, TenantId: 1000, Frequency: 100, @@ -1412,7 +1416,7 @@ func TestScraperRun(t *testing.T) { Settings: sm.CheckSettings{ Ping: &sm.PingSettings{}, }, - } + }) var counter testCounter errCounter := testCounterVec{counters: make(map[string]Incrementer), t: t}