Skip to content

Commit

Permalink
Callback Logs Cleanup Pt. 2 (#552)
Browse files Browse the repository at this point in the history
  • Loading branch information
sampocs authored Jan 10, 2023
1 parent 8ee30c2 commit ac45bc4
Show file tree
Hide file tree
Showing 15 changed files with 160 additions and 104 deletions.
23 changes: 19 additions & 4 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,26 @@ func LogWithHostZone(chainId string, s string, a ...any) string {
}

// Returns a log string with a chain Id and callback as a prefix
// Ex:
// | COSMOSHUB-4 | DELEGATE CALLBACK | string
func LogCallbackWithHostZone(chainId string, callbackId string, s string, a ...any) string {
// callbackType is either ICACALLBACK or ICQCALLBACK
// Format:
// | CHAIN-ID | {CALLBACK_ID} {CALLBACK_TYPE} | string
func logCallbackWithHostZone(chainId string, callbackId string, callbackType string, s string, a ...any) string {
msg := fmt.Sprintf(s, a...)
return fmt.Sprintf("| %-13s | %s CALLBACK | %s", strings.ToUpper(chainId), strings.ToUpper(callbackId), msg)
return fmt.Sprintf("| %-13s | %s %s | %s", strings.ToUpper(chainId), strings.ToUpper(callbackId), callbackType, msg)
}

// Returns a log string with a chain Id and icacallback as a prefix
// Ex:
// | COSMOSHUB-4 | DELEGATE ICACALLBACK | string
func LogICACallbackWithHostZone(chainId string, callbackId string, s string, a ...any) string {
return logCallbackWithHostZone(chainId, callbackId, "ICACALLBACK", s, a...)
}

// Returns a log string with a chain Id and icqcallback as a prefix
// Ex:
// | COSMOSHUB-4 | WITHDRAWALBALANCE ICQCALLBACK | string
func LogICQCallbackWithHostZone(chainId string, callbackId string, s string, a ...any) string {
return logCallbackWithHostZone(chainId, callbackId, "ICQCALLBACK", s, a...)
}

// Returns a log header string with a dash padding on either side
Expand Down
3 changes: 0 additions & 3 deletions x/stakeibc/keeper/delegation_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package keeper_test

import (
"fmt"
"testing"

sdk "github.com/cosmos/cosmos-sdk/types"
Expand All @@ -24,8 +23,6 @@ func TestDelegationGet(t *testing.T) {
keeper, ctx := keepertest.StakeibcKeeper(t)
expected := createTestDelegation(keeper, ctx)
actual, found := keeper.GetDelegation(ctx)
fmt.Println("actual", actual)
fmt.Println("expected", expected)
require.True(t, found)

// We use (gogoproto.nullable) for Int so when encode an empty delegation,
Expand Down
2 changes: 1 addition & 1 deletion x/stakeibc/keeper/host_zone.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (k Keeper) GetAllHostZone(ctx sdk.Context) (list []types.HostZone) {
func (k Keeper) AddDelegationToValidator(ctx sdk.Context, hostZone types.HostZone, validatorAddress string, amount sdk.Int, callbackId string) (success bool) {
for _, validator := range hostZone.Validators {
if validator.Address == validatorAddress {
k.Logger(ctx).Info(utils.LogCallbackWithHostZone(hostZone.ChainId, callbackId,
k.Logger(ctx).Info(utils.LogICACallbackWithHostZone(hostZone.ChainId, callbackId,
" Validator %s, Current Delegation: %v, Delegation Change: %v", validator.Address, validator.DelegationAmt, amount))

if amount.GTE(sdk.ZeroInt()) {
Expand Down
46 changes: 33 additions & 13 deletions x/stakeibc/keeper/icacallbacks_claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package keeper
import (
"fmt"

"github.com/Stride-Labs/stride/v4/utils"
"github.com/Stride-Labs/stride/v4/x/icacallbacks"
icacallbackstypes "github.com/Stride-Labs/stride/v4/x/icacallbacks/types"
recordstypes "github.com/Stride-Labs/stride/v4/x/records/types"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/golang/protobuf/proto" //nolint:staticcheck
)

// Marshal claim callback args
func (k Keeper) MarshalClaimCallbackArgs(ctx sdk.Context, claimCallback types.ClaimCallback) ([]byte, error) {
out, err := proto.Marshal(&claimCallback)
if err != nil {
Expand All @@ -23,6 +25,7 @@ func (k Keeper) MarshalClaimCallbackArgs(ctx sdk.Context, claimCallback types.Cl
return out, nil
}

// Unmarshalls claim callback arguments into a ClaimCallback struct
func (k Keeper) UnmarshalClaimCallbackArgs(ctx sdk.Context, claimCallback []byte) (*types.ClaimCallback, error) {
unmarshalledDelegateCallback := types.ClaimCallback{}
if err := proto.Unmarshal(claimCallback, &unmarshalledDelegateCallback); err != nil {
Expand All @@ -32,63 +35,80 @@ func (k Keeper) UnmarshalClaimCallbackArgs(ctx sdk.Context, claimCallback []byte
return &unmarshalledDelegateCallback, nil
}

// ICA Callback after claiming unbonded tokens
// If successful:
// * Removes the user redemption record
// If timeout/failure:
// * Reverts pending flag in the user redemption record so the claim can be re-tried
func ClaimCallback(k Keeper, ctx sdk.Context, packet channeltypes.Packet, ack *channeltypes.Acknowledgement, args []byte) error {
// deserialize the args
// Fetch callback args
claimCallback, err := k.UnmarshalClaimCallbackArgs(ctx, args)
if err != nil {
return err
return sdkerrors.Wrapf(types.ErrUnmarshalFailure, fmt.Sprintf("Unable to unmarshal claim callback args: %s", err.Error()))
}
k.Logger(ctx).Info(fmt.Sprintf("ClaimCallback %v", claimCallback))
chainId := claimCallback.ChainId
k.Logger(ctx).Info(utils.LogICACallbackWithHostZone(chainId, ICACallbackID_Claim,
"Starting claim callback for Redemption Record: %s", claimCallback.UserRedemptionRecordId))

// Grab the associated user redemption record
userRedemptionRecord, found := k.RecordsKeeper.GetUserRedemptionRecord(ctx, claimCallback.GetUserRedemptionRecordId())
if !found {
return sdkerrors.Wrapf(types.ErrRecordNotFound, "user redemption record not found %s", claimCallback.GetUserRedemptionRecordId())
}

// handle timeout
// Check for timeout (ack nil)
// If the ICA timed out, update the redemption record so the user can retry the claim
if ack == nil {
k.Logger(ctx).Error(fmt.Sprintf("ClaimCallback timeout, ack is nil, packet %v", packet))
// after a timeout, a user should be able to retry the claim
k.Logger(ctx).Error(utils.LogICACallbackWithHostZone(chainId, ICACallbackID_Claim,
"TIMEOUT (ack is nil), Packet: %+v", packet))

userRedemptionRecord.ClaimIsPending = false
k.RecordsKeeper.SetUserRedemptionRecord(ctx, userRedemptionRecord)
return nil
}

// Check for a failed transaction (ack error)
// Upon failure, update the redemption record to allow the user to retry the claim
txMsgData, err := icacallbacks.GetTxMsgData(ctx, *ack, k.Logger(ctx))
if err != nil {
k.Logger(ctx).Error(fmt.Sprintf("failed to unmarshal txMsgData, packet %v", packet))
k.Logger(ctx).Error(fmt.Sprintf("ClaimCallback txMsgData could not be parsed, packet %v", packet))
return sdkerrors.Wrap(icacallbackstypes.ErrTxMsgData, err.Error())
}

k.Logger(ctx).Info("ClaimCallback executing", "packet", packet, "txMsgData", txMsgData, "args", args)
// handle failed tx on host chain
if len(txMsgData.Data) == 0 {
k.Logger(ctx).Error(fmt.Sprintf("ClaimCallback failed, packet %v", packet))
k.Logger(ctx).Error(utils.LogICACallbackWithHostZone(chainId, ICACallbackID_Claim,
"ICA TX FAILED (ack is empty / ack error), Packet: %+v", packet))

// after an error, a user should be able to retry the claim
userRedemptionRecord.ClaimIsPending = false
k.RecordsKeeper.SetUserRedemptionRecord(ctx, userRedemptionRecord)
return nil
}

// claim successfully processed
// remove the record and decrement the hzu
k.Logger(ctx).Info(utils.LogICACallbackWithHostZone(chainId, ICACallbackID_Claim, "SUCCESS, Packet: %+v", packet))

// Upon success, remove the record and decrement the unbonded amount on the host zone unbonding record
k.RecordsKeeper.RemoveUserRedemptionRecord(ctx, claimCallback.GetUserRedemptionRecordId())
err = k.DecrementHostZoneUnbonding(ctx, userRedemptionRecord, *claimCallback)
if err != nil {
k.Logger(ctx).Error(fmt.Sprintf("ClaimCallback failed (DecrementHostZoneUnbonding), packet %v, err: %s", packet, err.Error()))
return err
}

k.Logger(ctx).Info(fmt.Sprintf("[CLAIM] success on %s", userRedemptionRecord.GetHostZoneId()))
return nil
}

// After a user claims their unbonded tokens, the claim amount is decremented from the corresponding host zone unbonding record
func (k Keeper) DecrementHostZoneUnbonding(ctx sdk.Context, userRedemptionRecord recordstypes.UserRedemptionRecord, callbackArgs types.ClaimCallback) error {
// fetch the hzu associated with the user unbonding record
hostZoneUnbonding, found := k.RecordsKeeper.GetHostZoneUnbondingByChainId(ctx, callbackArgs.EpochNumber, callbackArgs.ChainId)
if !found {
return sdkerrors.Wrapf(types.ErrRecordNotFound, "host zone unbonding not found %s", callbackArgs.ChainId)
}

// decrement the hzu by the amount claimed
hostZoneUnbonding.NativeTokenAmount = hostZoneUnbonding.NativeTokenAmount.Sub(userRedemptionRecord.Amount)

// save the updated hzu on the epoch unbonding record
epochUnbondingRecord, success := k.RecordsKeeper.AddHostZoneToEpochUnbondingRecord(ctx, callbackArgs.EpochNumber, callbackArgs.ChainId, hostZoneUnbonding)
if !success {
Expand Down
4 changes: 2 additions & 2 deletions x/stakeibc/keeper/icacallbacks_claim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (s *KeeperTestSuite) SetupClaimCallback() ClaimCallbackTestCase {
// after a user calls ClaimUndelegatedTokens, the record is set to claimIsPending = true
// to prevent double claims
ClaimIsPending: true,
Amount: sdk.ZeroInt(),
Amount: sdk.ZeroInt(),
}
recordId2 := recordtypes.UserRedemptionRecordKeyFormatter(HostChainId, epochNumber, "other_sender")
userRedemptionRecord2 := recordtypes.UserRedemptionRecord{
Expand Down Expand Up @@ -185,7 +185,7 @@ func (s *KeeperTestSuite) TestClaimCallback_WrongCallbackArgs() {
invalidArgs := tc.validArgs

err := stakeibckeeper.ClaimCallback(s.App.StakeibcKeeper, s.Ctx, invalidArgs.packet, invalidArgs.ack, []byte("random bytes"))
s.Require().EqualError(err, "unexpected EOF")
s.Require().EqualError(err, "Unable to unmarshal claim callback args: unexpected EOF: unable to unmarshal data structure")
}

func (s *KeeperTestSuite) TestClaimCallback_RecordNotFound() {
Expand Down
14 changes: 7 additions & 7 deletions x/stakeibc/keeper/icacallbacks_delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ func DelegateCallback(k Keeper, ctx sdk.Context, packet channeltypes.Packet, ack
// Deserialize the callback args
delegateCallback, err := k.UnmarshalDelegateCallbackArgs(ctx, args)
if err != nil {
return err
return sdkerrors.Wrapf(types.ErrUnmarshalFailure, fmt.Sprintf("Unable to unmarshal delegate callback args: %s", err.Error()))
}
chainId := delegateCallback.HostZoneId
k.Logger(ctx).Info(utils.LogCallbackWithHostZone(chainId, ICACallbackID_Delegate,
"Starting callback for Deposit Record: %d", delegateCallback.DepositRecordId))
k.Logger(ctx).Info(utils.LogICACallbackWithHostZone(chainId, ICACallbackID_Delegate,
"Starting delegate callback for Deposit Record: %d", delegateCallback.DepositRecordId))

// Confirm chainId and deposit record Id exist
hostZone, found := k.GetHostZone(ctx, chainId)
Expand All @@ -67,9 +67,9 @@ func DelegateCallback(k Keeper, ctx sdk.Context, packet channeltypes.Packet, ack
}

// Check for timeout (ack nil)
// No need to reset the deposit record status since it will get revertted when the channel is restored
// No need to reset the deposit record status since it will get reverted when the channel is restored
if ack == nil {
k.Logger(ctx).Error(utils.LogCallbackWithHostZone(chainId, ICACallbackID_Delegate,
k.Logger(ctx).Error(utils.LogICACallbackWithHostZone(chainId, ICACallbackID_Delegate,
"TIMEOUT (ack is nil), Packet: %+v", packet))
return nil
}
Expand All @@ -82,7 +82,7 @@ func DelegateCallback(k Keeper, ctx sdk.Context, packet channeltypes.Packet, ack
return sdkerrors.Wrap(icacallbackstypes.ErrTxMsgData, err.Error())
}
if len(txMsgData.Data) == 0 {
k.Logger(ctx).Error(utils.LogCallbackWithHostZone(chainId, ICACallbackID_Delegate,
k.Logger(ctx).Error(utils.LogICACallbackWithHostZone(chainId, ICACallbackID_Delegate,
"ICA TX FAILED (ack is empty / ack error), Packet: %+v", packet))

// Reset deposit record status
Expand All @@ -91,7 +91,7 @@ func DelegateCallback(k Keeper, ctx sdk.Context, packet channeltypes.Packet, ack
return nil
}

k.Logger(ctx).Info(utils.LogCallbackWithHostZone(chainId, ICACallbackID_Delegate, "SUCCESS, Packet: %+v", packet))
k.Logger(ctx).Info(utils.LogICACallbackWithHostZone(chainId, ICACallbackID_Delegate, "SUCCESS, Packet: %+v", packet))

// Update delegations on the host zone
for _, splitDelegation := range delegateCallback.SplitDelegations {
Expand Down
2 changes: 1 addition & 1 deletion x/stakeibc/keeper/icacallbacks_delegate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (s *KeeperTestSuite) TestDelegateCallback_WrongCallbackArgs() {
invalidArgs := tc.validArgs

err := stakeibckeeper.DelegateCallback(s.App.StakeibcKeeper, s.Ctx, invalidArgs.packet, invalidArgs.ack, []byte("random bytes"))
s.Require().EqualError(err, "unexpected EOF")
s.Require().EqualError(err, "Unable to unmarshal delegate callback args: unexpected EOF: unable to unmarshal data structure")
s.checkDelegateStateIfCallbackFailed(tc)
}

Expand Down
75 changes: 45 additions & 30 deletions x/stakeibc/keeper/icacallbacks_rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package keeper
import (
"fmt"

"github.com/Stride-Labs/stride/v4/utils"
"github.com/Stride-Labs/stride/v4/x/icacallbacks"
icacallbackstypes "github.com/Stride-Labs/stride/v4/x/icacallbacks/types"
"github.com/Stride-Labs/stride/v4/x/stakeibc/types"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/golang/protobuf/proto" //nolint:staticcheck
)

// Marshalls rebalance callback arguments
func (k Keeper) MarshalRebalanceCallbackArgs(ctx sdk.Context, rebalanceCallback types.RebalanceCallback) ([]byte, error) {
out, err := proto.Marshal(&rebalanceCallback)
if err != nil {
Expand All @@ -22,6 +24,7 @@ func (k Keeper) MarshalRebalanceCallbackArgs(ctx sdk.Context, rebalanceCallback
return out, nil
}

// Unmarshalls rebalance callback arguments into a RebalanceCallback struct
func (k Keeper) UnmarshalRebalanceCallbackArgs(ctx sdk.Context, rebalanceCallback []byte) (*types.RebalanceCallback, error) {
unmarshalledRebalanceCallback := types.RebalanceCallback{}
if err := proto.Unmarshal(rebalanceCallback, &unmarshalledRebalanceCallback); err != nil {
Expand All @@ -31,63 +34,75 @@ func (k Keeper) UnmarshalRebalanceCallbackArgs(ctx sdk.Context, rebalanceCallbac
return &unmarshalledRebalanceCallback, nil
}

// ICA Callback after rebalance validators on a host zone
// If successful:
// * Updates relevant validator delegations on the host zone struct
// If timeout/failure:
// * Does nothing
func RebalanceCallback(k Keeper, ctx sdk.Context, packet channeltypes.Packet, ack *channeltypes.Acknowledgement, args []byte) error {
k.Logger(ctx).Info("RebalanceCallback executing", "packet", packet)
// Fetch callback args
rebalanceCallback, err := k.UnmarshalRebalanceCallbackArgs(ctx, args)
if err != nil {
return sdkerrors.Wrapf(types.ErrUnmarshalFailure, fmt.Sprintf("Unable to unmarshal rebalance callback args: %s", err.Error()))
}
chainId := rebalanceCallback.HostZoneId
k.Logger(ctx).Info(utils.LogICACallbackWithHostZone(chainId, ICACallbackID_Rebalance, "Starting rebalance callback"))

// Check for timeout (ack nil)
// No action is necessary on a timeout
if ack == nil {
// timeout
k.Logger(ctx).Error(fmt.Sprintf("RebalanceCallback timeout, ack is nil, packet %v", packet))
k.Logger(ctx).Error(utils.LogICACallbackWithHostZone(chainId, ICACallbackID_Rebalance,
"TIMEOUT (ack is nil), Packet: %+v", packet))
return nil
}

// Check for a failed transaction (ack error)
// No action is necessary on a failure
txMsgData, err := icacallbacks.GetTxMsgData(ctx, *ack, k.Logger(ctx))
if err != nil {
k.Logger(ctx).Error(fmt.Sprintf("failed to fetch txMsgData, packet %v", packet))
k.Logger(ctx).Error(fmt.Sprintf("RebalanceCallback failed to fetch txMsgData, packet %v", packet))
return sdkerrors.Wrap(icacallbackstypes.ErrTxMsgData, err.Error())
}

if len(txMsgData.Data) == 0 {
// failed transaction
k.Logger(ctx).Error(fmt.Sprintf("RebalanceCallback tx failed, ack is empty (ack error), packet %v", packet))
k.Logger(ctx).Error(utils.LogICACallbackWithHostZone(chainId, ICACallbackID_Rebalance,
"ICA TX FAILED (ack is empty / ack error), Packet: %+v", packet))
return nil
}

// deserialize the args
rebalanceCallback, err := k.UnmarshalRebalanceCallbackArgs(ctx, args)
if err != nil {
errMsg := fmt.Sprintf("Unable to unmarshal rebalance callback args | %s", err.Error())
k.Logger(ctx).Error(errMsg)
return sdkerrors.Wrapf(types.ErrUnmarshalFailure, errMsg)
}
k.Logger(ctx).Info(fmt.Sprintf("RebalanceCallback %v", rebalanceCallback))
hostZone := rebalanceCallback.GetHostZoneId()
zone, found := k.GetHostZone(ctx, hostZone)
k.Logger(ctx).Info(utils.LogICACallbackWithHostZone(chainId, ICACallbackID_Rebalance, "SUCCESS, Packet: %+v", packet))

// Confirm the host zone exists
hostZone, found := k.GetHostZone(ctx, chainId)
if !found {
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "host zone not found %s", hostZone)
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "host zone not found %s", chainId)
}

// update the host zone
rebalancings := rebalanceCallback.GetRebalancings()
// assemble a map from validatorAddress -> validator
// Assemble a map from validatorAddress -> validator
valAddrMap := make(map[string]*types.Validator)
for _, val := range zone.GetValidators() {
valAddrMap[val.GetAddress()] = val
for _, val := range hostZone.Validators {
valAddrMap[val.Address] = val
}
for _, rebalancing := range rebalancings {
srcValidator := rebalancing.GetSrcValidator()
dstValidator := rebalancing.GetDstValidator()
amt := rebalancing.Amt

// For each re-delegation transaction, update the relevant validators on the host zone
for _, rebalancing := range rebalanceCallback.Rebalancings {
srcValidator := rebalancing.SrcValidator
dstValidator := rebalancing.DstValidator

// Decrement the total delegation from the source validator
if _, valFound := valAddrMap[srcValidator]; valFound {
valAddrMap[srcValidator].DelegationAmt = valAddrMap[srcValidator].DelegationAmt.Sub(amt)
valAddrMap[srcValidator].DelegationAmt = valAddrMap[srcValidator].DelegationAmt.Sub(rebalancing.Amt)
} else {
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "validator not found %s", srcValidator)
}

// Increment the total delegation for the destination validator
if _, valFound := valAddrMap[dstValidator]; valFound {
valAddrMap[dstValidator].DelegationAmt = valAddrMap[dstValidator].DelegationAmt.Add(amt)
valAddrMap[dstValidator].DelegationAmt = valAddrMap[dstValidator].DelegationAmt.Add(rebalancing.Amt)
} else {
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "validator not found %s", dstValidator)
}
}
k.SetHostZone(ctx, zone)
k.SetHostZone(ctx, hostZone)

return nil
}
2 changes: 1 addition & 1 deletion x/stakeibc/keeper/icacallbacks_rebalance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (s *KeeperTestSuite) TestRebalanceCallback_WrongCallbackArgs() {
invalidArgs := tc.validArgs

err := stakeibckeeper.RebalanceCallback(s.App.StakeibcKeeper, s.Ctx, invalidArgs.packet, invalidArgs.ack, []byte("random bytes"))
s.Require().EqualError(err, "Unable to unmarshal rebalance callback args | unexpected EOF: unable to unmarshal data structure")
s.Require().EqualError(err, "Unable to unmarshal rebalance callback args: unexpected EOF: unable to unmarshal data structure")
s.checkDelegationStateIfCallbackFailed()
}

Expand Down
Loading

0 comments on commit ac45bc4

Please sign in to comment.