Skip to content

Commit

Permalink
DAOS-8331 client: Export client metrics via agent
Browse files Browse the repository at this point in the history
Adds new agent config parameters and code to
optionally export client metrics in Prometheus
format.

Example daos_agent.yml updates:
  telemetry_port: 9192 # export on port 9192
  telemetry_retain: 5m # retain metrics for 5 minutes
                       # after client exit

Change-Id: I77864682cc19fa4c33f326d879e20704ef57a7ea
Required-githooks: true
Signed-off-by: Michael MacDonald <mjmac@google.com>
  • Loading branch information
mjmac committed Jan 4, 2024
1 parent 6434de4 commit 7a6fb84
Show file tree
Hide file tree
Showing 23 changed files with 1,820 additions and 728 deletions.
17 changes: 13 additions & 4 deletions src/client/api/metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ bool daos_client_metric_retain;
int
dc_tm_init(void)
{
int metrics_tag;
pid_t pid;
int rc;
struct d_tm_node_t *started_at;
int metrics_tag;
pid_t pid;
int rc;

d_getenv_bool(DAOS_CLIENT_METRICS_ENV, &daos_client_metric);
if (!daos_client_metric)
Expand All @@ -55,14 +56,22 @@ dc_tm_init(void)
pid = getpid();
D_INFO("INIT %s/%u metrics\n", dc_jobid, pid);

/** create new shmem space for per-pool metrics */
/** create new shmem space for per-client metrics */
rc = d_tm_add_ephemeral_dir(NULL, MAX_IDS_SIZE(INIT_JOB_NUM), "%s/%u",
dc_jobid, pid);
if (rc != 0) {
DL_ERROR(rc, "add metric %s/%u failed.\n", dc_jobid, pid);

Check warning on line 63 in src/client/api/metrics.c

View workflow job for this annotation

GitHub Actions / Logging macro checking

check-return, Line contains too many newlines
D_GOTO(out, rc);
}

rc = d_tm_add_metric(&started_at, D_TM_TIMESTAMP, "Timestamp of client startup", NULL,
"%s/%u/%s", dc_jobid, pid, "started_at");
if (rc != 0) {
DL_ERROR(rc, "add metric %s/%u/started_at failed.", dc_jobid, pid);
D_GOTO(out, rc);
}

d_tm_record_timestamp(started_at);
out:
if (rc)
d_tm_fini();
Expand Down
13 changes: 12 additions & 1 deletion src/control/cmd/daos_agent/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2020-2023 Intel Corporation.
// (C) Copyright 2020-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -55,6 +55,13 @@ type Config struct {
DisableAutoEvict bool `yaml:"disable_auto_evict,omitempty"`
ExcludeFabricIfaces common.StringSet `yaml:"exclude_fabric_ifaces,omitempty"`
FabricInterfaces []*NUMAFabricConfig `yaml:"fabric_ifaces,omitempty"`
TelemetryPort int `yaml:"telemetry_port,omitempty"`
TelemetryRetain time.Duration `yaml:"telemetry_retain,omitempty"`
}

// TelemetryEnabled returns true if client telemetry collection/export is enabled.
func (c *Config) TelemetryEnabled() bool {
return c.TelemetryPort > 0
}

// NUMAFabricConfig defines a list of fabric interfaces that belong to a NUMA
Expand Down Expand Up @@ -89,6 +96,10 @@ func LoadConfig(cfgPath string) (*Config, error) {
return nil, fmt.Errorf("invalid system name: %q", cfg.SystemName)
}

if cfg.TelemetryRetain > 0 && cfg.TelemetryPort == 0 {
return nil, errors.New("telemetry_retain requires telemetry_port")
}

return cfg, nil
}

Expand Down
75 changes: 59 additions & 16 deletions src/control/cmd/daos_agent/infocache.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2020-2023 Intel Corporation.
// (C) Copyright 2020-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand All @@ -8,6 +8,7 @@ package main

import (
"context"
"fmt"
"net"
"strings"
"sync"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/daos-stack/daos/src/control/lib/control"
"github.com/daos-stack/daos/src/control/lib/hardware"
"github.com/daos-stack/daos/src/control/lib/hardware/hwprov"
"github.com/daos-stack/daos/src/control/lib/telemetry"
"github.com/daos-stack/daos/src/control/logging"
)

Expand All @@ -36,17 +38,20 @@ type fabricScanFn func(ctx context.Context, providers ...string) (*NUMAFabric, e
// NewInfoCache creates a new InfoCache with appropriate parameters set.
func NewInfoCache(ctx context.Context, log logging.Logger, client control.UnaryInvoker, cfg *Config) *InfoCache {
ic := &InfoCache{
log: log,
ignoreIfaces: cfg.ExcludeFabricIfaces,
client: client,
cache: cache.NewItemCache(log),
getAttachInfo: control.GetAttachInfo,
fabricScan: getFabricScanFn(log, cfg, hwprov.DefaultFabricScanner(log)),
netIfaces: net.Interfaces,
devClassGetter: hwprov.DefaultNetDevClassProvider(log),
devStateGetter: hwprov.DefaultNetDevStateProvider(log),
log: log,
ignoreIfaces: cfg.ExcludeFabricIfaces,
client: client,
cache: cache.NewItemCache(log),
getAttachInfoCb: control.GetAttachInfo,
fabricScan: getFabricScanFn(log, cfg, hwprov.DefaultFabricScanner(log)),
netIfaces: net.Interfaces,
devClassGetter: hwprov.DefaultNetDevClassProvider(log),
devStateGetter: hwprov.DefaultNetDevStateProvider(log),
}

ic.clientTelemetryEnabled.Store(cfg.TelemetryEnabled())
ic.clientTelemetryRetain.Store(cfg.TelemetryRetain > 0)

if cfg.DisableCache {
ic.DisableAttachInfoCache()
ic.DisableFabricCache()
Expand Down Expand Up @@ -198,12 +203,14 @@ type InfoCache struct {
cache *cache.ItemCache
fabricCacheDisabled atm.Bool
attachInfoCacheDisabled atm.Bool
clientTelemetryEnabled atm.Bool
clientTelemetryRetain atm.Bool

getAttachInfo getAttachInfoFn
fabricScan fabricScanFn
netIfaces func() ([]net.Interface, error)
devClassGetter hardware.NetDevClassProvider
devStateGetter hardware.NetDevStateProvider
getAttachInfoCb getAttachInfoFn
fabricScan fabricScanFn
netIfaces func() ([]net.Interface, error)
devClassGetter hardware.NetDevClassProvider
devStateGetter hardware.NetDevStateProvider

client control.UnaryInvoker
attachInfoRefresh time.Duration
Expand Down Expand Up @@ -292,6 +299,41 @@ func (c *InfoCache) EnableStaticFabricCache(ctx context.Context, nf *NUMAFabric)
c.EnableFabricCache()
}

func (c *InfoCache) getAttachInfo(ctx context.Context, rpcClient control.UnaryInvoker, req *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) {
if c == nil {
return nil, errors.New("InfoCache is nil")
}
if c.getAttachInfoCb == nil {
return nil, errors.New("getAttachInfoFn is nil")
}

resp, err := c.getAttachInfoCb(ctx, rpcClient, req)
if err != nil {
return nil, err
}
c.adjustAttachInfo(resp)
return resp, nil
}

// adjustAttachInfo performs any necessary adjustments to the attach info
// before returning it.
func (c *InfoCache) adjustAttachInfo(resp *control.GetAttachInfoResp) {
if c == nil || resp == nil {
return
}

if c.clientTelemetryEnabled.IsTrue() {
resp.ClientNetHint.EnvVars = append(resp.ClientNetHint.EnvVars,
fmt.Sprintf("%s=1", telemetry.ClientMetricsEnabledEnv),
)
if c.clientTelemetryRetain.IsTrue() {
resp.ClientNetHint.EnvVars = append(resp.ClientNetHint.EnvVars,
fmt.Sprintf("%s=1", telemetry.ClientMetricsRetainEnv),
)
}
}
}

// GetAttachInfo fetches the attach info from the cache, and refreshes if necessary.
func (c *InfoCache) GetAttachInfo(ctx context.Context, sys string) (*control.GetAttachInfoResp, error) {
if c == nil {
Expand All @@ -308,7 +350,8 @@ func (c *InfoCache) GetAttachInfo(ctx context.Context, sys string) (*control.Get
}
createItem := func() (cache.Item, error) {
c.log.Debugf("cache miss for %s", sysAttachInfoKey(sys))
return newCachedAttachInfo(c.attachInfoRefresh, sys, c.client, c.getAttachInfo), nil
cai := newCachedAttachInfo(c.attachInfoRefresh, sys, c.client, c.getAttachInfo)
return cai, nil
}

item, release, err := c.cache.GetOrCreate(ctx, sysAttachInfoKey(sys), createItem)
Expand Down
86 changes: 68 additions & 18 deletions src/control/cmd/daos_agent/infocache_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2020-2023 Intel Corporation.
// (C) Copyright 2020-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand All @@ -8,20 +8,23 @@ package main

import (
"context"
"fmt"
"net"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/pkg/errors"

"github.com/daos-stack/daos/src/control/build"
"github.com/daos-stack/daos/src/control/common"
"github.com/daos-stack/daos/src/control/common/test"
"github.com/daos-stack/daos/src/control/lib/cache"
"github.com/daos-stack/daos/src/control/lib/control"
"github.com/daos-stack/daos/src/control/lib/hardware"
"github.com/daos-stack/daos/src/control/lib/telemetry"
"github.com/daos-stack/daos/src/control/logging"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/pkg/errors"
)

type testInfoCacheParams struct {
Expand All @@ -32,6 +35,8 @@ type testInfoCacheParams struct {
mockNetDevStateGetter hardware.NetDevStateProvider
disableFabricCache bool
disableAttachInfoCache bool
enableClientTelemetry bool
retainClientTelemetry bool
ctlInvoker control.Invoker
cachedItems []cache.Item
}
Expand All @@ -43,16 +48,19 @@ func newTestInfoCache(t *testing.T, log logging.Logger, params testInfoCachePara
}

ic := &InfoCache{
log: log,
getAttachInfo: params.mockGetAttachInfo,
fabricScan: params.mockScanFabric,
devClassGetter: params.mockNetDevClassGetter,
devStateGetter: params.mockNetDevStateGetter,
netIfaces: params.mockNetIfaces,
client: params.ctlInvoker,
cache: c,
log: log,
getAttachInfoCb: params.mockGetAttachInfo,
fabricScan: params.mockScanFabric,
devClassGetter: params.mockNetDevClassGetter,
devStateGetter: params.mockNetDevStateGetter,
netIfaces: params.mockNetIfaces,
client: params.ctlInvoker,
cache: c,
}

ic.clientTelemetryEnabled.Store(params.enableClientTelemetry)
ic.clientTelemetryRetain.Store(params.retainClientTelemetry)

if ic.netIfaces == nil {
ic.netIfaces = func() ([]net.Interface, error) {
return []net.Interface{
Expand Down Expand Up @@ -714,6 +722,14 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {
NetDevClass: uint32(hardware.Ether),
},
}
telemEnabledResp := copyGetAttachInfoResp(ctlResp)
telemEnabledResp.ClientNetHint.EnvVars = append(telemEnabledResp.ClientNetHint.EnvVars,
fmt.Sprintf("%s=1", telemetry.ClientMetricsEnabledEnv),
)
telemRetainedResp := copyGetAttachInfoResp(telemEnabledResp)
telemRetainedResp.ClientNetHint.EnvVars = append(telemRetainedResp.ClientNetHint.EnvVars,
fmt.Sprintf("%s=1", telemetry.ClientMetricsRetainEnv),
)

for name, tc := range map[string]struct {
getInfoCache func(logging.Logger) *InfoCache
Expand All @@ -734,7 +750,7 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {
disableAttachInfoCache: true,
})
},
remoteResp: ctlResp,
remoteResp: copyGetAttachInfoResp(ctlResp),
expResp: ctlResp,
expRemote: true,
},
Expand All @@ -748,11 +764,45 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {
expErr: errors.New("mock remote"),
expRemote: true,
},
"cache disabled; client telemetry enabled": {
getInfoCache: func(l logging.Logger) *InfoCache {
return newTestInfoCache(t, l, testInfoCacheParams{
disableAttachInfoCache: true,
enableClientTelemetry: true,
})
},
remoteResp: copyGetAttachInfoResp(ctlResp),
expResp: telemEnabledResp,
expRemote: true,
},
"cache enabled; client telemetry enabled": {
getInfoCache: func(l logging.Logger) *InfoCache {
return newTestInfoCache(t, l, testInfoCacheParams{
enableClientTelemetry: true,
})
},
remoteResp: copyGetAttachInfoResp(ctlResp),
expResp: telemEnabledResp,
expRemote: true,
expCached: true,
},
"cache enabled; client telemetry enabled; client telemetry retained": {
getInfoCache: func(l logging.Logger) *InfoCache {
return newTestInfoCache(t, l, testInfoCacheParams{
enableClientTelemetry: true,
retainClientTelemetry: true,
})
},
remoteResp: copyGetAttachInfoResp(ctlResp),
expResp: telemRetainedResp,
expRemote: true,
expCached: true,
},
"enabled but empty": {
getInfoCache: func(l logging.Logger) *InfoCache {
return newTestInfoCache(t, l, testInfoCacheParams{})
},
remoteResp: ctlResp,
remoteResp: copyGetAttachInfoResp(ctlResp),
expResp: ctlResp,
expRemote: true,
expCached: true,
Expand All @@ -772,7 +822,7 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {
fetch: func(_ context.Context, _ control.UnaryInvoker, _ *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) {
return nil, errors.New("shouldn't call cached remote")
},
lastResponse: ctlResp,
lastResponse: copyGetAttachInfoResp(ctlResp),
cacheItem: cacheItem{lastCached: time.Now()},
system: "test",
})
Expand All @@ -790,7 +840,7 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {
fetch: func(_ context.Context, _ control.UnaryInvoker, _ *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) {
return nil, errors.New("shouldn't call cached remote")
},
lastResponse: ctlResp,
lastResponse: copyGetAttachInfoResp(ctlResp),
cacheItem: cacheItem{lastCached: time.Now()},
system: build.DefaultSystemName,
})
Expand All @@ -814,7 +864,7 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {
return ic
},
system: "somethingelse",
remoteResp: ctlResp,
remoteResp: copyGetAttachInfoResp(ctlResp),
expResp: ctlResp,
expCached: true,
expRemote: true,
Expand All @@ -831,7 +881,7 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {

calledRemote := false
if ic != nil {
ic.getAttachInfo = func(_ context.Context, _ control.UnaryInvoker, _ *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) {
ic.getAttachInfoCb = func(_ context.Context, _ control.UnaryInvoker, _ *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) {
calledRemote = true
return tc.remoteResp, tc.remoteErr
}
Expand Down
12 changes: 11 additions & 1 deletion src/control/cmd/daos_agent/start.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2020-2023 Intel Corporation.
// (C) Copyright 2020-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -127,6 +127,16 @@ func (cmd *startCmd) Execute(_ []string) error {
}
cmd.Debugf("dRPC socket server started: %s", time.Since(drpcSrvStart))

if cmd.cfg.TelemetryEnabled() {
telemetryStart := time.Now()
shutdown, err := startPrometheusExporter(ctx, cmd, cmd.cfg)
if err != nil {
return errors.Wrap(err, "unable to start prometheus exporter")
}
defer shutdown()
cmd.Debugf("telemetry exporter started: %s", time.Since(telemetryStart))
}

cmd.Debugf("startup complete in %s", time.Since(startedAt))
cmd.Infof("%s (pid %d) listening on %s", versionString(), os.Getpid(), sockPath)
if err := systemd.Ready(); err != nil && err != systemd.ErrSdNotifyNoSocket {
Expand Down
Loading

0 comments on commit 7a6fb84

Please sign in to comment.