Skip to content

Commit

Permalink
Refactor add_cloud_metadata to handle ECS fields easier (#26438) (#26556
Browse files Browse the repository at this point in the history
)

(cherry picked from commit f8bb3a2)

Co-authored-by: Chris Mark <chrismarkou92@gmail.com>
  • Loading branch information
mergify[bot] and ChrsMark committed Jun 29, 2021
1 parent cf89c45 commit 6452107
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 50 deletions.
41 changes: 12 additions & 29 deletions libbeat/processors/add_cloud_metadata/add_cloud_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,49 +118,32 @@ func (p *addCloudMetadata) Run(event *beat.Event) (*beat.Event, error) {
return event, nil
}

// If cloud key exists in event already and overwrite flag is set to false, this processor will not overwrite the
// cloud fields. For example aws module writes cloud.instance.* to events already, with overwrite=false,
// add_cloud_metadata should not overwrite these fields with new values.
if !p.initData.overwrite {
cloudValue, _ := event.GetValue("cloud")
if cloudValue != nil {
err := p.extractECSMeta(event, meta)
if err != nil {
return nil, err
}
return event, nil
}
}

err := p.extractECSMeta(event, meta)
err := p.addMeta(event, meta)
if err != nil {
return nil, err
}
_, err = event.PutValue("cloud", meta)
return event, err
}

func (p *addCloudMetadata) String() string {
return "add_cloud_metadata=" + p.getMeta().String()
}

func (p *addCloudMetadata) extractECSMeta(event *beat.Event, meta common.MapStr) error {
// handle ECS fields first
if !p.initData.overwrite {
orchestratorValue, _ := event.GetValue("orchestrator")
if orchestratorValue != nil {
meta.Delete("orchestrator")
return nil
func (p *addCloudMetadata) addMeta(event *beat.Event, meta common.MapStr) error {
for key, metaVal := range meta {
// If key exists in event already and overwrite flag is set to false, this processor will not overwrite the
// meta fields. For example aws module writes cloud.instance.* to events already, with overwrite=false,
// add_cloud_metadata should not overwrite these fields with new values.
if !p.initData.overwrite {
v, _ := event.GetValue(key)
if v != nil {
continue
}
}
}
orchestratorFields, err := meta.GetValue("orchestrator")
if err == nil {
_, err = event.PutValue("orchestrator", orchestratorFields)
_, err := event.PutValue(key, metaVal)
if err != nil {
return err
}
}
meta.Delete("orchestrator")

return nil
}
2 changes: 1 addition & 1 deletion libbeat/processors/add_cloud_metadata/http_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (f *httpMetadataFetcher) fetchMetadata(ctx context.Context, client http.Cli

// Apply schema.
res.metadata = f.conv(res.metadata)
res.metadata["provider"] = f.provider
res.metadata.Put("cloud.provider", f.provider)

return res
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var alibabaCloudMetadataFetcher = provider{
m["service"] = common.MapStr{
"name": "ECS",
}
return common.MapStr(m)
return common.MapStr{"cloud": m}
}

urls, err := getMetadataURLs(c, ecsMetadataHost, []string{
Expand Down
2 changes: 1 addition & 1 deletion libbeat/processors/add_cloud_metadata/provider_aws_ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var ec2MetadataFetcher = provider{
"account": s.Object{"id": c.Str("accountId")},
"image": s.Object{"id": c.Str("imageId")},
}.Apply(m)
return out
return common.MapStr{"cloud": out}
}

fetcher, err := newMetadataFetcher(config, "aws", nil, metadataHost, ec2Schema, ec2InstanceIdentityURI)
Expand Down
2 changes: 1 addition & 1 deletion libbeat/processors/add_cloud_metadata/provider_azure_vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var azureVMMetadataFetcher = provider{
},
"region": c.Str("location"),
}.Apply(m)
return out
return common.MapStr{"cloud": out}
}

fetcher, err := newMetadataFetcher(config, "azure", azHeaders, metadataHost, azSchema, azMetadataURI)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var doMetadataFetcher = provider{
"name": c.Str("serviceName"),
},
}.Apply(m)
return out
return common.MapStr{"cloud": out}
}
doMetadataURI := "/metadata/v1.json"

Expand Down
32 changes: 18 additions & 14 deletions libbeat/processors/add_cloud_metadata/provider_google_gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,23 @@ var gceMetadataFetcher = provider{
gceMetadataURI := "/computeMetadata/v1/?recursive=true&alt=json"
gceHeaders := map[string]string{"Metadata-Flavor": "Google"}
gceSchema := func(m map[string]interface{}) common.MapStr {
out := common.MapStr{
cloud := common.MapStr{
"service": common.MapStr{
"name": "GCE",
},
}
meta := common.MapStr{}

trimLeadingPath := func(key string) {
v, err := out.GetValue(key)
v, err := cloud.GetValue(key)
if err != nil {
return
}
p, ok := v.(string)
if !ok {
return
}
out.Put(key, path.Base(p))
cloud.Put(key, path.Base(p))
}

if instance, ok := m["instance"].(map[string]interface{}); ok {
Expand All @@ -77,6 +78,10 @@ var gceMetadataFetcher = provider{
"type": c.Str("machineType"),
},
"availability_zone": c.Str("zone"),
}.ApplyTo(cloud, instance)
trimLeadingPath("machine.type")
trimLeadingPath("availability_zone")
s.Schema{
"orchestrator": s.Object{
"cluster": c.Dict(
"attributes",
Expand All @@ -85,29 +90,27 @@ var gceMetadataFetcher = provider{
"kubeconfig": c.Str("kubeconfig"),
}),
},
}.ApplyTo(out, instance)
trimLeadingPath("machine.type")
trimLeadingPath("availability_zone")
}.ApplyTo(meta, instance)
}

if kubeconfig, err := out.GetValue("orchestrator.cluster.kubeconfig"); err == nil {
if kubeconfig, err := meta.GetValue("orchestrator.cluster.kubeconfig"); err == nil {
kubeConfig, ok := kubeconfig.(string)
if !ok {
out.Delete("orchestrator.cluster.kubeconfig")
meta.Delete("orchestrator.cluster.kubeconfig")
}
cc := &KubeConfig{}
err := yaml.Unmarshal([]byte(kubeConfig), cc)
if err != nil {
out.Delete("orchestrator.cluster.kubeconfig")
meta.Delete("orchestrator.cluster.kubeconfig")
}
if len(cc.Clusters) > 0 {
if cc.Clusters[0].Cluster.Server != "" {
out.Delete("orchestrator.cluster.kubeconfig")
out.Put("orchestrator.cluster.url", cc.Clusters[0].Cluster.Server)
meta.Delete("orchestrator.cluster.kubeconfig")
meta.Put("orchestrator.cluster.url", cc.Clusters[0].Cluster.Server)
}
}
} else {
out.Delete("orchestrator")
meta.Delete("orchestrator")
}

if project, ok := m["project"].(map[string]interface{}); ok {
Expand All @@ -118,10 +121,11 @@ var gceMetadataFetcher = provider{
"account": s.Object{
"id": c.Str("projectId"),
},
}.ApplyTo(out, project)
}.ApplyTo(cloud, project)
}

return out
meta.DeepUpdate(common.MapStr{"cloud": cloud})
return meta
}

fetcher, err := newMetadataFetcher(config, provider, gceHeaders, metadataHost, gceSchema, gceMetadataURI)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func buildOpenstackNovaCreate(scheme string) func(provider string, c *common.Con
m["service"] = common.MapStr{
"name": "Nova",
}
return common.MapStr(m)
return common.MapStr{"cloud": m}
}

urls, err := getMetadataURLsWithScheme(c, scheme, metadataHost, []string{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var qcloudMetadataFetcher = provider{
m["service"] = common.MapStr{
"name": "CVM",
}
return common.MapStr(m)
return common.MapStr{"cloud": m}
}

urls, err := getMetadataURLs(c, qcloudMetadataHost, []string{
Expand Down

0 comments on commit 6452107

Please sign in to comment.