Skip to content

Commit

Permalink
[chore][receiver/zookeeper] Refactor to separate connection/request f…
Browse files Browse the repository at this point in the history
…rom metric processing (open-telemetry#22752)

[chore][receiver/zookeeper] Refactor to separate connection/request logic from metric processing
  • Loading branch information
djaglowski authored and Caleb-Hurshman committed Jul 6, 2023
1 parent 4874814 commit 3e8e07c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 17 deletions.
37 changes: 24 additions & 13 deletions receiver/zookeeperreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,45 +77,57 @@ func (z *zookeeperMetricsScraper) scrape(ctx context.Context) (pmetric.Metrics,
var ctxWithTimeout context.Context
ctxWithTimeout, z.cancel = context.WithTimeout(ctx, z.config.Timeout)

response, err := z.runCommand(ctxWithTimeout, "mntr")
if err != nil {
return pmetric.NewMetrics(), err
}

return z.processMntr(response)
}

func (z *zookeeperMetricsScraper) runCommand(ctx context.Context, command string) ([]string, error) {
conn, err := z.config.Dial()
if err != nil {
z.logger.Error("failed to establish connection",
zap.String("endpoint", z.config.Endpoint),
zap.Error(err),
)
return pmetric.NewMetrics(), err
return nil, err
}
defer func() {
if closeErr := z.closeConnection(conn); closeErr != nil {
z.logger.Warn("failed to shutdown connection", zap.Error(closeErr))
}
}()

deadline, ok := ctxWithTimeout.Deadline()
deadline, ok := ctx.Deadline()
if ok {
if err := z.setConnectionDeadline(conn, deadline); err != nil {
if err = z.setConnectionDeadline(conn, deadline); err != nil {
z.logger.Warn("failed to set deadline on connection", zap.Error(err))
}
}

return z.getResourceMetrics(conn)
}

func (z *zookeeperMetricsScraper) getResourceMetrics(conn net.Conn) (pmetric.Metrics, error) {
scanner, err := z.sendCmd(conn, mntrCommand)
scanner, err := z.sendCmd(conn, command)
if err != nil {
z.logger.Error("failed to send command",
zap.Error(err),
zap.String("command", mntrCommand),
zap.String("command", command),
)
return pmetric.NewMetrics(), err
return nil, err
}

var response []string
for scanner.Scan() {
response = append(response, scanner.Text())
}
return response, nil
}

func (z *zookeeperMetricsScraper) processMntr(response []string) (pmetric.Metrics, error) {
creator := newMetricCreator(z.mb)
now := pcommon.NewTimestampFromTime(time.Now())
resourceOpts := make([]metadata.ResourceMetricsOption, 0, 2)
for scanner.Scan() {
line := scanner.Text()
for _, line := range response {
parts := zookeeperFormatRE.FindStringSubmatch(line)
if len(parts) != 3 {
z.logger.Warn("unexpected line in response",
Expand Down Expand Up @@ -155,7 +167,6 @@ func (z *zookeeperMetricsScraper) getResourceMetrics(conn net.Conn) (pmetric.Met

// Generate computed metrics
creator.generateComputedMetrics(z.logger, now)

return z.mb.Emit(resourceOpts...), nil
}

Expand Down
8 changes: 4 additions & 4 deletions receiver/zookeeperreceiver/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,14 @@ func TestZookeeperMetricsScraperScrape(t *testing.T) {
name: "Error closing connection",
mockedZKOutputSourceFilename: "mntr-3.4.14",
expectedLogs: []logMsg{
{
msg: "metric computation failed",
level: zapcore.DebugLevel,
},
{
msg: "failed to shutdown connection",
level: zapcore.WarnLevel,
},
{
msg: "metric computation failed",
level: zapcore.DebugLevel,
},
},
expectedMetricsFilename: "error-closing-connection",
expectedResourceAttributes: map[string]string{
Expand Down

0 comments on commit 3e8e07c

Please sign in to comment.