Skip to content

Commit

Permalink
fix(consensus): node stalled after client has stopped (#1001)
Browse files Browse the repository at this point in the history
* fix(consensus): stopped abci client is not recoverable

* test(consensus): ensure client stopped error causes panic

* doc(abciclient): document ErrClientStopped

* test(consensus): apply rabbit feedback
  • Loading branch information
lklimek authored Dec 11, 2024
1 parent 7681ea4 commit a740370
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 1 deletion.
7 changes: 6 additions & 1 deletion abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ type socketClient struct {

var _ Client = (*socketClient)(nil)

var (
// ErrClientStopped is returned when client wasn't started yet or it was terminated with an error.
ErrClientStopped = errors.New("client has stopped")
)

// NewSocketClient creates a new socket client, which connects to a given
// address. If mustConnect is true, the client will return an error upon start
// if it fails to connect.
Expand Down Expand Up @@ -234,7 +239,7 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error {

func (cli *socketClient) doRequest(ctx context.Context, req *types.Request) (*types.Response, error) {
if !cli.IsRunning() {
return nil, errors.New("client has stopped")
return nil, ErrClientStopped
}

reqres := makeReqRes(ctx, req)
Expand Down
81 changes: 81 additions & 0 deletions internal/consensus/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

abciclient "github.com/dashpay/tenderdash/abci/client"
clientmocks "github.com/dashpay/tenderdash/abci/client/mocks"
"github.com/dashpay/tenderdash/abci/example/kvstore"
abci "github.com/dashpay/tenderdash/abci/types"
abcimocks "github.com/dashpay/tenderdash/abci/types/mocks"
Expand All @@ -22,6 +24,7 @@ import (
"github.com/dashpay/tenderdash/internal/mempool"
tmpubsub "github.com/dashpay/tenderdash/internal/pubsub"
tmquery "github.com/dashpay/tenderdash/internal/pubsub/query"
sm "github.com/dashpay/tenderdash/internal/state"
sf "github.com/dashpay/tenderdash/internal/state/test/factory"
"github.com/dashpay/tenderdash/internal/test/factory"
tmbytes "github.com/dashpay/tenderdash/libs/bytes"
Expand Down Expand Up @@ -3272,6 +3275,84 @@ func TestStateTryAddCommitCallsProcessProposal(t *testing.T) {
assert.NoError(t, err)
}

// TestStateTryAddCommitPanicsOnClientError ensures that
// given ABCI client that errors on ProcessProposal,
// when new block is about to be processed,
// then the TryAddCommitEvent causes a panic.
func TestStateTryAddCommitPanicsOnClientError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
config := configSetup(t)

// setup some node and commit
genDoc, privVals := factory.RandGenesisDoc(1, factory.ConsensusParams())
logger := consensusLogger(t)

state, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err)

// Create a panicking app
app := clientmocks.NewClient(t)
app.On("ProcessProposal", mock.Anything, mock.Anything).
Return(&abci.ResponseProcessProposal{}, abciclient.ErrClientStopped).
Once()

// create a new consensus state
proTxHash, err := privVals[0].GetProTxHash(ctx)
require.NoError(t, err)
ctx = dash.ContextWithProTxHash(ctx, proTxHash)
consensusState := newStateWithConfig(ctx, t, logger, config, state, privVals[0], app)

stateData := consensusState.GetStateData()

// create proposal and a block to be processed
block, err := sf.MakeBlock(state, 1, &types.Commit{}, kvstore.ProtocolVersion)
require.NoError(t, err)
require.NotZero(t, block.Version.App)
block.CoreChainLockedHeight = 1

commit, err := factory.MakeCommit(
ctx,
block.BlockID(nil),
block.Height,
0,
stateData.Votes.Precommits(0),
state.Validators,
privVals,
)
require.NoError(t, err)

proposal := types.NewProposal(
block.Height,
block.CoreChainLockedHeight,
0,
-1,
commit.BlockID,
block.Time)

parts, err := block.MakePartSet(999999999)
require.NoError(t, err)

// emulate that the node has received proposal and block
peerID := stateData.Validators.Proposer().NodeAddress.NodeID
stateData.Proposal = proposal
stateData.ProposalBlock = block
stateData.ProposalBlockParts = parts
stateData.updateRoundStep(commit.Round, cstypes.RoundStepPrevote)

// invoke the TryAddCommitEvent to see if it will panic
ctx = msgInfoWithCtx(ctx, msgInfo{
Msg: &CommitMessage{commit},
PeerID: peerID,
ReceiveTime: time.Time{},
})
assert.PanicsWithError(t,
"ABCI client stopped, Tenderdash needs to be restarted: ProcessProposal abci method: client has stopped",
func() {
_ = consensusState.ctrl.Dispatch(ctx, &TryAddCommitEvent{Commit: commit, PeerID: peerID}, &stateData)
})
}

// TestStateTimestamp_ProposalMatch tests that a validator prevotes a
// proposed block if the timestamp in the block matches the timestamp in the
// corresponding proposal message.
Expand Down
6 changes: 6 additions & 0 deletions internal/consensus/state_try_add_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package consensus

import (
"context"
"errors"
"fmt"

abciclient "github.com/dashpay/tenderdash/abci/client"
"github.com/dashpay/tenderdash/dash"
cstypes "github.com/dashpay/tenderdash/internal/consensus/types"
"github.com/dashpay/tenderdash/libs/log"
Expand Down Expand Up @@ -108,6 +110,10 @@ func (cs *TryAddCommitAction) verifyCommit(ctx context.Context, stateData *State
// We have a correct block, let's process it before applying the commit
err = cs.blockExec.ensureProcess(ctx, &stateData.RoundState, commit.Round)
if err != nil {
if errors.Is(err, abciclient.ErrClientStopped) {
// this is a non-recoverable error in current architecture
panic(fmt.Errorf("ABCI client stopped, Tenderdash needs to be restarted: %w", err))
}
return false, fmt.Errorf("unable to process proposal: %w", err)
}
err = cs.blockExec.validate(ctx, stateData)
Expand Down

0 comments on commit a740370

Please sign in to comment.