Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define v2 api endpoint #73

Merged
merged 10 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ run-unprocessed-events-replay-local:
--replay

generate-mock-interface:
cd internal/db && mockery --name=DBClient --output=../../tests/mocks --outpkg=mocks --filename=mock_db_client.go
cd internal/clients/ordinals && mockery --name=OrdinalsClientInterface --output=../../../tests/mocks --outpkg=mocks --filename=mock_ordinal_client.go
cd internal/shared/db/client && mockery --name=DBClient --output=../../../../tests/mocks --outpkg=mocks --filename=mock_db_client.go
cd internal/v1/db/client && mockery --name=V1DBClient --output=../../../../tests/mocks --outpkg=mocks --filename=mock_v1_db_client.go
cd internal/v2/db/client && mockery --name=V2DBClient --output=../../../../tests/mocks --outpkg=mocks --filename=mock_v2_db_client.go
cd internal/shared/http/clients/ordinals && mockery --name=OrdinalsClient --output=../../../../../tests/mocks --outpkg=mocks --filename=mock_ordinal_client.go

test:
./bin/local-startup.sh;
go test -v -cover -p 1 ./... -count=1


build-swagger:
swag init --parseDependency --parseInternal -d cmd/staking-api-service,internal/api,internal/types
swag init --parseDependency --parseInternal -d cmd/staking-api-service,internal/shared/api,internal/shared/types,internal/v1/api/handlers,internal/v2/api/handlers
39 changes: 24 additions & 15 deletions cmd/staking-api-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import (

"github.com/babylonlabs-io/staking-api-service/cmd/staking-api-service/cli"
"github.com/babylonlabs-io/staking-api-service/cmd/staking-api-service/scripts"
"github.com/babylonlabs-io/staking-api-service/internal/api"
"github.com/babylonlabs-io/staking-api-service/internal/clients"
"github.com/babylonlabs-io/staking-api-service/internal/config"
"github.com/babylonlabs-io/staking-api-service/internal/db/model"
"github.com/babylonlabs-io/staking-api-service/internal/observability/healthcheck"
"github.com/babylonlabs-io/staking-api-service/internal/observability/metrics"
"github.com/babylonlabs-io/staking-api-service/internal/queue"
"github.com/babylonlabs-io/staking-api-service/internal/services"
"github.com/babylonlabs-io/staking-api-service/internal/types"
"github.com/babylonlabs-io/staking-api-service/internal/shared/api"
"github.com/babylonlabs-io/staking-api-service/internal/shared/config"
dbclients "github.com/babylonlabs-io/staking-api-service/internal/shared/db/clients"
dbmodel "github.com/babylonlabs-io/staking-api-service/internal/shared/db/model"
"github.com/babylonlabs-io/staking-api-service/internal/shared/http/clients"
"github.com/babylonlabs-io/staking-api-service/internal/shared/observability/healthcheck"
"github.com/babylonlabs-io/staking-api-service/internal/shared/observability/metrics"
queueclients "github.com/babylonlabs-io/staking-api-service/internal/shared/queue/clients"
"github.com/babylonlabs-io/staking-api-service/internal/shared/services"
"github.com/babylonlabs-io/staking-api-service/internal/shared/types"
"github.com/joho/godotenv"
"github.com/rs/zerolog/log"
)
Expand Down Expand Up @@ -63,24 +64,32 @@ func main() {
metricsPort := cfg.Metrics.GetMetricsPort()
metrics.Init(metricsPort)

err = model.Setup(ctx, cfg)
err = dbmodel.Setup(ctx, cfg)
if err != nil {
log.Fatal().Err(err).Msg("error while setting up staking db model")
}

// initialize clients package which is used to interact with external services
clients := clients.New(cfg)
services, err := services.New(ctx, cfg, params, finalityProviders, clients)

dbClients, err := dbclients.New(ctx, cfg)
if err != nil {
log.Fatal().Err(err).Msg("error while setting up staking db clients")
}

services, err := services.New(ctx, cfg, params, finalityProviders, clients, dbClients)
if err != nil {
log.Fatal().Err(err).Msg("error while setting up staking services layer")
}

// Start the event queue processing
queues := queue.New(cfg.Queue, services)
queueClients := queueclients.New(ctx, cfg.Queue, services)

// Check if the scripts flag is set
if cli.GetReplayFlag() {
log.Info().Msg("Replay flag is set. Starting replay of unprocessable messages.")
err := scripts.ReplayUnprocessableMessages(ctx, cfg, queues, services.DbClient)

err := scripts.ReplayUnprocessableMessages(ctx, cfg, queueClients, dbClients.SharedDBClient)
if err != nil {
log.Fatal().Err(err).Msg("error while replaying unprocessable messages")
}
Expand All @@ -94,9 +103,9 @@ func main() {
return
}

queues.StartReceivingMessages()
queueClients.StartReceivingMessages()

healthcheckErr := healthcheck.StartHealthCheckCron(ctx, queues, cfg.Server.HealthCheckInterval)
healthcheckErr := healthcheck.StartHealthCheckCron(ctx, queueClients, cfg.Server.HealthCheckInterval)
if healthcheckErr != nil {
log.Fatal().Err(healthcheckErr).Msg("error while starting health check cron")
}
Expand Down
17 changes: 11 additions & 6 deletions cmd/staking-api-service/scripts/pubkey_address_backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,26 @@ import (
"context"
"fmt"

"github.com/babylonlabs-io/staking-api-service/internal/config"
"github.com/babylonlabs-io/staking-api-service/internal/db"
"github.com/babylonlabs-io/staking-api-service/internal/utils"
"github.com/babylonlabs-io/staking-api-service/internal/shared/config"
dbclient "github.com/babylonlabs-io/staking-api-service/internal/shared/db/client"
"github.com/babylonlabs-io/staking-api-service/internal/shared/utils"
v1dbclient "github.com/babylonlabs-io/staking-api-service/internal/v1/db/client"
"github.com/rs/zerolog/log"
)

func BackfillPubkeyAddressesMappings(ctx context.Context, cfg *config.Config) error {
dbClient, err := db.New(ctx, cfg.Db)
client, err := dbclient.NewMongoClient(ctx, cfg.Db)
if err != nil {
return fmt.Errorf("failed to create db client: %w", err)
}
v1dbClient, err := v1dbclient.New(ctx, client, cfg.Db)
if err != nil {
return fmt.Errorf("failed to create db client: %w", err)
}
pageToken := ""
var count int
for {
result, err := dbClient.ScanDelegationsPaginated(ctx, pageToken)
result, err := v1dbClient.ScanDelegationsPaginated(ctx, pageToken)
if err != nil {
return fmt.Errorf("failed to scan delegations: %w", err)
}
Expand All @@ -29,7 +34,7 @@ func BackfillPubkeyAddressesMappings(ctx context.Context, cfg *config.Config) er
if err != nil {
return fmt.Errorf("failed to derive btc addresses: %w", err)
}
if err := dbClient.InsertPkAddressMappings(
if err := v1dbClient.InsertPkAddressMappings(
ctx, delegation.StakerPkHex, addresses.Taproot,
addresses.NativeSegwitOdd, addresses.NativeSegwitEven,
); err != nil {
Expand Down
35 changes: 12 additions & 23 deletions cmd/staking-api-service/scripts/replay_unprocessed_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"errors"
"fmt"

"github.com/babylonlabs-io/staking-api-service/internal/config"
"github.com/babylonlabs-io/staking-api-service/internal/db"
"github.com/babylonlabs-io/staking-api-service/internal/queue"
"github.com/babylonlabs-io/staking-api-service/internal/shared/config"
dbclient "github.com/babylonlabs-io/staking-api-service/internal/shared/db/client"
queueclients "github.com/babylonlabs-io/staking-api-service/internal/shared/queue/clients"
queueClient "github.com/babylonlabs-io/staking-queue-client/client"
"github.com/rs/zerolog/log"
)
Expand All @@ -17,9 +17,7 @@ type GenericEvent struct {
EventType queueClient.EventType `json:"event_type"`
}

func ReplayUnprocessableMessages(ctx context.Context, cfg *config.Config, queues *queue.Queues, db db.DBClient) (err error) {
fmt.Println("Starting to replay unprocessable messages...")

func ReplayUnprocessableMessages(ctx context.Context, cfg *config.Config, queues *queueclients.QueueClients, db dbclient.DBClient) (err error) {
// Fetch unprocessable messages
unprocessableMessages, err := db.FindUnprocessableMessages(ctx)
if err != nil {
Expand All @@ -30,15 +28,12 @@ func ReplayUnprocessableMessages(ctx context.Context, cfg *config.Config, queues
messageCount := len(unprocessableMessages)

// Inform the user of the number of unprocessable messages
fmt.Printf("There are %d unprocessable messages.\n", messageCount)
if messageCount == 0 {
return errors.New("no unprocessable messages to replay")
}

// Process each unprocessable message
for i, msg := range unprocessableMessages {
fmt.Printf("Processing message %d/%d: %s\n", i+1, messageCount, msg.MessageBody)

for _, msg := range unprocessableMessages {
var genericEvent GenericEvent
if err := json.Unmarshal([]byte(msg.MessageBody), &genericEvent); err != nil {
return errors.New("failed to unmarshal event message")
Expand All @@ -53,34 +48,28 @@ func ReplayUnprocessableMessages(ctx context.Context, cfg *config.Config, queues
if err := db.DeleteUnprocessableMessage(ctx, msg.Receipt); err != nil {
return errors.New("failed to delete unprocessable message")
}

fmt.Printf("Message %d/%d processed and deleted successfully.\n", i+1, messageCount)
}

log.Info().Msg("Reprocessing of unprocessable messages completed.")
fmt.Println("Reprocessing of unprocessable messages completed.")
return
}

// processEventMessage processes the event message based on its EventType.
func processEventMessage(ctx context.Context, queues *queue.Queues, event GenericEvent, messageBody string) error {
fmt.Printf("Sending message to the queue for event type: %v\n", event.EventType)

func processEventMessage(ctx context.Context, queues *queueclients.QueueClients, event GenericEvent, messageBody string) error {
switch event.EventType {
case queueClient.ActiveStakingEventType:
return queues.ActiveStakingQueueClient.SendMessage(ctx, messageBody)
return queues.V1QueueClient.ActiveStakingQueueClient.SendMessage(ctx, messageBody)
case queueClient.UnbondingStakingEventType:
return queues.UnbondingStakingQueueClient.SendMessage(ctx, messageBody)
return queues.V1QueueClient.UnbondingStakingQueueClient.SendMessage(ctx, messageBody)
case queueClient.WithdrawStakingEventType:
return queues.WithdrawStakingQueueClient.SendMessage(ctx, messageBody)
return queues.V1QueueClient.WithdrawStakingQueueClient.SendMessage(ctx, messageBody)
case queueClient.ExpiredStakingEventType:
return queues.ExpiredStakingQueueClient.SendMessage(ctx, messageBody)
return queues.V1QueueClient.ExpiredStakingQueueClient.SendMessage(ctx, messageBody)
case queueClient.StatsEventType:
return queues.StatsQueueClient.SendMessage(ctx, messageBody)
return queues.V1QueueClient.StatsQueueClient.SendMessage(ctx, messageBody)
case queueClient.BtcInfoEventType:
return queues.BtcInfoQueueClient.SendMessage(ctx, messageBody)
return queues.V1QueueClient.BtcInfoQueueClient.SendMessage(ctx, messageBody)
default:
fmt.Printf("Error: unknown event type: %v\n", event.EventType)
return fmt.Errorf("unknown event type: %v", event.EventType)
}
}
Loading
Loading