Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
gks9128 authored Feb 22, 2023
2 parents 7ab75c9 + 3de0d50 commit 08d454d
Show file tree
Hide file tree
Showing 18 changed files with 370 additions and 71 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fixing system tests not returning expected content encoding for azure blob storage input. {pull}34412[34412]
- [Azure Logs] Fix authentication_processing_details parsing in sign-in logs. {issue}34330[34330] {pull}34478[34478]
- Prevent Elasticsearch from spewing log warnings about redundant wildcard when setting up ingest pipelines. {issue}34249[34249] {pull}34550[34550]
- Fix the issue of `cometd` input worker getting closed in case of a network connection issue and an EOF error. {issue}34326[34326] {pull}34327[34327]

*Heartbeat*

Expand Down
20 changes: 10 additions & 10 deletions metricbeat/module/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,35 +79,35 @@ type State struct {
}

// GetInfo returns the data for the Beat's / endpoint.
func GetInfo(m *MetricSet) (*Info, error) {
func GetInfo(m *MetricSet) (Info, error) {
content, err := fetchPath(m.HTTP, "/")
if err != nil {
return nil, err
return Info{}, err
}

info := &Info{}
info := Info{}
err = json.Unmarshal(content, &info)
if err != nil {
return nil, err
return Info{}, err
}

return info, nil
}

// GetState returns the data for the Beat's /state endpoint.
func GetState(m *MetricSet) (*State, error) {
func GetState(m *MetricSet) (State, error) {
content, err := fetchPath(m.HTTP, "/state")
if err != nil {
return nil, err
return State{}, err
}

info := &State{}
err = json.Unmarshal(content, &info)
state := State{}
err = json.Unmarshal(content, &state)
if err != nil {
return nil, err
return State{}, err
}

return info, nil
return state, nil
}

func fetchPath(httpHelper *helper.HTTP, path string) ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/beat/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,5 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return err
}

return eventMapping(r, *info, content, m.XPackEnabled)
return eventMapping(r, info, content, m.XPackEnabled)
}
2 changes: 1 addition & 1 deletion metricbeat/module/beat/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return err
}

return eventMapping(r, *info, clusterUUID, content, m.XPackEnabled)
return eventMapping(r, info, clusterUUID, content, m.XPackEnabled)
}

func (m *MetricSet) getClusterUUID() (string, error) {
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/ccr/ccr.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return err
}

return eventsMapping(r, *info, content, m.XPackEnabled)
return eventsMapping(r, info, content, m.XPackEnabled)
}

func (m *MetricSet) checkCCRAvailability(currentElasticsearchVersion *version.V) (message string, err error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,5 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return err
}

return eventMapping(r, m.HTTP, *info, content, m.XPackEnabled)
return eventMapping(r, m.HTTP, info, content, m.XPackEnabled)
}
9 changes: 4 additions & 5 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,17 +197,16 @@ func getMasterName(http *helper.HTTP, uri string) (string, error) {
}

// GetInfo returns the data for the Elasticsearch / endpoint.
func GetInfo(http *helper.HTTP, uri string) (*Info, error) {

func GetInfo(http *helper.HTTP, uri string) (Info, error) {
content, err := fetchPath(http, uri, "/", "")
if err != nil {
return nil, err
return Info{}, err
}

info := &Info{}
info := Info{}
err = json.Unmarshal(content, &info)
if err != nil {
return nil, err
return Info{}, err
}

return info, nil
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/enrich/enrich.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return err
}

return eventsMapping(r, *info, content, m.XPackEnabled)
return eventsMapping(r, info, content, m.XPackEnabled)
}

func (m *MetricSet) checkEnrichAvailability(currentElasticsearchVersion *version.V) (message string, err error) {
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return err
}

return eventsMapping(r, m.HTTP, *info, content, m.XPackEnabled)
return eventsMapping(r, m.HTTP, info, content, m.XPackEnabled)
}

func (m *MetricSet) updateServicePath(esVersion version.V) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,5 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return err
}

return eventsMapping(r, *info, content, m.XPackEnabled)
return eventsMapping(r, info, content, m.XPackEnabled)
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,5 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return errors.Wrap(err, "failed to get info from Elasticsearch")
}

return eventMapping(r, *info, content, m.XPackEnabled)
return eventMapping(r, info, content, m.XPackEnabled)
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,5 @@ func (m *IngestMetricSet) Fetch(report mb.ReporterV2) error {
m.fetchCounter++ // It's fine if this overflows, it's only used for modulo
sampleProcessors := m.fetchCounter%m.sampleProcessorsEveryN == 0
m.Logger().Debugf("Sampling ingest_pipeline processor stats: %v", sampleProcessors)
return eventsMapping(report, *info, content, m.XPackEnabled, sampleProcessors)
return eventsMapping(report, info, content, m.XPackEnabled, sampleProcessors)
}
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/ml_job/ml_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,5 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return err
}

return eventsMapping(r, *info, content, m.XPackEnabled)
return eventsMapping(r, info, content, m.XPackEnabled)
}
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,5 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return errors.Wrap(err, "failed to get info from Elasticsearch")
}

return eventsMapping(r, *info, content, m.XPackEnabled)
return eventsMapping(r, info, content, m.XPackEnabled)
}
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/node_stats/node_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return err
}

return eventsMapping(r, m.MetricSet, *info, content, m.XPackEnabled)
return eventsMapping(r, m.MetricSet, info, content, m.XPackEnabled)
}

func (m *MetricSet) updateServiceURI() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,5 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return err
}

return eventsMapping(r, *info, content, m.XPackEnabled)
return eventsMapping(r, info, content, m.XPackEnabled)
}
96 changes: 73 additions & 23 deletions x-pack/filebeat/input/cometd/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ package cometd
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"

"golang.org/x/time/rate"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -23,6 +27,9 @@ import (

const (
inputName = "cometd"

// retryInterval is the minimum duration between pub/sub client retries.
retryInterval = 30 * time.Second
)

// Run starts the input worker then returns. Only the first invocation
Expand All @@ -37,44 +44,90 @@ func (in *cometdInput) Run() {
defer in.workerWg.Done()
defer in.workerCancel()
in.b = bay.Bayeux{}
in.creds, err = bay.GetSalesforceCredentials(in.authParams)
if err != nil {
in.log.Errorw("not able to get access token", "error", err)
return
}
if err := in.run(); err != nil {
in.log.Errorw("got error while running input", "error", err)
return

rt := rate.NewLimiter(rate.Every(retryInterval), 1)

for in.workerCtx.Err() == nil {
// Rate limit.
if err := rt.Wait(in.workerCtx); err != nil {
continue
}

// Creating a new channel for cometd input.
in.msgCh = make(chan bay.MaybeMsg, 1)

in.creds, err = bay.GetSalesforceCredentials(in.authParams)
if err != nil {
in.log.Errorw("not able to get access token", "error", err)
continue
}

if err := in.run(); err != nil {
if in.workerCtx.Err() == nil {
in.log.Errorw("Restarting failed CometD input worker.", "error", err)
continue
}

// Log any non-cancellation error before stopping.
if !errors.Is(err, context.Canceled) {
in.log.Errorw("got error while running input", "error", err)
}
}
}
}()
})
}

func (in *cometdInput) run() error {
in.msgCh = in.b.Channel(in.workerCtx, in.msgCh, "-1", *in.creds, in.config.ChannelName)
ctx, cancel := context.WithCancel(in.workerCtx)
defer cancel()
// Ticker with 5 seconds to avoid log too many warnings
ticker := time.NewTicker(5 * time.Second)
in.msgCh = in.b.Channel(ctx, in.msgCh, "-1", *in.creds, in.config.ChannelName)
for e := range in.msgCh {
if e.Failed() {
return fmt.Errorf("error collecting events: %w", e.Err)
// if err bayeux library returns recoverable error, do not close input.
// instead continue with connection warning
if !strings.Contains(e.Error(), "trying again") {
return fmt.Errorf("error collecting events: %w", e.Err)
}
// log warning every 5 seconds only to avoid to many unnecessary logs
select {
case <-ticker.C:
in.log.Errorw("Retrying...! facing issue while collecting data from CometD", "error", e.Error())
default:
}
} else if !e.Msg.Successful {
var event event
// To handle the last response where the object received was empty
if e.Msg.Data.Payload == nil {
return nil
}

var msg []byte
var err error
// Convert json.RawMessage response to []byte
msg, err := e.Msg.Data.Payload.MarshalJSON()
if err != nil {
return fmt.Errorf("JSON error: %w", err)
if e.Msg.Data.Payload != nil {
msg, err = e.Msg.Data.Payload.MarshalJSON()
if err != nil {
in.log.Errorw("invalid JSON", "error", err)
continue
}
} else if e.Msg.Data.Object != nil {
msg, err = e.Msg.Data.Object.MarshalJSON()
if err != nil {
in.log.Errorw("invalid JSON", "error", err)
continue
}
} else {
// To handle the last response where the object received was empty
return nil
}

// Extract event IDs from json.RawMessage
err = json.Unmarshal(e.Msg.Data.Payload, &event)
err = json.Unmarshal(msg, &event)
if err != nil {
return fmt.Errorf("error while parsing JSON: %w", err)
in.log.Errorw("error while parsing JSON", "error", err)
continue
}
if ok := in.outlet.OnEvent(makeEvent(event.EventId, e.Msg.Channel, string(msg))); !ok {
in.log.Debug("OnEvent returned false. Stopping input worker.")
cancel()
return fmt.Errorf("error ingesting data to elasticsearch")
}
}
Expand Down Expand Up @@ -133,9 +186,6 @@ func NewInput(
authParams: authParams,
}

// Creating a new channel for cometd input.
in.msgCh = make(chan bay.MaybeMsg, 1)

// Build outlet for events.
in.outlet, err = connector.Connect(cfg)
if err != nil {
Expand Down
Loading

0 comments on commit 08d454d

Please sign in to comment.