Skip to content

Commit

Permalink
Merge branch 'master' into Nasf-Fan/DAOS-15914_1
Browse files Browse the repository at this point in the history
  • Loading branch information
Nasf-Fan committed Jun 18, 2024
2 parents 3501278 + 2dcf4ed commit cbff806
Show file tree
Hide file tree
Showing 9 changed files with 274 additions and 29 deletions.
10 changes: 8 additions & 2 deletions src/control/server/mgmt_svc.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2018-2023 Intel Corporation.
// (C) Copyright 2018-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -135,7 +135,13 @@ func (svc *mgmtSvc) checkLeaderRequest(req proto.Message) error {
if err := svc.checkSystemRequest(unwrapped); err != nil {
return err
}
return svc.sysdb.CheckLeader()

if err := svc.sysdb.CheckLeader(); err != nil {
return err
}

svc.sysdb.WaitForLeaderStepUp()
return nil
}

// checkReplicaRequest performs sanity-checking on a request that must
Expand Down
6 changes: 5 additions & 1 deletion src/control/server/mgmt_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,11 @@ func (svc *mgmtSvc) checkReqFabricProvider(req *mgmtpb.JoinReq, peerAddr *net.TC

sysProv, err := svc.getFabricProvider()
if err != nil {
return errors.Wrapf(err, "fetching system fabric provider")
if system.IsErrSystemAttrNotFound(err) {
svc.log.Debugf("error fetching system fabric provider: %s", err.Error())
return system.ErrLeaderStepUpInProgress
}
return errors.Wrap(err, "fetching system fabric provider")
}

if joinProv != sysProv {
Expand Down
15 changes: 14 additions & 1 deletion src/control/server/mgmt_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2508,8 +2508,21 @@ func TestMgmtSvc_checkReqFabricProvider(t *testing.T) {
joinURI: "tcp://10.10.10.10",
expErr: &system.ErrNotReplica{},
},
"prop not set": {
getSvc: func(t *testing.T, l logging.Logger) *mgmtSvc {
ms := mgmtSystemTestSetup(t, l, system.Members{}, []*control.HostResponse{})
if err := ms.setFabricProviders(""); err != nil {
t.Fatal(err)
}
return ms
},
joinProv: "tcp",
joinURI: "tcp://10.10.10.10",
expErr: system.ErrLeaderStepUpInProgress,
},
"success": {
joinURI: "tcp://10.10.10.10",
joinProv: "tcp",
joinURI: "tcp://10.10.10.10",
},
"does not match": {
joinProv: "verbs",
Expand Down
10 changes: 6 additions & 4 deletions src/control/system/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
)

var (
ErrEmptyGroupMap = errors.New("empty group map (all ranks excluded?)")
ErrRaftUnavail = errors.New("raft service unavailable (not started yet?)")
ErrUninitialized = errors.New("system is uninitialized (storage format required?)")
ErrEmptyGroupMap = errors.New("empty group map (all ranks excluded?)")
ErrRaftUnavail = errors.New("raft service unavailable (not started yet?)")
ErrUninitialized = errors.New("system is uninitialized (storage format required?)")
ErrLeaderStepUpInProgress = errors.New("leader step-up in progress (try again)")
)

// IsNotReady is a convenience function for checking if an error
Expand All @@ -37,7 +38,8 @@ func IsUnavailable(err error) bool {
if err == nil {
return false
}
return strings.Contains(errors.Cause(err).Error(), ErrRaftUnavail.Error())
cause := errors.Cause(err).Error()
return strings.Contains(cause, ErrRaftUnavail.Error()) || strings.Contains(cause, ErrLeaderStepUpInProgress.Error())
}

// IsEmptyGroupMap returns a boolean indicating whether or not the
Expand Down
93 changes: 93 additions & 0 deletions src/control/system/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//
// (C) Copyright 2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//

package system

import (
"testing"

"github.com/pkg/errors"

"github.com/daos-stack/daos/src/control/common/test"
)

func TestSystem_Errors_IsNotReady(t *testing.T) {
for name, tc := range map[string]struct {
err error
expResult bool
}{
"nil": {},
"uninitialized": {
err: ErrUninitialized,
expResult: true,
},
"raft not available": {
err: ErrRaftUnavail,
expResult: true,
},
"leadership transfer in progress": {
err: ErrLeaderStepUpInProgress,
expResult: true,
},
"something else": {
err: errors.New("something is wrong"),
},
} {
t.Run(name, func(t *testing.T) {
test.AssertEqual(t, tc.expResult, IsNotReady(tc.err), "")
})
}
}

func TestSystem_Errors_IsUninitialized(t *testing.T) {
for name, tc := range map[string]struct {
err error
expResult bool
}{
"nil": {},
"uninitialized": {
err: ErrUninitialized,
expResult: true,
},
"unavailable not uninitialized": {
err: ErrRaftUnavail,
},
"something else": {
err: errors.New("something is wrong"),
},
} {
t.Run(name, func(t *testing.T) {
test.AssertEqual(t, tc.expResult, IsUninitialized(tc.err), "")
})
}
}

func TestSystem_Errors_IsUnavailable(t *testing.T) {
for name, tc := range map[string]struct {
err error
expResult bool
}{
"nil": {},
"raft not available": {
err: ErrRaftUnavail,
expResult: true,
},
"leadership transfer in progress": {
err: ErrLeaderStepUpInProgress,
expResult: true,
},
"uninitialized not unavailable": {
err: ErrUninitialized,
},
"something else": {
err: errors.New("something is wrong"),
},
} {
t.Run(name, func(t *testing.T) {
test.AssertEqual(t, tc.expResult, IsUnavailable(tc.err), "")
})
}
}
45 changes: 28 additions & 17 deletions src/control/system/raft/database.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2020-2023 Intel Corporation.
// (C) Copyright 2020-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -88,6 +88,7 @@ type (
log logging.Logger
cfg *DatabaseConfig
initialized atm.Bool
steppingUp atm.Bool
replicaAddr *net.TCPAddr
raftTransport raft.Transport
raft syncRaft
Expand Down Expand Up @@ -521,26 +522,36 @@ func (db *Database) monitorLeadershipState(parent context.Context) {
}

db.log.Debugf("node %s gained MS leader state", db.replicaAddr)
if err := db.Barrier(); err != nil {
db.log.Errorf("raft Barrier() failed: %s", err)
if err = db.ResignLeadership(err); err != nil {
db.log.Errorf("raft ResignLeadership() failed: %s", err)
}
continue // restart the monitoring loop
}

var gainedCtx context.Context
gainedCtx, cancelGainedCtx = context.WithCancel(parent)
for _, fn := range db.onLeadershipGained {
if err := fn(gainedCtx); err != nil {
db.log.Errorf("failure in onLeadershipGained callback: %s", err)
cancelGainedCtx()
if err = db.ResignLeadership(err); err != nil {
db.log.Errorf("raft ResignLeadership() failed: %s", err)
}
break // break out of the inner loop; restart the monitoring loop
}
db.stepUp(gainedCtx, cancelGainedCtx)
}
}
}

func (db *Database) stepUp(ctx context.Context, cancel context.CancelFunc) {
db.steppingUp.SetTrue()
defer db.steppingUp.SetFalse()

if err := db.Barrier(); err != nil {
db.log.Errorf("raft Barrier() failed: %s", err)
if err = db.ResignLeadership(err); err != nil {
db.log.Errorf("raft ResignLeadership() failed: %s", err)
}
return // restart the monitoring loop
}

for i, fn := range db.onLeadershipGained {
db.log.Tracef("executing onLeadershipGained[%d]", i)

if err := fn(ctx); err != nil {
db.log.Errorf("failure in onLeadershipGained callback: %s", err)
cancel()
if err = db.ResignLeadership(err); err != nil {
db.log.Errorf("raft ResignLeadership() failed: %s", err)
}
return
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions src/control/system/raft/mocks.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2020-2023 Intel Corporation.
// (C) Copyright 2020-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -31,6 +31,7 @@ type (
ServerAddress raft.ServerAddress
State raft.RaftState
LeadershipTransferErr error
BarrierReturn raft.Future
}
mockRaftService struct {
cfg mockRaftServiceConfig
Expand Down Expand Up @@ -85,7 +86,10 @@ func (mrs *mockRaftService) State() raft.RaftState {
}

func (mrs *mockRaftService) Barrier(time.Duration) raft.Future {
return &mockRaftFuture{}
if mrs.cfg.BarrierReturn == nil {
return &mockRaftFuture{}
}
return mrs.cfg.BarrierReturn
}

func newMockRaftService(cfg *mockRaftServiceConfig, fsm raft.FSM) *mockRaftService {
Expand Down
15 changes: 13 additions & 2 deletions src/control/system/raft/raft.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2020-2022 Intel Corporation.
// (C) Copyright 2020-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -148,10 +148,21 @@ func (db *Database) Barrier() error {
db.log.Errorf("lost leadership during Barrier(): %s", err)
return errNotSysLeader(svc, db)
}
return err
if err != nil {
return err
}
return nil
})
}

// WaitForLeaderStepUp waits for all OnLeadershipGained functions to finish executing.
func (db *Database) WaitForLeaderStepUp() {
for db.steppingUp.IsTrue() {
// short interval to keep this polling loop from consuming too many cycles
time.Sleep(10 * time.Millisecond)
}
}

// ShutdownRaft signals that the raft implementation should shut down
// and release any resources it is holding. Blocks until the shutdown
// is complete.
Expand Down
Loading

0 comments on commit cbff806

Please sign in to comment.