Skip to content

Commit

Permalink
Cache gouging params and download contracts in the worker (#1264)
Browse files Browse the repository at this point in the history
This PR adds a small cache to the worker that's being
invalidated/updated by event webhooks we receive from the `bus`.

I added two benchmarks, a baseline and another with the worker cache
enabled. This doesn't really show all the benefits though because a) it
will put less strain on the database and b) I have a single node setup.
For a multi-node setup this likely makes a big difference.


[baseline.txt](https://github.com/user-attachments/files/15740432/baseline.txt)

[cached.txt](https://github.com/user-attachments/files/15740433/cached.txt)

---------

Co-authored-by: Christopher Schinnerl <chris@sia.tech>
  • Loading branch information
peterjan and ChrisSchinnerl committed Jun 13, 2024
1 parent 57c3e5f commit c7b80d4
Show file tree
Hide file tree
Showing 25 changed files with 782 additions and 134 deletions.
70 changes: 66 additions & 4 deletions api/events.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package api

import (
"encoding/json"
"errors"
"fmt"
"time"

"go.sia.tech/core/types"
"go.sia.tech/renterd/webhooks"
)

const (
ModuleContractSet = "contract_set"
ModuleConsensus = "consensus"
ModuleContract = "contract"
ModuleContractSet = "contract_set"
ModuleSetting = "setting"

EventUpdate = "update"
Expand All @@ -19,6 +22,10 @@ const (
EventRenew = "renew"
)

var (
ErrUnknownEvent = errors.New("unknown event")
)

type (
EventConsensusUpdate struct {
ConsensusState
Expand All @@ -33,9 +40,8 @@ type (
}

EventContractRenew struct {
ContractID types.FileContractID `json:"contractID"`
RenewedFromID types.FileContractID `json:"renewedFromID"`
Timestamp time.Time `json:"timestamp"`
Renewal ContractMetadata `json:"renewal"`
Timestamp time.Time `json:"timestamp"`
}

EventContractSetUpdate struct {
Expand Down Expand Up @@ -103,3 +109,59 @@ func (e EventSettingDelete) Event() webhooks.Event {
Payload: e,
}
}

func ParseEventWebhook(event webhooks.Event) (interface{}, error) {
bytes, err := json.Marshal(event.Payload)
if err != nil {
return nil, err
}
switch event.Module {
case ModuleContract:
switch event.Event {
case EventArchive:
var e EventContractArchive
if err := json.Unmarshal(bytes, &e); err != nil {
return nil, err
}
return e, nil
case EventRenew:
var e EventContractRenew
if err := json.Unmarshal(bytes, &e); err != nil {
return nil, err
}
return e, nil
}
case ModuleContractSet:
if event.Event == EventUpdate {
var e EventContractSetUpdate
if err := json.Unmarshal(bytes, &e); err != nil {
return nil, err
}
return e, nil
}
case ModuleConsensus:
if event.Event == EventUpdate {
var e EventConsensusUpdate
if err := json.Unmarshal(bytes, &e); err != nil {
return nil, err
}
return e, nil
}
case ModuleSetting:
switch event.Event {
case EventUpdate:
var e EventSettingUpdate
if err := json.Unmarshal(bytes, &e); err != nil {
return nil, err
}
return e, nil
case EventDelete:
var e EventSettingDelete
if err := json.Unmarshal(bytes, &e); err != nil {
return nil, err
}
return e, nil
}
}
return nil, fmt.Errorf("%w: module %s event %s", ErrUnknownEvent, event.Module, event.Event)
}
2 changes: 1 addition & 1 deletion api/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package api

import "go.sia.tech/renterd/webhooks"

type WebHookResponse struct {
type WebhookResponse struct {
Webhooks []webhooks.Webhook `json:"webhooks"`
Queues []webhooks.WebhookQueueInfo `json:"queues"`
}
15 changes: 8 additions & 7 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -1173,9 +1173,8 @@ func (b *bus) contractIDRenewedHandlerPOST(jc jape.Context) {

b.uploadingSectors.HandleRenewal(req.Contract.ID(), req.RenewedFrom)
b.events.BroadcastEvent(api.EventContractRenew{
ContractID: req.Contract.ID(),
RenewedFromID: req.RenewedFrom,
Timestamp: time.Now().UTC(),
Renewal: r,
Timestamp: time.Now().UTC(),
})

jc.Encode(r)
Expand Down Expand Up @@ -2111,7 +2110,7 @@ func (b *bus) webhookHandlerDelete(jc jape.Context) {

func (b *bus) webhookHandlerGet(jc jape.Context) {
webhooks, queueInfos := b.hooks.Info()
jc.Encode(api.WebHookResponse{
jc.Encode(api.WebhookResponse{
Queues: queueInfos,
Webhooks: webhooks,
})
Expand All @@ -2122,10 +2121,12 @@ func (b *bus) webhookHandlerPost(jc jape.Context) {
if jc.Decode(&req) != nil {
return
}

err := b.hooks.Register(jc.Request.Context(), webhooks.Webhook{
Event: req.Event,
Module: req.Module,
URL: req.URL,
Event: req.Event,
Module: req.Module,
URL: req.URL,
Headers: req.Headers,
})
if err != nil {
jc.Error(fmt.Errorf("failed to add Webhook: %w", err), http.StatusInternalServerError)
Expand Down
10 changes: 8 additions & 2 deletions bus/client/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,19 @@ func (c *Client) DeleteWebhook(ctx context.Context, url, module, event string) e
}

// RegisterWebhook registers a new webhook for the given URL.
func (c *Client) RegisterWebhook(ctx context.Context, webhook webhooks.Webhook) error {
func (c *Client) RegisterWebhook(ctx context.Context, webhook webhooks.Webhook, opts ...webhooks.HeaderOption) error {
if webhook.Headers == nil {
webhook.Headers = make(map[string]string)
}
for _, opt := range opts {
opt(webhook.Headers)
}
err := c.c.WithContext(ctx).POST("/webhooks", webhook, nil)
return err
}

// Webhooks returns all webhooks currently registered.
func (c *Client) Webhooks(ctx context.Context) (resp api.WebHookResponse, err error) {
func (c *Client) Webhooks(ctx context.Context) (resp api.WebhookResponse, err error) {
err = c.c.WithContext(ctx).GET("/webhooks", &resp)
return
}
45 changes: 22 additions & 23 deletions cmd/renterd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.sia.tech/renterd/config"
"go.sia.tech/renterd/internal/node"
"go.sia.tech/renterd/internal/utils"
iworker "go.sia.tech/renterd/internal/worker"
"go.sia.tech/renterd/worker"
"go.sia.tech/renterd/worker/s3"
"go.sia.tech/web/renterd"
Expand Down Expand Up @@ -455,11 +456,11 @@ func main() {
Network: network,
}

type shutdownFn struct {
type shutdownFnEntry struct {
name string
fn func(context.Context) error
}
var shutdownFns []shutdownFn
var shutdownFns []shutdownFnEntry

if cfg.Bus.RemoteAddr != "" && len(cfg.Worker.Remotes) != 0 && !cfg.Autopilot.Enabled {
logger.Fatal("remote bus, remote worker, and no autopilot -- nothing to do!")
Expand All @@ -485,7 +486,7 @@ func main() {

// Create the webserver.
srv := &http.Server{Handler: mux}
shutdownFns = append(shutdownFns, shutdownFn{
shutdownFns = append(shutdownFns, shutdownFnEntry{
name: "HTTP Server",
fn: srv.Shutdown,
})
Expand All @@ -500,7 +501,7 @@ func main() {
if err != nil {
logger.Fatal("failed to create bus, err: " + err.Error())
}
shutdownFns = append(shutdownFns, shutdownFn{
shutdownFns = append(shutdownFns, shutdownFnEntry{
name: "Bus",
fn: fn,
})
Expand All @@ -519,22 +520,27 @@ func main() {
var s3Srv *http.Server
var s3Listener net.Listener
var workers []autopilot.Worker
setupWorkerFn := node.NoopFn
if len(cfg.Worker.Remotes) == 0 {
if cfg.Worker.Enabled {
w, s3Handler, fn, err := node.NewWorker(cfg.Worker, s3.Opts{
workerAddr := cfg.HTTP.Address + "/api/worker"
var shutdownFn node.ShutdownFn
w, s3Handler, setupFn, shutdownFn, err := node.NewWorker(cfg.Worker, s3.Opts{
AuthDisabled: cfg.S3.DisableAuth,
HostBucketEnabled: cfg.S3.HostBucketEnabled,
}, bc, seed, logger)
if err != nil {
logger.Fatal("failed to create worker: " + err.Error())
}
shutdownFns = append(shutdownFns, shutdownFn{
setupWorkerFn = func(ctx context.Context) error {
return setupFn(ctx, workerAddr, cfg.HTTP.Password)
}
shutdownFns = append(shutdownFns, shutdownFnEntry{
name: "Worker",
fn: fn,
fn: shutdownFn,
})

mux.Sub["/api/worker"] = utils.TreeMux{Handler: workerAuth(cfg.HTTP.Password, cfg.Worker.AllowUnauthenticatedDownloads)(w)}
workerAddr := cfg.HTTP.Address + "/api/worker"
mux.Sub["/api/worker"] = utils.TreeMux{Handler: iworker.Auth(cfg.HTTP.Password, cfg.Worker.AllowUnauthenticatedDownloads)(w)}
wc := worker.NewClient(workerAddr, cfg.HTTP.Password)
workers = append(workers, wc)

Expand All @@ -547,7 +553,7 @@ func main() {
if err != nil {
logger.Fatal("failed to create listener: " + err.Error())
}
shutdownFns = append(shutdownFns, shutdownFn{
shutdownFns = append(shutdownFns, shutdownFnEntry{
name: "S3",
fn: s3Srv.Shutdown,
})
Expand All @@ -573,7 +579,7 @@ func main() {
}

// NOTE: the autopilot shutdown function needs to be called first.
shutdownFns = append(shutdownFns, shutdownFn{
shutdownFns = append(shutdownFns, shutdownFnEntry{
name: "Autopilot",
fn: fn,
})
Expand Down Expand Up @@ -614,6 +620,11 @@ func main() {
}
}

// Finish worker setup.
if err := setupWorkerFn(context.Background()); err != nil {
logger.Fatal("failed to setup worker: " + err.Error())
}

logger.Info("api: Listening on " + l.Addr().String())

if s3Srv != nil {
Expand Down Expand Up @@ -745,15 +756,3 @@ func runCompatMigrateAutopilotJSONToStore(bc *bus.Client, id, dir string) (err e

return nil
}

func workerAuth(password string, unauthenticatedDownloads bool) func(http.Handler) http.Handler {
return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if unauthenticatedDownloads && req.Method == http.MethodGet && strings.HasPrefix(req.URL.Path, "/objects/") {
h.ServeHTTP(w, req)
} else {
jape.BasicAuth(password)(h).ServeHTTP(w, req)
}
})
}
}
2 changes: 1 addition & 1 deletion internal/bus/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func NewEventBroadcaster(b webhooks.Broadcaster, l *zap.SugaredLogger) EventBroa
}
}

func (b EventBroadcaster) BroadcastEvent(e webhooks.WebhookEvent) {
func (b EventBroadcaster) BroadcastEvent(e webhooks.EventWebhook) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
if err := b.broadcaster.BroadcastAction(ctx, e.Event()); err != nil {
b.logger.Errorw("failed to broadcast event", "event", e, "error", err)
Expand Down
11 changes: 7 additions & 4 deletions internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ type AutopilotConfig struct {

type (
RunFn = func() error
SetupFn = func(context.Context, string, string) error
ShutdownFn = func(context.Context) error
)

var NoopFn = func(context.Context) error { return nil }

func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (http.Handler, ShutdownFn, error) {
gatewayDir := filepath.Join(dir, "gateway")
if err := os.MkdirAll(gatewayDir, 0700); err != nil {
Expand Down Expand Up @@ -219,18 +222,18 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht
return b.Handler(), shutdownFn, nil
}

func NewWorker(cfg config.Worker, s3Opts s3.Opts, b Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, http.Handler, ShutdownFn, error) {
func NewWorker(cfg config.Worker, s3Opts s3.Opts, b Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, http.Handler, SetupFn, ShutdownFn, error) {
workerKey := blake2b.Sum256(append([]byte("worker"), seed...))
w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.BusFlushInterval, cfg.DownloadOverdriveTimeout, cfg.UploadOverdriveTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxOverdrive, cfg.DownloadMaxMemory, cfg.UploadMaxMemory, cfg.AllowPrivateIPs, l)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
s3Handler, err := s3.New(b, w, l.Named("s3").Sugar(), s3Opts)
if err != nil {
err = errors.Join(err, w.Shutdown(context.Background()))
return nil, nil, nil, fmt.Errorf("failed to create s3 handler: %w", err)
return nil, nil, nil, nil, fmt.Errorf("failed to create s3 handler: %w", err)
}
return w.Handler(), s3Handler, w.Shutdown, nil
return w.Handler(), s3Handler, w.Setup, w.Shutdown, nil
}

func NewAutopilot(cfg AutopilotConfig, b autopilot.Bus, workers []autopilot.Worker, l *zap.Logger) (http.Handler, RunFn, ShutdownFn, error) {
Expand Down
6 changes: 6 additions & 0 deletions internal/sql/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ var (
return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00009_json_settings", log)
},
},
{
ID: "00010_webhook_headers",
Migrate: func(tx Tx) error {
return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00010_webhook_headers", log)
},
},
}
}
MetricsMigrations = func(ctx context.Context, migrationsFs embed.FS, log *zap.SugaredLogger) []Migration {
Expand Down
12 changes: 8 additions & 4 deletions internal/test/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.sia.tech/renterd/internal/node"
"go.sia.tech/renterd/internal/test"
"go.sia.tech/renterd/internal/utils"
iworker "go.sia.tech/renterd/internal/worker"
"go.sia.tech/renterd/stores"
"go.sia.tech/renterd/worker/s3"
"go.sia.tech/web/renterd"
Expand Down Expand Up @@ -332,12 +333,10 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster {
busShutdownFns = append(busShutdownFns, bStopFn)

// Create worker.
w, s3Handler, wShutdownFn, err := node.NewWorker(workerCfg, s3.Opts{}, busClient, wk, logger)
w, s3Handler, wSetupFn, wShutdownFn, err := node.NewWorker(workerCfg, s3.Opts{}, busClient, wk, logger)
tt.OK(err)

workerAuth := jape.BasicAuth(workerPassword)
workerServer := http.Server{
Handler: workerAuth(w),
Handler: iworker.Auth(workerPassword, false)(w),
}

var workerShutdownFns []func(context.Context) error
Expand Down Expand Up @@ -440,6 +439,11 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster {
SlabBufferMaxSizeSoft: build.DefaultUploadPackingSettings.SlabBufferMaxSizeSoft,
}))

// Register the worker
if err := wSetupFn(ctx, workerAddr, workerPassword); err != nil {
tt.Fatalf("failed to register worker, err: %v", err)
}

// Fund the bus.
if funding {
cluster.MineBlocks(latestHardforkHeight)
Expand Down
Loading

0 comments on commit c7b80d4

Please sign in to comment.