Skip to content

Commit

Permalink
Integrate coreum parallel lib
Browse files Browse the repository at this point in the history
  • Loading branch information
dzmitryhil committed Nov 30, 2023
1 parent 6affe9b commit ecd4854
Show file tree
Hide file tree
Showing 15 changed files with 248 additions and 114 deletions.
2 changes: 1 addition & 1 deletion integration-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ replace (
)

require (
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872
github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd
github.com/CoreumFoundation/coreumbridge-xrpl/relayer v1.0.0
github.com/rubblelabs/ripple v0.0.0-20230908201244-7f73b1fe5e22
Expand Down
4 changes: 2 additions & 2 deletions integration-tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d h1:nalkkPQcITbvhmL4+C4cKA87NW0tfm3Kl9VXRoPywFg=
github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d/go.mod h1:URdX5+vg25ts3aCh8H5IFZybJYKWhJHYMTnf+ULtoC4=
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b h1:nSNvOe9oRVl0Ijph3u/e1nZi7j4vGhYyTI3NVbPLdXI=
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0=
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872 h1:cEkMNqZamgrLwUAqjhRcoF0h3Z/AZY6mp20H8N0P/98=
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0=
github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd h1:NEwCGG9i6yjPY/avFTcrCDF16zvzIcvldnUR8AZtA7U=
github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd/go.mod h1:XTqILFqH1e0GF1bYEnu/I0mElsfwH5OWfu2F5DACIjY=
github.com/CosmWasm/wasmd v0.41.0 h1:fmwxSbwb50zZDcBaayYFRLIaSFca+EFld1WOaQi49jg=
Expand Down
91 changes: 37 additions & 54 deletions integration-tests/processes/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ package processes_test

import (
"context"
"sync"
"fmt"
"strings"
"testing"
"time"

Expand All @@ -18,6 +19,7 @@ import (
"github.com/samber/lo"
"github.com/stretchr/testify/require"

"github.com/CoreumFoundation/coreum-tools/pkg/parallel"
"github.com/CoreumFoundation/coreum-tools/pkg/retry"
coreumapp "github.com/CoreumFoundation/coreum/v3/app"
coreumconfig "github.com/CoreumFoundation/coreum/v3/pkg/config"
Expand Down Expand Up @@ -56,18 +58,18 @@ func DefaultRunnerEnvConfig() RunnerEnvConfig {

// RunnerEnv is runner environment used for the integration tests.
type RunnerEnv struct {
Cfg RunnerEnvConfig
bridgeXRPLAddress rippledata.Account
ContractClient *coreum.ContractClient
Chains integrationtests.Chains
ContractOwner sdk.AccAddress
Runners []*runner.Runner
ProcessErrorsMu sync.RWMutex
ProcessErrors []error
Cfg RunnerEnvConfig
bridgeXRPLAddress rippledata.Account
ContractClient *coreum.ContractClient
Chains integrationtests.Chains
ContractOwner sdk.AccAddress
RunnersParallelGroup *parallel.Group
Runners []*runner.Runner
}

// NewRunnerEnv returns new instance of the RunnerEnv.
func NewRunnerEnv(ctx context.Context, t *testing.T, cfg RunnerEnvConfig, chains integrationtests.Chains) *RunnerEnv {
ctx, cancel := context.WithCancel(ctx)
relayerCoreumAddresses := genCoreumRelayers(
ctx,
t,
Expand Down Expand Up @@ -136,57 +138,45 @@ func NewRunnerEnv(ctx context.Context, t *testing.T, cfg RunnerEnvConfig, chains
}

runnerEnv := &RunnerEnv{
Cfg: cfg,
bridgeXRPLAddress: bridgeXRPLAddress,
ContractClient: contractClient,
Chains: chains,
ContractOwner: contractOwner,
Runners: runners,
ProcessErrorsMu: sync.RWMutex{},
ProcessErrors: make([]error, 0),
Cfg: cfg,
bridgeXRPLAddress: bridgeXRPLAddress,
ContractClient: contractClient,
Chains: chains,
ContractOwner: contractOwner,
RunnersParallelGroup: parallel.NewGroup(ctx),
Runners: runners,
}
t.Cleanup(func() {
runnerEnv.RequireNoErrors(t)
// we can cancel the context now and wait for the runner to stop gracefully
cancel()
err := runnerEnv.RunnersParallelGroup.Wait()
if err == nil || errors.Is(err, context.Canceled) {
return
}
// the client replies with that error in if the context is canceled at the time of the request,
// and the error is in the internal package, so we can't check the type
if strings.Contains(err.Error(), "context canceled") {
return
}

require.NoError(t, err, "Found unexpected runner process errors after the execution")
})

return runnerEnv
}

// StartAllRunnerProcesses starts all relayer processes.
func (r *RunnerEnv) StartAllRunnerProcesses(ctx context.Context, t *testing.T) {
errCh := make(chan error, len(r.Runners))
go func() {
for {
select {
case <-ctx.Done():
if !errors.Is(ctx.Err(), context.Canceled) {
r.ProcessErrorsMu.Lock()
r.ProcessErrors = append(r.ProcessErrors, ctx.Err())
r.ProcessErrorsMu.Unlock()
}
return
case err := <-errCh:
r.ProcessErrorsMu.Lock()
r.ProcessErrors = append(r.ProcessErrors, err)
r.ProcessErrorsMu.Unlock()
}
}
}()

for _, relayerRunner := range r.Runners {
go func(relayerRunner *runner.Runner) {
func (r *RunnerEnv) StartAllRunnerProcesses() {
for i := range r.Runners {
relayerRunner := r.Runners[i]
r.RunnersParallelGroup.Spawn(fmt.Sprintf("runner-%d", i), parallel.Exit, func(ctx context.Context) error {
// disable restart on error to handler unexpected errors
xrplTxObserverProcess := relayerRunner.Processes.XRPLTxObserver
xrplTxObserverProcess.IsRestartableOnError = false
xrplTxSubmitterProcess := relayerRunner.Processes.XRPLTxSubmitter
xrplTxSubmitterProcess.IsRestartableOnError = false

err := relayerRunner.Processor.StartProcesses(ctx, xrplTxObserverProcess, xrplTxSubmitterProcess)
if err != nil && !errors.Is(err, context.Canceled) {
t.Logf("Unexpected error on process start:%s", err)
errCh <- err
}
}(relayerRunner)
return relayerRunner.Processor.StartProcesses(ctx, xrplTxObserverProcess, xrplTxSubmitterProcess)
})
}
}

Expand Down Expand Up @@ -276,13 +266,6 @@ func (r *RunnerEnv) RegisterXRPLOriginatedToken(
return registeredXRPLToken
}

// RequireNoErrors check whether the runner err received runner errors.
func (r *RunnerEnv) RequireNoErrors(t *testing.T) {
r.ProcessErrorsMu.RLock()
defer r.ProcessErrorsMu.RUnlock()
require.Empty(t, r.ProcessErrors, "Found unexpected process errors after the execution")
}

// SendXRPLPaymentTx sends Payment transaction.
func (r *RunnerEnv) SendXRPLPaymentTx(
ctx context.Context,
Expand Down
14 changes: 7 additions & 7 deletions integration-tests/processes/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestSendXRPLOriginatedTokensFromXRPLToCoreumAndBack(t *testing.T) {

envCfg := DefaultRunnerEnvConfig()
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
runnerEnv.AllocateTickets(ctx, t, uint32(200))

coreumSender := chains.Coreum.GenAccount()
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestSendXRPLOriginatedTokenFromXRPLToCoreumWithMaliciousRelayer(t *testing.
envCfg := DefaultRunnerEnvConfig()
envCfg.MaliciousRelayerNumber = 1
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
runnerEnv.AllocateTickets(ctx, t, uint32(200))

coreumSender := chains.Coreum.GenAccount()
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestSendXRPLOriginatedTokenFromXRPLToCoreumWithTicketsReallocation(t *testi
envCfg := DefaultRunnerEnvConfig()
envCfg.UsedTicketSequenceThreshold = 3
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
runnerEnv.AllocateTickets(ctx, t, uint32(5))
sendingCount := 10

Expand Down Expand Up @@ -241,7 +241,7 @@ func TestSendXRPLOriginatedTokensFromXRPLToCoreumWithDifferentAmountAndPartialAm
})

// start relayers
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
// recover tickets so we can register tokens
runnerEnv.AllocateTickets(ctx, t, 200)

Expand Down Expand Up @@ -314,7 +314,7 @@ func TestRecoverXRPLOriginatedTokenRegistrationAndSendFromXRPLToCoreumAndBack(t

envCfg := DefaultRunnerEnvConfig()
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
runnerEnv.AllocateTickets(ctx, t, uint32(200))

coreumSender := chains.Coreum.GenAccount()
Expand Down Expand Up @@ -422,7 +422,7 @@ func TestSendCoreumOriginatedTokenFromCoreumToXRPLAndBackWithDifferentAmountsAnd
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)

// start relayers
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
// recover tickets so we can register tokens
runnerEnv.AllocateTickets(ctx, t, 200)

Expand Down Expand Up @@ -546,7 +546,7 @@ func TestSendCoreumOriginatedTokenFromCoreumToXRPLAndBackWithMaliciousRelayer(t
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)

// start relayers
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
// recover tickets so we can register tokens
runnerEnv.AllocateTickets(ctx, t, 200)

Expand Down
10 changes: 5 additions & 5 deletions integration-tests/processes/ticket_allocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestTicketsAllocationRecoveryWithAccountSequence(t *testing.T) {
require.NoError(t, err)
require.Empty(t, availableTickets)

runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
chains.XRPL.FundAccountForTicketAllocation(ctx, t, runnerEnv.bridgeXRPLAddress, numberOfTicketsToAllocate)

bridgeXRPLAccountInfo, err := chains.XRPL.RPCClient().AccountInfo(ctx, runnerEnv.bridgeXRPLAddress)
Expand Down Expand Up @@ -57,7 +57,7 @@ func TestTicketsAllocationRecoveryWithRejection(t *testing.T) {
require.NoError(t, err)
require.Empty(t, availableTickets)

runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
// we don't fund the contract for the tickets allocation to let the chain reject the allocation transaction

bridgeXRPLAccountInfo, err := chains.XRPL.RPCClient().AccountInfo(ctx, runnerEnv.bridgeXRPLAddress)
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestTicketsAllocationRecoveryWithInvalidAccountSequence(t *testing.T) {
require.NoError(t, err)
require.Empty(t, availableTickets)

runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
chains.XRPL.FundAccountForTicketAllocation(ctx, t, runnerEnv.bridgeXRPLAddress, numberOfTicketsToAllocate)

bridgeXRPLAccountInfo, err := chains.XRPL.RPCClient().AccountInfo(ctx, runnerEnv.bridgeXRPLAddress)
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestTicketsAllocationRecoveryWithMaliciousRelayers(t *testing.T) {
require.NoError(t, err)
require.Empty(t, availableTickets)

runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()

chains.XRPL.FundAccountForTicketAllocation(ctx, t, runnerEnv.bridgeXRPLAddress, numberOfTicketsToAllocate)

Expand Down Expand Up @@ -184,7 +184,7 @@ func TestTicketsReAllocationByTheXRPLTokenRegistration(t *testing.T) {
envCfg.UsedTicketSequenceThreshold = 3
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)

runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()

// allocate first five tickets
numberOfTicketsToAllocate := uint32(5)
Expand Down
2 changes: 1 addition & 1 deletion relayer/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ replace (

require (
cosmossdk.io/math v1.1.2
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872
github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd
github.com/CosmWasm/wasmd v0.41.0
github.com/cosmos/cosmos-sdk v0.47.5
Expand Down
4 changes: 2 additions & 2 deletions relayer/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d h1:nalkkPQcITbvhmL4+C4cKA87NW0tfm3Kl9VXRoPywFg=
github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d/go.mod h1:URdX5+vg25ts3aCh8H5IFZybJYKWhJHYMTnf+ULtoC4=
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b h1:nSNvOe9oRVl0Ijph3u/e1nZi7j4vGhYyTI3NVbPLdXI=
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0=
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872 h1:cEkMNqZamgrLwUAqjhRcoF0h3Z/AZY6mp20H8N0P/98=
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0=
github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd h1:NEwCGG9i6yjPY/avFTcrCDF16zvzIcvldnUR8AZtA7U=
github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd/go.mod h1:XTqILFqH1e0GF1bYEnu/I0mElsfwH5OWfu2F5DACIjY=
github.com/CosmWasm/wasmd v0.41.0 h1:fmwxSbwb50zZDcBaayYFRLIaSFca+EFld1WOaQi49jg=
Expand Down
15 changes: 14 additions & 1 deletion relayer/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@ import (
"context"

"go.uber.org/zap"

"github.com/CoreumFoundation/coreum-tools/pkg/parallel"
)

//go:generate mockgen -destination=mock.go -package=logger . Logger
//go:generate mockgen -destination=mock.go -package=logger . Logger,ParallelLogger

// ParallelLogger is parallel logger interface used mostly for mocks.
type ParallelLogger interface {
parallel.Logger
}

// A Field is a marshaling operation used to add a key-value pair to a logger's context. Most fields are lazily
// marshaled, so it's inexpensive to add fields to disabled debug-level log statements.
Expand All @@ -27,6 +34,7 @@ type Logger interface {
Info(ctx context.Context, msg string, fields ...Field)
Warn(ctx context.Context, msg string, fields ...Field)
Error(ctx context.Context, msg string, fields ...Field)
ParallelLogger(ctx context.Context) ParallelLogger
}

// AnyField takes a key and an arbitrary value and chooses the best way to represent them as a field, falling back to a
Expand Down Expand Up @@ -55,6 +63,11 @@ func Uint64Field(key string, value uint64) Field {
return convertZapFieldToField(zap.Uint64(key, value))
}

// ByteStringField constructs a field with the given key and value.
func ByteStringField(key string, value []byte) Field {
return convertZapFieldToField(zap.ByteString(key, value))
}

// Error is shorthand for the common idiom NamedError("error", err).
func Error(err error) Field {
return convertZapFieldToField(zap.Error(err))
Expand Down
Loading

0 comments on commit ecd4854

Please sign in to comment.