diff --git a/integration-tests/go.mod b/integration-tests/go.mod index a1248819..e4f6ad2e 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -20,7 +20,7 @@ replace ( ) require ( - github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b + github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872 github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd github.com/CoreumFoundation/coreumbridge-xrpl/relayer v1.0.0 github.com/rubblelabs/ripple v0.0.0-20230908201244-7f73b1fe5e22 diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 9f1dc0a4..86b05c68 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -222,8 +222,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d h1:nalkkPQcITbvhmL4+C4cKA87NW0tfm3Kl9VXRoPywFg= github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d/go.mod h1:URdX5+vg25ts3aCh8H5IFZybJYKWhJHYMTnf+ULtoC4= -github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b h1:nSNvOe9oRVl0Ijph3u/e1nZi7j4vGhYyTI3NVbPLdXI= -github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0= +github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872 h1:cEkMNqZamgrLwUAqjhRcoF0h3Z/AZY6mp20H8N0P/98= +github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0= github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd h1:NEwCGG9i6yjPY/avFTcrCDF16zvzIcvldnUR8AZtA7U= github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd/go.mod h1:XTqILFqH1e0GF1bYEnu/I0mElsfwH5OWfu2F5DACIjY= github.com/CosmWasm/wasmd v0.41.0 h1:fmwxSbwb50zZDcBaayYFRLIaSFca+EFld1WOaQi49jg= diff --git a/integration-tests/processes/env_test.go b/integration-tests/processes/env_test.go index 3cc81fce..532e9723 100644 --- a/integration-tests/processes/env_test.go +++ b/integration-tests/processes/env_test.go @@ -5,7 +5,8 @@ package processes_test import ( "context" - "sync" + "fmt" + "strings" "testing" "time" @@ -18,6 +19,7 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/require" + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" "github.com/CoreumFoundation/coreum-tools/pkg/retry" coreumapp "github.com/CoreumFoundation/coreum/v3/app" coreumconfig "github.com/CoreumFoundation/coreum/v3/pkg/config" @@ -56,18 +58,18 @@ func DefaultRunnerEnvConfig() RunnerEnvConfig { // RunnerEnv is runner environment used for the integration tests. type RunnerEnv struct { - Cfg RunnerEnvConfig - bridgeXRPLAddress rippledata.Account - ContractClient *coreum.ContractClient - Chains integrationtests.Chains - ContractOwner sdk.AccAddress - Runners []*runner.Runner - ProcessErrorsMu sync.RWMutex - ProcessErrors []error + Cfg RunnerEnvConfig + bridgeXRPLAddress rippledata.Account + ContractClient *coreum.ContractClient + Chains integrationtests.Chains + ContractOwner sdk.AccAddress + RunnersParallelGroup *parallel.Group + Runners []*runner.Runner } // NewRunnerEnv returns new instance of the RunnerEnv. func NewRunnerEnv(ctx context.Context, t *testing.T, cfg RunnerEnvConfig, chains integrationtests.Chains) *RunnerEnv { + ctx, cancel := context.WithCancel(ctx) relayerCoreumAddresses := genCoreumRelayers( ctx, t, @@ -136,57 +138,45 @@ func NewRunnerEnv(ctx context.Context, t *testing.T, cfg RunnerEnvConfig, chains } runnerEnv := &RunnerEnv{ - Cfg: cfg, - bridgeXRPLAddress: bridgeXRPLAddress, - ContractClient: contractClient, - Chains: chains, - ContractOwner: contractOwner, - Runners: runners, - ProcessErrorsMu: sync.RWMutex{}, - ProcessErrors: make([]error, 0), + Cfg: cfg, + bridgeXRPLAddress: bridgeXRPLAddress, + ContractClient: contractClient, + Chains: chains, + ContractOwner: contractOwner, + RunnersParallelGroup: parallel.NewGroup(ctx), + Runners: runners, } t.Cleanup(func() { - runnerEnv.RequireNoErrors(t) + // we can cancel the context now and wait for the runner to stop gracefully + cancel() + err := runnerEnv.RunnersParallelGroup.Wait() + if err == nil || errors.Is(err, context.Canceled) { + return + } + // the client replies with that error in if the context is canceled at the time of the request, + // and the error is in the internal package, so we can't check the type + if strings.Contains(err.Error(), "context canceled") { + return + } + + require.NoError(t, err, "Found unexpected runner process errors after the execution") }) return runnerEnv } // StartAllRunnerProcesses starts all relayer processes. -func (r *RunnerEnv) StartAllRunnerProcesses(ctx context.Context, t *testing.T) { - errCh := make(chan error, len(r.Runners)) - go func() { - for { - select { - case <-ctx.Done(): - if !errors.Is(ctx.Err(), context.Canceled) { - r.ProcessErrorsMu.Lock() - r.ProcessErrors = append(r.ProcessErrors, ctx.Err()) - r.ProcessErrorsMu.Unlock() - } - return - case err := <-errCh: - r.ProcessErrorsMu.Lock() - r.ProcessErrors = append(r.ProcessErrors, err) - r.ProcessErrorsMu.Unlock() - } - } - }() - - for _, relayerRunner := range r.Runners { - go func(relayerRunner *runner.Runner) { +func (r *RunnerEnv) StartAllRunnerProcesses() { + for i := range r.Runners { + relayerRunner := r.Runners[i] + r.RunnersParallelGroup.Spawn(fmt.Sprintf("runner-%d", i), parallel.Exit, func(ctx context.Context) error { // disable restart on error to handler unexpected errors xrplTxObserverProcess := relayerRunner.Processes.XRPLTxObserver xrplTxObserverProcess.IsRestartableOnError = false xrplTxSubmitterProcess := relayerRunner.Processes.XRPLTxSubmitter xrplTxSubmitterProcess.IsRestartableOnError = false - - err := relayerRunner.Processor.StartProcesses(ctx, xrplTxObserverProcess, xrplTxSubmitterProcess) - if err != nil && !errors.Is(err, context.Canceled) { - t.Logf("Unexpected error on process start:%s", err) - errCh <- err - } - }(relayerRunner) + return relayerRunner.Processor.StartProcesses(ctx, xrplTxObserverProcess, xrplTxSubmitterProcess) + }) } } @@ -276,13 +266,6 @@ func (r *RunnerEnv) RegisterXRPLOriginatedToken( return registeredXRPLToken } -// RequireNoErrors check whether the runner err received runner errors. -func (r *RunnerEnv) RequireNoErrors(t *testing.T) { - r.ProcessErrorsMu.RLock() - defer r.ProcessErrorsMu.RUnlock() - require.Empty(t, r.ProcessErrors, "Found unexpected process errors after the execution") -} - // SendXRPLPaymentTx sends Payment transaction. func (r *RunnerEnv) SendXRPLPaymentTx( ctx context.Context, diff --git a/integration-tests/processes/send_test.go b/integration-tests/processes/send_test.go index fb4c46e6..c573a383 100644 --- a/integration-tests/processes/send_test.go +++ b/integration-tests/processes/send_test.go @@ -31,7 +31,7 @@ func TestSendXRPLOriginatedTokensFromXRPLToCoreumAndBack(t *testing.T) { envCfg := DefaultRunnerEnvConfig() runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() runnerEnv.AllocateTickets(ctx, t, uint32(200)) coreumSender := chains.Coreum.GenAccount() @@ -95,7 +95,7 @@ func TestSendXRPLOriginatedTokenFromXRPLToCoreumWithMaliciousRelayer(t *testing. envCfg := DefaultRunnerEnvConfig() envCfg.MaliciousRelayerNumber = 1 runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() runnerEnv.AllocateTickets(ctx, t, uint32(200)) coreumSender := chains.Coreum.GenAccount() @@ -148,7 +148,7 @@ func TestSendXRPLOriginatedTokenFromXRPLToCoreumWithTicketsReallocation(t *testi envCfg := DefaultRunnerEnvConfig() envCfg.UsedTicketSequenceThreshold = 3 runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() runnerEnv.AllocateTickets(ctx, t, uint32(5)) sendingCount := 10 @@ -241,7 +241,7 @@ func TestSendXRPLOriginatedTokensFromXRPLToCoreumWithDifferentAmountAndPartialAm }) // start relayers - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() // recover tickets so we can register tokens runnerEnv.AllocateTickets(ctx, t, 200) @@ -314,7 +314,7 @@ func TestRecoverXRPLOriginatedTokenRegistrationAndSendFromXRPLToCoreumAndBack(t envCfg := DefaultRunnerEnvConfig() runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() runnerEnv.AllocateTickets(ctx, t, uint32(200)) coreumSender := chains.Coreum.GenAccount() @@ -422,7 +422,7 @@ func TestSendCoreumOriginatedTokenFromCoreumToXRPLAndBackWithDifferentAmountsAnd runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) // start relayers - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() // recover tickets so we can register tokens runnerEnv.AllocateTickets(ctx, t, 200) @@ -546,7 +546,7 @@ func TestSendCoreumOriginatedTokenFromCoreumToXRPLAndBackWithMaliciousRelayer(t runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) // start relayers - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() // recover tickets so we can register tokens runnerEnv.AllocateTickets(ctx, t, 200) diff --git a/integration-tests/processes/ticket_allocation_test.go b/integration-tests/processes/ticket_allocation_test.go index 3168cb8c..dac41133 100644 --- a/integration-tests/processes/ticket_allocation_test.go +++ b/integration-tests/processes/ticket_allocation_test.go @@ -25,7 +25,7 @@ func TestTicketsAllocationRecoveryWithAccountSequence(t *testing.T) { require.NoError(t, err) require.Empty(t, availableTickets) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() chains.XRPL.FundAccountForTicketAllocation(ctx, t, runnerEnv.bridgeXRPLAddress, numberOfTicketsToAllocate) bridgeXRPLAccountInfo, err := chains.XRPL.RPCClient().AccountInfo(ctx, runnerEnv.bridgeXRPLAddress) @@ -57,7 +57,7 @@ func TestTicketsAllocationRecoveryWithRejection(t *testing.T) { require.NoError(t, err) require.Empty(t, availableTickets) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() // we don't fund the contract for the tickets allocation to let the chain reject the allocation transaction bridgeXRPLAccountInfo, err := chains.XRPL.RPCClient().AccountInfo(ctx, runnerEnv.bridgeXRPLAddress) @@ -90,7 +90,7 @@ func TestTicketsAllocationRecoveryWithInvalidAccountSequence(t *testing.T) { require.NoError(t, err) require.Empty(t, availableTickets) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() chains.XRPL.FundAccountForTicketAllocation(ctx, t, runnerEnv.bridgeXRPLAddress, numberOfTicketsToAllocate) bridgeXRPLAccountInfo, err := chains.XRPL.RPCClient().AccountInfo(ctx, runnerEnv.bridgeXRPLAddress) @@ -153,7 +153,7 @@ func TestTicketsAllocationRecoveryWithMaliciousRelayers(t *testing.T) { require.NoError(t, err) require.Empty(t, availableTickets) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() chains.XRPL.FundAccountForTicketAllocation(ctx, t, runnerEnv.bridgeXRPLAddress, numberOfTicketsToAllocate) @@ -184,7 +184,7 @@ func TestTicketsReAllocationByTheXRPLTokenRegistration(t *testing.T) { envCfg.UsedTicketSequenceThreshold = 3 runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() // allocate first five tickets numberOfTicketsToAllocate := uint32(5) diff --git a/relayer/go.mod b/relayer/go.mod index 7130d3b4..6cd8d9a3 100644 --- a/relayer/go.mod +++ b/relayer/go.mod @@ -20,7 +20,7 @@ replace ( require ( cosmossdk.io/math v1.1.2 - github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b + github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872 github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd github.com/CosmWasm/wasmd v0.41.0 github.com/cosmos/cosmos-sdk v0.47.5 diff --git a/relayer/go.sum b/relayer/go.sum index 9f1dc0a4..86b05c68 100644 --- a/relayer/go.sum +++ b/relayer/go.sum @@ -222,8 +222,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d h1:nalkkPQcITbvhmL4+C4cKA87NW0tfm3Kl9VXRoPywFg= github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d/go.mod h1:URdX5+vg25ts3aCh8H5IFZybJYKWhJHYMTnf+ULtoC4= -github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b h1:nSNvOe9oRVl0Ijph3u/e1nZi7j4vGhYyTI3NVbPLdXI= -github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0= +github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872 h1:cEkMNqZamgrLwUAqjhRcoF0h3Z/AZY6mp20H8N0P/98= +github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0= github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd h1:NEwCGG9i6yjPY/avFTcrCDF16zvzIcvldnUR8AZtA7U= github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd/go.mod h1:XTqILFqH1e0GF1bYEnu/I0mElsfwH5OWfu2F5DACIjY= github.com/CosmWasm/wasmd v0.41.0 h1:fmwxSbwb50zZDcBaayYFRLIaSFca+EFld1WOaQi49jg= diff --git a/relayer/logger/logger.go b/relayer/logger/logger.go index c85405b5..0abff8b9 100644 --- a/relayer/logger/logger.go +++ b/relayer/logger/logger.go @@ -4,9 +4,16 @@ import ( "context" "go.uber.org/zap" + + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" ) -//go:generate mockgen -destination=mock.go -package=logger . Logger +//go:generate mockgen -destination=mock.go -package=logger . Logger,ParallelLogger + +// ParallelLogger is parallel logger interface used mostly for mocks. +type ParallelLogger interface { + parallel.Logger +} // A Field is a marshaling operation used to add a key-value pair to a logger's context. Most fields are lazily // marshaled, so it's inexpensive to add fields to disabled debug-level log statements. @@ -27,6 +34,7 @@ type Logger interface { Info(ctx context.Context, msg string, fields ...Field) Warn(ctx context.Context, msg string, fields ...Field) Error(ctx context.Context, msg string, fields ...Field) + ParallelLogger(ctx context.Context) ParallelLogger } // AnyField takes a key and an arbitrary value and chooses the best way to represent them as a field, falling back to a @@ -55,6 +63,11 @@ func Uint64Field(key string, value uint64) Field { return convertZapFieldToField(zap.Uint64(key, value)) } +// ByteStringField constructs a field with the given key and value. +func ByteStringField(key string, value []byte) Field { + return convertZapFieldToField(zap.ByteString(key, value)) +} + // Error is shorthand for the common idiom NamedError("error", err). func Error(err error) Field { return convertZapFieldToField(zap.Error(err)) diff --git a/relayer/logger/mock.go b/relayer/logger/mock.go index 25a7fe0d..4639e892 100644 --- a/relayer/logger/mock.go +++ b/relayer/logger/mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger (interfaces: Logger) +// Source: github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger (interfaces: Logger,ParallelLogger) // Package logger is a generated GoMock package. package logger @@ -8,6 +8,7 @@ import ( context "context" reflect "reflect" + parallel "github.com/CoreumFoundation/coreum-tools/pkg/parallel" gomock "github.com/golang/mock/gomock" ) @@ -85,6 +86,20 @@ func (mr *MockLoggerMockRecorder) Info(arg0, arg1 interface{}, arg2 ...interface return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Info", reflect.TypeOf((*MockLogger)(nil).Info), varargs...) } +// ParallelLogger mocks base method. +func (m *MockLogger) ParallelLogger(arg0 context.Context) ParallelLogger { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ParallelLogger", arg0) + ret0, _ := ret[0].(ParallelLogger) + return ret0 +} + +// ParallelLogger indicates an expected call of ParallelLogger. +func (mr *MockLoggerMockRecorder) ParallelLogger(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ParallelLogger", reflect.TypeOf((*MockLogger)(nil).ParallelLogger), arg0) +} + // Warn mocks base method. func (m *MockLogger) Warn(arg0 context.Context, arg1 string, arg2 ...Field) { m.ctrl.T.Helper() @@ -101,3 +116,50 @@ func (mr *MockLoggerMockRecorder) Warn(arg0, arg1 interface{}, arg2 ...interface varargs := append([]interface{}{arg0, arg1}, arg2...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Warn", reflect.TypeOf((*MockLogger)(nil).Warn), varargs...) } + +// MockParallelLogger is a mock of ParallelLogger interface. +type MockParallelLogger struct { + ctrl *gomock.Controller + recorder *MockParallelLoggerMockRecorder +} + +// MockParallelLoggerMockRecorder is the mock recorder for MockParallelLogger. +type MockParallelLoggerMockRecorder struct { + mock *MockParallelLogger +} + +// NewMockParallelLogger creates a new mock instance. +func NewMockParallelLogger(ctrl *gomock.Controller) *MockParallelLogger { + mock := &MockParallelLogger{ctrl: ctrl} + mock.recorder = &MockParallelLoggerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockParallelLogger) EXPECT() *MockParallelLoggerMockRecorder { + return m.recorder +} + +// Debug mocks base method. +func (m *MockParallelLogger) Debug(arg0 string, arg1 int64, arg2 parallel.OnExit, arg3 string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Debug", arg0, arg1, arg2, arg3) +} + +// Debug indicates an expected call of Debug. +func (mr *MockParallelLoggerMockRecorder) Debug(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Debug", reflect.TypeOf((*MockParallelLogger)(nil).Debug), arg0, arg1, arg2, arg3) +} + +// Error mocks base method. +func (m *MockParallelLogger) Error(arg0 string, arg1 int64, arg2 parallel.OnExit, arg3 string, arg4 error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Error", arg0, arg1, arg2, arg3, arg4) +} + +// Error indicates an expected call of Error. +func (mr *MockParallelLoggerMockRecorder) Error(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Error", reflect.TypeOf((*MockParallelLogger)(nil).Error), arg0, arg1, arg2, arg3, arg4) +} diff --git a/relayer/logger/zap.go b/relayer/logger/zap.go index c6807190..d142e391 100644 --- a/relayer/logger/zap.go +++ b/relayer/logger/zap.go @@ -2,6 +2,7 @@ package logger import ( "context" + "fmt" "strings" "github.com/pkg/errors" @@ -9,6 +10,7 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/tracing" ) @@ -20,6 +22,59 @@ const ( tracingProcessFieldName = "process" ) +var _ ParallelLogger = &ParallelZapLogger{} + +// ParallelZapLogger is parallel zap logger. +type ParallelZapLogger struct { + ctx context.Context //nolint:containedctx // the design depends on the parallel logger design where the ctx is set similar + zapLog *ZapLogger +} + +// NewParallelZapLogger return new instance of the ParallelZapLogger. +func NewParallelZapLogger(ctx context.Context, zapLog *ZapLogger) *ParallelZapLogger { + return &ParallelZapLogger{ + ctx: ctx, + zapLog: zapLog, + } +} + +// Debug prints debug log. +func (p *ParallelZapLogger) Debug(name string, id int64, onExit parallel.OnExit, message string) { + p.zapLog.Named(name).Debug( + p.ctx, message, + Int64Field("id", id), + StringField("onExit", onExit.String()), + ) +} + +// Error prints error log. +func (p *ParallelZapLogger) Error(name string, id int64, onExit parallel.OnExit, message string, err error) { + // the context canceled is not an error + if errors.Is(err, context.Canceled) { + return + } + var panicErr parallel.ErrPanic + if errors.As(err, &panicErr) { + p.zapLog.Named(name).Error( + p.ctx, + message, + Int64Field("id", id), + StringField("onExit", onExit.String()), + StringField("value", fmt.Sprint(panicErr.Value)), + ByteStringField("stack", panicErr.Stack), + Error(err), + ) + return + } + p.zapLog.Named(name).Error( + p.ctx, + message, + Int64Field("id", id), + StringField("onExit", onExit.String()), + Error(err), + ) +} + // ZapLoggerConfig is ZapLogger config. type ZapLoggerConfig struct { Level string @@ -77,28 +132,39 @@ func NewZapLogger(cfg ZapLoggerConfig) (*ZapLogger, error) { // Debug logs a message at DebugLevel. The message includes any fields passed at the log site, as well as any fields // accumulated on the logger. -func (z ZapLogger) Debug(ctx context.Context, msg string, fields ...Field) { +func (z *ZapLogger) Debug(ctx context.Context, msg string, fields ...Field) { z.zapLogger.Debug(msg, filedToZapField(ctx, fields...)...) } // Info logs a message at InfoLevel. The message includes any fields passed at the log site, as well as any fields // accumulated on the logger. -func (z ZapLogger) Info(ctx context.Context, msg string, fields ...Field) { +func (z *ZapLogger) Info(ctx context.Context, msg string, fields ...Field) { z.zapLogger.Info(msg, filedToZapField(ctx, fields...)...) } // Warn logs a message at WarnLevel. The message includes any fields passed at the log site, as well as any fields // accumulated on the logger. -func (z ZapLogger) Warn(ctx context.Context, msg string, fields ...Field) { +func (z *ZapLogger) Warn(ctx context.Context, msg string, fields ...Field) { z.zapLogger.Warn(msg, filedToZapField(ctx, fields...)...) } // Error logs a message at ErrorLevel. The message includes any fields passed at the log site, as well as any fields // accumulated on the logger. -func (z ZapLogger) Error(ctx context.Context, msg string, fields ...Field) { +func (z *ZapLogger) Error(ctx context.Context, msg string, fields ...Field) { z.zapLogger.Error(msg, filedToZapField(ctx, fields...)...) } +// Named adds a new path segment to the logger's name. Segments are joined by +// periods. By default, Loggers are unnamed. +func (z *ZapLogger) Named(name string) *ZapLogger { + return NewZapLoggerFromLogger(z.zapLogger.Named(name)) +} + +// ParallelLogger returns parallel zap logger. +func (z *ZapLogger) ParallelLogger(ctx context.Context) ParallelLogger { + return NewParallelZapLogger(ctx, z) +} + func filedToZapField(ctx context.Context, fields ...Field) []zap.Field { zapFields := lo.Map(fields, func(filed Field, _ int) zap.Field { return zap.Field{ diff --git a/relayer/processes/processor.go b/relayer/processes/processor.go index 79f763c1..7d5b226d 100644 --- a/relayer/processes/processor.go +++ b/relayer/processes/processor.go @@ -2,10 +2,10 @@ package processes import ( "context" - "sync" "github.com/pkg/errors" + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/tracing" ) @@ -47,46 +47,40 @@ func (p *Processor) StartProcesses(ctx context.Context, processes ...ProcessWith } } - wg := sync.WaitGroup{} - wg.Add(len(processes)) - for _, process := range processes { - go func(process ProcessWithOptions) { - // set process name to the context + pg := parallel.NewGroup(ctx, parallel.WithGroupLogger(p.log.ParallelLogger(ctx))) + for i := range processes { + process := processes[i] + pg.Spawn(process.Name, parallel.Continue, func(ctx context.Context) error { ctx = tracing.WithTracingProcess(ctx, process.Name) - defer wg.Done() - defer func() { - if r := recover(); r != nil { - p.log.Error(ctx, "Received panic during the process execution", logger.Error(errors.Errorf("%s", r))) - if !process.IsRestartableOnError { - p.log.Warn(ctx, "The process is not auto-restartable on error") - return - } - p.log.Info(ctx, "Restarting process after the panic") - p.startProcessWithRestartOnError(ctx, process) - } - }() - p.startProcessWithRestartOnError(ctx, process) - }(process) + return p.startProcessWithRestartOnError(ctx, process) + }) } - wg.Wait() - return nil + return pg.Wait() } -func (p *Processor) startProcessWithRestartOnError(ctx context.Context, process ProcessWithOptions) { +func (p *Processor) startProcessWithRestartOnError(ctx context.Context, process ProcessWithOptions) error { for { - if err := process.Process.Start(ctx); err != nil { + // spawn one independent task to handle the panics properly + err := parallel.Run(ctx, func(ctx context.Context, spawnFn parallel.SpawnFn) error { + spawnFn(process.Name, parallel.Continue, func(ctx context.Context) error { + return process.Process.Start(ctx) + }) + return nil + }, parallel.WithGroupLogger(p.log.ParallelLogger(ctx))) + + if err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - return + return nil } p.log.Error(ctx, "Received unexpected error from the process", logger.Error(err)) if !process.IsRestartableOnError { p.log.Warn(ctx, "The process is not auto-restartable on error") - break + return err } p.log.Info(ctx, "Restarting process after the error") } else { - return + return nil } } } diff --git a/relayer/processes/processor_test.go b/relayer/processes/processor_test.go index 27d66a92..f3b96f76 100644 --- a/relayer/processes/processor_test.go +++ b/relayer/processes/processor_test.go @@ -92,6 +92,7 @@ func TestProcessor_StartProcesses(t *testing.T) { } }, logErrorsCount: 1, + wantErr: true, }, { name: "singe_process_with_error_restartable", @@ -157,6 +158,7 @@ func TestProcessor_StartProcesses(t *testing.T) { } }, logErrorsCount: 1, + wantErr: true, }, { name: "singe_process_with_panic_restartable", @@ -234,6 +236,12 @@ func TestProcessor_StartProcesses(t *testing.T) { ctrl := gomock.NewController(t) logMock := logger.NewAnyLogMock(ctrl) + + parallelLoggerMock := logger.NewMockParallelLogger(ctrl) + logMock.EXPECT().ParallelLogger(gomock.Any()).Return(parallelLoggerMock).AnyTimes() + parallelLoggerMock.EXPECT().Debug(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + parallelLoggerMock.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + if tt.logErrorsCount > 0 { logMock.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).Times(tt.logErrorsCount) } diff --git a/relayer/processes/xrpl_tx_observer.go b/relayer/processes/xrpl_tx_observer.go index 8dfdce23..cde0ddcb 100644 --- a/relayer/processes/xrpl_tx_observer.go +++ b/relayer/processes/xrpl_tx_observer.go @@ -75,7 +75,7 @@ func (o *XRPLTxObserver) Start(ctx context.Context) error { if errors.Is(err, context.Canceled) { o.log.Warn(ctx, "Context canceled during the XRPL tx processing", logger.StringField("error", err.Error())) } else { - o.log.Error(ctx, "Failed to process XRPL tx", logger.Error(err)) + return errors.Wrapf(err, "failed to process XRPL tx, txHash:%s", tx.GetHash().String()) } } } diff --git a/relayer/processes/xrpl_tx_submitter.go b/relayer/processes/xrpl_tx_submitter.go index a603b923..cc8b4726 100644 --- a/relayer/processes/xrpl_tx_submitter.go +++ b/relayer/processes/xrpl_tx_submitter.go @@ -98,7 +98,7 @@ func (s *XRPLTxSubmitter) Start(ctx context.Context) error { return errors.WithStack(ctx.Err()) default: if err := s.processPendingOperations(ctx); err != nil && !errors.Is(err, context.Canceled) { - s.log.Error(ctx, "Failed to process pending operations", logger.Error(err)) + return errors.Wrap(err, "failed to process pending operations") } if !s.cfg.RepeatRecentScan { s.log.Info(ctx, "Process repeating is disabled, process is finished") diff --git a/relayer/xrpl/scanner.go b/relayer/xrpl/scanner.go index 97ee40b4..1f7cb252 100644 --- a/relayer/xrpl/scanner.go +++ b/relayer/xrpl/scanner.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" rippledata "github.com/rubblelabs/ripple/data" + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" "github.com/CoreumFoundation/coreum-tools/pkg/retry" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger" ) @@ -67,21 +68,28 @@ func NewAccountScanner(cfg AccountScannerConfig, log logger.Logger, rpcTxProvide // ScanTxs subscribes on rpc account transactions and continuously scans the recent and historical transactions. func (s *AccountScanner) ScanTxs(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { s.log.Info(ctx, "Subscribing xrpl scanner", logger.AnyField("config", s.cfg)) + + if !s.cfg.RecentScanEnabled && !s.cfg.FullScanEnabled { + return errors.Errorf("both recent and full scans are disabled") + } + + pg := parallel.NewGroup(ctx, parallel.WithGroupLogger(s.log.ParallelLogger(ctx))) if s.cfg.RecentScanEnabled { currentLedgerRes, err := s.rpcTxProvider.LedgerCurrent(ctx) if err != nil { return err } currentLedger := currentLedgerRes.LedgerCurrentIndex - go s.scanRecentHistory(ctx, currentLedger, ch) + pg.Spawn("recent-history-scanner", parallel.Continue, func(ctx context.Context) error { + s.scanRecentHistory(ctx, currentLedger, ch) + return nil + }) } - if s.cfg.FullScanEnabled { - go s.scanFullHistory(ctx, ch) - } - - if !s.cfg.RecentScanEnabled && !s.cfg.FullScanEnabled { - return errors.Errorf("both recent and full scans are disabled") + pg.Spawn("full-history-scanner", parallel.Continue, func(ctx context.Context) error { + s.scanFullHistory(ctx, ch) + return nil + }) } return nil