From a48d5132b0d8c47e29463a74798a5a332494e8f8 Mon Sep 17 00:00:00 2001 From: Barak Amar Date: Tue, 15 Sep 2020 17:16:15 +0300 Subject: [PATCH] Bugfix/installation id not set and collector posting data is dropped (#616) --- api/api_controller.go | 11 -------- api/handler_test.go | 2 -- auth/metadata.go | 56 ++++++++++++++++++++----------------- cmd/lakefs/cmd/init.go | 2 +- cmd/lakefs/cmd/run.go | 30 +++++++------------- config/config.go | 4 ++- gateway/playback_test.go | 2 -- loadtest/local_load_test.go | 2 -- stats/collector.go | 12 -------- 9 files changed, 44 insertions(+), 77 deletions(-) diff --git a/api/api_controller.go b/api/api_controller.go index 2d7157e6558..96e58336ee2 100644 --- a/api/api_controller.go +++ b/api/api_controller.go @@ -249,17 +249,6 @@ func (c *Controller) SetupLakeFSHandler() setupop.SetupLakeFSHandler { }) } - // write metadata - metadata, err := c.deps.Meta.Write() - if err != nil { - return setupop.NewSetupLakeFSDefault(http.StatusInternalServerError). - WithPayload(&models.Error{ - Message: err.Error(), - }) - } - - c.deps.Collector.SetInstallationID(metadata["installation_id"]) - c.deps.Collector.CollectMetadata(metadata) c.deps.Collector.CollectEvent("global", "init") // setup admin user diff --git a/api/handler_test.go b/api/handler_test.go index d37b3dd1382..c441863b7c6 100644 --- a/api/handler_test.go +++ b/api/handler_test.go @@ -71,8 +71,6 @@ func createDefaultAdminUser(authService auth.Service, t *testing.T) *authmodel.C type mockCollector struct{} -func (m *mockCollector) SetInstallationID(_ string) {} - func (m *mockCollector) CollectMetadata(_ map[string]string) {} func (m *mockCollector) CollectEvent(_, _ string) {} diff --git a/auth/metadata.go b/auth/metadata.go index 1caebff4fcf..f659066c0a8 100644 --- a/auth/metadata.go +++ b/auth/metadata.go @@ -1,7 +1,6 @@ package auth import ( - "errors" "runtime" "time" @@ -12,7 +11,6 @@ import ( ) type MetadataManager interface { - InstallationID() (string, error) SetupTimestamp() (time.Time, error) UpdateSetupTimestamp(time.Time) error Write() (map[string]string, error) @@ -35,6 +33,23 @@ func NewDBMetadataManager(version string, database db.Database) *DBMetadataManag } } +func insertOrGetInstallationID(tx db.Tx) (string, error) { + newInstallationID := uuid.New().String() + res, err := tx.Exec(`INSERT INTO auth_installation_metadata (key_name, key_value) + VALUES ($1,$2) + ON CONFLICT DO NOTHING`, + InstallationIDKeyName, newInstallationID) + if err != nil { + return "", err + } + if affected, err := res.RowsAffected(); err != nil { + return "", err + } else if affected == 1 { + return newInstallationID, nil + } + return getInstallationID(tx) +} + func getInstallationID(tx db.Tx) (string, error) { var installationID string err := tx.Get(&installationID, `SELECT key_value FROM auth_installation_metadata WHERE key_name = $1`, @@ -66,16 +81,6 @@ func writeMetadata(tx sqlx.Execer, items map[string]string) error { return nil } -func (d *DBMetadataManager) InstallationID() (string, error) { - installationID, err := d.db.Transact(func(tx db.Tx) (interface{}, error) { - return getInstallationID(tx) - }, db.WithLogger(logging.Dummy()), db.ReadOnly()) - if err != nil { - return "", err - } - return installationID.(string), nil -} - func (d *DBMetadataManager) UpdateSetupTimestamp(ts time.Time) error { _, err := d.db.Transact(func(tx db.Tx) (interface{}, error) { return nil, writeMetadata(tx, map[string]string{ @@ -107,25 +112,24 @@ func (d *DBMetadataManager) Write() (map[string]string, error) { metadata[k] = v } } - - // see if we have existing metadata or we need to generate one _, err = d.db.Transact(func(tx db.Tx) (interface{}, error) { - // get installation ID - if we don't have one we'll generate one - _, err := getInstallationID(tx) - if err != nil && !errors.Is(err, db.ErrNotFound) { + // write metadata + err = writeMetadata(tx, metadata) + if err != nil { return nil, err } - - if err != nil { // i.e. err is db.ErrNotFound - // we don't have an installation ID - let's write one. - installationID := uuid.Must(uuid.NewUUID()).String() - metadata["installation_id"] = installationID - metadata["setup_time"] = time.Now().UTC().Format(time.RFC3339) + // write installation id + installationID, err := insertOrGetInstallationID(tx) + if err == nil { + metadata[InstallationIDKeyName] = installationID } - err = writeMetadata(tx, metadata) - return nil, err + // get setup timestamp + setupTS, err := getSetupTimestamp(tx) + if err == nil { + metadata[SetupTimestampKeyName] = setupTS.UTC().Format(time.RFC3339) + } + return nil, nil }, db.WithLogger(logging.Dummy())) - return metadata, err } diff --git a/cmd/lakefs/cmd/init.go b/cmd/lakefs/cmd/init.go index 5379f5ff185..aac1c7c6ded 100644 --- a/cmd/lakefs/cmd/init.go +++ b/cmd/lakefs/cmd/init.go @@ -57,7 +57,7 @@ var initCmd = &cobra.Command{ ctx, cancelFn := context.WithCancel(context.Background()) processID, bufferedCollectorArgs := cfg.GetStatsBufferedCollectorArgs() - stats := stats.NewBufferedCollector(metadata["installation_id"], processID, bufferedCollectorArgs...) + stats := stats.NewBufferedCollector(metadata[auth.InstallationIDKeyName], processID, bufferedCollectorArgs...) go stats.Run(ctx) stats.CollectMetadata(metadata) stats.CollectEvent("global", "init") diff --git a/cmd/lakefs/cmd/run.go b/cmd/lakefs/cmd/run.go index e9e4f411de8..8afa7c37ca1 100644 --- a/cmd/lakefs/cmd/run.go +++ b/cmd/lakefs/cmd/run.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "math/rand" "net/http" "os" "os/signal" @@ -78,12 +77,17 @@ var runCmd = &cobra.Command{ meta := auth.NewDBMetadataManager(config.Version, dbPool) - installationID, err := meta.InstallationID() + processID, bufferedCollectorArgs := cfg.GetStatsBufferedCollectorArgs() + + // collect and write metadata + metadata, err := meta.Write() if err != nil { - installationID = "" // no installation ID is available + logger.WithError(err).Debug("failed to collect account metadata") } - processID, bufferedCollectorArgs := cfg.GetStatsBufferedCollectorArgs() - stats := stats.NewBufferedCollector(installationID, processID, bufferedCollectorArgs...) + + stats := stats.NewBufferedCollector(metadata[auth.InstallationIDKeyName], processID, bufferedCollectorArgs...) + // send metadata + stats.CollectMetadata(metadata) dedupCleaner := dedup.NewCleaner(blockStore, cataloger.DedupReportChannel()) defer func() { @@ -122,22 +126,8 @@ var runCmd = &cobra.Command{ ctx, cancelFn := context.WithCancel(context.Background()) go stats.Run(ctx) - stats.CollectEvent("global", "run") - // stagger a bit and update metadata - go func() { - // avoid a thundering herd in case we have many lakeFS instances starting together - const maxSplay = 10 * time.Second - randSource := rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec - time.Sleep(time.Duration(randSource.Intn(int(maxSplay)))) - - metadata, err := meta.Write() - if err != nil { - logger.WithError(err).Trace("failed to collect account metadata") - return - } - stats.CollectMetadata(metadata) - }() + stats.CollectEvent("global", "run") logging.Default().WithField("listen_address", cfg.GetListenAddress()).Info("starting HTTP server") server := &http.Server{ diff --git a/config/config.go b/config/config.go index b73d020720f..59e0f727d28 100644 --- a/config/config.go +++ b/config/config.go @@ -299,9 +299,11 @@ func (c *Config) GetStatsFlushInterval() time.Duration { } func (c *Config) GetStatsBufferedCollectorArgs() (processID string, opts []stats.BufferedCollectorOpts) { - sender := stats.NewDummySender() + var sender stats.Sender if c.GetStatsEnabled() && Version != UnreleasedVersion { sender = stats.NewHTTPSender(c.GetStatsAddress(), time.Now) + } else { + sender = stats.NewDummySender() } return uuid.Must(uuid.NewUUID()).String(), []stats.BufferedCollectorOpts{ diff --git a/gateway/playback_test.go b/gateway/playback_test.go index 4d4e6c8a0af..7fe82a94d38 100644 --- a/gateway/playback_test.go +++ b/gateway/playback_test.go @@ -93,8 +93,6 @@ func TestMain(m *testing.M) { os.Exit(code) } -func (m *mockCollector) SetInstallationID(installationID string) {} - func (m *mockCollector) CollectMetadata(accountMetadata map[string]string) {} func (m *mockCollector) CollectEvent(class, action string) {} diff --git a/loadtest/local_load_test.go b/loadtest/local_load_test.go index 489f841ba87..4248fc2997d 100644 --- a/loadtest/local_load_test.go +++ b/loadtest/local_load_test.go @@ -45,8 +45,6 @@ func TestMain(m *testing.M) { type mockCollector struct{} -func (m *mockCollector) SetInstallationID(_ string) {} - func (m *mockCollector) CollectMetadata(_ map[string]string) {} func (m *mockCollector) CollectEvent(_, _ string) {} diff --git a/stats/collector.go b/stats/collector.go index b4fb119731f..6197dbee34e 100644 --- a/stats/collector.go +++ b/stats/collector.go @@ -3,7 +3,6 @@ package stats import ( "context" "fmt" - "sync" "time" "github.com/treeverse/lakefs/logging" @@ -16,7 +15,6 @@ const ( ) type Collector interface { - SetInstallationID(installationID string) CollectEvent(class, action string) CollectMetadata(accountMetadata map[string]string) } @@ -79,7 +77,6 @@ type BufferedCollector struct { sendTimeout time.Duration flushTicker FlushTicker done chan bool - mutex *sync.RWMutex installationID string processID string } @@ -125,7 +122,6 @@ func NewBufferedCollector(installationID, processID string, opts ...BufferedColl sendTimeout: DefaultSendTimeout, flushTicker: &TimeTicker{ticker: time.NewTicker(DefaultFlushInterval)}, installationID: installationID, - mutex: &sync.RWMutex{}, processID: processID, } @@ -136,8 +132,6 @@ func NewBufferedCollector(installationID, processID string, opts ...BufferedColl return s } func (s *BufferedCollector) getInstallationID() string { - s.mutex.RLock() - defer s.mutex.RUnlock() return s.installationID } @@ -207,12 +201,6 @@ func makeMetrics(counters keyIndex) []Metric { return metrics } -func (s *BufferedCollector) SetInstallationID(installationID string) { - s.mutex.Lock() - defer s.mutex.Unlock() - s.installationID = installationID -} - func (s *BufferedCollector) CollectMetadata(accountMetadata map[string]string) { entries := make([]MetadataEntry, len(accountMetadata)) i := 0