Skip to content

Commit

Permalink
DAOS-15956 control: Limit race between taking leadership and join (#1…
Browse files Browse the repository at this point in the history
…4541)

In very limited situations, it is possible to encounter a race condition
between an MS replica stepping up as leader and engines beginning to
join the system. In that case, the system fabric provider property may
not yet be set, causing the engine join to fail.

This change shrinks the gap where the race can occur. It also returns
a retryable error on the small chance that this scenario is hit.

Signed-off-by: Kris Jacque <kris.jacque@intel.com>
  • Loading branch information
kjacque authored Jun 17, 2024
1 parent f825add commit 2dcf4ed
Show file tree
Hide file tree
Showing 9 changed files with 274 additions and 29 deletions.
10 changes: 8 additions & 2 deletions src/control/server/mgmt_svc.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2018-2023 Intel Corporation.
// (C) Copyright 2018-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -135,7 +135,13 @@ func (svc *mgmtSvc) checkLeaderRequest(req proto.Message) error {
if err := svc.checkSystemRequest(unwrapped); err != nil {
return err
}
return svc.sysdb.CheckLeader()

if err := svc.sysdb.CheckLeader(); err != nil {
return err
}

svc.sysdb.WaitForLeaderStepUp()
return nil
}

// checkReplicaRequest performs sanity-checking on a request that must
Expand Down
6 changes: 5 additions & 1 deletion src/control/server/mgmt_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,11 @@ func (svc *mgmtSvc) checkReqFabricProvider(req *mgmtpb.JoinReq, peerAddr *net.TC

sysProv, err := svc.getFabricProvider()
if err != nil {
return errors.Wrapf(err, "fetching system fabric provider")
if system.IsErrSystemAttrNotFound(err) {
svc.log.Debugf("error fetching system fabric provider: %s", err.Error())
return system.ErrLeaderStepUpInProgress
}
return errors.Wrap(err, "fetching system fabric provider")
}

if joinProv != sysProv {
Expand Down
15 changes: 14 additions & 1 deletion src/control/server/mgmt_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2508,8 +2508,21 @@ func TestMgmtSvc_checkReqFabricProvider(t *testing.T) {
joinURI: "tcp://10.10.10.10",
expErr: &system.ErrNotReplica{},
},
"prop not set": {
getSvc: func(t *testing.T, l logging.Logger) *mgmtSvc {
ms := mgmtSystemTestSetup(t, l, system.Members{}, []*control.HostResponse{})
if err := ms.setFabricProviders(""); err != nil {
t.Fatal(err)
}
return ms
},
joinProv: "tcp",
joinURI: "tcp://10.10.10.10",
expErr: system.ErrLeaderStepUpInProgress,
},
"success": {
joinURI: "tcp://10.10.10.10",
joinProv: "tcp",
joinURI: "tcp://10.10.10.10",
},
"does not match": {
joinProv: "verbs",
Expand Down
10 changes: 6 additions & 4 deletions src/control/system/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
)

var (
ErrEmptyGroupMap = errors.New("empty group map (all ranks excluded?)")
ErrRaftUnavail = errors.New("raft service unavailable (not started yet?)")
ErrUninitialized = errors.New("system is uninitialized (storage format required?)")
ErrEmptyGroupMap = errors.New("empty group map (all ranks excluded?)")
ErrRaftUnavail = errors.New("raft service unavailable (not started yet?)")
ErrUninitialized = errors.New("system is uninitialized (storage format required?)")
ErrLeaderStepUpInProgress = errors.New("leader step-up in progress (try again)")
)

// IsNotReady is a convenience function for checking if an error
Expand All @@ -37,7 +38,8 @@ func IsUnavailable(err error) bool {
if err == nil {
return false
}
return strings.Contains(errors.Cause(err).Error(), ErrRaftUnavail.Error())
cause := errors.Cause(err).Error()
return strings.Contains(cause, ErrRaftUnavail.Error()) || strings.Contains(cause, ErrLeaderStepUpInProgress.Error())
}

// IsEmptyGroupMap returns a boolean indicating whether or not the
Expand Down
93 changes: 93 additions & 0 deletions src/control/system/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//
// (C) Copyright 2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//

package system

import (
"testing"

"github.com/pkg/errors"

"github.com/daos-stack/daos/src/control/common/test"
)

func TestSystem_Errors_IsNotReady(t *testing.T) {
for name, tc := range map[string]struct {
err error
expResult bool
}{
"nil": {},
"uninitialized": {
err: ErrUninitialized,
expResult: true,
},
"raft not available": {
err: ErrRaftUnavail,
expResult: true,
},
"leadership transfer in progress": {
err: ErrLeaderStepUpInProgress,
expResult: true,
},
"something else": {
err: errors.New("something is wrong"),
},
} {
t.Run(name, func(t *testing.T) {
test.AssertEqual(t, tc.expResult, IsNotReady(tc.err), "")
})
}
}

func TestSystem_Errors_IsUninitialized(t *testing.T) {
for name, tc := range map[string]struct {
err error
expResult bool
}{
"nil": {},
"uninitialized": {
err: ErrUninitialized,
expResult: true,
},
"unavailable not uninitialized": {
err: ErrRaftUnavail,
},
"something else": {
err: errors.New("something is wrong"),
},
} {
t.Run(name, func(t *testing.T) {
test.AssertEqual(t, tc.expResult, IsUninitialized(tc.err), "")
})
}
}

func TestSystem_Errors_IsUnavailable(t *testing.T) {
for name, tc := range map[string]struct {
err error
expResult bool
}{
"nil": {},
"raft not available": {
err: ErrRaftUnavail,
expResult: true,
},
"leadership transfer in progress": {
err: ErrLeaderStepUpInProgress,
expResult: true,
},
"uninitialized not unavailable": {
err: ErrUninitialized,
},
"something else": {
err: errors.New("something is wrong"),
},
} {
t.Run(name, func(t *testing.T) {
test.AssertEqual(t, tc.expResult, IsUnavailable(tc.err), "")
})
}
}
45 changes: 28 additions & 17 deletions src/control/system/raft/database.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2020-2023 Intel Corporation.
// (C) Copyright 2020-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -88,6 +88,7 @@ type (
log logging.Logger
cfg *DatabaseConfig
initialized atm.Bool
steppingUp atm.Bool
replicaAddr *net.TCPAddr
raftTransport raft.Transport
raft syncRaft
Expand Down Expand Up @@ -521,26 +522,36 @@ func (db *Database) monitorLeadershipState(parent context.Context) {
}

db.log.Debugf("node %s gained MS leader state", db.replicaAddr)
if err := db.Barrier(); err != nil {
db.log.Errorf("raft Barrier() failed: %s", err)
if err = db.ResignLeadership(err); err != nil {
db.log.Errorf("raft ResignLeadership() failed: %s", err)
}
continue // restart the monitoring loop
}

var gainedCtx context.Context
gainedCtx, cancelGainedCtx = context.WithCancel(parent)
for _, fn := range db.onLeadershipGained {
if err := fn(gainedCtx); err != nil {
db.log.Errorf("failure in onLeadershipGained callback: %s", err)
cancelGainedCtx()
if err = db.ResignLeadership(err); err != nil {
db.log.Errorf("raft ResignLeadership() failed: %s", err)
}
break // break out of the inner loop; restart the monitoring loop
}
db.stepUp(gainedCtx, cancelGainedCtx)
}
}
}

func (db *Database) stepUp(ctx context.Context, cancel context.CancelFunc) {
db.steppingUp.SetTrue()
defer db.steppingUp.SetFalse()

if err := db.Barrier(); err != nil {
db.log.Errorf("raft Barrier() failed: %s", err)
if err = db.ResignLeadership(err); err != nil {
db.log.Errorf("raft ResignLeadership() failed: %s", err)
}
return // restart the monitoring loop
}

for i, fn := range db.onLeadershipGained {
db.log.Tracef("executing onLeadershipGained[%d]", i)

if err := fn(ctx); err != nil {
db.log.Errorf("failure in onLeadershipGained callback: %s", err)
cancel()
if err = db.ResignLeadership(err); err != nil {
db.log.Errorf("raft ResignLeadership() failed: %s", err)
}
return
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions src/control/system/raft/mocks.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2020-2023 Intel Corporation.
// (C) Copyright 2020-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -31,6 +31,7 @@ type (
ServerAddress raft.ServerAddress
State raft.RaftState
LeadershipTransferErr error
BarrierReturn raft.Future
}
mockRaftService struct {
cfg mockRaftServiceConfig
Expand Down Expand Up @@ -85,7 +86,10 @@ func (mrs *mockRaftService) State() raft.RaftState {
}

func (mrs *mockRaftService) Barrier(time.Duration) raft.Future {
return &mockRaftFuture{}
if mrs.cfg.BarrierReturn == nil {
return &mockRaftFuture{}
}
return mrs.cfg.BarrierReturn
}

func newMockRaftService(cfg *mockRaftServiceConfig, fsm raft.FSM) *mockRaftService {
Expand Down
15 changes: 13 additions & 2 deletions src/control/system/raft/raft.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2020-2022 Intel Corporation.
// (C) Copyright 2020-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -148,10 +148,21 @@ func (db *Database) Barrier() error {
db.log.Errorf("lost leadership during Barrier(): %s", err)
return errNotSysLeader(svc, db)
}
return err
if err != nil {
return err
}
return nil
})
}

// WaitForLeaderStepUp waits for all OnLeadershipGained functions to finish executing.
func (db *Database) WaitForLeaderStepUp() {
for db.steppingUp.IsTrue() {
// short interval to keep this polling loop from consuming too many cycles
time.Sleep(10 * time.Millisecond)
}
}

// ShutdownRaft signals that the raft implementation should shut down
// and release any resources it is holding. Blocks until the shutdown
// is complete.
Expand Down
Loading

0 comments on commit 2dcf4ed

Please sign in to comment.