Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Development feat structured logging #881

Merged
merged 20 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,422 changes: 1,422 additions & 0 deletions bridge/docs/observability.md

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion bridge/docs/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,8 @@ Refer to [production](./production.md) for more information on how to setup a pr

When you have setup the bridge in either development or production mode you can start bridging.

See [bridging](./bridging.md) for more information on how to bridge.
See [bridging](./bridging.md) for more information on how to bridge.

## Log schema
Bridge validators use simple event log for the sake of improving observability and perform tracing on workflows and data.
you can find more about the log schema and how it can improve the observability of the system in [the bride observability document](./observability.md).
42 changes: 30 additions & 12 deletions bridge/tfchain_bridge/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
"syscall"
"time"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
flag "github.com/spf13/pflag"
"github.com/threefoldtech/tfchain_bridge/pkg"
"github.com/threefoldtech/tfchain_bridge/pkg/bridge"
"github.com/threefoldtech/tfchain_bridge/pkg/logger"
)

func main() {
Expand All @@ -30,36 +30,54 @@ func main() {

flag.Parse()

log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stdout})
zerolog.SetGlobalLevel(zerolog.InfoLevel)
if debug {
zerolog.SetGlobalLevel(zerolog.DebugLevel)
log.Debug().Msg("debug mode enabled")
}
logger.InitLogger(debug)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

timeout, timeoutCancel := context.WithTimeout(ctx, time.Second*15)
defer timeoutCancel()

br, err := bridge.NewBridge(timeout, bridgeCfg)
br, address, err := bridge.NewBridge(timeout, bridgeCfg)
if err != nil {
panic(err)
log.Fatal().
Err(err).
Str("event_action", "bridge_init_aborted").
Str("event_kind", "error").
Str("category", "availability").
Msg("the bridge instance cannot be started")
}
sourceLogEntry := logger.SourceCommonLogEntry{
Instance_public_key: address,
Bridge_wallet_address: bridgeCfg.StellarBridgeAccount,
Stellar_network: bridgeCfg.StellarNetwork,
Tfchain_url: bridgeCfg.TfchainURL,
}

log.Logger = log.Logger.With().Interface("source", sourceLogEntry).Logger()

sigs := make(chan os.Signal, 1)

signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

go func() {
log.Info().Msg("awaiting signal")
log.Debug().Msg("awaiting signal")
<-sigs
log.Info().Msg("shutting now")
log.Debug().Msg("shutting now")
cancel()
}()

if err = br.Start(ctx); err != nil && err != context.Canceled {
log.Fatal().Err(err).Msg("exited unexpectedly")
log.Fatal().
Err(err).
Str("event_action", "bridge_unexpectedly_exited").
Str("event_kind", "error").
Str("category", "availability").
Comment on lines +73 to +75
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be part of the error context somehow? not printed as strings? May be return some sort of a ErrorContext that implements error then once received an ErrorContext can be downcasted, then the data extracted from it and printed.

The values of the category, event-kind, and action can then be pre-defined constants

Msg("the bridge instance has exited unexpectedly")
}
log.Info().
Str("event_action", "bridge_stopped").
Str("event_kind", "event").
Str("category", "availability").
Msg("the bridge instance has stopped")
}
85 changes: 60 additions & 25 deletions bridge/tfchain_bridge/pkg/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package bridge

import (
"context"
"time"

"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/threefoldtech/tfchain_bridge/pkg"
"github.com/threefoldtech/tfchain_bridge/pkg/stellar"
Expand All @@ -24,20 +26,20 @@ type Bridge struct {
depositFee int64
}

func NewBridge(ctx context.Context, cfg pkg.BridgeConfig) (*Bridge, error) {
func NewBridge(ctx context.Context, cfg pkg.BridgeConfig) (*Bridge, string, error) {
subClient, err := subpkg.NewSubstrateClient(cfg.TfchainURL, cfg.TfchainSeed)
if err != nil {
return nil, err
return nil, "", err
}

blockPersistency, err := pkg.InitPersist(cfg.PersistencyFile)
if err != nil {
return nil, err
return nil, "", err
}

wallet, err := stellar.NewStellarWallet(ctx, &cfg.StellarConfig)
if err != nil {
return nil, err
return nil, "", err
}

if cfg.RescanBridgeAccount {
Expand All @@ -46,18 +48,18 @@ func NewBridge(ctx context.Context, cfg pkg.BridgeConfig) (*Bridge, error) {
// and mint accordingly
err = blockPersistency.SaveStellarCursor("0")
if err != nil {
return nil, err
return nil, "", err
}
err = blockPersistency.SaveHeight(0)
if err != nil {
return nil, err
return nil, "", err
}
}

// fetch the configured depositfee
depositFee, err := subClient.GetDepositFee()
if err != nil {
return nil, err
return nil, "", err
}

bridge := &Bridge{
Expand All @@ -67,39 +69,60 @@ func NewBridge(ctx context.Context, cfg pkg.BridgeConfig) (*Bridge, error) {
config: &cfg,
depositFee: depositFee,
}

return bridge, nil
// stat deposit fee?
return bridge, wallet.GetKeypair().Address(), nil
}

func (bridge *Bridge) Start(ctx context.Context) error {
log.Info().
Str("event_action", "bridge_started").
Str("event_kind", "event").
Str("category", "availability").
Dict("metadata", zerolog.Dict().
Bool("rescan_flag", bridge.config.RescanBridgeAccount).
Int64("deposit_fee", bridge.depositFee)).
Msg("the bridge instance has started")
height, err := bridge.blockPersistency.GetHeight()
if err != nil {
return errors.Wrap(err, "failed to get block height from persistency")
return errors.Wrap(err, "an error occurred while reading block height from persistency")
}

log.Info().Msg("starting stellar subscription...")
log.Debug().
Msg("The Stellar subscription is starting")
stellarSub := make(chan stellar.MintEventSubscription)
go func() {
defer close(stellarSub)
if err = bridge.wallet.StreamBridgeStellarTransactions(ctx, stellarSub, height.StellarCursor); err != nil {
log.Fatal().Msgf("failed to monitor bridge account %s", err.Error())
log.Fatal().
Err(err).
Str("event_action", "bridge_unexpectedly_exited").
Str("event_kind", "error").
Str("category", "availability").
Msg("the bridge instance has exited unexpectedly")
}
}()

log.Info().Msg("starting tfchain subscription...")
log.Debug().
Msg("The TFChain subscription is starting")
tfchainSub := make(chan subpkg.EventSubscription)
go func() {
defer close(tfchainSub)
if err := bridge.subClient.SubscribeTfchainBridgeEvents(ctx, tfchainSub); err != nil {
log.Fatal().Msgf("failed to subscribe to tfchain %s", err.Error())
log.Fatal().
Err(err).
Str("event_action", "bridge_unexpectedly_exited").
Str("event_kind", "error").
Str("category", "availability").
Msg("the bridge instance has exited unexpectedly")
}
}()
afterMinute := time.After(60 * time.Second)

for {
select {
case data := <-tfchainSub:
if data.Err != nil {
return errors.Wrap(err, "failed to process events")
return errors.Wrap(err, "failed to get tfchain events")
}
for _, withdrawCreatedEvent := range data.Events.WithdrawCreatedEvents {
err := bridge.handleWithdrawCreated(ctx, withdrawCreatedEvent)
Expand All @@ -108,13 +131,13 @@ func (bridge *Bridge) Start(ctx context.Context) error {
if errors.Is(err, pkg.ErrTransactionAlreadyBurned) || errors.Is(err, pkg.ErrTransactionAlreadyMinted) {
continue
}
return errors.Wrap(err, "failed to handle withdraw created")
return errors.Wrap(err, "an error occurred while handling WithdrawCreatedEvents")
}
}
for _, withdrawExpiredEvent := range data.Events.WithdrawExpiredEvents {
err := bridge.handleWithdrawExpired(ctx, withdrawExpiredEvent)
if err != nil {
return errors.Wrap(err, "failed to handle withdraw expired")
return errors.Wrap(err, "an error occurred while handling WithdrawExpiredEvents")
}
}
for _, withdawReadyEvent := range data.Events.WithdrawReadyEvents {
Expand All @@ -123,14 +146,13 @@ func (bridge *Bridge) Start(ctx context.Context) error {
if errors.Is(err, pkg.ErrTransactionAlreadyBurned) {
continue
}
return errors.Wrap(err, "failed to handle withdraw ready")
return errors.Wrap(err, "an error occurred while handling WithdrawReadyEvents")
}
log.Info().Uint64("ID", withdawReadyEvent.ID).Msg("withdraw processed")
}
for _, refundExpiredEvent := range data.Events.RefundExpiredEvents {
err := bridge.handleRefundExpired(ctx, refundExpiredEvent)
if err != nil {
return errors.Wrap(err, "failed to handle refund expired")
return errors.Wrap(err, "an error occurred while handling RefundExpiredEvents")
}
}
for _, refundReadyEvent := range data.Events.RefundReadyEvents {
Expand All @@ -139,13 +161,12 @@ func (bridge *Bridge) Start(ctx context.Context) error {
if errors.Is(err, pkg.ErrTransactionAlreadyRefunded) {
continue
}
return errors.Wrap(err, "failed to handle refund ready")
return errors.Wrap(err, "an error occurred while handling RefundReadyEvents")
}
log.Info().Str("hash", refundReadyEvent.Hash).Msg("refund processed")
}
case data := <-stellarSub:
if data.Err != nil {
return errors.Wrap(err, "failed to get mint events")
return errors.Wrap(err, "failed to get stellar payments")
}

for _, mEvent := range data.Events {
Expand All @@ -154,12 +175,26 @@ func (bridge *Bridge) Start(ctx context.Context) error {
if errors.Is(err, pkg.ErrTransactionAlreadyMinted) {
continue
}
return errors.Wrap(err, "failed to handle mint")
return errors.Wrap(err, "an error occurred while processing the payment received") // mint could be initiated already but there is a problem saving the cursor
}
log.Info().Str("hash", mEvent.Tx.Hash).Msg("mint processed")
}
time.Sleep(0)
case <-afterMinute:
balance, err := bridge.wallet.StatBridgeAccount()
if err != nil {
log.Logger.Warn().Err(err).Msgf("Can't retrieve the wallet balance at the moment")
}
log.Logger.Info().
Str("event_action", "wallet_balance").
Str("event_kind", "metric").
Str("category", "vault").
Dict("metadata", zerolog.Dict().
Str("tft", balance)).
Msgf("TFT Balance is %s", balance)
afterMinute = time.After(60 * time.Second)
case <-ctx.Done():
return ctx.Err()
}
time.Sleep(1 * time.Second)
}
}
Loading