Skip to content

Commit

Permalink
bus: update pin manager store
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Aug 15, 2024
1 parent 747232c commit 46dc438
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 17 deletions.
2 changes: 1 addition & 1 deletion bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func New(ctx context.Context, am *alerts.Manager, wm WebhooksManager, cm ChainMa
}

// create pin manager
b.pinMgr = ibus.NewPinManager(b.alerts, wm, store, store, defaultPinUpdateInterval, defaultPinRateWindow, l)
b.pinMgr = ibus.NewPinManager(b.alerts, wm, store, defaultPinUpdateInterval, defaultPinRateWindow, l)

// create chain subscriber
b.cs = ibus.NewChainSubscriber(wm, cm, store, w, announcementMaxAge, l)
Expand Down
25 changes: 10 additions & 15 deletions internal/bus/pinmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,18 @@ import (
)

type (
AutopilotStore interface {
Store interface {
Autopilot(ctx context.Context, id string) (api.Autopilot, error)
UpdateAutopilot(ctx context.Context, ap api.Autopilot) error
}

SettingStore interface {
Setting(ctx context.Context, key string) (string, error)
UpdateAutopilot(ctx context.Context, ap api.Autopilot) error
UpdateSetting(ctx context.Context, key, value string) error
}
)

type (
pinManager struct {
a alerts.Alerter
as AutopilotStore
ss SettingStore
s Store
broadcaster webhooks.Broadcaster

updateInterval time.Duration
Expand All @@ -54,11 +50,10 @@ type (
// NewPinManager returns a new PinManager, responsible for pinning prices to a
// fixed value in an underlying currency. The returned pin manager is already
// running and can be stopped by calling Shutdown.
func NewPinManager(alerts alerts.Alerter, broadcaster webhooks.Broadcaster, as AutopilotStore, ss SettingStore, updateInterval, rateWindow time.Duration, l *zap.Logger) *pinManager {
func NewPinManager(alerts alerts.Alerter, broadcaster webhooks.Broadcaster, s Store, updateInterval, rateWindow time.Duration, l *zap.Logger) *pinManager {
pm := &pinManager{
a: alerts,
as: as,
ss: ss,
s: s,
broadcaster: broadcaster,

logger: l.Named("pricemanager").Sugar(),
Expand Down Expand Up @@ -110,7 +105,7 @@ func (pm *pinManager) averageRate() decimal.Decimal {

func (pm *pinManager) pinnedSettings(ctx context.Context) (api.PricePinSettings, error) {
var ps api.PricePinSettings
if pss, err := pm.ss.Setting(ctx, api.SettingPricePinning); err != nil {
if pss, err := pm.s.Setting(ctx, api.SettingPricePinning); err != nil {
return api.PricePinSettings{}, err
} else if err := json.Unmarshal([]byte(pss), &ps); err != nil {
pm.logger.Panicf("failed to unmarshal pinned settings '%s': %v", pss, err)
Expand Down Expand Up @@ -185,7 +180,7 @@ func (pm *pinManager) run() {
func (pm *pinManager) updateAutopilotSettings(ctx context.Context, autopilotID string, pins api.AutopilotPins, rate decimal.Decimal) error {
var updated bool

ap, err := pm.as.Autopilot(ctx, autopilotID)
ap, err := pm.s.Autopilot(ctx, autopilotID)
if err != nil {
return err
}
Expand Down Expand Up @@ -222,7 +217,7 @@ func (pm *pinManager) updateAutopilotSettings(ctx context.Context, autopilotID s
}

// update autopilto
return pm.as.UpdateAutopilot(ctx, ap)
return pm.s.UpdateAutopilot(ctx, ap)
}

func (pm *pinManager) updateExchangeRates(currency string, rate float64) error {
Expand All @@ -249,7 +244,7 @@ func (pm *pinManager) updateGougingSettings(ctx context.Context, pins api.Gougin

// fetch gouging settings
var gs api.GougingSettings
if gss, err := pm.ss.Setting(ctx, api.SettingGouging); err != nil {
if gss, err := pm.s.Setting(ctx, api.SettingGouging); err != nil {
return err
} else if err := json.Unmarshal([]byte(gss), &gs); err != nil {
pm.logger.Panicf("failed to unmarshal gouging settings '%s': %v", gss, err)
Expand Down Expand Up @@ -319,7 +314,7 @@ func (pm *pinManager) updateGougingSettings(ctx context.Context, pins api.Gougin

// update settings
bytes, _ := json.Marshal(gs)
err = pm.ss.UpdateSetting(ctx, api.SettingGouging, string(bytes))
err = pm.s.UpdateSetting(ctx, api.SettingGouging, string(bytes))

// broadcast event
if err == nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/bus/pinmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func TestPinManager(t *testing.T) {
defer forex.Close()

// create a pinmanager
pm := NewPinManager(a, eb, ms, ms, testUpdateInterval, time.Minute, zap.NewNop())
pm := NewPinManager(a, eb, ms, testUpdateInterval, time.Minute, zap.NewNop())
defer func() {
if err := pm.Shutdown(context.Background()); err != nil {
t.Fatal(err)
Expand Down

0 comments on commit 46dc438

Please sign in to comment.