Skip to content

Commit

Permalink
feat: remove queue from the service
Browse files Browse the repository at this point in the history
  • Loading branch information
jrwbabylonlab committed Dec 6, 2024
1 parent f462bf3 commit 6bbd20e
Show file tree
Hide file tree
Showing 12 changed files with 18 additions and 347 deletions.
8 changes: 1 addition & 7 deletions cmd/staking-expiry-checker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/babylonlabs-io/staking-expiry-checker/internal/db"
"github.com/babylonlabs-io/staking-expiry-checker/internal/observability/metrics"
"github.com/babylonlabs-io/staking-expiry-checker/internal/poller"
"github.com/babylonlabs-io/staking-expiry-checker/internal/queue"
"github.com/babylonlabs-io/staking-expiry-checker/internal/services"
)

Expand Down Expand Up @@ -53,12 +52,7 @@ func main() {
log.Fatal().Err(err).Msg("error while creating btc client")
}

qm, err := queue.NewQueueManager(&cfg.Queue)
if err != nil {
log.Fatal().Err(err).Msg("error while creating queue manager")
}

delegationService := services.NewService(dbClient, btcClient, qm)
delegationService := services.NewService(dbClient, btcClient)
if err != nil {
log.Fatal().Err(err).Msg("error while creating delegation service")
}
Expand Down
8 changes: 0 additions & 8 deletions config/config-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@ btc:
net-params: testnet
rpc-user: rpcuser
rpc-pass: rpcpass
queue:
queue_user: user # can be replaced by values in .env file
queue_password: password
url: "localhost:5672"
processing_timeout: 5 # 5 second
msg_max_retry_attempts: 10
requeue_delay_time: 300
queue_type: quorum
metrics:
host: 0.0.0.0
port: 2112
8 changes: 0 additions & 8 deletions config/config-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@ btc:
net-params: testnet
rpc-user: rpcuser
rpc-pass: rpcpass
queue:
queue_user: user # can be replaced by values in .env file
queue_password: password
url: "localhost:5672"
processing_timeout: 5 # 5 second
msg_max_retry_attempts: 3
requeue_delay_time: 60
queue_type: quorum
metrics:
host: 0.0.0.0
port: 2112
2 changes: 1 addition & 1 deletion contrib/images/staking-expiry-checker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.21-alpine AS builder
FROM golang:1.23.1-alpine AS builder

ARG VERSION="HEAD"

Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
module github.com/babylonlabs-io/staking-expiry-checker

go 1.21.6
go 1.23.1

require (
github.com/babylonlabs-io/staking-queue-client v0.4.1
github.com/btcsuite/btcd v0.24.0
github.com/rabbitmq/amqp091-go v1.9.0
github.com/spf13/viper v1.18.2
)

Expand Down
14 changes: 4 additions & 10 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@ import (
"os"
"strings"

queue "github.com/babylonlabs-io/staking-queue-client/config"
"github.com/spf13/viper"
)

type Config struct {
Poller PollerConfig `mapstructure:"poller"`
Db DbConfig `mapstructure:"db"`
Btc BtcConfig `mapstructure:"btc"`
Queue queue.QueueConfig `mapstructure:"queue"`
Metrics MetricsConfig `mapstructure:"metrics"`
Poller PollerConfig `mapstructure:"poller"`
Db DbConfig `mapstructure:"db"`
Btc BtcConfig `mapstructure:"btc"`
Metrics MetricsConfig `mapstructure:"metrics"`
}

func (cfg *Config) Validate() error {
Expand All @@ -34,10 +32,6 @@ func (cfg *Config) Validate() error {
return err
}

if err := cfg.Queue.Validate(); err != nil {
return err
}

return nil
}

Expand Down
14 changes: 0 additions & 14 deletions internal/observability/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ var (
metricsRouter *chi.Mux
pollDurationHistogram *prometheus.HistogramVec
btcClientDurationHistogram *prometheus.HistogramVec
queueSendErrorCounter prometheus.Counter
)

// Init initializes the metrics package.
Expand Down Expand Up @@ -89,18 +88,9 @@ func registerMetrics() {
[]string{"function", "status"},
)

// add a counter for the number of errors from the fail to push message into queue
queueSendErrorCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "queue_send_error_count",
Help: "The total number of errors when sending messages to the queue",
},
)

prometheus.MustRegister(
pollDurationHistogram,
btcClientDurationHistogram,
queueSendErrorCounter,
)
}

Expand All @@ -126,7 +116,3 @@ func RecordBtcClientMetrics[T any](clientRequest func() (T, error)) (T, error) {

return result, err
}

func RecordQueueSendError() {
queueSendErrorCounter.Inc()
}
55 changes: 0 additions & 55 deletions internal/queue/queue.go

This file was deleted.

21 changes: 7 additions & 14 deletions internal/services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,18 @@ import (

"github.com/babylonlabs-io/staking-expiry-checker/internal/btcclient"
"github.com/babylonlabs-io/staking-expiry-checker/internal/db"
"github.com/babylonlabs-io/staking-expiry-checker/internal/queue"
queueclient "github.com/babylonlabs-io/staking-queue-client/client"
"github.com/rs/zerolog/log"
)

type Service struct {
db db.DbInterface
btc btcclient.BtcInterface
queueManager *queue.QueueManager
db db.DbInterface
btc btcclient.BtcInterface
}

func NewService(db db.DbInterface, btc btcclient.BtcInterface, qm *queue.QueueManager) *Service {
func NewService(db db.DbInterface, btc btcclient.BtcInterface) *Service {
return &Service{
db: db,
btc: btc,
queueManager: qm,
db: db,
btc: btc,
}
}

Expand All @@ -43,11 +39,8 @@ func (s *Service) ProcessExpiredDelegations(ctx context.Context) error {
}

for _, delegation := range expiredDelegations {
ev := queueclient.NewExpiredStakingEvent(delegation.StakingTxHashHex, delegation.TxType)
if err := s.queueManager.SendExpiredStakingEvent(ctx, ev); err != nil {
log.Error().Err(err).Msg("Error sending expired staking event")
return err
}
// TODO: Process the expired delegation.
log.Info().Msgf("Found a expired delegation, do nothing now: %v", delegation.ID)
// After successfully sending the event, delete the entry from the database.
if err := s.db.DeleteExpiredDelegation(ctx, delegation.ID); err != nil {
log.Error().Err(err).Msg("Error deleting expired delegation")
Expand Down
8 changes: 0 additions & 8 deletions tests/config-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@ btc:
net-params: testnet
rpc-user: rpcuser
rpc-pass: rpcpass
queue:
queue_user: user # can be replaced by values in .env file
queue_password: password
url: "localhost:5672"
processing_timeout: 60 # 5 second
msg_max_retry_attempts: 2
requeue_delay_time: 5
queue_type: quorum
metrics:
host: 0.0.0.0
port: 2113
Loading

0 comments on commit 6bbd20e

Please sign in to comment.