Skip to content

Commit

Permalink
DAOS-8331 client: Export client metrics via agent
Browse files Browse the repository at this point in the history
Adds new agent config parameters and code to
optionally export client metrics in Prometheus
format.

Example daos_agent.yml updates:
  telemetry_port: 9192 # export on port 9192
  telemetry_retain: 5m # retain metrics for 5 minutes
                       # after client exit

Run-GHA: true
Change-Id: I77864682cc19fa4c33f326d879e20704ef57a7ea
Required-githooks: true
Signed-off-by: Michael MacDonald <mjmac@google.com>
  • Loading branch information
mjmac committed Feb 21, 2024
1 parent 11c411f commit 9894532
Show file tree
Hide file tree
Showing 27 changed files with 1,893 additions and 741 deletions.
17 changes: 13 additions & 4 deletions src/client/api/metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ bool daos_client_metric_retain;
int
dc_tm_init(void)
{
int metrics_tag;
pid_t pid;
int rc;
struct d_tm_node_t *started_at;
int metrics_tag;
pid_t pid;
int rc;

d_getenv_bool(DAOS_CLIENT_METRICS_ENABLE, &daos_client_metric);
if (!daos_client_metric)
Expand Down Expand Up @@ -64,10 +65,18 @@ dc_tm_init(void)
rc = d_tm_add_ephemeral_dir(NULL, MAX_IDS_SIZE(INIT_JOB_NUM), "%s/%u",
dc_jobid, pid);
if (rc != 0) {
DL_ERROR(rc, "add metric %s/%u failed.\n", dc_jobid, pid);
DL_ERROR(rc, "add metric %s/%u failed.", dc_jobid, pid);
D_GOTO(out, rc);
}

rc = d_tm_add_metric(&started_at, D_TM_TIMESTAMP, "Timestamp of client startup", NULL,
"%s/%u/%s", dc_jobid, pid, "started_at");
if (rc != 0) {
DL_ERROR(rc, "add metric %s/%u/started_at failed.", dc_jobid, pid);
D_GOTO(out, rc);
}

d_tm_record_timestamp(started_at);
out:
if (rc)
d_tm_fini();
Expand Down
1 change: 1 addition & 0 deletions src/common/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ COMMON_FILES = ['debug.c', 'mem.c', 'fail_loc.c', 'lru.c',
'cipher.c', 'cipher_isal.c', 'qat.c', 'fault_domain.c',
'tls.c', 'metrics.c']


def build_daos_common(denv, client):
""" Building non-pmem version for client's common lib"""
benv = denv.Clone()
Expand Down
13 changes: 12 additions & 1 deletion src/control/cmd/daos_agent/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2020-2023 Intel Corporation.
// (C) Copyright 2020-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -54,6 +54,13 @@ type Config struct {
DisableAutoEvict bool `yaml:"disable_auto_evict,omitempty"`
ExcludeFabricIfaces common.StringSet `yaml:"exclude_fabric_ifaces,omitempty"`
FabricInterfaces []*NUMAFabricConfig `yaml:"fabric_ifaces,omitempty"`
TelemetryPort int `yaml:"telemetry_port,omitempty"`
TelemetryRetain time.Duration `yaml:"telemetry_retain,omitempty"`
}

// TelemetryEnabled returns true if client telemetry collection/export is enabled.
func (c *Config) TelemetryEnabled() bool {
return c.TelemetryPort > 0
}

// NUMAFabricConfig defines a list of fabric interfaces that belong to a NUMA
Expand Down Expand Up @@ -88,6 +95,10 @@ func LoadConfig(cfgPath string) (*Config, error) {
return nil, fmt.Errorf("invalid system name: %s", cfg.SystemName)
}

if cfg.TelemetryRetain > 0 && cfg.TelemetryPort == 0 {
return nil, errors.New("telemetry_retain requires telemetry_port")
}

return cfg, nil
}

Expand Down
75 changes: 59 additions & 16 deletions src/control/cmd/daos_agent/infocache.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2020-2023 Intel Corporation.
// (C) Copyright 2020-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand All @@ -8,6 +8,7 @@ package main

import (
"context"
"fmt"
"net"
"strings"
"sync"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/daos-stack/daos/src/control/lib/control"
"github.com/daos-stack/daos/src/control/lib/hardware"
"github.com/daos-stack/daos/src/control/lib/hardware/hwprov"
"github.com/daos-stack/daos/src/control/lib/telemetry"
"github.com/daos-stack/daos/src/control/logging"
)

Expand All @@ -36,17 +38,20 @@ type fabricScanFn func(ctx context.Context, providers ...string) (*NUMAFabric, e
// NewInfoCache creates a new InfoCache with appropriate parameters set.
func NewInfoCache(ctx context.Context, log logging.Logger, client control.UnaryInvoker, cfg *Config) *InfoCache {
ic := &InfoCache{
log: log,
ignoreIfaces: cfg.ExcludeFabricIfaces,
client: client,
cache: cache.NewItemCache(log),
getAttachInfo: control.GetAttachInfo,
fabricScan: getFabricScanFn(log, cfg, hwprov.DefaultFabricScanner(log)),
netIfaces: net.Interfaces,
devClassGetter: hwprov.DefaultNetDevClassProvider(log),
devStateGetter: hwprov.DefaultNetDevStateProvider(log),
log: log,
ignoreIfaces: cfg.ExcludeFabricIfaces,
client: client,
cache: cache.NewItemCache(log),
getAttachInfoCb: control.GetAttachInfo,
fabricScan: getFabricScanFn(log, cfg, hwprov.DefaultFabricScanner(log)),
netIfaces: net.Interfaces,
devClassGetter: hwprov.DefaultNetDevClassProvider(log),
devStateGetter: hwprov.DefaultNetDevStateProvider(log),
}

ic.clientTelemetryEnabled.Store(cfg.TelemetryEnabled())
ic.clientTelemetryRetain.Store(cfg.TelemetryRetain > 0)

if cfg.DisableCache {
ic.DisableAttachInfoCache()
ic.DisableFabricCache()
Expand Down Expand Up @@ -198,12 +203,14 @@ type InfoCache struct {
cache *cache.ItemCache
fabricCacheDisabled atm.Bool
attachInfoCacheDisabled atm.Bool
clientTelemetryEnabled atm.Bool
clientTelemetryRetain atm.Bool

getAttachInfo getAttachInfoFn
fabricScan fabricScanFn
netIfaces func() ([]net.Interface, error)
devClassGetter hardware.NetDevClassProvider
devStateGetter hardware.NetDevStateProvider
getAttachInfoCb getAttachInfoFn
fabricScan fabricScanFn
netIfaces func() ([]net.Interface, error)
devClassGetter hardware.NetDevClassProvider
devStateGetter hardware.NetDevStateProvider

client control.UnaryInvoker
attachInfoRefresh time.Duration
Expand Down Expand Up @@ -292,6 +299,41 @@ func (c *InfoCache) EnableStaticFabricCache(ctx context.Context, nf *NUMAFabric)
c.EnableFabricCache()
}

func (c *InfoCache) getAttachInfo(ctx context.Context, rpcClient control.UnaryInvoker, req *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) {
if c == nil {
return nil, errors.New("InfoCache is nil")
}
if c.getAttachInfoCb == nil {
return nil, errors.New("getAttachInfoFn is nil")
}

resp, err := c.getAttachInfoCb(ctx, rpcClient, req)
if err != nil {
return nil, err
}
c.adjustAttachInfo(resp)
return resp, nil
}

// adjustAttachInfo performs any necessary adjustments to the attach info
// before returning it.
func (c *InfoCache) adjustAttachInfo(resp *control.GetAttachInfoResp) {
if c == nil || resp == nil {
return
}

if c.clientTelemetryEnabled.IsTrue() {
resp.ClientNetHint.EnvVars = append(resp.ClientNetHint.EnvVars,
fmt.Sprintf("%s=1", telemetry.ClientMetricsEnabledEnv),
)
if c.clientTelemetryRetain.IsTrue() {
resp.ClientNetHint.EnvVars = append(resp.ClientNetHint.EnvVars,
fmt.Sprintf("%s=1", telemetry.ClientMetricsRetainEnv),
)
}
}
}

// GetAttachInfo fetches the attach info from the cache, and refreshes if necessary.
func (c *InfoCache) GetAttachInfo(ctx context.Context, sys string) (*control.GetAttachInfoResp, error) {
if c == nil {
Expand All @@ -308,7 +350,8 @@ func (c *InfoCache) GetAttachInfo(ctx context.Context, sys string) (*control.Get
}
createItem := func() (cache.Item, error) {
c.log.Debugf("cache miss for %s", sysAttachInfoKey(sys))
return newCachedAttachInfo(c.attachInfoRefresh, sys, c.client, c.getAttachInfo), nil
cai := newCachedAttachInfo(c.attachInfoRefresh, sys, c.client, c.getAttachInfo)
return cai, nil
}

item, release, err := c.cache.GetOrCreate(ctx, sysAttachInfoKey(sys), createItem)
Expand Down
86 changes: 68 additions & 18 deletions src/control/cmd/daos_agent/infocache_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2020-2023 Intel Corporation.
// (C) Copyright 2020-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand All @@ -8,20 +8,23 @@ package main

import (
"context"
"fmt"
"net"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/pkg/errors"

"github.com/daos-stack/daos/src/control/build"
"github.com/daos-stack/daos/src/control/common"
"github.com/daos-stack/daos/src/control/common/test"
"github.com/daos-stack/daos/src/control/lib/cache"
"github.com/daos-stack/daos/src/control/lib/control"
"github.com/daos-stack/daos/src/control/lib/hardware"
"github.com/daos-stack/daos/src/control/lib/telemetry"
"github.com/daos-stack/daos/src/control/logging"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/pkg/errors"
)

type testInfoCacheParams struct {
Expand All @@ -32,6 +35,8 @@ type testInfoCacheParams struct {
mockNetDevStateGetter hardware.NetDevStateProvider
disableFabricCache bool
disableAttachInfoCache bool
enableClientTelemetry bool
retainClientTelemetry bool
ctlInvoker control.Invoker
cachedItems []cache.Item
}
Expand All @@ -43,16 +48,19 @@ func newTestInfoCache(t *testing.T, log logging.Logger, params testInfoCachePara
}

ic := &InfoCache{
log: log,
getAttachInfo: params.mockGetAttachInfo,
fabricScan: params.mockScanFabric,
devClassGetter: params.mockNetDevClassGetter,
devStateGetter: params.mockNetDevStateGetter,
netIfaces: params.mockNetIfaces,
client: params.ctlInvoker,
cache: c,
log: log,
getAttachInfoCb: params.mockGetAttachInfo,
fabricScan: params.mockScanFabric,
devClassGetter: params.mockNetDevClassGetter,
devStateGetter: params.mockNetDevStateGetter,
netIfaces: params.mockNetIfaces,
client: params.ctlInvoker,
cache: c,
}

ic.clientTelemetryEnabled.Store(params.enableClientTelemetry)
ic.clientTelemetryRetain.Store(params.retainClientTelemetry)

if ic.netIfaces == nil {
ic.netIfaces = func() ([]net.Interface, error) {
return []net.Interface{
Expand Down Expand Up @@ -714,6 +722,14 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {
NetDevClass: uint32(hardware.Ether),
},
}
telemEnabledResp := copyGetAttachInfoResp(ctlResp)
telemEnabledResp.ClientNetHint.EnvVars = append(telemEnabledResp.ClientNetHint.EnvVars,
fmt.Sprintf("%s=1", telemetry.ClientMetricsEnabledEnv),
)
telemRetainedResp := copyGetAttachInfoResp(telemEnabledResp)
telemRetainedResp.ClientNetHint.EnvVars = append(telemRetainedResp.ClientNetHint.EnvVars,
fmt.Sprintf("%s=1", telemetry.ClientMetricsRetainEnv),
)

for name, tc := range map[string]struct {
getInfoCache func(logging.Logger) *InfoCache
Expand All @@ -734,7 +750,7 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {
disableAttachInfoCache: true,
})
},
remoteResp: ctlResp,
remoteResp: copyGetAttachInfoResp(ctlResp),
expResp: ctlResp,
expRemote: true,
},
Expand All @@ -748,11 +764,45 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {
expErr: errors.New("mock remote"),
expRemote: true,
},
"cache disabled; client telemetry enabled": {
getInfoCache: func(l logging.Logger) *InfoCache {
return newTestInfoCache(t, l, testInfoCacheParams{
disableAttachInfoCache: true,
enableClientTelemetry: true,
})
},
remoteResp: copyGetAttachInfoResp(ctlResp),
expResp: telemEnabledResp,
expRemote: true,
},
"cache enabled; client telemetry enabled": {
getInfoCache: func(l logging.Logger) *InfoCache {
return newTestInfoCache(t, l, testInfoCacheParams{
enableClientTelemetry: true,
})
},
remoteResp: copyGetAttachInfoResp(ctlResp),
expResp: telemEnabledResp,
expRemote: true,
expCached: true,
},
"cache enabled; client telemetry enabled; client telemetry retained": {
getInfoCache: func(l logging.Logger) *InfoCache {
return newTestInfoCache(t, l, testInfoCacheParams{
enableClientTelemetry: true,
retainClientTelemetry: true,
})
},
remoteResp: copyGetAttachInfoResp(ctlResp),
expResp: telemRetainedResp,
expRemote: true,
expCached: true,
},
"enabled but empty": {
getInfoCache: func(l logging.Logger) *InfoCache {
return newTestInfoCache(t, l, testInfoCacheParams{})
},
remoteResp: ctlResp,
remoteResp: copyGetAttachInfoResp(ctlResp),
expResp: ctlResp,
expRemote: true,
expCached: true,
Expand All @@ -772,7 +822,7 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {
fetch: func(_ context.Context, _ control.UnaryInvoker, _ *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) {
return nil, errors.New("shouldn't call cached remote")
},
lastResponse: ctlResp,
lastResponse: copyGetAttachInfoResp(ctlResp),
cacheItem: cacheItem{lastCached: time.Now()},
system: "test",
})
Expand All @@ -790,7 +840,7 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {
fetch: func(_ context.Context, _ control.UnaryInvoker, _ *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) {
return nil, errors.New("shouldn't call cached remote")
},
lastResponse: ctlResp,
lastResponse: copyGetAttachInfoResp(ctlResp),
cacheItem: cacheItem{lastCached: time.Now()},
system: build.DefaultSystemName,
})
Expand All @@ -814,7 +864,7 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {
return ic
},
system: "somethingelse",
remoteResp: ctlResp,
remoteResp: copyGetAttachInfoResp(ctlResp),
expResp: ctlResp,
expCached: true,
expRemote: true,
Expand All @@ -831,7 +881,7 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {

calledRemote := false
if ic != nil {
ic.getAttachInfo = func(_ context.Context, _ control.UnaryInvoker, _ *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) {
ic.getAttachInfoCb = func(_ context.Context, _ control.UnaryInvoker, _ *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) {
calledRemote = true
return tc.remoteResp, tc.remoteErr
}
Expand Down
Loading

0 comments on commit 9894532

Please sign in to comment.