From f2394355d81f33d7a5cec6490cc4523619e596e8 Mon Sep 17 00:00:00 2001 From: Christophe Collot <52134228+CCOLLOT@users.noreply.github.com> Date: Fri, 9 Jun 2023 15:00:31 +0200 Subject: [PATCH] feat(lambda-promtail): add cloudfront log file ingestion support (#9573) **What this PR does / why we need it**: This PR enables ingesting logs from Cloudfront log files stored in s3 (batch). The current setup only supports streaming Cloudfront logs through AWS Kinesis, whereas this PR implements the same flow as for VPC Flow logs, Load Balancer logs, and Cloudtrail logs (s3 --> SQS (optional) --> Lambda Promtail --> Loki) **Special notes for your reviewer**: + The Cloudfront log file format is different from the already implemented services, meaning we had to build yet another regex. AWS never bothered making all services follow the same log file naming convention but the "good" thing is that it's now very unlikely they will change it in the future. + The Cloudfront file name does not have any mention of the AWS account or the time of log it contains, it means we have to infer the log type from the filename format instead of finding the exact string "cloudfront" in the filename. This is why in `getLabels`, if no `type` parameter is found in the regex, we use the key corresponding to the name of the matching parser. + I introduced a new `parser` struct to group together several parameters specific to a type of log (and avoid relying too much on map key string matching and / or if statements for specific use cases) + I've been successfully running this code in several AWS environments for days. + I corrected a typo from my previous PR #9497 (wrong PR number in Changelog.md) **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [x] Documentation added - [x] Tests updated - [x] `CHANGELOG.md` updated - [x] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [x] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --------- Co-authored-by: Michel Hollands <42814411+MichelHollands@users.noreply.github.com> --- CHANGELOG.md | 2 + .../sources/clients/lambda-promtail/_index.md | 7 +- tools/lambda-promtail/lambda-promtail/s3.go | 117 ++++++++++++------ .../lambda-promtail/s3_test.go | 115 ++++++++++++++++- .../testdata/cloudfront.log.gz | Bin 0 -> 781 bytes 5 files changed, 194 insertions(+), 47 deletions(-) create mode 100644 tools/lambda-promtail/testdata/cloudfront.log.gz diff --git a/CHANGELOG.md b/CHANGELOG.md index 93f4818827397..3add6541557b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ ##### Enhancements +* [9573](https://github.com/grafana/loki/pull/9573) **CCOLLOT**: Lambda-Promtail: Add support for AWS CloudFront log ingestion. +* [9497](https://github.com/grafana/loki/pull/9497) **CCOLLOT**: Lambda-Promtail: Add support for AWS CloudTrail log ingestion. * [8886](https://github.com/grafana/loki/pull/8886) **MichelHollands**: Add new logql template function `unixToTime` * [8067](https://github.com/grafana/loki/pull/9497) **CCOLLOT**: Lambda-Promtail: Add support for AWS CloudTrail log ingestion. * [9515](https://github.com/grafana/loki/pull/9515) **MichelHollands**: Fix String() on vector aggregation LogQL expressions that contain `without ()`. diff --git a/docs/sources/clients/lambda-promtail/_index.md b/docs/sources/clients/lambda-promtail/_index.md index a42494700ce5a..f3fff1bcea110 100644 --- a/docs/sources/clients/lambda-promtail/_index.md +++ b/docs/sources/clients/lambda-promtail/_index.md @@ -109,9 +109,10 @@ This workflow allows ingesting AWS loadbalancer logs stored on S3 to Loki. This workflow allows ingesting AWS Cloudtrail logs stored on S3 to Loki. -### Cloudfront real-time logs - -Cloudfront [real-time logs](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/real-time-logs.html) can be sent to a Kinesis data stream. The data stream can be mapped to be an [event source](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html) for lambda-promtail to deliver the logs to Loki. +### Cloudfront logs +Cloudfront logs can be either batched or streamed in real time to Loki: ++ Logging can be activated on a Cloudfront distribution with an S3 bucket as the destination. In this case, the workflow is the same as for other services (VPC Flow logs, Loadbalancer logs, Cloudtrail logs). ++ Cloudfront [real-time logs](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/real-time-logs.html) can be sent to a Kinesis data stream. The data stream can be mapped to be an [event source](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html) for lambda-promtail to deliver the logs to Loki. ### Triggering Lambda-Promtail via SQS For AWS services supporting sending messages to SQS (for example, S3 with an S3 Notification to SQS), events can be processed through an [SQS queue using a lambda trigger](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html) instead of directly configuring the source service to trigger lambda. Lambda-promtail will retrieve the nested events from the SQS messages' body and process them as if them came directly from the source service. diff --git a/tools/lambda-promtail/lambda-promtail/s3.go b/tools/lambda-promtail/lambda-promtail/s3.go index 93f3d8e9d89cc..bb7a94d032e46 100644 --- a/tools/lambda-promtail/lambda-promtail/s3.go +++ b/tools/lambda-promtail/lambda-promtail/s3.go @@ -21,11 +21,27 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3" ) +type parserConfig struct { + // value to use for __aws_log_type label + logTypeLabel string + // regex matching filename and and exporting labels from it + filenameRegex *regexp.Regexp + // regex that extracts the timestamp from the log sample + timestampRegex *regexp.Regexp + // time format to use to convert the timestamp to time.Time + timestampFormat string + // how many lines or jsonToken to skip at the beginning of the file + skipHeaderCount int + // key of the metadata label to use as a value for the__aws__owner label + ownerLabelKey string +} + const ( FLOW_LOG_TYPE string = "vpcflowlogs" LB_LOG_TYPE string = "elasticloadbalancing" CLOUDTRAIL_LOG_TYPE string = "CloudTrail" CLOUDTRAIL_DIGEST_LOG_TYPE string = "CloudTrail-Digest" + CLOUDFRONT_LOG_TYPE string = "cloudfront" ) var ( @@ -40,15 +56,45 @@ var ( // CloudTrail // source: https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-log-file-examples.html#cloudtrail-log-filename-format // example: 111122223333_CloudTrail_us-east-2_20150801T0210Z_Mu0KsOhtH1ar15ZZ.json.gz - defaultFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P\d+)\/(?P[a-zA-Z0-9_\-]+)\/(?P[\w-]+)\/(?P\d+)\/(?P\d+)\/(?P\d+)\/\d+\_(?:elasticloadbalancing|vpcflowlogs)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?(?P[a-zA-Z0-9\-]+)`) - cloudtrailFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P\d+)\/(?P[a-zA-Z0-9_\-]+)\/(?P[\w-]+)\/(?P\d+)\/(?P\d+)\/(?P\d+)\/\d+\_(?:CloudTrail|CloudTrail-Digest)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?.+_(?P[a-zA-Z0-9\-]+)`) - filenameRegexes = map[string]*regexp.Regexp{ - FLOW_LOG_TYPE: defaultFilenameRegex, - LB_LOG_TYPE: defaultFilenameRegex, - CLOUDTRAIL_LOG_TYPE: cloudtrailFilenameRegex, + // CloudFront + // source https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#AccessLogsFileNaming + // example: example-prefix/EMLARXS9EXAMPLE.2019-11-14-20.RT4KCN4SGK9.gz + defaultFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P\d+)\/(?P[a-zA-Z0-9_\-]+)\/(?P[\w-]+)\/(?P\d+)\/(?P\d+)\/(?P\d+)\/\d+\_(?:elasticloadbalancing|vpcflowlogs)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?(?P[a-zA-Z0-9\-]+)`) + defaultTimestampRegex = regexp.MustCompile(`\w+ (?P\d+-\d+-\d+T\d+:\d+:\d+\.\d+Z)`) + cloudtrailFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P\d+)\/(?P[a-zA-Z0-9_\-]+)\/(?P[\w-]+)\/(?P\d+)\/(?P\d+)\/(?P\d+)\/\d+\_(?:CloudTrail|CloudTrail-Digest)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?.+_(?P[a-zA-Z0-9\-]+)`) + cloudfrontFilenameRegex = regexp.MustCompile(`(?P.*)\/(?P[A-Z0-9]+)\.(?P\d+)-(?P\d+)-(?P\d+)-(.+)`) + cloudfrontTimestampRegex = regexp.MustCompile(`(?P\d+-\d+-\d+\s\d+:\d+:\d+)`) + parsers = map[string]parserConfig{ + FLOW_LOG_TYPE: { + logTypeLabel: "s3_vpc_flow", + filenameRegex: defaultFilenameRegex, + ownerLabelKey: "account_id", + timestampRegex: defaultTimestampRegex, + timestampFormat: time.RFC3339, + skipHeaderCount: 1, + }, + LB_LOG_TYPE: { + logTypeLabel: "s3_lb", + filenameRegex: defaultFilenameRegex, + ownerLabelKey: "account_id", + timestampFormat: time.RFC3339, + timestampRegex: defaultTimestampRegex, + }, + CLOUDTRAIL_LOG_TYPE: { + logTypeLabel: "s3_cloudtrail", + ownerLabelKey: "account_id", + skipHeaderCount: 3, + filenameRegex: cloudtrailFilenameRegex, + }, + CLOUDFRONT_LOG_TYPE: { + logTypeLabel: "s3_cloudfront", + filenameRegex: cloudfrontFilenameRegex, + ownerLabelKey: "prefix", + timestampRegex: cloudfrontTimestampRegex, + timestampFormat: "2006-01-02\x0915:04:05", + skipHeaderCount: 2, + }, } - // regex that extracts the timestamp (RFC3339) from message log - timestampRegex = regexp.MustCompile(`\w+ (?P\d+-\d+-\d+T\d+:\d+:\d+\.\d+Z)`) ) func getS3Client(ctx context.Context, region string) (*s3.Client, error) { @@ -68,6 +114,13 @@ func getS3Client(ctx context.Context, region string) (*s3.Client, error) { } func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.ReadCloser) error { + parser, ok := parsers[labels["type"]] + if !ok { + if labels["type"] == CLOUDTRAIL_DIGEST_LOG_TYPE { + return nil + } + return fmt.Errorf("could not find parser for type %s", labels["type"]) + } gzreader, err := gzip.NewReader(obj) if err != nil { return err @@ -75,25 +128,10 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io. scanner := bufio.NewScanner(gzreader) - skipHeader := false - logType := "" - switch labels["type"] { - case FLOW_LOG_TYPE: - skipHeader = true - logType = "s3_vpc_flow" - case LB_LOG_TYPE: - logType = "s3_lb" - case CLOUDTRAIL_LOG_TYPE: - logType = "s3_cloudtrail" - case CLOUDTRAIL_DIGEST_LOG_TYPE: - // do not ingest digest files' content - return nil - } - ls := model.LabelSet{ - model.LabelName("__aws_log_type"): model.LabelValue(logType), - model.LabelName(fmt.Sprintf("__aws_%s", logType)): model.LabelValue(labels["src"]), - model.LabelName(fmt.Sprintf("__aws_%s_owner", logType)): model.LabelValue(labels["account_id"]), + model.LabelName("__aws_log_type"): model.LabelValue(parser.logTypeLabel), + model.LabelName(fmt.Sprintf("__aws_%s", parser.logTypeLabel)): model.LabelValue(labels["src"]), + model.LabelName(fmt.Sprintf("__aws_%s_owner", parser.logTypeLabel)): model.LabelValue(labels[parser.ownerLabelKey]), } ls = applyExtraLabels(ls) @@ -102,7 +140,7 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io. if labels["type"] == CLOUDTRAIL_LOG_TYPE { records := make(chan Record) jsonStream := NewJSONStream(records) - go jsonStream.Start(gzreader, 3) + go jsonStream.Start(gzreader, parser.skipHeaderCount) // Stream json file for record := range jsonStream.records { if record.Error != nil { @@ -123,17 +161,17 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io. for scanner.Scan() { log_line := scanner.Text() lineCount++ - if lineCount == 1 && skipHeader { + if lineCount <= parser.skipHeaderCount { continue } if printLogLine { fmt.Println(log_line) } - match := timestampRegex.FindStringSubmatch(log_line) timestamp := time.Now() + match := parser.timestampRegex.FindStringSubmatch(log_line) if len(match) > 0 { - timestamp, err = time.Parse(time.RFC3339, match[1]) + timestamp, err = time.Parse(parser.timestampFormat, match[1]) if err != nil { return err } @@ -151,24 +189,23 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io. } func getLabels(record events.S3EventRecord) (map[string]string, error) { + labels := make(map[string]string) labels["key"] = record.S3.Object.Key labels["bucket"] = record.S3.Bucket.Name labels["bucket_owner"] = record.S3.Bucket.OwnerIdentity.PrincipalID labels["bucket_region"] = record.AWSRegion - var matchingExp *regexp.Regexp var matchingType *string - for key, exp := range filenameRegexes { - if exp.MatchString(labels["key"]) { - matchingExp = exp + for key, p := range parsers { + if p.filenameRegex.MatchString(labels["key"]) { matchingType = aws.String(key) - } - } - match := matchingExp.FindStringSubmatch(labels["key"]) - for i, name := range matchingExp.SubexpNames() { - if i != 0 && name != "" { - labels[name] = match[i] + match := p.filenameRegex.FindStringSubmatch(labels["key"]) + for i, name := range p.filenameRegex.SubexpNames() { + if i != 0 && name != "" { + labels[name] = match[i] + } + } } } if labels["type"] == "" { diff --git a/tools/lambda-promtail/lambda-promtail/s3_test.go b/tools/lambda-promtail/lambda-promtail/s3_test.go index 83f1161b6d956..18f808825799b 100644 --- a/tools/lambda-promtail/lambda-promtail/s3_test.go +++ b/tools/lambda-promtail/lambda-promtail/s3_test.go @@ -89,6 +89,39 @@ func Test_getLabels(t *testing.T) { }, wantErr: false, }, + { + name: "cloudtrail_digest_logs", + args: args{ + record: events.S3EventRecord{ + AWSRegion: "us-east-1", + S3: events.S3Entity{ + Bucket: events.S3Bucket{ + Name: "cloudtrail_digest_logs_test", + OwnerIdentity: events.S3UserIdentity{ + PrincipalID: "test", + }, + }, + Object: events.S3Object{ + Key: "my-bucket/AWSLogs/123456789012/CloudTrail-Digest/us-east-1/2022/01/24/123456789012_CloudTrail-Digest_us-east-1_20220124T0000Z_4jhzXFO2Jlvu2b3y.json.gz", + }, + }, + }, + }, + want: map[string]string{ + "account_id": "123456789012", + "bucket": "cloudtrail_digest_logs_test", + "bucket_owner": "test", + "bucket_region": "us-east-1", + "day": "24", + "key": "my-bucket/AWSLogs/123456789012/CloudTrail-Digest/us-east-1/2022/01/24/123456789012_CloudTrail-Digest_us-east-1_20220124T0000Z_4jhzXFO2Jlvu2b3y.json.gz", + "month": "01", + "region": "us-east-1", + "src": "4jhzXFO2Jlvu2b3y", + "type": CLOUDTRAIL_DIGEST_LOG_TYPE, + "year": "2022", + }, + wantErr: false, + }, { name: "cloudtrail_logs", args: args{ @@ -122,6 +155,38 @@ func Test_getLabels(t *testing.T) { }, wantErr: false, }, + { + name: "s3_cloudfront", + args: args{ + record: events.S3EventRecord{ + AWSRegion: "us-east-1", + S3: events.S3Entity{ + Bucket: events.S3Bucket{ + Name: "cloudfront_logs_test", + OwnerIdentity: events.S3UserIdentity{ + PrincipalID: "test", + }, + }, + Object: events.S3Object{ + Key: "my/bucket/prefix/E2K2LNL5N3WR51.2022-07-18-12.a10a8496.gz", + }, + }, + }, + }, + want: map[string]string{ + "bucket": "cloudfront_logs_test", + "bucket_owner": "test", + "bucket_region": "us-east-1", + "day": "18", + "key": "my/bucket/prefix/E2K2LNL5N3WR51.2022-07-18-12.a10a8496.gz", + "month": "07", + "prefix": "my/bucket/prefix", + "src": "E2K2LNL5N3WR51", + "type": CLOUDFRONT_LOG_TYPE, + "year": "2022", + }, + wantErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -149,6 +214,7 @@ func Test_parseS3Log(t *testing.T) { name string args args wantErr bool + expectedLen int expectedStream string }{ { @@ -165,6 +231,7 @@ func Test_parseS3Log(t *testing.T) { "account_id": "123456789", }, }, + expectedLen: 1, expectedStream: `{__aws_log_type="s3_vpc_flow", __aws_s3_vpc_flow="source", __aws_s3_vpc_flow_owner="123456789"}`, wantErr: false, }, @@ -182,6 +249,7 @@ func Test_parseS3Log(t *testing.T) { "account_id": "123456789", }, }, + expectedLen: 1, expectedStream: `{__aws_log_type="s3_lb", __aws_s3_lb="source", __aws_s3_lb_owner="123456789"}`, wantErr: false, }, @@ -199,9 +267,46 @@ func Test_parseS3Log(t *testing.T) { "account_id": "123456789", }, }, + expectedLen: 1, expectedStream: `{__aws_log_type="s3_cloudtrail", __aws_s3_cloudtrail="source", __aws_s3_cloudtrail_owner="123456789"}`, wantErr: false, }, + { + name: "cloudtrail_digest_logs", + args: args{ + batchSize: 131072, // Set large enough we don't try and send to promtail + filename: "../testdata/cloudtrail-log-file.json.gz", + b: &batch{ + streams: map[string]*logproto.Stream{}, + }, + labels: map[string]string{ + "type": CLOUDTRAIL_DIGEST_LOG_TYPE, + "src": "source", + "account_id": "123456789", + }, + }, + expectedLen: 0, + expectedStream: ``, + wantErr: false, + }, + { + name: "cloudfrontlogs", + args: args{ + batchSize: 131072, // Set large enough we don't try and send to promtail + filename: "../testdata/cloudfront.log.gz", + b: &batch{ + streams: map[string]*logproto.Stream{}, + }, + labels: map[string]string{ + "type": CLOUDFRONT_LOG_TYPE, + "src": "DISTRIBUTIONID", + "prefix": "path/to/file", + }, + }, + expectedLen: 1, + expectedStream: `{__aws_log_type="s3_cloudfront", __aws_s3_cloudfront="DISTRIBUTIONID", __aws_s3_cloudfront_owner="path/to/file"}`, + wantErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -214,10 +319,12 @@ func Test_parseS3Log(t *testing.T) { if err := parseS3Log(context.Background(), tt.args.b, tt.args.labels, tt.args.obj); (err != nil) != tt.wantErr { t.Errorf("parseS3Log() error = %v, wantErr %v", err, tt.wantErr) } - require.Len(t, tt.args.b.streams, 1) - stream, ok := tt.args.b.streams[tt.expectedStream] - require.True(t, ok, "batch does not contain stream: %s", tt.expectedStream) - require.NotNil(t, stream) + require.Len(t, tt.args.b.streams, tt.expectedLen) + if tt.expectedStream != "" { + stream, ok := tt.args.b.streams[tt.expectedStream] + require.True(t, ok, "batch does not contain stream: %s", tt.expectedStream) + require.NotNil(t, stream) + } }) } } diff --git a/tools/lambda-promtail/testdata/cloudfront.log.gz b/tools/lambda-promtail/testdata/cloudfront.log.gz new file mode 100644 index 0000000000000000000000000000000000000000..c37a63d55a6c4f68d4905fdd32b955e41fd91530 GIT binary patch literal 781 zcmV+o1M>VIiwFpt8gFC(17mD&b!298Z*FuhY;R`()l^+?+d2$=_I`zdVL%H^vL)L| zn_&-WmL)}!ty_|&7zPxo&BSVD%ai0Z*{{D;vfl3E;(lOngJ7AYNFMTt_RDK9R_eMJ zG8(^l`Cfu5Y%eT?gV4ze!X1HP14QXuIP?o`PVV*&Y?u?-gt;XZxKbA=os6}0r{2^W zNvwm)J6k7QYyI(OSc3uMJx-Pu4C!y6eh&E7g4z4Cfz}TaPM?qs*j72>_6V;FyLn-WG~mXLz4cc zD>&SQ5%uPPjLH90p)OwVm?tDo3GW1Px5u*{V?qC7z!({@fN{>VG++tk38O5djGqUi z;UXxKmTz}@twqWmqq)*;u{H=vYj8o-7+A|)WOE}MNAGRG<2WdtYiuuys=a6$iluC1 zy`hD!gsf?=`FXqt<)F!D!mKyb~Z zwZV3BDZj{+#J96#d%gOQbRU4RJk5|ijb@}e5{c_FhxFI)f&DC`E!94CS-oE`0 zmJWObEY4UEQ+9riB1NKOR5*M6b?2L-S%3qv54~Bzx<01Jx@00#;&iEM_ z4Q6CM?(?j3EO)$ETt(Q_p@Ecj(sQ5xj2};tdnJ!wXml^O;&?}R)2x#mLjHXsc$Z-< zzd8_3N7~_M^Lx+3J@4++VEFO