Skip to content

Commit

Permalink
[Metricbeat][elasticsearch] Use replica number in doc ID for shard me…
Browse files Browse the repository at this point in the history
…tricset (#33457)

* [Metricbeat][elasticsearch] Use replica number in doc ID for shard metricset

* "Fix" lint warnings

* Fix lint errors
  • Loading branch information
miltonhultgren authored Nov 16, 2022
1 parent b961f36 commit a5ffacc
Showing 1 changed file with 27 additions and 23 deletions.
50 changes: 27 additions & 23 deletions metricbeat/module/elasticsearch/shard/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import (
"github.com/elastic/beats/v7/metricbeat/helper/elastic"
"github.com/elastic/elastic-agent-libs/mapstr"

"fmt"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

s "github.com/elastic/beats/v7/libbeat/common/schema"
c "github.com/elastic/beats/v7/libbeat/common/schema/mapstriface"
Expand Down Expand Up @@ -61,45 +62,45 @@ func eventsMapping(r mb.ReporterV2, content []byte, isXpack bool) error {
stateData := &stateStruct{}
err := json.Unmarshal(content, stateData)
if err != nil {
return errors.Wrap(err, "failure parsing Elasticsearch Cluster State API response")
return fmt.Errorf("failure parsing Elasticsearch Cluster State API response: %w", err)
}

var errs multierror.Errors
for _, index := range stateData.RoutingTable.Indices {
for _, shards := range index.Shards {
for _, shard := range shards {
for i, shard := range shards {
event := mb.Event{
ModuleFields: mapstr.M{},
}

event.ModuleFields.Put("cluster.state.id", stateData.StateID)
event.ModuleFields.Put("cluster.stats.state.state_uuid", stateData.StateID)
event.ModuleFields.Put("cluster.id", stateData.ClusterID)
event.ModuleFields.Put("cluster.name", stateData.ClusterName)
_, _ = event.ModuleFields.Put("cluster.state.id", stateData.StateID)
_, _ = event.ModuleFields.Put("cluster.stats.state.state_uuid", stateData.StateID)
_, _ = event.ModuleFields.Put("cluster.id", stateData.ClusterID)
_, _ = event.ModuleFields.Put("cluster.name", stateData.ClusterName)

fields, err := schema.Apply(shard)
if err != nil {
errs = append(errs, errors.Wrap(err, "failure applying shard schema"))
errs = append(errs, fmt.Errorf("failure applying shard schema: %w", err))
continue
}

// Handle node field: could be string or null
err = elasticsearch.PassThruField("node", shard, fields)
if err != nil {
errs = append(errs, errors.Wrap(err, "failure passing through node field"))
errs = append(errs, fmt.Errorf("failure passing through node field: %w", err))
continue
}

// Handle relocating_node field: could be string or null
err = elasticsearch.PassThruField("relocating_node", shard, fields)
if err != nil {
errs = append(errs, errors.Wrap(err, "failure passing through relocating_node field"))
errs = append(errs, fmt.Errorf("failure passing through relocating_node field: %w", err))
continue
}

event.ID, err = generateHashForEvent(stateData.StateID, fields)
event.ID, err = generateHashForEvent(stateData.StateID, fields, i)
if err != nil {
errs = append(errs, errors.Wrap(err, "failure getting event ID"))
errs = append(errs, fmt.Errorf("failure getting event ID: %w", err))
continue
}

Expand All @@ -110,28 +111,28 @@ func eventsMapping(r mb.ReporterV2, content []byte, isXpack bool) error {
continue
}
if nodeID != nil { // shard has not been allocated yet
event.ModuleFields.Put("node.id", nodeID)
_, _ = event.ModuleFields.Put("node.id", nodeID)
delete(fields, "node")

sourceNode, err := getSourceNode(nodeID.(string), stateData)
if err != nil {
errs = append(errs, errors.Wrap(err, "failure getting source node information"))
errs = append(errs, fmt.Errorf("failure getting source node information: %w", err))
continue
}
event.ModuleFields.Put("node.name", sourceNode["name"])
event.MetricSetFields.Put("source_node", sourceNode)
_, _ = event.ModuleFields.Put("node.name", sourceNode["name"])
_, _ = event.MetricSetFields.Put("source_node", sourceNode)
}

event.ModuleFields.Put("index.name", fields["index"])
_, _ = event.ModuleFields.Put("index.name", fields["index"])
delete(fields, "index")

event.MetricSetFields.Put("number", fields["shard"])
_, _ = event.MetricSetFields.Put("number", fields["shard"])
delete(event.MetricSetFields, "shard")

delete(event.MetricSetFields, "relocating_node")
relocatingNode := fields["relocating_node"]
event.MetricSetFields.Put("relocating_node.name", relocatingNode)
event.MetricSetFields.Put("relocating_node.id", relocatingNode)
_, _ = event.MetricSetFields.Put("relocating_node.name", relocatingNode)
_, _ = event.MetricSetFields.Put("relocating_node.id", relocatingNode)

// xpack.enabled in config using standalone metricbeat writes to `.monitoring` instead of `metricbeat-*`
// When using Agent, the index name is overwritten anyways.
Expand Down Expand Up @@ -160,7 +161,10 @@ func getSourceNode(nodeID string, stateData *stateStruct) (mapstr.M, error) {
}, nil
}

func generateHashForEvent(stateID string, shard mapstr.M) (string, error) {
// Note: This function may generate duplicate IDs, but those will be dropped since libbeat
// ignores the 409 status code
// https://github.com/elastic/beats/blob/main/libbeat/outputs/elasticsearch/client.go#L396
func generateHashForEvent(stateID string, shard mapstr.M, index int) (string, error) {
var nodeID string
if shard["node"] == nil {
nodeID = "_na"
Expand All @@ -181,7 +185,7 @@ func generateHashForEvent(stateID string, shard mapstr.M) (string, error) {
if !ok {
return "", elastic.MakeErrorForMissingField("shard", elastic.Elasticsearch)
}
shardNumberStr := strconv.FormatInt(shardNumberInt, 10)
shardNumberStr := "s" + strconv.FormatInt(shardNumberInt, 10)

isPrimary, ok := shard["primary"].(bool)
if !ok {
Expand All @@ -191,7 +195,7 @@ func generateHashForEvent(stateID string, shard mapstr.M) (string, error) {
if isPrimary {
shardType = "p"
} else {
shardType = "r"
shardType = "r" + strconv.Itoa(index)
}

return stateID + ":" + nodeID + ":" + indexName + ":" + shardNumberStr + ":" + shardType, nil
Expand Down

0 comments on commit a5ffacc

Please sign in to comment.