Skip to content

Commit

Permalink
Add more logs for the processes
Browse files Browse the repository at this point in the history
  • Loading branch information
dzmitryhil committed Feb 8, 2024
1 parent 0de542e commit e678431
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 105 deletions.
116 changes: 62 additions & 54 deletions relayer/processes/coreum_to_xrpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,58 +90,61 @@ func NewCoreumToXRPLProcess(
}

// Start starts the process.
func (s *CoreumToXRPLProcess) Start(ctx context.Context) error {
func (p *CoreumToXRPLProcess) Start(ctx context.Context) error {
p.log.Info(ctx, "Starting Coreum to XRPL process")
for {
select {
case <-ctx.Done():
return errors.WithStack(ctx.Err())
default:
if err := s.processPendingOperations(ctx); err != nil && !errors.Is(err, context.Canceled) {
if err := p.processPendingOperations(ctx); err != nil && !errors.Is(err, context.Canceled) {
return errors.Wrap(err, "failed to process pending operations")
}
if !s.cfg.RepeatRecentScan {
s.log.Info(ctx, "Process repeating is disabled, process is finished")
if !p.cfg.RepeatRecentScan {
p.log.Info(ctx, "Process repeating is disabled, process is finished")
return nil
}
s.log.Info(ctx, "Waiting before the next execution", zap.String("delay", s.cfg.RepeatDelay.String()))
p.log.Info(ctx, "Waiting before the next execution", zap.String("delay", p.cfg.RepeatDelay.String()))
select {
case <-ctx.Done():
return errors.WithStack(ctx.Err())
case <-time.After(s.cfg.RepeatDelay):
case <-time.After(p.cfg.RepeatDelay):
}
}
}
}

func (s *CoreumToXRPLProcess) processPendingOperations(ctx context.Context) error {
operations, err := s.contractClient.GetPendingOperations(ctx)
func (p *CoreumToXRPLProcess) processPendingOperations(ctx context.Context) error {
operations, err := p.contractClient.GetPendingOperations(ctx)
if err != nil {
return err
}
if len(operations) == 0 {
p.log.Debug(ctx, "No pending operations to process")
return nil
}
p.log.Info(ctx, "Processing pending operations", zap.Int("count", len(operations)))

bridgeSigners, err := s.getBridgeSigners(ctx)
bridgeSigners, err := p.getBridgeSigners(ctx)
if err != nil {
return err
}

for _, operation := range operations {
if err := s.signOrSubmitOperation(ctx, operation, bridgeSigners); err != nil {
if err := p.signOrSubmitOperation(ctx, operation, bridgeSigners); err != nil {
return err
}
}

return nil
}

func (s *CoreumToXRPLProcess) getBridgeSigners(ctx context.Context) (BridgeSigners, error) {
xrplWeights, xrplWeightsQuorum, err := s.getBridgeXRPLSignerAccountsWithWeights(ctx)
func (p *CoreumToXRPLProcess) getBridgeSigners(ctx context.Context) (BridgeSigners, error) {
xrplWeights, xrplWeightsQuorum, err := p.getBridgeXRPLSignerAccountsWithWeights(ctx)
if err != nil {
return BridgeSigners{}, err
}
contractConfig, err := s.contractClient.GetContractConfig(ctx)
contractConfig, err := p.contractClient.GetContractConfig(ctx)
if err != nil {
return BridgeSigners{}, err
}
Expand Down Expand Up @@ -179,10 +182,10 @@ func (s *CoreumToXRPLProcess) getBridgeSigners(ctx context.Context) (BridgeSigne
}, nil
}

func (s *CoreumToXRPLProcess) getBridgeXRPLSignerAccountsWithWeights(
func (p *CoreumToXRPLProcess) getBridgeXRPLSignerAccountsWithWeights(
ctx context.Context,
) (map[rippledata.Account]uint16, uint32, error) {
accountInfo, err := s.xrplRPCClient.AccountInfo(ctx, s.cfg.BridgeXRPLAddress)
accountInfo, err := p.xrplRPCClient.AccountInfo(ctx, p.cfg.BridgeXRPLAddress)
if err != nil {
return nil, 0, err
}
Expand All @@ -200,34 +203,39 @@ func (s *CoreumToXRPLProcess) getBridgeXRPLSignerAccountsWithWeights(
return accountWights, weightsQuorum, nil
}

func (s *CoreumToXRPLProcess) signOrSubmitOperation(
func (p *CoreumToXRPLProcess) signOrSubmitOperation(
ctx context.Context,
operation coreum.Operation,
bridgeSigners BridgeSigners,
) error {
valid, err := s.preValidateOperation(ctx, operation)
valid, err := p.preValidateOperation(ctx, operation)
if err != nil {
return err
}
if !valid {
s.log.Info(ctx, "Operation is invalid", zap.Any("operation", operation))
p.log.Warn(ctx, "Operation is invalid", zap.Any("operation", operation))
return nil
}
p.log.Debug(
ctx,
"Pre-validation of the operation passed, operation is valid",
zap.Any("operation", operation),
)

tx, quorumIsReached, err := s.buildSubmittableTransaction(ctx, operation, bridgeSigners)
tx, quorumIsReached, err := p.buildSubmittableTransaction(ctx, operation, bridgeSigners)
if err != nil {
return err
}
if !quorumIsReached {
return s.registerTxSignature(ctx, operation)
return p.registerTxSignature(ctx, operation)
}

txRes, err := s.xrplRPCClient.Submit(ctx, tx)
txRes, err := p.xrplRPCClient.Submit(ctx, tx)
if err != nil {
return errors.Wrapf(err, "failed to submit transaction:%+v", tx)
}
if txRes.EngineResult.Success() {
s.log.Info(
p.log.Info(
ctx,
"XRPL multi-sign transaction has been successfully submitted",
zap.String("txHash", strings.ToUpper(tx.GetHash().String())),
Expand All @@ -237,7 +245,7 @@ func (s *CoreumToXRPLProcess) signOrSubmitOperation(
}
// These codes indicate that the transaction failed, but it was applied to a ledger to apply the transaction cost.
if strings.HasPrefix(txRes.EngineResult.String(), xrpl.TecTxResultPrefix) {
s.log.Info(
p.log.Info(
ctx,
fmt.Sprintf(
"The transaction has been sent, but will be reverted, code:%s, description:%s",
Expand All @@ -249,13 +257,13 @@ func (s *CoreumToXRPLProcess) signOrSubmitOperation(

switch txRes.EngineResult.String() {
case xrpl.TefNOTicketTxResult, xrpl.TefPastSeqTxResult:
s.log.Debug(
p.log.Debug(
ctx,
"Transaction has been already submitted",
)
return nil
case xrpl.TelInsufFeeP:
s.log.Warn(
p.log.Warn(
ctx,
"The Fee from the transaction is not high enough to meet the server's current transaction cost requirement.",
)
Expand All @@ -266,7 +274,7 @@ func (s *CoreumToXRPLProcess) signOrSubmitOperation(
}
}

func (s *CoreumToXRPLProcess) buildSubmittableTransaction(
func (p *CoreumToXRPLProcess) buildSubmittableTransaction(
ctx context.Context,
operation coreum.Operation,
bridgeSigners BridgeSigners,
Expand All @@ -277,12 +285,12 @@ func (s *CoreumToXRPLProcess) buildSubmittableTransaction(
for _, signature := range operation.Signatures {
xrplAcc, ok := bridgeSigners.CoreumToXRPLAccount[signature.RelayerCoreumAddress.String()]
if !ok {
s.log.Warn(ctx, "Found unknown signer", zap.String("coreumAddress", signature.RelayerCoreumAddress.String()))
p.log.Warn(ctx, "Found unknown signer", zap.String("coreumAddress", signature.RelayerCoreumAddress.String()))
continue
}
xrplPubKey, ok := bridgeSigners.XRPLPubKeys[xrplAcc]
if !ok {
s.log.Warn(
p.log.Warn(
ctx,
"Found XRPL signer address without pub key in the contract",
zap.String("xrplAddress", xrplAcc.String()),
Expand All @@ -291,12 +299,12 @@ func (s *CoreumToXRPLProcess) buildSubmittableTransaction(
}
xrplAccWeight, ok := bridgeSigners.XRPLWeights[xrplAcc]
if !ok {
s.log.Warn(ctx, "Found XRPL signer address without weight", zap.String("xrplAddress", xrplAcc.String()))
p.log.Warn(ctx, "Found XRPL signer address without weight", zap.String("xrplAddress", xrplAcc.String()))
continue
}
var txSignature rippledata.VariableLength
if err := txSignature.UnmarshalText([]byte(signature.Signature)); err != nil {
s.log.Error(
p.log.Error(
ctx,
"Failed to unmarshal tx signature",
zap.Error(err),
Expand All @@ -312,7 +320,7 @@ func (s *CoreumToXRPLProcess) buildSubmittableTransaction(
SigningPubKey: &xrplPubKey,
},
}
tx, err := s.buildXRPLTxFromOperation(operation)
tx, err := p.buildXRPLTxFromOperation(operation)
if err != nil {
return nil, false, err
}
Expand All @@ -321,7 +329,7 @@ func (s *CoreumToXRPLProcess) buildSubmittableTransaction(
}
isValid, _, err := rippledata.CheckMultiSignature(tx)
if err != nil {
s.log.Error(
p.log.Error(
ctx,
"failed to check transaction signature, err:%s, signer:%+v",
zap.Error(err),
Expand All @@ -330,7 +338,7 @@ func (s *CoreumToXRPLProcess) buildSubmittableTransaction(
continue
}
if !isValid {
s.log.Error(
p.log.Error(
ctx,
"Invalid tx signature",
zap.Error(err),
Expand All @@ -351,7 +359,7 @@ func (s *CoreumToXRPLProcess) buildSubmittableTransaction(
return nil, false, nil
}
// build tx one more time to be sure that it is not affected
tx, err := s.buildXRPLTxFromOperation(operation)
tx, err := p.buildXRPLTxFromOperation(operation)
if err != nil {
return nil, false, err
}
Expand All @@ -365,11 +373,11 @@ func (s *CoreumToXRPLProcess) buildSubmittableTransaction(
// preValidateOperation checks if the operation is valid, and it makes sense to submit the corresponding transaction
// or the operation should be canceled with the `invalid` state. For now the main purpose of the function is to filter
// out the `AllocateTickets` operation with the invalid sequence.
func (s *CoreumToXRPLProcess) preValidateOperation(ctx context.Context, operation coreum.Operation) (bool, error) {
func (p *CoreumToXRPLProcess) preValidateOperation(ctx context.Context, operation coreum.Operation) (bool, error) {
// no need to check if the current relayer has already provided the signature
// this check prevents the state when relayer votes and then changes its vote because of different current state
for _, signature := range operation.Signatures {
if signature.RelayerCoreumAddress.String() == s.cfg.RelayerCoreumAddress.String() {
if signature.RelayerCoreumAddress.String() == p.cfg.RelayerCoreumAddress.String() {
return true, nil
}
}
Expand All @@ -381,15 +389,15 @@ func (s *CoreumToXRPLProcess) preValidateOperation(ctx context.Context, operatio
return true, nil
}

bridgeXRPLAccInfo, err := s.xrplRPCClient.AccountInfo(ctx, s.cfg.BridgeXRPLAddress)
bridgeXRPLAccInfo, err := p.xrplRPCClient.AccountInfo(ctx, p.cfg.BridgeXRPLAddress)
if err != nil {
return false, err
}
// sequence is valid
if *bridgeXRPLAccInfo.AccountData.Sequence == operation.AccountSequence {
return true, nil
}
s.log.Info(
p.log.Info(
ctx,
"Invalid bridge account sequence",
zap.Uint32("expected", *bridgeXRPLAccInfo.AccountData.Sequence),
Expand All @@ -403,37 +411,37 @@ func (s *CoreumToXRPLProcess) preValidateOperation(ctx context.Context, operatio
// tx with the ticket number
},
}
s.log.Info(ctx, "Sending invalid tx evidence")
_, err = s.contractClient.SendXRPLTicketsAllocationTransactionResultEvidence(ctx, s.cfg.RelayerCoreumAddress, evidence)
p.log.Info(ctx, "Sending invalid tx evidence")
_, err = p.contractClient.SendXRPLTicketsAllocationTransactionResultEvidence(ctx, p.cfg.RelayerCoreumAddress, evidence)
if err == nil {
return false, nil
}
if IsExpectedEvidenceSubmissionError(err) {
s.log.Debug(ctx, "Received expected evidence submission error", zap.String("errText", err.Error()))
p.log.Debug(ctx, "Received expected evidence submission error", zap.String("errText", err.Error()))
return false, nil
}

return false, nil
}

func (s *CoreumToXRPLProcess) registerTxSignature(ctx context.Context, operation coreum.Operation) error {
tx, err := s.buildXRPLTxFromOperation(operation)
func (p *CoreumToXRPLProcess) registerTxSignature(ctx context.Context, operation coreum.Operation) error {
tx, err := p.buildXRPLTxFromOperation(operation)
if err != nil {
return err
}
signer, err := s.xrplSigner.MultiSign(tx, s.cfg.XRPLTxSignerKeyName)
signer, err := p.xrplSigner.MultiSign(tx, p.cfg.XRPLTxSignerKeyName)
if err != nil {
return errors.Wrapf(err, "failed to sign transaction, keyName:%s", s.cfg.XRPLTxSignerKeyName)
return errors.Wrapf(err, "failed to sign transaction, keyName:%s", p.cfg.XRPLTxSignerKeyName)
}
_, err = s.contractClient.SaveSignature(
_, err = p.contractClient.SaveSignature(
ctx,
s.cfg.RelayerCoreumAddress,
p.cfg.RelayerCoreumAddress,
operation.GetOperationID(),
operation.Version,
signer.Signer.TxnSignature.String(),
)
if err == nil {
s.log.Info(
p.log.Info(
ctx,
"Signature registered for the operation",
zap.String("signature", signer.Signer.TxnSignature.String()),
Expand All @@ -445,7 +453,7 @@ func (s *CoreumToXRPLProcess) registerTxSignature(ctx context.Context, operation
coreum.IsPendingOperationNotFoundError(err) ||
coreum.IsOperationVersionMismatchError(err) ||
coreum.IsBridgeHaltedError(err) {
s.log.Debug(
p.log.Debug(
ctx,
"Received expected evidence error on saving signature",
zap.String("errText", err.Error()),
Expand All @@ -457,16 +465,16 @@ func (s *CoreumToXRPLProcess) registerTxSignature(ctx context.Context, operation
return errors.Wrap(err, "failed to register transaction signature")
}

func (s *CoreumToXRPLProcess) buildXRPLTxFromOperation(operation coreum.Operation) (MultiSignableTransaction, error) {
func (p *CoreumToXRPLProcess) buildXRPLTxFromOperation(operation coreum.Operation) (MultiSignableTransaction, error) {
switch {
case isAllocateTicketsOperation(operation):
return BuildTicketCreateTxForMultiSigning(s.cfg.BridgeXRPLAddress, operation)
return BuildTicketCreateTxForMultiSigning(p.cfg.BridgeXRPLAddress, operation)
case isTrustSetOperation(operation):
return BuildTrustSetTxForMultiSigning(s.cfg.BridgeXRPLAddress, operation)
return BuildTrustSetTxForMultiSigning(p.cfg.BridgeXRPLAddress, operation)
case isCoreumToXRPLTransferOperation(operation):
return BuildCoreumToXRPLXRPLOriginatedTokenTransferPaymentTxForMultiSigning(s.cfg.BridgeXRPLAddress, operation)
return BuildCoreumToXRPLXRPLOriginatedTokenTransferPaymentTxForMultiSigning(p.cfg.BridgeXRPLAddress, operation)
case isRotateKeysOperation(operation):
return BuildSignerListSetTxForMultiSigning(s.cfg.BridgeXRPLAddress, operation)
return BuildSignerListSetTxForMultiSigning(p.cfg.BridgeXRPLAddress, operation)
default:
return nil, errors.Errorf("failed to process operation, unable to determine operation type, operation:%+v", operation)
}
Expand Down
Loading

0 comments on commit e678431

Please sign in to comment.