Skip to content

Commit

Permalink
DAOS-14707 control: Allow multiple agents simultaneously (#13465)
Browse files Browse the repository at this point in the history
This allows for the case where multiple agents are used to allow
a client connections to multiple systems. The agent log prints a
NOTICE-level message to indicate the presence of other daos_agent
processes on startup.

Signed-off-by: Kris Jacque <kris.jacque@intel.com>
  • Loading branch information
kjacque committed Dec 19, 2023
1 parent e9b4104 commit 70cf75c
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 54 deletions.
5 changes: 2 additions & 3 deletions src/control/cmd/daos_agent/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type startCmd struct {

func (cmd *startCmd) Execute(_ []string) error {
if err := common.CheckDupeProcess(); err != nil {
return err
cmd.Notice(err.Error())
}

cmd.Infof("Starting %s (pid %d)", versionString(), os.Getpid())
Expand Down Expand Up @@ -123,8 +123,7 @@ func (cmd *startCmd) Execute(_ []string) error {
drpcSrvStart := time.Now()
err = drpcServer.Start(hwlocCtx)
if err != nil {
cmd.Errorf("Unable to start socket server on %s: %v", sockPath, err)
return err
return errors.Wrap(err, "unable to start dRPC server")
}
cmd.Debugf("dRPC socket server started: %s", time.Since(drpcSrvStart))

Expand Down
39 changes: 31 additions & 8 deletions src/control/drpc/drpc_server.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2018-2022 Intel Corporation.
// (C) Copyright 2018-2023 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -87,28 +87,51 @@ func (d *DomainSocketServer) Listen(ctx context.Context) {

// Start sets up the dRPC server socket and kicks off the listener goroutine.
func (d *DomainSocketServer) Start(ctx context.Context) error {
// Just in case an old socket file is still lying around
if err := syscall.Unlink(d.sockFile); err != nil && !os.IsNotExist(err) {
return errors.Wrapf(err, "Unable to unlink %s", d.sockFile)
if d == nil {
return errors.New("DomainSocketServer is nil")
}

addr := &net.UnixAddr{Name: d.sockFile, Net: "unixpacket"}
if err := d.checkExistingSocket(ctx, addr); err != nil {
return err
}

lis, err := net.ListenUnix("unixpacket", addr)
if err != nil {
return errors.Wrapf(err, "Unable to listen on unix socket %s", d.sockFile)
return errors.Wrapf(err, "unable to listen on unix socket %s", d.sockFile)
}
d.listener = lis

// The only writer should be the I/O Engines which should be running as the same user as
// daos_server process.
if err := os.Chmod(d.sockFile, d.sockFileMode); err != nil {
return errors.Wrapf(err, "Unable to set permissions on %s", d.sockFile)
return errors.Wrapf(err, "unable to set permissions on %s", d.sockFile)
}

go d.Listen(ctx)
return nil
}

func (d *DomainSocketServer) checkExistingSocket(ctx context.Context, addr *net.UnixAddr) error {
conn, err := net.DialUnix("unixpacket", nil, addr)
if err == nil {
_ = conn.Close()
return FaultSocketFileInUse(d.sockFile)
}

if errors.Is(err, syscall.ENOENT) {
return nil
}

if errors.Is(err, syscall.ECONNREFUSED) {
// File exists but no one is listening - it's safe to delete.
if err := syscall.Unlink(addr.Name); err != nil && !os.IsNotExist(err) {
return errors.Wrap(err, "unlink old socket file")
}
return nil
}

return err
}

// RegisterRPCModule takes a Module and associates it with the given
// DomainSocketServer so it can be used to process incoming dRPC calls.
func (d *DomainSocketServer) RegisterRPCModule(mod Module) {
Expand Down
155 changes: 112 additions & 43 deletions src/control/drpc/drpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"google.golang.org/protobuf/proto"

"github.com/daos-stack/daos/src/control/common/test"
"github.com/daos-stack/daos/src/control/fault"
"github.com/daos-stack/daos/src/control/fault/code"
"github.com/daos-stack/daos/src/control/logging"
)

Expand Down Expand Up @@ -144,52 +146,119 @@ func TestNewDomainSocketServer(t *testing.T) {
test.AssertEqual(t, dss.sockFile, expectedSock, "wrong sockfile")
}

func TestServer_Start_CantUnlinkSocket(t *testing.T) {
log, buf := logging.NewTestLogger(t.Name())
defer test.ShowBufferOnFailure(t, buf)

tmpDir, tmpCleanup := test.CreateTestDir(t)
defer tmpCleanup()

path := filepath.Join(tmpDir, "test.sock")

// Forbid searching the directory
if err := os.Chmod(tmpDir, 0000); err != nil {
t.Fatalf("Couldn't change permissions on dir: %v", err)
func TestDrpc_DomainSocketServer_Start(t *testing.T) {
sockPath := func(dir string) string {
return filepath.Join(dir, "test.sock")
}
defer func() {
_ = os.Chmod(tmpDir, 0700)
}()

dss, _ := NewDomainSocketServer(log, path, testFileMode)

err := dss.Start(test.Context(t))

test.CmpErr(t, errors.New("unlink"), err)
}

func TestServer_Start_CantListen(t *testing.T) {
log, buf := logging.NewTestLogger(t.Name())
defer test.ShowBufferOnFailure(t, buf)

tmpDir, tmpCleanup := test.CreateTestDir(t)
defer tmpCleanup()

path := filepath.Join(tmpDir, "test.sock")

// Forbid writing the directory
if err := os.Chmod(tmpDir, 0500); err != nil {
t.Fatalf("Couldn't change permissions on dir: %v", err)
for name, tc := range map[string]struct {
nilServer bool
setup func(t *testing.T, dir string) func()
expErr error
}{
"nil": {
nilServer: true,
expErr: errors.New("nil"),
},
"unused existing socket file": {
setup: func(t *testing.T, dir string) func() {
t.Helper()

f, err := os.Create(sockPath(dir))
if err != nil {
t.Fatal(err)
}
_ = f.Close()
return func() {}
},
},
"can't unlink old socket file": {
setup: func(t *testing.T, dir string) func() {
t.Helper()

sockFile := sockPath(dir)
f, err := os.Create(sockFile)
if err != nil {
t.Fatal(err)
}
_ = f.Close()

if err := os.Chmod(dir, 0500); err != nil {
t.Fatalf("Couldn't change permissions on dir: %v", err)
}
return func() {
_ = os.Chmod(dir, 0700)
}
},
expErr: errors.New("unlink"),
},
"socket file in use": {
setup: func(t *testing.T, dir string) func() {
t.Helper()
log, buf := logging.NewTestLogger(t.Name())
defer test.ShowBufferOnFailure(t, buf)

other, err := NewDomainSocketServer(log, sockPath(dir), testFileMode)
if err != nil {
t.Fatalf("can't create first server: %s", err.Error())
}

err = other.Start(test.Context(t))
if err != nil {
t.Fatalf("can't start up first server: %s", err.Error())
}

// NB: The started server is shut down when the test context is canceled.
return func() {}
},
expErr: FaultSocketFileInUse(""),
},
"listen fails": {
setup: func(t *testing.T, dir string) func() {
t.Helper()

if err := os.Chmod(dir, 0500); err != nil {
t.Fatalf("Couldn't change permissions on dir: %v", err)
}
return func() {
_ = os.Chmod(dir, 0700)
}
},
expErr: errors.New("listen"),
},
"success": {},
} {
t.Run(name, func(t *testing.T) {
log, buf := logging.NewTestLogger(t.Name())
defer test.ShowBufferOnFailure(t, buf)

tmpDir, tmpCleanup := test.CreateTestDir(t)
defer tmpCleanup()

if tc.setup != nil {
teardown := tc.setup(t, tmpDir)
defer teardown()
}

// Test hack - make sure the right path is included in the fault message for comparison
if fault.IsFaultCode(tc.expErr, code.SocketFileInUse) {
tc.expErr = FaultSocketFileInUse(sockPath(tmpDir))
}

var err error
var dss *DomainSocketServer
if !tc.nilServer {
dss, err = NewDomainSocketServer(log, sockPath(tmpDir), testFileMode)
if err != nil {
t.Fatal(err)
}
}

err = dss.Start(test.Context(t))

test.CmpErr(t, tc.expErr, err)
})
}
defer func() {
_ = os.Chmod(tmpDir, 0700)
}()

dss, _ := NewDomainSocketServer(log, path, testFileMode)

err := dss.Start(test.Context(t))

test.CmpErr(t, errors.New("listen"), err)
}

func TestServer_RegisterModule(t *testing.T) {
Expand Down
27 changes: 27 additions & 0 deletions src/control/drpc/fault.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
//
// (C) Copyright 2023 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//

package drpc

import (
"fmt"

"github.com/daos-stack/daos/src/control/fault"
"github.com/daos-stack/daos/src/control/fault/code"
)

// FaultSocketFileInUse indicates that the dRPC socket file was already in use when we tried
// to start the dRPC server.
func FaultSocketFileInUse(path string) *fault.Fault {
return &fault.Fault{
Domain: "drpc",
Code: code.SocketFileInUse,
Description: fmt.Sprintf("Configured dRPC socket file '%s' is already in use.", path),
Reason: "dRPC socket file already in use",
Resolution: "If another process is using the socket file, configure a different socket directory. " +
"Otherwise, delete the existing socket file and try again.",
}
}
1 change: 1 addition & 0 deletions src/control/fault/code/codes.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
PrivilegedHelperNotPrivileged
PrivilegedHelperNotAvailable
PrivilegedHelperRequestFailed
SocketFileInUse
)

// generic storage fault codes
Expand Down
6 changes: 6 additions & 0 deletions utils/test_memcheck.supp
Original file line number Diff line number Diff line change
Expand Up @@ -383,3 +383,9 @@
...
fun:mdb_txn_commit
}
{
go_runtime_syscall_param
Memcheck:Param
write(buf)
fun:runtime/internal/syscall.Syscall6
}

0 comments on commit 70cf75c

Please sign in to comment.