Skip to content

Commit

Permalink
feat: instrumenting bridge for better observibility and log correlation
Browse files Browse the repository at this point in the history
  • Loading branch information
sameh-farouk committed Oct 30, 2023
1 parent 9d56a80 commit c9080e7
Show file tree
Hide file tree
Showing 9 changed files with 319 additions and 77 deletions.
33 changes: 26 additions & 7 deletions bridge/tfchain_bridge/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
flag "github.com/spf13/pflag"
"github.com/threefoldtech/tfchain_bridge/pkg"
"github.com/threefoldtech/tfchain_bridge/pkg/bridge"
"github.com/threefoldtech/tfchain_bridge/pkg/logger"
)

func main() {
Expand All @@ -30,11 +31,12 @@ func main() {

flag.Parse()

log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stdout})
zerolog.SetGlobalLevel(zerolog.InfoLevel)
log.Logger = zerolog.New(os.Stdout).With().Timestamp().Uint("version", logger.VERSION).Logger()
if debug {
zerolog.SetGlobalLevel(zerolog.DebugLevel)
log.Debug().Msg("debug mode enabled")
} else {
zerolog.SetGlobalLevel(zerolog.InfoLevel)
}

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -43,23 +45,40 @@ func main() {
timeout, timeoutCancel := context.WithTimeout(ctx, time.Second*15)
defer timeoutCancel()

br, err := bridge.NewBridge(timeout, bridgeCfg)
br, address, err := bridge.NewBridge(timeout, bridgeCfg)
if err != nil {
panic(err)
log.Fatal().
Err(err).
Str("event_type", "bridge_aborted").
Dict("event", zerolog.Dict().
Str("tfchain_url", bridgeCfg.TfchainURL).
Str("stellar_horizon_url", bridgeCfg.StellarHorizonUrl).
Str("stellar_network", bridgeCfg.StellarNetwork).
Bool("Rescan_flag", bridgeCfg.RescanBridgeAccount)).
Msg("bridge instance can not be created") // no source yet
}
log_source := logger.New_log_source(address, bridgeCfg)

log.Logger = zerolog.New(os.Stdout).With().Interface("source", log_source).Logger()

sigs := make(chan os.Signal, 1)

signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

go func() {
log.Info().Msg("awaiting signal")
log.Debug().Msg("awaiting signal")
<-sigs
log.Info().Msg("shutting now")
log.Debug().Msg("shutting now")
cancel()
}()

if err = br.Start(ctx); err != nil && err != context.Canceled {
log.Fatal().Err(err).Msg("exited unexpectedly")
log.Fatal().
Err(err).
Str("event_type", "bridge_unexpectedly_exited").
Msg("bridge instance exited unexpectedly")
}
log.Info().
Str("event_type", "bridge_stopped").
Msg("bridge instance stopped")
}
46 changes: 29 additions & 17 deletions bridge/tfchain_bridge/pkg/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/threefoldtech/tfchain_bridge/pkg"
"github.com/threefoldtech/tfchain_bridge/pkg/stellar"
Expand All @@ -24,20 +25,20 @@ type Bridge struct {
depositFee int64
}

func NewBridge(ctx context.Context, cfg pkg.BridgeConfig) (*Bridge, error) {
func NewBridge(ctx context.Context, cfg pkg.BridgeConfig) (*Bridge, string, error) {
subClient, err := subpkg.NewSubstrateClient(cfg.TfchainURL, cfg.TfchainSeed)
if err != nil {
return nil, err
return nil, "", err
}

blockPersistency, err := pkg.InitPersist(cfg.PersistencyFile)
if err != nil {
return nil, err
return nil, "", err
}

wallet, err := stellar.NewStellarWallet(ctx, &cfg.StellarConfig)
if err != nil {
return nil, err
return nil, "", err
}

if cfg.RescanBridgeAccount {
Expand All @@ -46,18 +47,18 @@ func NewBridge(ctx context.Context, cfg pkg.BridgeConfig) (*Bridge, error) {
// and mint accordingly
err = blockPersistency.SaveStellarCursor("0")
if err != nil {
return nil, err
return nil, "", err
}
err = blockPersistency.SaveHeight(0)
if err != nil {
return nil, err
return nil, "", err
}
}

// fetch the configured depositfee
depositFee, err := subClient.GetDepositFee()
if err != nil {
return nil, err
return nil, "", err
}

bridge := &Bridge{
Expand All @@ -67,31 +68,45 @@ func NewBridge(ctx context.Context, cfg pkg.BridgeConfig) (*Bridge, error) {
config: &cfg,
depositFee: depositFee,
}

return bridge, nil
// stat deposit fee?
return bridge, wallet.GetKeypair().Address(), nil
}

func (bridge *Bridge) Start(ctx context.Context) error {
log.Info().
Str("event_type", "bridge_started").
Dict("event", zerolog.Dict().
Bool("rescan_flag", bridge.config.RescanBridgeAccount).
Int64("deposit_fee", bridge.depositFee)).
Msg("bridge instance started")
height, err := bridge.blockPersistency.GetHeight()
if err != nil {
return errors.Wrap(err, "failed to get block height from persistency")
}

log.Info().Msg("starting stellar subscription...")
log.Debug().
Msg("starting stellar subscription...")
stellarSub := make(chan stellar.MintEventSubscription)
go func() {
defer close(stellarSub)
if err = bridge.wallet.StreamBridgeStellarTransactions(ctx, stellarSub, height.StellarCursor); err != nil {
log.Fatal().Msgf("failed to monitor bridge account %s", err.Error())
log.Fatal().
Err(err).
Str("event_type", "bridge_unexpectedly_exited").
Msg("failed to monitor bridge account")
}
}()

log.Info().Msg("starting tfchain subscription...")
log.Debug().
Msg("starting tfchain subscription...")
tfchainSub := make(chan subpkg.EventSubscription)
go func() {
defer close(tfchainSub)
if err := bridge.subClient.SubscribeTfchainBridgeEvents(ctx, tfchainSub); err != nil {
log.Fatal().Msgf("failed to subscribe to tfchain %s", err.Error())
log.Fatal().
Err(err).
Str("event_type", "bridge_unexpectedly_exited").
Msg("failed to subscribe to tfchain")
}
}()

Expand Down Expand Up @@ -125,7 +140,6 @@ func (bridge *Bridge) Start(ctx context.Context) error {
}
return errors.Wrap(err, "failed to handle withdraw ready")
}
log.Info().Uint64("ID", withdawReadyEvent.ID).Msg("withdraw processed")
}
for _, refundExpiredEvent := range data.Events.RefundExpiredEvents {
err := bridge.handleRefundExpired(ctx, refundExpiredEvent)
Expand All @@ -141,7 +155,6 @@ func (bridge *Bridge) Start(ctx context.Context) error {
}
return errors.Wrap(err, "failed to handle refund ready")
}
log.Info().Str("hash", refundReadyEvent.Hash).Msg("refund processed")
}
case data := <-stellarSub:
if data.Err != nil {
Expand All @@ -154,9 +167,8 @@ func (bridge *Bridge) Start(ctx context.Context) error {
if errors.Is(err, pkg.ErrTransactionAlreadyMinted) {
continue
}
return errors.Wrap(err, "failed to handle mint")
return errors.Wrap(err, "failed to handle mint") // mint could be initiated already but there is a problem saving the cursor
}
log.Info().Str("hash", mEvent.Tx.Hash).Msg("mint processed")
}
case <-ctx.Done():
return ctx.Err()
Expand Down
50 changes: 34 additions & 16 deletions bridge/tfchain_bridge/pkg/bridge/mint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
hProtocol "github.com/stellar/go/protocols/horizon"
substrate "github.com/threefoldtech/tfchain/clients/tfchain-client-go"
Expand All @@ -15,6 +16,9 @@ import (

// mint handler for stellar
func (bridge *Bridge) mint(ctx context.Context, senders map[string]*big.Int, tx hProtocol.Transaction) error {
logger := log.Logger.With().Str("span_id", tx.ID).Logger()
refund_contex := context.Background()

minted, err := bridge.subClient.IsMintedAlready(tx.Hash)
if err != nil {
if !errors.Is(err, substrate.ErrMintTransactionNotFound) {
Expand All @@ -23,18 +27,27 @@ func (bridge *Bridge) mint(ctx context.Context, senders map[string]*big.Int, tx
}

if minted {
log.Info().Str("tx_id", tx.Hash).Msg("transaction is already minted")
logger.Info().
Str("event_type", "mint_skipped").
Msg("transaction is already minted")
return pkg.ErrTransactionAlreadyMinted
}

if len(senders) == 0 {
return nil
}
logger.Info().
Str("event_type", "transfer_initiated").
Dict("event", zerolog.Dict().
Str("type", "deposit")).
Msgf("transfer with id %s initiated", tx.ID)

// only one payment in transaction is allowed
if len(senders) > 1 {
log.Info().Msgf("cannot process mint transaction, multiple senders found, refunding now")
refund_contex = context.WithValue(refund_contex, "refund_reason", "multiple senders found")

for sender, depositAmount := range senders {
return bridge.refund(context.Background(), sender, depositAmount.Int64(), tx)
return bridge.refund(refund_contex, sender, depositAmount.Int64(), tx) // how this should be refund the multiple sender ?
}
}

Expand All @@ -46,37 +59,35 @@ func (bridge *Bridge) mint(ctx context.Context, senders map[string]*big.Int, tx
}

if tx.Memo == "" {
log.Info().Str("tx_id", tx.Hash).Msg("transaction has empty memo, refunding now")
return bridge.refund(context.Background(), receiver, depositedAmount.Int64(), tx)
refund_contex = context.WithValue(refund_contex, "refund_reason", "transaction has empty memo")
return bridge.refund(refund_contex, receiver, depositedAmount.Int64(), tx)
}

if tx.MemoType == "return" {
log.Debug().Str("tx_id", tx.Hash).Msg("transaction has a return memo hash, skipping this transaction")
logger.Debug().Str("tx_id", tx.Hash).Msg("transaction has a return memo hash, skipping this transaction")
// save cursor
cursor := tx.PagingToken()
err := bridge.blockPersistency.SaveStellarCursor(cursor)
if err != nil {
log.Err(err).Msg("error while saving cursor")
return err
return errors.Wrap(err, "error while saving cursor")
}
log.Info().Msg("stellar cursor saved")
return nil
}

// if the deposited amount is lower than the depositfee, trigger a refund
if depositedAmount.Cmp(big.NewInt(bridge.depositFee)) <= 0 {
return bridge.refund(context.Background(), receiver, depositedAmount.Int64(), tx)
refund_contex = context.WithValue(refund_contex, "refund_reason", "deposited amount is lower than the deposit fee")
return bridge.refund(refund_contex, receiver, depositedAmount.Int64(), tx)
}

destinationSubstrateAddress, err := bridge.getSubstrateAddressFromMemo(tx.Memo)
if err != nil {
log.Info().Msgf("error while decoding tx memo: %s", err.Error())
logger.Debug().Err(err).Msgf("error while decoding tx memo")
// memo is not formatted correctly, issue a refund
return bridge.refund(context.Background(), receiver, depositedAmount.Int64(), tx)
refund_contex = context.WithValue(refund_contex, "refund_reason", "memo is not formatted correctly")
return bridge.refund(refund_contex, receiver, depositedAmount.Int64(), tx)
}

log.Info().Int64("amount", depositedAmount.Int64()).Str("tx_id", tx.Hash).Msgf("target substrate address to mint on: %s", destinationSubstrateAddress)

accountID, err := substrate.FromAddress(destinationSubstrateAddress)
if err != nil {
return err
Expand All @@ -87,11 +98,18 @@ func (bridge *Bridge) mint(ctx context.Context, senders map[string]*big.Int, tx
return err
}

logger.Info().
Str("type_event", "mint_proposed").
Dict("event", zerolog.Dict().
Int64("amount", depositedAmount.Int64()).
Str("tx_id", tx.Hash).
Str("destination_address", destinationSubstrateAddress)).
Msgf("mint proposed. target substrate address: %s", destinationSubstrateAddress)

// save cursor
cursor := tx.PagingToken()
if err = bridge.blockPersistency.SaveStellarCursor(cursor); err != nil {
log.Err(err).Msgf("error while saving cursor")
return err
return errors.Wrap(err, "error while saving cursor")
}

return nil
Expand Down
Loading

0 comments on commit c9080e7

Please sign in to comment.