From c7b80d419db742c82a80fba7f50beec3a896eb0a Mon Sep 17 00:00:00 2001 From: Peter-Jan Brone Date: Thu, 13 Jun 2024 14:55:46 +0200 Subject: [PATCH] Cache gouging params and download contracts in the worker (#1264) This PR adds a small cache to the worker that's being invalidated/updated by event webhooks we receive from the `bus`. I added two benchmarks, a baseline and another with the worker cache enabled. This doesn't really show all the benefits though because a) it will put less strain on the database and b) I have a single node setup. For a multi-node setup this likely makes a big difference. [baseline.txt](https://github.com/user-attachments/files/15740432/baseline.txt) [cached.txt](https://github.com/user-attachments/files/15740433/cached.txt) --------- Co-authored-by: Christopher Schinnerl --- api/events.go | 70 +++- api/webhooks.go | 2 +- bus/bus.go | 15 +- bus/client/webhooks.go | 10 +- cmd/renterd/main.go | 45 ++- internal/bus/events.go | 2 +- internal/node/node.go | 11 +- internal/sql/migrations.go | 6 + internal/test/e2e/cluster.go | 12 +- internal/test/e2e/events_test.go | 60 +--- internal/worker/auth.go | 20 ++ internal/worker/cache.go | 340 ++++++++++++++++++ internal/worker/cache_test.go | 183 ++++++++++ stores/metadata.go | 3 + stores/sql/mysql/main.go | 4 +- .../main/migration_00010_webhook_headers.sql | 1 + stores/sql/mysql/migrations/main/schema.sql | 1 + stores/sql/sqlite/main.go | 4 +- .../main/migration_00010_webhook_headers.sql | 1 + stores/sql/sqlite/migrations/main/schema.sql | 2 +- stores/webhooks.go | 18 +- webhooks/webhooks.go | 43 ++- worker/client/client.go | 7 + worker/mocks_test.go | 9 + worker/worker.go | 47 ++- 25 files changed, 782 insertions(+), 134 deletions(-) create mode 100644 internal/worker/auth.go create mode 100644 internal/worker/cache.go create mode 100644 internal/worker/cache_test.go create mode 100644 stores/sql/mysql/migrations/main/migration_00010_webhook_headers.sql create mode 100644 stores/sql/sqlite/migrations/main/migration_00010_webhook_headers.sql diff --git a/api/events.go b/api/events.go index a2235c92f..f533bc5bf 100644 --- a/api/events.go +++ b/api/events.go @@ -1,6 +1,9 @@ package api import ( + "encoding/json" + "errors" + "fmt" "time" "go.sia.tech/core/types" @@ -8,9 +11,9 @@ import ( ) const ( - ModuleContractSet = "contract_set" ModuleConsensus = "consensus" ModuleContract = "contract" + ModuleContractSet = "contract_set" ModuleSetting = "setting" EventUpdate = "update" @@ -19,6 +22,10 @@ const ( EventRenew = "renew" ) +var ( + ErrUnknownEvent = errors.New("unknown event") +) + type ( EventConsensusUpdate struct { ConsensusState @@ -33,9 +40,8 @@ type ( } EventContractRenew struct { - ContractID types.FileContractID `json:"contractID"` - RenewedFromID types.FileContractID `json:"renewedFromID"` - Timestamp time.Time `json:"timestamp"` + Renewal ContractMetadata `json:"renewal"` + Timestamp time.Time `json:"timestamp"` } EventContractSetUpdate struct { @@ -103,3 +109,59 @@ func (e EventSettingDelete) Event() webhooks.Event { Payload: e, } } + +func ParseEventWebhook(event webhooks.Event) (interface{}, error) { + bytes, err := json.Marshal(event.Payload) + if err != nil { + return nil, err + } + switch event.Module { + case ModuleContract: + switch event.Event { + case EventArchive: + var e EventContractArchive + if err := json.Unmarshal(bytes, &e); err != nil { + return nil, err + } + return e, nil + case EventRenew: + var e EventContractRenew + if err := json.Unmarshal(bytes, &e); err != nil { + return nil, err + } + return e, nil + } + case ModuleContractSet: + if event.Event == EventUpdate { + var e EventContractSetUpdate + if err := json.Unmarshal(bytes, &e); err != nil { + return nil, err + } + return e, nil + } + case ModuleConsensus: + if event.Event == EventUpdate { + var e EventConsensusUpdate + if err := json.Unmarshal(bytes, &e); err != nil { + return nil, err + } + return e, nil + } + case ModuleSetting: + switch event.Event { + case EventUpdate: + var e EventSettingUpdate + if err := json.Unmarshal(bytes, &e); err != nil { + return nil, err + } + return e, nil + case EventDelete: + var e EventSettingDelete + if err := json.Unmarshal(bytes, &e); err != nil { + return nil, err + } + return e, nil + } + } + return nil, fmt.Errorf("%w: module %s event %s", ErrUnknownEvent, event.Module, event.Event) +} diff --git a/api/webhooks.go b/api/webhooks.go index 379da7d55..0bca3d4ca 100644 --- a/api/webhooks.go +++ b/api/webhooks.go @@ -2,7 +2,7 @@ package api import "go.sia.tech/renterd/webhooks" -type WebHookResponse struct { +type WebhookResponse struct { Webhooks []webhooks.Webhook `json:"webhooks"` Queues []webhooks.WebhookQueueInfo `json:"queues"` } diff --git a/bus/bus.go b/bus/bus.go index 987920a0d..306a4f2b7 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -1173,9 +1173,8 @@ func (b *bus) contractIDRenewedHandlerPOST(jc jape.Context) { b.uploadingSectors.HandleRenewal(req.Contract.ID(), req.RenewedFrom) b.events.BroadcastEvent(api.EventContractRenew{ - ContractID: req.Contract.ID(), - RenewedFromID: req.RenewedFrom, - Timestamp: time.Now().UTC(), + Renewal: r, + Timestamp: time.Now().UTC(), }) jc.Encode(r) @@ -2111,7 +2110,7 @@ func (b *bus) webhookHandlerDelete(jc jape.Context) { func (b *bus) webhookHandlerGet(jc jape.Context) { webhooks, queueInfos := b.hooks.Info() - jc.Encode(api.WebHookResponse{ + jc.Encode(api.WebhookResponse{ Queues: queueInfos, Webhooks: webhooks, }) @@ -2122,10 +2121,12 @@ func (b *bus) webhookHandlerPost(jc jape.Context) { if jc.Decode(&req) != nil { return } + err := b.hooks.Register(jc.Request.Context(), webhooks.Webhook{ - Event: req.Event, - Module: req.Module, - URL: req.URL, + Event: req.Event, + Module: req.Module, + URL: req.URL, + Headers: req.Headers, }) if err != nil { jc.Error(fmt.Errorf("failed to add Webhook: %w", err), http.StatusInternalServerError) diff --git a/bus/client/webhooks.go b/bus/client/webhooks.go index e0a52080e..5993e0c5b 100644 --- a/bus/client/webhooks.go +++ b/bus/client/webhooks.go @@ -23,13 +23,19 @@ func (c *Client) DeleteWebhook(ctx context.Context, url, module, event string) e } // RegisterWebhook registers a new webhook for the given URL. -func (c *Client) RegisterWebhook(ctx context.Context, webhook webhooks.Webhook) error { +func (c *Client) RegisterWebhook(ctx context.Context, webhook webhooks.Webhook, opts ...webhooks.HeaderOption) error { + if webhook.Headers == nil { + webhook.Headers = make(map[string]string) + } + for _, opt := range opts { + opt(webhook.Headers) + } err := c.c.WithContext(ctx).POST("/webhooks", webhook, nil) return err } // Webhooks returns all webhooks currently registered. -func (c *Client) Webhooks(ctx context.Context) (resp api.WebHookResponse, err error) { +func (c *Client) Webhooks(ctx context.Context) (resp api.WebhookResponse, err error) { err = c.c.WithContext(ctx).GET("/webhooks", &resp) return } diff --git a/cmd/renterd/main.go b/cmd/renterd/main.go index 5f030768b..d9d84a5fd 100644 --- a/cmd/renterd/main.go +++ b/cmd/renterd/main.go @@ -27,6 +27,7 @@ import ( "go.sia.tech/renterd/config" "go.sia.tech/renterd/internal/node" "go.sia.tech/renterd/internal/utils" + iworker "go.sia.tech/renterd/internal/worker" "go.sia.tech/renterd/worker" "go.sia.tech/renterd/worker/s3" "go.sia.tech/web/renterd" @@ -455,11 +456,11 @@ func main() { Network: network, } - type shutdownFn struct { + type shutdownFnEntry struct { name string fn func(context.Context) error } - var shutdownFns []shutdownFn + var shutdownFns []shutdownFnEntry if cfg.Bus.RemoteAddr != "" && len(cfg.Worker.Remotes) != 0 && !cfg.Autopilot.Enabled { logger.Fatal("remote bus, remote worker, and no autopilot -- nothing to do!") @@ -485,7 +486,7 @@ func main() { // Create the webserver. srv := &http.Server{Handler: mux} - shutdownFns = append(shutdownFns, shutdownFn{ + shutdownFns = append(shutdownFns, shutdownFnEntry{ name: "HTTP Server", fn: srv.Shutdown, }) @@ -500,7 +501,7 @@ func main() { if err != nil { logger.Fatal("failed to create bus, err: " + err.Error()) } - shutdownFns = append(shutdownFns, shutdownFn{ + shutdownFns = append(shutdownFns, shutdownFnEntry{ name: "Bus", fn: fn, }) @@ -519,22 +520,27 @@ func main() { var s3Srv *http.Server var s3Listener net.Listener var workers []autopilot.Worker + setupWorkerFn := node.NoopFn if len(cfg.Worker.Remotes) == 0 { if cfg.Worker.Enabled { - w, s3Handler, fn, err := node.NewWorker(cfg.Worker, s3.Opts{ + workerAddr := cfg.HTTP.Address + "/api/worker" + var shutdownFn node.ShutdownFn + w, s3Handler, setupFn, shutdownFn, err := node.NewWorker(cfg.Worker, s3.Opts{ AuthDisabled: cfg.S3.DisableAuth, HostBucketEnabled: cfg.S3.HostBucketEnabled, }, bc, seed, logger) if err != nil { logger.Fatal("failed to create worker: " + err.Error()) } - shutdownFns = append(shutdownFns, shutdownFn{ + setupWorkerFn = func(ctx context.Context) error { + return setupFn(ctx, workerAddr, cfg.HTTP.Password) + } + shutdownFns = append(shutdownFns, shutdownFnEntry{ name: "Worker", - fn: fn, + fn: shutdownFn, }) - mux.Sub["/api/worker"] = utils.TreeMux{Handler: workerAuth(cfg.HTTP.Password, cfg.Worker.AllowUnauthenticatedDownloads)(w)} - workerAddr := cfg.HTTP.Address + "/api/worker" + mux.Sub["/api/worker"] = utils.TreeMux{Handler: iworker.Auth(cfg.HTTP.Password, cfg.Worker.AllowUnauthenticatedDownloads)(w)} wc := worker.NewClient(workerAddr, cfg.HTTP.Password) workers = append(workers, wc) @@ -547,7 +553,7 @@ func main() { if err != nil { logger.Fatal("failed to create listener: " + err.Error()) } - shutdownFns = append(shutdownFns, shutdownFn{ + shutdownFns = append(shutdownFns, shutdownFnEntry{ name: "S3", fn: s3Srv.Shutdown, }) @@ -573,7 +579,7 @@ func main() { } // NOTE: the autopilot shutdown function needs to be called first. - shutdownFns = append(shutdownFns, shutdownFn{ + shutdownFns = append(shutdownFns, shutdownFnEntry{ name: "Autopilot", fn: fn, }) @@ -614,6 +620,11 @@ func main() { } } + // Finish worker setup. + if err := setupWorkerFn(context.Background()); err != nil { + logger.Fatal("failed to setup worker: " + err.Error()) + } + logger.Info("api: Listening on " + l.Addr().String()) if s3Srv != nil { @@ -745,15 +756,3 @@ func runCompatMigrateAutopilotJSONToStore(bc *bus.Client, id, dir string) (err e return nil } - -func workerAuth(password string, unauthenticatedDownloads bool) func(http.Handler) http.Handler { - return func(h http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - if unauthenticatedDownloads && req.Method == http.MethodGet && strings.HasPrefix(req.URL.Path, "/objects/") { - h.ServeHTTP(w, req) - } else { - jape.BasicAuth(password)(h).ServeHTTP(w, req) - } - }) - } -} diff --git a/internal/bus/events.go b/internal/bus/events.go index 17ca458dd..507f33402 100644 --- a/internal/bus/events.go +++ b/internal/bus/events.go @@ -22,7 +22,7 @@ func NewEventBroadcaster(b webhooks.Broadcaster, l *zap.SugaredLogger) EventBroa } } -func (b EventBroadcaster) BroadcastEvent(e webhooks.WebhookEvent) { +func (b EventBroadcaster) BroadcastEvent(e webhooks.EventWebhook) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) if err := b.broadcaster.BroadcastAction(ctx, e.Event()); err != nil { b.logger.Errorw("failed to broadcast event", "event", e, "error", err) diff --git a/internal/node/node.go b/internal/node/node.go index c26feb35c..f07980a23 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -55,9 +55,12 @@ type AutopilotConfig struct { type ( RunFn = func() error + SetupFn = func(context.Context, string, string) error ShutdownFn = func(context.Context) error ) +var NoopFn = func(context.Context) error { return nil } + func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (http.Handler, ShutdownFn, error) { gatewayDir := filepath.Join(dir, "gateway") if err := os.MkdirAll(gatewayDir, 0700); err != nil { @@ -219,18 +222,18 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht return b.Handler(), shutdownFn, nil } -func NewWorker(cfg config.Worker, s3Opts s3.Opts, b Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, http.Handler, ShutdownFn, error) { +func NewWorker(cfg config.Worker, s3Opts s3.Opts, b Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, http.Handler, SetupFn, ShutdownFn, error) { workerKey := blake2b.Sum256(append([]byte("worker"), seed...)) w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.BusFlushInterval, cfg.DownloadOverdriveTimeout, cfg.UploadOverdriveTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxOverdrive, cfg.DownloadMaxMemory, cfg.UploadMaxMemory, cfg.AllowPrivateIPs, l) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } s3Handler, err := s3.New(b, w, l.Named("s3").Sugar(), s3Opts) if err != nil { err = errors.Join(err, w.Shutdown(context.Background())) - return nil, nil, nil, fmt.Errorf("failed to create s3 handler: %w", err) + return nil, nil, nil, nil, fmt.Errorf("failed to create s3 handler: %w", err) } - return w.Handler(), s3Handler, w.Shutdown, nil + return w.Handler(), s3Handler, w.Setup, w.Shutdown, nil } func NewAutopilot(cfg AutopilotConfig, b autopilot.Bus, workers []autopilot.Worker, l *zap.Logger) (http.Handler, RunFn, ShutdownFn, error) { diff --git a/internal/sql/migrations.go b/internal/sql/migrations.go index 867a01ca2..29ce16d13 100644 --- a/internal/sql/migrations.go +++ b/internal/sql/migrations.go @@ -169,6 +169,12 @@ var ( return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00009_json_settings", log) }, }, + { + ID: "00010_webhook_headers", + Migrate: func(tx Tx) error { + return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00010_webhook_headers", log) + }, + }, } } MetricsMigrations = func(ctx context.Context, migrationsFs embed.FS, log *zap.SugaredLogger) []Migration { diff --git a/internal/test/e2e/cluster.go b/internal/test/e2e/cluster.go index 43288bc8c..8d5f93ce5 100644 --- a/internal/test/e2e/cluster.go +++ b/internal/test/e2e/cluster.go @@ -25,6 +25,7 @@ import ( "go.sia.tech/renterd/internal/node" "go.sia.tech/renterd/internal/test" "go.sia.tech/renterd/internal/utils" + iworker "go.sia.tech/renterd/internal/worker" "go.sia.tech/renterd/stores" "go.sia.tech/renterd/worker/s3" "go.sia.tech/web/renterd" @@ -332,12 +333,10 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster { busShutdownFns = append(busShutdownFns, bStopFn) // Create worker. - w, s3Handler, wShutdownFn, err := node.NewWorker(workerCfg, s3.Opts{}, busClient, wk, logger) + w, s3Handler, wSetupFn, wShutdownFn, err := node.NewWorker(workerCfg, s3.Opts{}, busClient, wk, logger) tt.OK(err) - - workerAuth := jape.BasicAuth(workerPassword) workerServer := http.Server{ - Handler: workerAuth(w), + Handler: iworker.Auth(workerPassword, false)(w), } var workerShutdownFns []func(context.Context) error @@ -440,6 +439,11 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster { SlabBufferMaxSizeSoft: build.DefaultUploadPackingSettings.SlabBufferMaxSizeSoft, })) + // Register the worker + if err := wSetupFn(ctx, workerAddr, workerPassword); err != nil { + tt.Fatalf("failed to register worker, err: %v", err) + } + // Fund the bus. if funding { cluster.MineBlocks(latestHardforkHeight) diff --git a/internal/test/e2e/events_test.go b/internal/test/e2e/events_test.go index dcac5c8f8..8760a4e02 100644 --- a/internal/test/e2e/events_test.go +++ b/internal/test/e2e/events_test.go @@ -20,7 +20,7 @@ import ( // providing an event webhook was registered. func TestEvents(t *testing.T) { // list all events - allEvents := []webhooks.WebhookEvent{ + allEvents := []webhooks.EventWebhook{ api.EventConsensusUpdate{}, api.EventContractArchive{}, api.EventContractRenew{}, @@ -132,11 +132,11 @@ func TestEvents(t *testing.T) { // assert the events we received contain the expected information for _, r := range received { - event, err := parseEvent(r) + event, err := api.ParseEventWebhook(r) tt.OK(err) switch e := event.(type) { case api.EventContractRenew: - if e.ContractID != renewed.ID || e.RenewedFromID != c.ID || e.Timestamp.IsZero() { + if e.Renewal.ID != renewed.ID || e.Renewal.RenewedFrom != c.ID || e.Timestamp.IsZero() { t.Fatalf("unexpected event %+v", e) } case api.EventContractArchive: @@ -168,57 +168,3 @@ func TestEvents(t *testing.T) { } } } - -func parseEvent(event webhooks.Event) (interface{}, error) { - bytes, err := json.Marshal(event.Payload) - if err != nil { - return nil, err - } - switch event.Module { - case api.ModuleContract: - if event.Event == api.EventArchive { - var e api.EventContractArchive - if err := json.Unmarshal(bytes, &e); err != nil { - return nil, err - } - return e, nil - } else if event.Event == api.EventRenew { - var e api.EventContractRenew - if err := json.Unmarshal(bytes, &e); err != nil { - return nil, err - } - return e, nil - } - case api.ModuleContractSet: - if event.Event == api.EventUpdate { - var e api.EventContractSetUpdate - if err := json.Unmarshal(bytes, &e); err != nil { - return nil, err - } - return e, nil - } - case api.ModuleConsensus: - if event.Event == api.EventUpdate { - var e api.EventConsensusUpdate - if err := json.Unmarshal(bytes, &e); err != nil { - return nil, err - } - return e, nil - } - case api.ModuleSetting: - if event.Event == api.EventUpdate { - var e api.EventSettingUpdate - if err := json.Unmarshal(bytes, &e); err != nil { - return nil, err - } - return e, nil - } else if event.Event == api.EventDelete { - var e api.EventSettingDelete - if err := json.Unmarshal(bytes, &e); err != nil { - return nil, err - } - return e, nil - } - } - return nil, fmt.Errorf("unexpected event %+v", event) -} diff --git a/internal/worker/auth.go b/internal/worker/auth.go new file mode 100644 index 000000000..032d2536c --- /dev/null +++ b/internal/worker/auth.go @@ -0,0 +1,20 @@ +package worker + +import ( + "net/http" + "strings" + + "go.sia.tech/jape" +) + +func Auth(password string, unauthenticatedDownloads bool) func(http.Handler) http.Handler { + return func(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if unauthenticatedDownloads && req.Method == http.MethodGet && strings.HasPrefix(req.URL.Path, "/objects/") { + h.ServeHTTP(w, req) + } else { + jape.BasicAuth(password)(h).ServeHTTP(w, req) + } + }) + } +} diff --git a/internal/worker/cache.go b/internal/worker/cache.go new file mode 100644 index 000000000..7841e9249 --- /dev/null +++ b/internal/worker/cache.go @@ -0,0 +1,340 @@ +package worker + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "sort" + "sync" + "time" + + "go.uber.org/zap" + + "go.sia.tech/renterd/api" + "go.sia.tech/renterd/webhooks" +) + +const ( + cacheKeyDownloadContracts = "downloadcontracts" + cacheKeyGougingParams = "gougingparams" + + cacheEntryExpiry = 5 * time.Minute +) + +var ( + errCacheNotReady = errors.New("cache is not ready yet, required webhooks have not been registered") + errCacheOutdated = errors.New("cache is outdated, the value fetched from the bus does not match the cached value") +) + +type memoryCache struct { + items map[string]*cacheEntry + mu sync.RWMutex +} + +type cacheEntry struct { + value interface{} + expiry time.Time +} + +func newMemoryCache() *memoryCache { + return &memoryCache{ + items: make(map[string]*cacheEntry), + } +} + +func (c *memoryCache) Get(key string) (value interface{}, found bool, expired bool) { + c.mu.RLock() + defer c.mu.RUnlock() + entry, ok := c.items[key] + if !ok { + return nil, false, false + } else if time.Now().After(entry.expiry) { + return entry.value, true, true + } + return entry.value, true, false +} + +func (c *memoryCache) Set(key string, value interface{}) { + c.mu.Lock() + defer c.mu.Unlock() + c.items[key] = &cacheEntry{ + value: value, + expiry: time.Now().Add(cacheEntryExpiry), + } +} + +func (c *memoryCache) Invalidate(key string) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.items, key) +} + +type ( + Bus interface { + Contracts(ctx context.Context, opts api.ContractsOpts) ([]api.ContractMetadata, error) + GougingParams(ctx context.Context) (api.GougingParams, error) + RegisterWebhook(ctx context.Context, wh webhooks.Webhook, opts ...webhooks.HeaderOption) error + } + + WorkerCache interface { + DownloadContracts(ctx context.Context) ([]api.ContractMetadata, error) + GougingParams(ctx context.Context) (api.GougingParams, error) + HandleEvent(event webhooks.Event) error + Initialize(ctx context.Context, workerAPI string, opts ...webhooks.HeaderOption) error + } +) + +type cache struct { + b Bus + + cache *memoryCache + logger *zap.SugaredLogger + + mu sync.Mutex + ready bool +} + +func NewCache(b Bus, logger *zap.Logger) WorkerCache { + return &cache{ + b: b, + + cache: newMemoryCache(), + logger: logger.Sugar().Named("workercache"), + } +} + +func (c *cache) DownloadContracts(ctx context.Context) (contracts []api.ContractMetadata, err error) { + // fetch directly from bus if the cache is not ready + if !c.isReady() { + c.logger.Warn(errCacheNotReady) + contracts, err = c.b.Contracts(ctx, api.ContractsOpts{}) + return + } + + // fetch from bus if it's not cached or expired + value, found, expired := c.cache.Get(cacheKeyDownloadContracts) + if !found || expired { + contracts, err = c.b.Contracts(ctx, api.ContractsOpts{}) + if err == nil { + c.cache.Set(cacheKeyDownloadContracts, contracts) + } + if expired && !contractsEqual(value.([]api.ContractMetadata), contracts) { + c.logger.Warn(fmt.Errorf("%w: key %v", errCacheOutdated, cacheKeyDownloadContracts)) + } + return + } + + return value.([]api.ContractMetadata), nil +} + +func (c *cache) GougingParams(ctx context.Context) (gp api.GougingParams, err error) { + // fetch directly from bus if the cache is not ready + if !c.isReady() { + c.logger.Warn(errCacheNotReady) + gp, err = c.b.GougingParams(ctx) + return + } + + // fetch from bus if it's not cached or expired + value, found, expired := c.cache.Get(cacheKeyGougingParams) + if !found || expired { + gp, err = c.b.GougingParams(ctx) + if err == nil { + c.cache.Set(cacheKeyGougingParams, gp) + } + if expired && !gougingParamsEqual(value.(api.GougingParams), gp) { + c.logger.Warn(fmt.Errorf("%w: key %v", errCacheOutdated, cacheKeyGougingParams)) + } + return + } + + return value.(api.GougingParams), nil +} + +func (c *cache) HandleEvent(event webhooks.Event) (err error) { + log := c.logger.With("module", event.Module, "event", event.Event) + + // parse the event + parsed, err := api.ParseEventWebhook(event) + if err != nil { + log.Errorw("failed to parse event", "error", err) + return err + } + + // handle the event + switch e := parsed.(type) { + case api.EventConsensusUpdate: + log = log.With("bh", e.BlockHeight, "ts", e.Timestamp) + c.handleConsensusUpdate(e) + case api.EventContractArchive: + log = log.With("fcid", e.ContractID, "ts", e.Timestamp) + c.handleContractArchive(e) + case api.EventContractRenew: + log = log.With("fcid", e.Renewal.ID, "renewedFrom", e.Renewal.RenewedFrom, "ts", e.Timestamp) + c.handleContractRenew(e) + case api.EventSettingUpdate: + log = log.With("key", e.Key, "ts", e.Timestamp) + err = c.handleSettingUpdate(e) + case api.EventSettingDelete: + log = log.With("key", e.Key, "ts", e.Timestamp) + c.handleSettingDelete(e) + default: + log.Info("unhandled event", e) + return + } + + // log the outcome + if err != nil { + log.Errorw("failed to handle event", "error", err) + } else { + log.Info("handled event") + } + return +} + +func (c *cache) Initialize(ctx context.Context, workerAPI string, webhookOpts ...webhooks.HeaderOption) error { + eventsURL := fmt.Sprintf("%s/events", workerAPI) + for _, wh := range []webhooks.Webhook{ + webhooks.NewEventWebhook(eventsURL, api.EventConsensusUpdate{}), + webhooks.NewEventWebhook(eventsURL, api.EventContractArchive{}), + webhooks.NewEventWebhook(eventsURL, api.EventContractRenew{}), + webhooks.NewEventWebhook(eventsURL, api.EventSettingUpdate{}), + } { + if err := c.b.RegisterWebhook(ctx, wh, webhookOpts...); err != nil { + return fmt.Errorf("failed to register webhook '%s', err: %v", wh, err) + } + } + c.mu.Lock() + c.ready = true + c.mu.Unlock() + return nil +} + +func (c *cache) isReady() bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.ready +} + +func (c *cache) handleConsensusUpdate(event api.EventConsensusUpdate) { + // return early if the doesn't have gouging params to update + value, found, _ := c.cache.Get(cacheKeyGougingParams) + if !found { + return + } + + // update gouging params + gp := value.(api.GougingParams) + gp.ConsensusState = event.ConsensusState + gp.TransactionFee = event.TransactionFee + c.cache.Set(cacheKeyGougingParams, gp) +} + +func (c *cache) handleContractArchive(event api.EventContractArchive) { + // return early if the cache doesn't have contracts + value, found, _ := c.cache.Get(cacheKeyDownloadContracts) + if !found { + return + } + contracts := value.([]api.ContractMetadata) + + // remove the contract from the cache + for i, contract := range contracts { + if contract.ID == event.ContractID { + contracts = append(contracts[:i], contracts[i+1:]...) + break + } + } + c.cache.Set(cacheKeyDownloadContracts, contracts) +} + +func (c *cache) handleContractRenew(event api.EventContractRenew) { + // return early if the cache doesn't have contracts + value, found, _ := c.cache.Get(cacheKeyDownloadContracts) + if !found { + return + } + contracts := value.([]api.ContractMetadata) + + // update the renewed contract in the cache + for i, contract := range contracts { + if contract.ID == event.Renewal.RenewedFrom { + contracts[i] = event.Renewal + break + } + } + + c.cache.Set(cacheKeyDownloadContracts, contracts) +} + +func (c *cache) handleSettingDelete(e api.EventSettingDelete) { + if e.Key == api.SettingGouging || e.Key == api.SettingRedundancy { + c.cache.Invalidate(cacheKeyGougingParams) + } +} + +func (c *cache) handleSettingUpdate(e api.EventSettingUpdate) (err error) { + // return early if the cache doesn't have gouging params to update + value, found, _ := c.cache.Get(cacheKeyGougingParams) + if !found { + return nil + } + gp := value.(api.GougingParams) + + // marshal the updated value + data, err := json.Marshal(e.Update) + if err != nil { + return fmt.Errorf("couldn't marshal the given value, error: %v", err) + } + + // unmarshal into the appropriated setting and update the cache + switch e.Key { + case api.SettingGouging: + var gs api.GougingSettings + if err := json.Unmarshal(data, &gs); err != nil { + return fmt.Errorf("couldn't update gouging settings, invalid request body, %t", e.Update) + } else if err := gs.Validate(); err != nil { + return fmt.Errorf("couldn't update gouging settings, error: %v", err) + } + + gp.GougingSettings = gs + c.cache.Set(cacheKeyGougingParams, gp) + case api.SettingRedundancy: + var rs api.RedundancySettings + if err := json.Unmarshal(data, &rs); err != nil { + return fmt.Errorf("couldn't update redundancy settings, invalid request body, %t", e.Update) + } else if err := rs.Validate(); err != nil { + return fmt.Errorf("couldn't update redundancy settings, error: %v", err) + } + + gp.RedundancySettings = rs + c.cache.Set(cacheKeyGougingParams, gp) + default: + } + + return nil +} + +func contractsEqual(x, y []api.ContractMetadata) bool { + if len(x) != len(y) { + return false + } + sort.Slice(x, func(i, j int) bool { return x[i].ID.String() < x[j].ID.String() }) + sort.Slice(y, func(i, j int) bool { return y[i].ID.String() < y[j].ID.String() }) + for i, c := range x { + if c.ID.String() != y[i].ID.String() { + return false + } + } + return true +} + +func gougingParamsEqual(x, y api.GougingParams) bool { + var xb bytes.Buffer + var yb bytes.Buffer + json.NewEncoder(&xb).Encode(x) + json.NewEncoder(&yb).Encode(y) + return bytes.Equal(xb.Bytes(), yb.Bytes()) +} diff --git a/internal/worker/cache_test.go b/internal/worker/cache_test.go new file mode 100644 index 000000000..0b419a29c --- /dev/null +++ b/internal/worker/cache_test.go @@ -0,0 +1,183 @@ +package worker + +import ( + "context" + "strings" + "testing" + "time" + + "go.sia.tech/core/types" + "go.sia.tech/renterd/api" + "go.sia.tech/renterd/internal/test" + "go.sia.tech/renterd/webhooks" + + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" +) + +type mockBus struct { + contracts []api.ContractMetadata + gougingParams api.GougingParams +} + +func (m *mockBus) Contracts(ctx context.Context, opts api.ContractsOpts) ([]api.ContractMetadata, error) { + return m.contracts, nil +} +func (m *mockBus) GougingParams(ctx context.Context) (api.GougingParams, error) { + return m.gougingParams, nil +} +func (m *mockBus) RegisterWebhook(ctx context.Context, wh webhooks.Webhook, opts ...webhooks.HeaderOption) error { + return nil +} + +func newMockBus() *mockBus { + return &mockBus{ + contracts: []api.ContractMetadata{ + testContractMetadata(1), + testContractMetadata(2), + testContractMetadata(3), + }, + gougingParams: api.GougingParams{ + RedundancySettings: test.RedundancySettings, + GougingSettings: test.GougingSettings, + TransactionFee: types.Siacoins(1), + ConsensusState: api.ConsensusState{ + BlockHeight: 1, + LastBlockTime: api.TimeRFC3339{}, + Synced: true, + }, + }, + } +} + +func TestWorkerCache(t *testing.T) { + // observe logs + observedZapCore, observedLogs := observer.New(zap.DebugLevel) + + // create mock bus and cache + c, b, mc := newTestCache(zap.New(observedZapCore)) + + // assert using cache before it's initialized prints a warning + contracts, err := c.DownloadContracts(context.Background()) + if err != nil { + t.Fatal(err) + } else if len(contracts) != 3 { + t.Fatal("expected 3 contracts, got", len(contracts)) + } + gp, err := c.GougingParams(context.Background()) + if err != nil { + t.Fatal(err) + } else if gp.RedundancySettings != test.RedundancySettings { + t.Fatal("expected redundancy settings to match", gp.RedundancySettings, test.RedundancySettings) + } else if gp.GougingSettings != test.GougingSettings { + t.Fatal("expected gouging settings to match", gp.GougingSettings, test.GougingSettings) + } else if !gp.TransactionFee.Equals(types.Siacoins(1)) { + t.Fatal("expected transaction fee to match", gp.TransactionFee, types.Siacoins(1)) + } + + // assert warnings are printed when the cache is not ready yet + if logs := observedLogs.FilterLevelExact(zap.WarnLevel); logs.Len() != 2 { + t.Fatal("expected 2 warnings, got", logs.Len()) + } else if lines := observedLogs.TakeAll(); lines[0].Message != lines[1].Message { + t.Fatal("expected same message, got", lines[0].Message, lines[1].Message) + } else if !strings.Contains(lines[0].Message, errCacheNotReady.Error()) { + t.Fatal("expected error message to contain 'cache is not ready yet', got", lines[0].Message) + } + + // initialize the cache + if err := c.Initialize(context.Background(), ""); err != nil { + t.Fatal(err) + } + + // fetch contracts & gouging params so they're cached + _, err = c.DownloadContracts(context.Background()) + if err != nil { + t.Fatal(err) + } + _, err = c.GougingParams(context.Background()) + if err != nil { + t.Fatal(err) + } + + // update bus contracts & expire cache entry manually + b.contracts = append(b.contracts, testContractMetadata(4)) + contracts, err = c.DownloadContracts(context.Background()) + if err != nil { + t.Fatal(err) + } else if len(contracts) != 3 { + t.Fatal("expected 3 contracts, got", len(contracts)) + } + mc.mu.Lock() + mc.items[cacheKeyDownloadContracts].expiry = time.Now().Add(-1 * time.Minute) + mc.mu.Unlock() + + // fetch contracts again, assert we have 4 now and we printed a warning to indicate the cache entry was invalid + contracts, err = c.DownloadContracts(context.Background()) + if err != nil { + t.Fatal(err) + } else if len(contracts) != 4 { + t.Fatal("expected 4 contracts, got", len(contracts)) + } else if logs := observedLogs.FilterLevelExact(zap.WarnLevel); logs.Len() != 1 { + t.Fatal("expected 1 warning, got", logs.Len(), logs.All()) + } else if lines := observedLogs.TakeAll(); !strings.Contains(lines[0].Message, errCacheOutdated.Error()) || !strings.Contains(lines[0].Message, cacheKeyDownloadContracts) { + t.Fatal("expected error message to contain 'cache is outdated', got", lines[0].Message) + } + + // update gouging params & expire cache entry manually + b.gougingParams.TransactionFee = b.gougingParams.TransactionFee.Mul64(2) + + // expire cache entry manually + mc.mu.Lock() + mc.items[cacheKeyGougingParams].expiry = time.Now().Add(-1 * time.Minute) + mc.mu.Unlock() + + // fetch contracts again, assert we have 4 now and we printed a warning to indicate the cache entry was invalid + gp, err = c.GougingParams(context.Background()) + if err != nil { + t.Fatal(err) + } else if !gp.TransactionFee.Equals(b.gougingParams.TransactionFee) { + t.Fatal("expected transaction fee to be updated, got", gp.TransactionFee) + } else if logs := observedLogs.FilterLevelExact(zap.WarnLevel); logs.Len() != 1 { + t.Fatal("expected 1 warning, got", logs.Len(), logs.All()) + } else if lines := observedLogs.TakeAll(); !strings.Contains(lines[0].Message, errCacheOutdated.Error()) || !strings.Contains(lines[0].Message, cacheKeyGougingParams) { + t.Fatal("expected error message to contain 'cache is outdated', got", lines[0].Message) + } + + // assert the worker cache handles every event + _ = observedLogs.TakeAll() // clear logs + for _, event := range []webhooks.EventWebhook{ + api.EventConsensusUpdate{}, + api.EventContractArchive{}, + api.EventContractRenew{}, + api.EventSettingUpdate{}, + api.EventSettingDelete{}, + } { + if err := c.HandleEvent(event.Event()); err != nil { + t.Fatal(err) + } + } + for _, entry := range observedLogs.TakeAll() { + if strings.Contains(entry.Message, "unhandled event") { + t.Fatal("expected no unhandled event, got", entry) + } + } +} + +func newTestCache(logger *zap.Logger) (WorkerCache, *mockBus, *memoryCache) { + b := newMockBus() + c := newMemoryCache() + return &cache{ + b: b, + cache: c, + logger: logger.Sugar(), + }, b, c +} + +func testContractMetadata(n int) api.ContractMetadata { + return api.ContractMetadata{ + ID: types.FileContractID{byte(n)}, + HostKey: types.PublicKey{byte(n)}, + WindowStart: 0, + WindowEnd: 10, + } +} diff --git a/stores/metadata.go b/stores/metadata.go index efe0fd941..498c151bd 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -631,6 +631,9 @@ func (s *SQLStore) AddRenewedContract(ctx context.Context, c rhpv2.ContractRevis return err } + // Populate host. + newContract.Host = oldContract.Host + s.addKnownContract(c.ID()) renewed = newContract return nil diff --git a/stores/sql/mysql/main.go b/stores/sql/mysql/main.go index c7fe51f99..98b77710f 100644 --- a/stores/sql/mysql/main.go +++ b/stores/sql/mysql/main.go @@ -512,7 +512,7 @@ func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld, FROM ( SELECT CONCAT(?, SUBSTR(object_id, ?)) FROM objects - WHERE object_id LIKE ? + WHERE object_id LIKE ? AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?) ) as i )`, @@ -528,7 +528,7 @@ func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld, UPDATE objects SET object_id = CONCAT(?, SUBSTR(object_id, ?)), db_directory_id = ? - WHERE object_id LIKE ? + WHERE object_id LIKE ? AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)`, prefixNew, utf8.RuneCountInString(prefixOld)+1, dirID, diff --git a/stores/sql/mysql/migrations/main/migration_00010_webhook_headers.sql b/stores/sql/mysql/migrations/main/migration_00010_webhook_headers.sql new file mode 100644 index 000000000..3fc6af932 --- /dev/null +++ b/stores/sql/mysql/migrations/main/migration_00010_webhook_headers.sql @@ -0,0 +1 @@ +ALTER TABLE `webhooks` ADD COLUMN `headers` JSON DEFAULT ('{}'); \ No newline at end of file diff --git a/stores/sql/mysql/migrations/main/schema.sql b/stores/sql/mysql/migrations/main/schema.sql index 4fd6f063b..a8b52e0f3 100644 --- a/stores/sql/mysql/migrations/main/schema.sql +++ b/stores/sql/mysql/migrations/main/schema.sql @@ -419,6 +419,7 @@ CREATE TABLE `webhooks` ( `module` varchar(255) NOT NULL, `event` varchar(255) NOT NULL, `url` varchar(255) NOT NULL, + `headers` JSON DEFAULT ('{}'), PRIMARY KEY (`id`), UNIQUE KEY `idx_module_event_url` (`module`,`event`,`url`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; diff --git a/stores/sql/sqlite/main.go b/stores/sql/sqlite/main.go index c14ee5b05..d3f4d65c6 100644 --- a/stores/sql/sqlite/main.go +++ b/stores/sql/sqlite/main.go @@ -524,8 +524,8 @@ func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld, UPDATE objects SET object_id = ? || SUBSTR(object_id, ?), db_directory_id = ? - WHERE object_id LIKE ? - AND SUBSTR(object_id, 1, ?) = ? + WHERE object_id LIKE ? + AND SUBSTR(object_id, 1, ?) = ? AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)`, prefixNew, utf8.RuneCountInString(prefixOld)+1, dirID, diff --git a/stores/sql/sqlite/migrations/main/migration_00010_webhook_headers.sql b/stores/sql/sqlite/migrations/main/migration_00010_webhook_headers.sql new file mode 100644 index 000000000..7e3d0453b --- /dev/null +++ b/stores/sql/sqlite/migrations/main/migration_00010_webhook_headers.sql @@ -0,0 +1 @@ +ALTER TABLE `webhooks` ADD COLUMN `headers` text DEFAULT '{}'; \ No newline at end of file diff --git a/stores/sql/sqlite/migrations/main/schema.sql b/stores/sql/sqlite/migrations/main/schema.sql index aa03a862d..aaff040ee 100644 --- a/stores/sql/sqlite/migrations/main/schema.sql +++ b/stores/sql/sqlite/migrations/main/schema.sql @@ -147,7 +147,7 @@ CREATE INDEX `idx_ephemeral_accounts_requires_sync` ON `ephemeral_accounts`(`req CREATE TABLE `autopilots` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`identifier` text NOT NULL UNIQUE,`config` text,`current_period` integer DEFAULT 0); -- dbWebhook -CREATE TABLE `webhooks` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`module` text NOT NULL,`event` text NOT NULL,`url` text NOT NULL); +CREATE TABLE `webhooks` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`module` text NOT NULL,`event` text NOT NULL,`url` text NOT NULL,`headers` text DEFAULT ('{}')); CREATE UNIQUE INDEX `idx_module_event_url` ON `webhooks`(`module`,`event`,`url`); -- dbObjectUserMetadata diff --git a/stores/webhooks.go b/stores/webhooks.go index 4db325698..ec2b95d3c 100644 --- a/stores/webhooks.go +++ b/stores/webhooks.go @@ -15,6 +15,8 @@ type ( Module string `gorm:"uniqueIndex:idx_module_event_url;NOT NULL;size:255"` Event string `gorm:"uniqueIndex:idx_module_event_url;NOT NULL;size:255"` URL string `gorm:"uniqueIndex:idx_module_event_url;NOT NULL;size:255"` + + Headers map[string]string `gorm:"serializer:json"` } ) @@ -35,14 +37,15 @@ func (s *SQLStore) DeleteWebhook(ctx context.Context, wb webhooks.Webhook) error }) } -func (s *SQLStore) AddWebhook(ctx context.Context, wb webhooks.Webhook) error { +func (s *SQLStore) AddWebhook(ctx context.Context, wh webhooks.Webhook) error { return s.retryTransaction(ctx, func(tx *gorm.DB) error { return tx.Clauses(clause.OnConflict{ DoNothing: true, }).Create(&dbWebhook{ - Module: wb.Module, - Event: wb.Event, - URL: wb.URL, + Module: wh.Module, + Event: wh.Event, + URL: wh.URL, + Headers: wh.Headers, }).Error }) } @@ -55,9 +58,10 @@ func (s *SQLStore) Webhooks(ctx context.Context) ([]webhooks.Webhook, error) { var whs []webhooks.Webhook for _, wb := range dbWebhooks { whs = append(whs, webhooks.Webhook{ - Module: wb.Module, - Event: wb.Event, - URL: wb.URL, + Module: wb.Module, + Event: wb.Event, + URL: wb.URL, + Headers: wb.Headers, }) } return whs, nil diff --git a/webhooks/webhooks.go b/webhooks/webhooks.go index 8b079c05c..9a4d290fa 100644 --- a/webhooks/webhooks.go +++ b/webhooks/webhooks.go @@ -3,6 +3,7 @@ package webhooks import ( "bytes" "context" + "encoding/base64" "encoding/json" "errors" "fmt" @@ -29,6 +30,14 @@ type ( } ) +type HeaderOption func(headers map[string]string) + +func WithBasicAuth(username, password string) HeaderOption { + return func(headers map[string]string) { + headers["Authorization"] = "Basic " + base64.StdEncoding.EncodeToString([]byte(username+":"+password)) + } +} + type NoopBroadcaster struct{} func (NoopBroadcaster) BroadcastAction(_ context.Context, _ Event) error { return nil } @@ -40,9 +49,10 @@ const ( type ( Webhook struct { - Module string `json:"module"` - Event string `json:"event"` - URL string `json:"url"` + Module string `json:"module"` + Event string `json:"event"` + URL string `json:"url"` + Headers map[string]string `json:"headers,omitempty"` } WebhookQueueInfo struct { @@ -57,12 +67,12 @@ type ( Payload interface{} `json:"payload,omitempty"` } - WebhookEvent interface { + EventWebhook interface { Event() Event } ) -func NewEventWebhook(url string, e WebhookEvent) Webhook { +func NewEventWebhook(url string, e EventWebhook) Webhook { return Webhook{ Module: e.Event().Module, Event: e.Event().Event, @@ -84,9 +94,10 @@ type Manager struct { } type eventQueue struct { - ctx context.Context - logger *zap.SugaredLogger - url string + ctx context.Context + logger *zap.SugaredLogger + headers map[string]string + url string mu sync.Mutex isDequeueing bool @@ -105,9 +116,10 @@ func (m *Manager) BroadcastAction(_ context.Context, event Event) error { queue, exists := m.queues[hook.URL] if !exists { queue = &eventQueue{ - ctx: m.shutdownCtx, - logger: m.logger, - url: hook.URL, + ctx: m.shutdownCtx, + logger: m.logger, + headers: hook.Headers, + url: hook.URL, } m.queues[hook.URL] = queue } @@ -174,7 +186,7 @@ func (m *Manager) Register(ctx context.Context, wh Webhook) error { defer cancel() // Test URL. - err := sendEvent(ctx, wh.URL, Event{ + err := sendEvent(ctx, wh.URL, wh.Headers, Event{ Event: WebhookEventPing, }) if err != nil { @@ -207,7 +219,7 @@ func (q *eventQueue) dequeue() { q.events = q.events[1:] q.mu.Unlock() - err := sendEvent(q.ctx, q.url, next) + err := sendEvent(q.ctx, q.url, q.headers, next) if err != nil { q.logger.Errorf("failed to send Webhook event %v to %v: %v", next.String(), q.url, err) } @@ -247,7 +259,7 @@ func NewManager(logger *zap.SugaredLogger, store WebhookStore) (*Manager, error) return m, nil } -func sendEvent(ctx context.Context, url string, action Event) error { +func sendEvent(ctx context.Context, url string, headers map[string]string, action Event) error { body, err := json.Marshal(action) if err != nil { return err @@ -257,6 +269,9 @@ func sendEvent(ctx context.Context, url string, action Event) error { if err != nil { return err } + for k, v := range headers { + req.Header.Set(k, v) + } defer io.ReadAll(req.Body) // always drain body resp, err := http.DefaultClient.Do(req) diff --git a/worker/client/client.go b/worker/client/client.go index 71fd200ad..c1ab8a70e 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -16,6 +16,7 @@ import ( "go.sia.tech/jape" "go.sia.tech/renterd/api" "go.sia.tech/renterd/object" + "go.sia.tech/renterd/webhooks" ) // A Client provides methods for interacting with a worker. @@ -268,6 +269,12 @@ func (c *Client) UploadStats() (resp api.UploadStatsResponse, err error) { return } +// RegisterEvent register an event. +func (c *Client) RegisterEvent(ctx context.Context, e webhooks.Event) (err error) { + err = c.c.WithContext(ctx).POST("/events", e, nil) + return +} + func (c *Client) object(ctx context.Context, bucket, path string, opts api.DownloadObjectOptions) (_ io.ReadCloser, _ http.Header, err error) { values := url.Values{} values.Set("bucket", url.QueryEscape(bucket)) diff --git a/worker/mocks_test.go b/worker/mocks_test.go index ae5b3f359..b67d80c52 100644 --- a/worker/mocks_test.go +++ b/worker/mocks_test.go @@ -85,6 +85,7 @@ type busMock struct { *syncerMock *walletMock *webhookBroadcasterMock + *webhookStoreMock } func newBusMock(cs *contractStoreMock, hs *hostStoreMock, os *objectStoreMock) *busMock { @@ -688,3 +689,11 @@ type webhookBroadcasterMock struct{} func (*webhookBroadcasterMock) BroadcastAction(context.Context, webhooks.Event) error { return nil } + +var _ WebhookStore = (*webhookStoreMock)(nil) + +type webhookStoreMock struct{} + +func (*webhookStoreMock) RegisterWebhook(ctx context.Context, webhook webhooks.Webhook, opts ...webhooks.HeaderOption) error { + return nil +} diff --git a/worker/worker.go b/worker/worker.go index fa9ad3de8..842e541f2 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -26,6 +26,7 @@ import ( "go.sia.tech/renterd/api" "go.sia.tech/renterd/build" "go.sia.tech/renterd/internal/utils" + iworker "go.sia.tech/renterd/internal/worker" "go.sia.tech/renterd/object" "go.sia.tech/renterd/webhooks" "go.sia.tech/renterd/worker/client" @@ -79,6 +80,7 @@ type ( HostStore ObjectStore SettingStore + WebhookStore Syncer Wallet @@ -155,6 +157,10 @@ type ( WalletSign(ctx context.Context, txn *types.Transaction, toSign []types.Hash256, cf types.CoveredFields) error } + WebhookStore interface { + RegisterWebhook(ctx context.Context, webhook webhooks.Webhook, opts ...webhooks.HeaderOption) error + } + ConsensusState interface { ConsensusState(ctx context.Context) (api.ConsensusState, error) } @@ -208,6 +214,7 @@ type worker struct { uploadManager *uploadManager accounts *accounts + cache iworker.WorkerCache priceTables *priceTables transportPoolV3 *transportPoolV3 @@ -1224,6 +1231,25 @@ func (w *worker) idHandlerGET(jc jape.Context) { jc.Encode(w.id) } +func (w *worker) eventsHandler(jc jape.Context) { + var event webhooks.Event + if jc.Decode(&event) != nil { + return + } else if event.Event == webhooks.WebhookEventPing { + jc.ResponseWriter.WriteHeader(http.StatusOK) + return + } + + err := w.cache.HandleEvent(event) + if errors.Is(err, api.ErrUnknownEvent) { + jc.ResponseWriter.WriteHeader(http.StatusAccepted) + return + } else if err != nil { + jc.Error(err, http.StatusBadRequest) + return + } +} + func (w *worker) memoryGET(jc jape.Context) { jc.Encode(api.MemoryResponse{ Download: w.downloadManager.mm.Status(), @@ -1275,20 +1301,23 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush return nil, errors.New("uploadMaxMemory cannot be 0") } + cache := iworker.NewCache(b, l) + l = l.Named("worker").Named(id) - ctx, cancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCancel := context.WithCancel(context.Background()) w := &worker{ alerts: alerts.WithOrigin(b, fmt.Sprintf("worker.%s", id)), allowPrivateIPs: allowPrivateIPs, contractLockingDuration: contractLockingDuration, + cache: cache, id: id, bus: b, masterKey: masterKey, logger: l.Sugar(), startTime: time.Now(), uploadingPackedSlabs: make(map[string]struct{}), - shutdownCtx: ctx, - shutdownCtxCancel: cancel, + shutdownCtx: shutdownCtx, + shutdownCtxCancel: shutdownCancel, } w.initAccounts(b) @@ -1308,6 +1337,8 @@ func (w *worker) Handler() http.Handler { "GET /account/:hostkey": w.accountHandlerGET, "GET /id": w.idHandlerGET, + "POST /events": w.eventsHandler, + "GET /memory": w.memoryGET, "GET /rhp/contracts": w.rhpContractsHandlerGET, @@ -1336,6 +1367,12 @@ func (w *worker) Handler() http.Handler { }) } +// Setup initializes the worker cache. +func (w *worker) Setup(ctx context.Context, apiURL, apiPassword string) error { + webhookOpts := []webhooks.HeaderOption{webhooks.WithBasicAuth("", apiPassword)} + return w.cache.Initialize(ctx, apiURL, webhookOpts...) +} + // Shutdown shuts down the worker. func (w *worker) Shutdown(ctx context.Context) error { // cancel shutdown context @@ -1555,13 +1592,13 @@ func (w *worker) GetObject(ctx context.Context, bucket, path string, opts api.Do opts.Range.Length = hor.Range.Length // fetch gouging params - gp, err := w.bus.GougingParams(ctx) + gp, err := w.cache.GougingParams(ctx) if err != nil { return nil, fmt.Errorf("couldn't fetch gouging parameters from bus: %w", err) } // fetch all contracts - contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{}) + contracts, err := w.cache.DownloadContracts(ctx) if err != nil { return nil, fmt.Errorf("couldn't fetch contracts from bus: %w", err) }