From 2dcf4edf242a8032223061da220db6fbd9b8a813 Mon Sep 17 00:00:00 2001 From: Kris Jacque Date: Mon, 17 Jun 2024 15:48:42 -0600 Subject: [PATCH] DAOS-15956 control: Limit race between taking leadership and join (#14541) In very limited situations, it is possible to encounter a race condition between an MS replica stepping up as leader and engines beginning to join the system. In that case, the system fabric provider property may not yet be set, causing the engine join to fail. This change shrinks the gap where the race can occur. It also returns a retryable error on the small chance that this scenario is hit. Signed-off-by: Kris Jacque --- src/control/server/mgmt_svc.go | 10 ++- src/control/server/mgmt_system.go | 6 +- src/control/server/mgmt_system_test.go | 15 +++- src/control/system/errors.go | 10 ++- src/control/system/errors_test.go | 93 +++++++++++++++++++++++ src/control/system/raft/database.go | 45 ++++++----- src/control/system/raft/mocks.go | 8 +- src/control/system/raft/raft.go | 15 +++- src/control/system/raft/raft_test.go | 101 +++++++++++++++++++++++++ 9 files changed, 274 insertions(+), 29 deletions(-) create mode 100644 src/control/system/errors_test.go create mode 100644 src/control/system/raft/raft_test.go diff --git a/src/control/server/mgmt_svc.go b/src/control/server/mgmt_svc.go index 35792c44249..aac5358736b 100644 --- a/src/control/server/mgmt_svc.go +++ b/src/control/server/mgmt_svc.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2018-2023 Intel Corporation. +// (C) Copyright 2018-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -135,7 +135,13 @@ func (svc *mgmtSvc) checkLeaderRequest(req proto.Message) error { if err := svc.checkSystemRequest(unwrapped); err != nil { return err } - return svc.sysdb.CheckLeader() + + if err := svc.sysdb.CheckLeader(); err != nil { + return err + } + + svc.sysdb.WaitForLeaderStepUp() + return nil } // checkReplicaRequest performs sanity-checking on a request that must diff --git a/src/control/server/mgmt_system.go b/src/control/server/mgmt_system.go index a458a4524ba..64dda030023 100644 --- a/src/control/server/mgmt_system.go +++ b/src/control/server/mgmt_system.go @@ -277,7 +277,11 @@ func (svc *mgmtSvc) checkReqFabricProvider(req *mgmtpb.JoinReq, peerAddr *net.TC sysProv, err := svc.getFabricProvider() if err != nil { - return errors.Wrapf(err, "fetching system fabric provider") + if system.IsErrSystemAttrNotFound(err) { + svc.log.Debugf("error fetching system fabric provider: %s", err.Error()) + return system.ErrLeaderStepUpInProgress + } + return errors.Wrap(err, "fetching system fabric provider") } if joinProv != sysProv { diff --git a/src/control/server/mgmt_system_test.go b/src/control/server/mgmt_system_test.go index 1b04a1e4b4c..7a7d6ec0d1e 100644 --- a/src/control/server/mgmt_system_test.go +++ b/src/control/server/mgmt_system_test.go @@ -2508,8 +2508,21 @@ func TestMgmtSvc_checkReqFabricProvider(t *testing.T) { joinURI: "tcp://10.10.10.10", expErr: &system.ErrNotReplica{}, }, + "prop not set": { + getSvc: func(t *testing.T, l logging.Logger) *mgmtSvc { + ms := mgmtSystemTestSetup(t, l, system.Members{}, []*control.HostResponse{}) + if err := ms.setFabricProviders(""); err != nil { + t.Fatal(err) + } + return ms + }, + joinProv: "tcp", + joinURI: "tcp://10.10.10.10", + expErr: system.ErrLeaderStepUpInProgress, + }, "success": { - joinURI: "tcp://10.10.10.10", + joinProv: "tcp", + joinURI: "tcp://10.10.10.10", }, "does not match": { joinProv: "verbs", diff --git a/src/control/system/errors.go b/src/control/system/errors.go index f074ac97550..c7253e186eb 100644 --- a/src/control/system/errors.go +++ b/src/control/system/errors.go @@ -20,9 +20,10 @@ import ( ) var ( - ErrEmptyGroupMap = errors.New("empty group map (all ranks excluded?)") - ErrRaftUnavail = errors.New("raft service unavailable (not started yet?)") - ErrUninitialized = errors.New("system is uninitialized (storage format required?)") + ErrEmptyGroupMap = errors.New("empty group map (all ranks excluded?)") + ErrRaftUnavail = errors.New("raft service unavailable (not started yet?)") + ErrUninitialized = errors.New("system is uninitialized (storage format required?)") + ErrLeaderStepUpInProgress = errors.New("leader step-up in progress (try again)") ) // IsNotReady is a convenience function for checking if an error @@ -37,7 +38,8 @@ func IsUnavailable(err error) bool { if err == nil { return false } - return strings.Contains(errors.Cause(err).Error(), ErrRaftUnavail.Error()) + cause := errors.Cause(err).Error() + return strings.Contains(cause, ErrRaftUnavail.Error()) || strings.Contains(cause, ErrLeaderStepUpInProgress.Error()) } // IsEmptyGroupMap returns a boolean indicating whether or not the diff --git a/src/control/system/errors_test.go b/src/control/system/errors_test.go new file mode 100644 index 00000000000..d2ea4eda1ab --- /dev/null +++ b/src/control/system/errors_test.go @@ -0,0 +1,93 @@ +// +// (C) Copyright 2024 Intel Corporation. +// +// SPDX-License-Identifier: BSD-2-Clause-Patent +// + +package system + +import ( + "testing" + + "github.com/pkg/errors" + + "github.com/daos-stack/daos/src/control/common/test" +) + +func TestSystem_Errors_IsNotReady(t *testing.T) { + for name, tc := range map[string]struct { + err error + expResult bool + }{ + "nil": {}, + "uninitialized": { + err: ErrUninitialized, + expResult: true, + }, + "raft not available": { + err: ErrRaftUnavail, + expResult: true, + }, + "leadership transfer in progress": { + err: ErrLeaderStepUpInProgress, + expResult: true, + }, + "something else": { + err: errors.New("something is wrong"), + }, + } { + t.Run(name, func(t *testing.T) { + test.AssertEqual(t, tc.expResult, IsNotReady(tc.err), "") + }) + } +} + +func TestSystem_Errors_IsUninitialized(t *testing.T) { + for name, tc := range map[string]struct { + err error + expResult bool + }{ + "nil": {}, + "uninitialized": { + err: ErrUninitialized, + expResult: true, + }, + "unavailable not uninitialized": { + err: ErrRaftUnavail, + }, + "something else": { + err: errors.New("something is wrong"), + }, + } { + t.Run(name, func(t *testing.T) { + test.AssertEqual(t, tc.expResult, IsUninitialized(tc.err), "") + }) + } +} + +func TestSystem_Errors_IsUnavailable(t *testing.T) { + for name, tc := range map[string]struct { + err error + expResult bool + }{ + "nil": {}, + "raft not available": { + err: ErrRaftUnavail, + expResult: true, + }, + "leadership transfer in progress": { + err: ErrLeaderStepUpInProgress, + expResult: true, + }, + "uninitialized not unavailable": { + err: ErrUninitialized, + }, + "something else": { + err: errors.New("something is wrong"), + }, + } { + t.Run(name, func(t *testing.T) { + test.AssertEqual(t, tc.expResult, IsUnavailable(tc.err), "") + }) + } +} diff --git a/src/control/system/raft/database.go b/src/control/system/raft/database.go index 818f709e5b6..9b4723a3826 100644 --- a/src/control/system/raft/database.go +++ b/src/control/system/raft/database.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2020-2023 Intel Corporation. +// (C) Copyright 2020-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -88,6 +88,7 @@ type ( log logging.Logger cfg *DatabaseConfig initialized atm.Bool + steppingUp atm.Bool replicaAddr *net.TCPAddr raftTransport raft.Transport raft syncRaft @@ -521,26 +522,36 @@ func (db *Database) monitorLeadershipState(parent context.Context) { } db.log.Debugf("node %s gained MS leader state", db.replicaAddr) - if err := db.Barrier(); err != nil { - db.log.Errorf("raft Barrier() failed: %s", err) - if err = db.ResignLeadership(err); err != nil { - db.log.Errorf("raft ResignLeadership() failed: %s", err) - } - continue // restart the monitoring loop - } var gainedCtx context.Context gainedCtx, cancelGainedCtx = context.WithCancel(parent) - for _, fn := range db.onLeadershipGained { - if err := fn(gainedCtx); err != nil { - db.log.Errorf("failure in onLeadershipGained callback: %s", err) - cancelGainedCtx() - if err = db.ResignLeadership(err); err != nil { - db.log.Errorf("raft ResignLeadership() failed: %s", err) - } - break // break out of the inner loop; restart the monitoring loop - } + db.stepUp(gainedCtx, cancelGainedCtx) + } + } +} + +func (db *Database) stepUp(ctx context.Context, cancel context.CancelFunc) { + db.steppingUp.SetTrue() + defer db.steppingUp.SetFalse() + + if err := db.Barrier(); err != nil { + db.log.Errorf("raft Barrier() failed: %s", err) + if err = db.ResignLeadership(err); err != nil { + db.log.Errorf("raft ResignLeadership() failed: %s", err) + } + return // restart the monitoring loop + } + + for i, fn := range db.onLeadershipGained { + db.log.Tracef("executing onLeadershipGained[%d]", i) + + if err := fn(ctx); err != nil { + db.log.Errorf("failure in onLeadershipGained callback: %s", err) + cancel() + if err = db.ResignLeadership(err); err != nil { + db.log.Errorf("raft ResignLeadership() failed: %s", err) } + return } } } diff --git a/src/control/system/raft/mocks.go b/src/control/system/raft/mocks.go index fef8f2eb61f..ef82a3b95b0 100644 --- a/src/control/system/raft/mocks.go +++ b/src/control/system/raft/mocks.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2020-2023 Intel Corporation. +// (C) Copyright 2020-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -31,6 +31,7 @@ type ( ServerAddress raft.ServerAddress State raft.RaftState LeadershipTransferErr error + BarrierReturn raft.Future } mockRaftService struct { cfg mockRaftServiceConfig @@ -85,7 +86,10 @@ func (mrs *mockRaftService) State() raft.RaftState { } func (mrs *mockRaftService) Barrier(time.Duration) raft.Future { - return &mockRaftFuture{} + if mrs.cfg.BarrierReturn == nil { + return &mockRaftFuture{} + } + return mrs.cfg.BarrierReturn } func newMockRaftService(cfg *mockRaftServiceConfig, fsm raft.FSM) *mockRaftService { diff --git a/src/control/system/raft/raft.go b/src/control/system/raft/raft.go index 22906cb2073..ef30c226aa8 100644 --- a/src/control/system/raft/raft.go +++ b/src/control/system/raft/raft.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2020-2022 Intel Corporation. +// (C) Copyright 2020-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -148,10 +148,21 @@ func (db *Database) Barrier() error { db.log.Errorf("lost leadership during Barrier(): %s", err) return errNotSysLeader(svc, db) } - return err + if err != nil { + return err + } + return nil }) } +// WaitForLeaderStepUp waits for all OnLeadershipGained functions to finish executing. +func (db *Database) WaitForLeaderStepUp() { + for db.steppingUp.IsTrue() { + // short interval to keep this polling loop from consuming too many cycles + time.Sleep(10 * time.Millisecond) + } +} + // ShutdownRaft signals that the raft implementation should shut down // and release any resources it is holding. Blocks until the shutdown // is complete. diff --git a/src/control/system/raft/raft_test.go b/src/control/system/raft/raft_test.go new file mode 100644 index 00000000000..f294a1785e0 --- /dev/null +++ b/src/control/system/raft/raft_test.go @@ -0,0 +1,101 @@ +// +// (C) Copyright 2024 Intel Corporation. +// +// SPDX-License-Identifier: BSD-2-Clause-Patent +// + +package raft + +import ( + "testing" + "time" + + "github.com/hashicorp/raft" + "github.com/pkg/errors" + + "github.com/daos-stack/daos/src/control/common/test" + "github.com/daos-stack/daos/src/control/logging" + "github.com/daos-stack/daos/src/control/system" +) + +func TestRaft_Database_Barrier(t *testing.T) { + for name, tc := range map[string]struct { + raftSvcCfg *mockRaftServiceConfig + expErr error + }{ + "raft svc barrier failed": { + raftSvcCfg: &mockRaftServiceConfig{ + BarrierReturn: &mockRaftFuture{ + err: errors.New("mock error"), + }, + }, + expErr: errors.New("mock error"), + }, + "raft svc leadership error": { + raftSvcCfg: &mockRaftServiceConfig{ + BarrierReturn: &mockRaftFuture{ + err: raft.ErrNotLeader, + }, + }, + expErr: &system.ErrNotLeader{}, + }, + "success": { + raftSvcCfg: &mockRaftServiceConfig{}, + }, + } { + t.Run(name, func(t *testing.T) { + log, buf := logging.NewTestLogger(t.Name()) + defer test.ShowBufferOnFailure(t, buf) + + db, err := NewDatabase(log, &DatabaseConfig{}) + if err != nil { + t.Fatal(err) + } + db.raft.setSvc(newMockRaftService(tc.raftSvcCfg, (*fsm)(db))) + db.initialized.SetTrue() + + err = db.Barrier() + test.CmpErr(t, tc.expErr, err) + }) + } +} + +func TestRaft_Database_WaitForLeaderStepUp(t *testing.T) { + for name, tc := range map[string]struct { + stepUpDelay time.Duration + }{ + "no delay": {}, + "10 ms": { + stepUpDelay: 10 * time.Millisecond, + }, + "25 ms": { + stepUpDelay: 25 * time.Millisecond, + }, + "500 ms": { + stepUpDelay: 500 * time.Millisecond, + }, + } { + t.Run(name, func(t *testing.T) { + log, buf := logging.NewTestLogger(t.Name()) + defer test.ShowBufferOnFailure(t, buf) + + db := MockDatabase(t, log) + db.initialized.SetTrue() + db.steppingUp.SetTrue() + fullDelay := tc.stepUpDelay + 10*time.Millisecond + go func() { + time.Sleep(fullDelay) + db.steppingUp.SetFalse() + }() + + start := time.Now() + db.WaitForLeaderStepUp() + duration := time.Since(start) + + test.AssertTrue(t, duration >= tc.stepUpDelay, "") + + // Wiggle room for the total duration: 10ms for loop interval + a little more for potentially slow machines + test.AssertTrue(t, duration <= tc.stepUpDelay+50*time.Millisecond, "") + }) + } +}