Skip to content

Commit

Permalink
Merge remote-tracking branch 'elastic/main' into feature/fb/parse-aws…
Browse files Browse the repository at this point in the history
…-vpc-flow-log-processor
  • Loading branch information
andrewkroh committed Nov 15, 2022
2 parents 1d35407 + a3eca5f commit 3a6e4cd
Show file tree
Hide file tree
Showing 18 changed files with 669 additions and 71 deletions.
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ issues:
- text: "undefined"
linters:
- typecheck
- text: "undeclared name:"
linters:
- typecheck
- text: "imported but not used"
linters:
- typecheck
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Added separation of transform context object inside httpjson. Introduced new clause `.parent_last_response.*` {pull}33499[33499]
- Cloud Foundry input uses server-side filtering when retrieving logs. {pull}33456[33456]
- Add `parse_aws_vpc_flow_log` processor. {pull}33656[33656]
- Modified `aws-s3` input to reduce mutex contention when multiple SQS message are being processed concurrently. {pull}33658[33658]
- Disable "event normalization" processing for the aws-s3 input to reduce allocations. {pull}33673[33673]
- Add Common Expression Language input. {pull}31233[31233]

*Auditbeat*
Expand All @@ -176,6 +178,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Add Data Granularity option to AWS module to allow for for fewer API calls of longer periods and keep small intervals. {issue}33133[33133] {pull}33166[33166]
- Update README file on how to run Metricbeat on Kubernetes. {pull}33308[33308]
- Add per-thread metrics to system_summary {pull}33614[33614]
- Add GCP CloudSQL metadata {pull}33066[33066]

*Packetbeat*

Expand Down
8 changes: 8 additions & 0 deletions libbeat/docs/shared-ssl-config.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,14 @@ present in the chain during the handshake, it will be added to the
`certificate_authorities` list and the handshake will continue
normaly.

To get the fingerprint from a CA certificate on a Unix-like
system, you can use the following command, where `ca.crt` is the
certificate.

[source]
------------------------
openssl x509 -fingerprint -sha256 -noout -in ./ca.crt | awk --field-separator="=" '{print $2}' | sed 's/://g'
------------------------

[discrete]
[[ssl-server-config]]
Expand Down
6 changes: 3 additions & 3 deletions testing/environments/snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
version: '2.3'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.6.0-62908926-SNAPSHOT
image: docker.elastic.co/elasticsearch/elasticsearch:8.6.0-0c44a3fc-SNAPSHOT
# When extend is used it merges healthcheck.tests, see:
# https://github.com/docker/compose/issues/8962
# healthcheck:
Expand Down Expand Up @@ -31,7 +31,7 @@ services:
- "./docker/elasticsearch/users_roles:/usr/share/elasticsearch/config/users_roles"

logstash:
image: docker.elastic.co/logstash/logstash:8.6.0-62908926-SNAPSHOT
image: docker.elastic.co/logstash/logstash:8.6.0-0c44a3fc-SNAPSHOT
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9600/_node/stats"]
retries: 600
Expand All @@ -44,7 +44,7 @@ services:
- 5055:5055

kibana:
image: docker.elastic.co/kibana/kibana:8.6.0-62908926-SNAPSHOT
image: docker.elastic.co/kibana/kibana:8.6.0-0c44a3fc-SNAPSHOT
environment:
- "ELASTICSEARCH_USERNAME=kibana_system_user"
- "ELASTICSEARCH_PASSWORD=testing"
Expand Down
39 changes: 24 additions & 15 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,6 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
}()
defer cancelInputCtx()

// Create client for publishing events and receive notification of their ACKs.
client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: inputContext.Cancelation,
ACKHandler: awscommon.NewEventACKHandler(),
})
if err != nil {
return fmt.Errorf("failed to create pipeline client: %w", err)
}
defer client.Close()

if in.config.QueueURL != "" {
regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint)
if err != nil {
Expand All @@ -127,7 +117,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
in.awsConfig.Region = regionName

// Create SQS receiver and S3 notification processor.
receiver, err := in.createSQSReceiver(inputContext, client)
receiver, err := in.createSQSReceiver(inputContext, pipeline)
if err != nil {
return fmt.Errorf("failed to initialize sqs receiver: %w", err)
}
Expand All @@ -139,6 +129,21 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
}

if in.config.BucketARN != "" || in.config.NonAWSBucketName != "" {
// Create client for publishing events and receive notification of their ACKs.
client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: inputContext.Cancelation,
ACKHandler: awscommon.NewEventACKHandler(),
Processing: beat.ProcessingConfig{
// This input only produces events with basic types so normalization
// is not required.
EventNormalization: boolPtr(false),
},
})
if err != nil {
return fmt.Errorf("failed to create pipeline client: %w", err)
}
defer client.Close()

// Create S3 receiver and S3 notification processor.
poller, err := in.createS3Lister(inputContext, ctx, client, persistentStore, states)
if err != nil {
Expand All @@ -154,7 +159,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
return nil
}

func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsReader, error) {
func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*sqsReader, error) {
sqsAPI := &awsSQSAPI{
client: sqs.NewFromConfig(in.awsConfig, func(o *sqs.Options) {
if in.config.AWSConfig.FIPSEnabled {
Expand Down Expand Up @@ -192,8 +197,8 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsRe
if err != nil {
return nil, err
}
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, fileSelectors)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, s3EventHandlerFactory)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, fileSelectors)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, pipeline, s3EventHandlerFactory)
sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, in.config.MaxNumberOfMessages, sqsMessageHandler)

return sqsReader, nil
Expand Down Expand Up @@ -267,10 +272,11 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
if len(in.config.FileSelectors) == 0 {
fileSelectors = []fileSelectorConfig{{ReaderConfig: in.config.ReaderConfig}}
}
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, fileSelectors)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, fileSelectors)
s3Poller := newS3Poller(log.Named("s3_poller"),
metrics,
s3API,
client,
s3EventHandlerFactory,
states,
persistentStore,
Expand Down Expand Up @@ -368,3 +374,6 @@ func getProviderFromDomain(endpoint string, ProviderOverride string) string {
}
return "unknown"
}

// boolPtr returns a pointer to b.
func boolPtr(b bool) *bool { return &b }
49 changes: 36 additions & 13 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,37 @@ func (c constantS3) ListObjectsPaginator(bucket, prefix string) s3Pager {
return c.pagerConstant
}

var _ beat.Pipeline = (*fakePipeline)(nil)

// fakePipeline returns new ackClients.
type fakePipeline struct{}

func (c *fakePipeline) ConnectWith(clientConfig beat.ClientConfig) (beat.Client, error) {
return &ackClient{}, nil
}

func (c *fakePipeline) Connect() (beat.Client, error) {
panic("Connect() is not implemented.")
}

var _ beat.Client = (*ackClient)(nil)

// ackClient is a fake beat.Client that ACKs the published messages.
type ackClient struct{}

func (c *ackClient) Close() error { return nil }

func (c *ackClient) Publish(event beat.Event) {
// Fake the ACK handling.
event.Private.(*awscommon.EventACKTracker).ACK()
}

func (c *ackClient) PublishAll(event []beat.Event) {
for _, e := range event {
c.Publish(e)
}
}

func makeBenchmarkConfig(t testing.TB) config {
cfg := conf.MustNewConfigFrom(`---
queue_url: foo
Expand All @@ -171,21 +202,13 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR
metrics := newInputMetrics(metricRegistry, "test_id")
sqsAPI := newConstantSQS()
s3API := newConstantS3(t)
client := pubtest.NewChanClient(100)
defer close(client.Channel)
pipeline := &fakePipeline{}
conf := makeBenchmarkConfig(t)

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, conf.FileSelectors)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, nil, time.Minute, 5, s3EventHandlerFactory)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, conf.FileSelectors)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, nil, time.Minute, 5, pipeline, s3EventHandlerFactory)
sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, maxMessagesInflight, sqsMessageHandler)

go func() {
for event := range client.Channel {
// Fake the ACK handling that's not implemented in pubtest.
event.Private.(*awscommon.EventACKTracker).ACK()
}
}()

ctx, cancel := context.WithCancel(context.Background())
b.Cleanup(cancel)

Expand Down Expand Up @@ -313,8 +336,8 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
return
}

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, config.FileSelectors)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", listPrefix, "region", "provider", numberOfWorkers, time.Second)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, config.FileSelectors)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, client, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", listPrefix, "region", "provider", numberOfWorkers, time.Second)

if err := s3Poller.Poll(ctx); err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
Expand Down
5 changes: 3 additions & 2 deletions x-pack/filebeat/input/awss3/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/aws/smithy-go/middleware"

"github.com/elastic/beats/v7/libbeat/beat"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -28,7 +29,7 @@ import (
// Run 'go generate' to create mocks that are used in tests.
//go:generate go install github.com/golang/mock/mockgen@v1.6.0
//go:generate mockgen -source=interfaces.go -destination=mock_interfaces_test.go -package awss3 -mock_names=sqsAPI=MockSQSAPI,sqsProcessor=MockSQSProcessor,s3API=MockS3API,s3Pager=MockS3Pager,s3ObjectHandlerFactory=MockS3ObjectHandlerFactory,s3ObjectHandler=MockS3ObjectHandler
//go:generate mockgen -destination=mock_publisher_test.go -package=awss3 -mock_names=Client=MockBeatClient github.com/elastic/beats/v7/libbeat/beat Client
//go:generate mockgen -destination=mock_publisher_test.go -package=awss3 -mock_names=Client=MockBeatClient,Pipeline=MockBeatPipeline github.com/elastic/beats/v7/libbeat/beat Client,Pipeline

// ------
// SQS interfaces
Expand Down Expand Up @@ -88,7 +89,7 @@ type s3ObjectHandlerFactory interface {
// Create returns a new s3ObjectHandler that can be used to process the
// specified S3 object. If the handler is not configured to process the
// given S3 object (based on key name) then it will return nil.
Create(ctx context.Context, log *logp.Logger, acker *awscommon.EventACKTracker, obj s3EventV2) s3ObjectHandler
Create(ctx context.Context, log *logp.Logger, client beat.Client, acker *awscommon.EventACKTracker, obj s3EventV2) s3ObjectHandler
}

type s3ObjectHandler interface {
Expand Down
9 changes: 5 additions & 4 deletions x-pack/filebeat/input/awss3/mock_interfaces_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 54 additions & 1 deletion x-pack/filebeat/input/awss3/mock_publisher_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3a6e4cd

Please sign in to comment.