Skip to content

Commit

Permalink
feat(lambda-promtail): add cloudfront log file ingestion support (#9573)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:

This PR enables ingesting logs from Cloudfront log files stored in s3
(batch).
The current setup only supports streaming Cloudfront logs through AWS
Kinesis, whereas this PR implements the same flow as for VPC Flow logs,
Load Balancer logs, and Cloudtrail logs (s3 --> SQS (optional) -->
Lambda Promtail --> Loki)

**Special notes for your reviewer**:
+ The Cloudfront log file format is different from the already
implemented services, meaning we had to build yet another regex. AWS
never bothered making all services follow the same log file naming
convention but the "good" thing is that it's now very unlikely they will
change it in the future.
+ The Cloudfront file name does not have any mention of the AWS account
or the time of log it contains, it means we have to infer the log type
from the filename format instead of finding the exact string
"cloudfront" in the filename. This is why in `getLabels`, if no `type`
parameter is found in the regex, we use the key corresponding to the
name of the matching parser.
+ I introduced a new `parser` struct to group together several
parameters specific to a type of log (and avoid relying too much on map
key string matching and / or if statements for specific use cases)
+ I've been successfully running this code in several AWS environments
for days.
+ I corrected a typo from my previous PR #9497 (wrong PR number in
Changelog.md)
**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [x] Documentation added
- [x] Tests updated
- [x] `CHANGELOG.md` updated
- [x] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`
- [x] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)

---------

Co-authored-by: Michel Hollands <42814411+MichelHollands@users.noreply.github.com>
  • Loading branch information
CCOLLOT and MichelHollands authored Jun 9, 2023
1 parent c6fbff2 commit f239435
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 47 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

##### Enhancements

* [9573](https://github.com/grafana/loki/pull/9573) **CCOLLOT**: Lambda-Promtail: Add support for AWS CloudFront log ingestion.
* [9497](https://github.com/grafana/loki/pull/9497) **CCOLLOT**: Lambda-Promtail: Add support for AWS CloudTrail log ingestion.
* [8886](https://github.com/grafana/loki/pull/8886) **MichelHollands**: Add new logql template function `unixToTime`
* [8067](https://github.com/grafana/loki/pull/9497) **CCOLLOT**: Lambda-Promtail: Add support for AWS CloudTrail log ingestion.
* [9515](https://github.com/grafana/loki/pull/9515) **MichelHollands**: Fix String() on vector aggregation LogQL expressions that contain `without ()`.
Expand Down
7 changes: 4 additions & 3 deletions docs/sources/clients/lambda-promtail/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,10 @@ This workflow allows ingesting AWS loadbalancer logs stored on S3 to Loki.

This workflow allows ingesting AWS Cloudtrail logs stored on S3 to Loki.

### Cloudfront real-time logs

Cloudfront [real-time logs](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/real-time-logs.html) can be sent to a Kinesis data stream. The data stream can be mapped to be an [event source](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html) for lambda-promtail to deliver the logs to Loki.
### Cloudfront logs
Cloudfront logs can be either batched or streamed in real time to Loki:
+ Logging can be activated on a Cloudfront distribution with an S3 bucket as the destination. In this case, the workflow is the same as for other services (VPC Flow logs, Loadbalancer logs, Cloudtrail logs).
+ Cloudfront [real-time logs](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/real-time-logs.html) can be sent to a Kinesis data stream. The data stream can be mapped to be an [event source](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html) for lambda-promtail to deliver the logs to Loki.

### Triggering Lambda-Promtail via SQS
For AWS services supporting sending messages to SQS (for example, S3 with an S3 Notification to SQS), events can be processed through an [SQS queue using a lambda trigger](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html) instead of directly configuring the source service to trigger lambda. Lambda-promtail will retrieve the nested events from the SQS messages' body and process them as if them came directly from the source service.
Expand Down
117 changes: 77 additions & 40 deletions tools/lambda-promtail/lambda-promtail/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,27 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
)

type parserConfig struct {
// value to use for __aws_log_type label
logTypeLabel string
// regex matching filename and and exporting labels from it
filenameRegex *regexp.Regexp
// regex that extracts the timestamp from the log sample
timestampRegex *regexp.Regexp
// time format to use to convert the timestamp to time.Time
timestampFormat string
// how many lines or jsonToken to skip at the beginning of the file
skipHeaderCount int
// key of the metadata label to use as a value for the__aws_<logType>_owner label
ownerLabelKey string
}

const (
FLOW_LOG_TYPE string = "vpcflowlogs"
LB_LOG_TYPE string = "elasticloadbalancing"
CLOUDTRAIL_LOG_TYPE string = "CloudTrail"
CLOUDTRAIL_DIGEST_LOG_TYPE string = "CloudTrail-Digest"
CLOUDFRONT_LOG_TYPE string = "cloudfront"
)

var (
Expand All @@ -40,15 +56,45 @@ var (
// CloudTrail
// source: https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-log-file-examples.html#cloudtrail-log-filename-format
// example: 111122223333_CloudTrail_us-east-2_20150801T0210Z_Mu0KsOhtH1ar15ZZ.json.gz
defaultFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P<account_id>\d+)\/(?P<type>[a-zA-Z0-9_\-]+)\/(?P<region>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/\d+\_(?:elasticloadbalancing|vpcflowlogs)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?(?P<src>[a-zA-Z0-9\-]+)`)
cloudtrailFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P<account_id>\d+)\/(?P<type>[a-zA-Z0-9_\-]+)\/(?P<region>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/\d+\_(?:CloudTrail|CloudTrail-Digest)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?.+_(?P<src>[a-zA-Z0-9\-]+)`)
filenameRegexes = map[string]*regexp.Regexp{
FLOW_LOG_TYPE: defaultFilenameRegex,
LB_LOG_TYPE: defaultFilenameRegex,
CLOUDTRAIL_LOG_TYPE: cloudtrailFilenameRegex,
// CloudFront
// source https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#AccessLogsFileNaming
// example: example-prefix/EMLARXS9EXAMPLE.2019-11-14-20.RT4KCN4SGK9.gz
defaultFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P<account_id>\d+)\/(?P<type>[a-zA-Z0-9_\-]+)\/(?P<region>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/\d+\_(?:elasticloadbalancing|vpcflowlogs)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?(?P<src>[a-zA-Z0-9\-]+)`)
defaultTimestampRegex = regexp.MustCompile(`\w+ (?P<timestamp>\d+-\d+-\d+T\d+:\d+:\d+\.\d+Z)`)
cloudtrailFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P<account_id>\d+)\/(?P<type>[a-zA-Z0-9_\-]+)\/(?P<region>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/\d+\_(?:CloudTrail|CloudTrail-Digest)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?.+_(?P<src>[a-zA-Z0-9\-]+)`)
cloudfrontFilenameRegex = regexp.MustCompile(`(?P<prefix>.*)\/(?P<src>[A-Z0-9]+)\.(?P<year>\d+)-(?P<month>\d+)-(?P<day>\d+)-(.+)`)
cloudfrontTimestampRegex = regexp.MustCompile(`(?P<timestamp>\d+-\d+-\d+\s\d+:\d+:\d+)`)
parsers = map[string]parserConfig{
FLOW_LOG_TYPE: {
logTypeLabel: "s3_vpc_flow",
filenameRegex: defaultFilenameRegex,
ownerLabelKey: "account_id",
timestampRegex: defaultTimestampRegex,
timestampFormat: time.RFC3339,
skipHeaderCount: 1,
},
LB_LOG_TYPE: {
logTypeLabel: "s3_lb",
filenameRegex: defaultFilenameRegex,
ownerLabelKey: "account_id",
timestampFormat: time.RFC3339,
timestampRegex: defaultTimestampRegex,
},
CLOUDTRAIL_LOG_TYPE: {
logTypeLabel: "s3_cloudtrail",
ownerLabelKey: "account_id",
skipHeaderCount: 3,
filenameRegex: cloudtrailFilenameRegex,
},
CLOUDFRONT_LOG_TYPE: {
logTypeLabel: "s3_cloudfront",
filenameRegex: cloudfrontFilenameRegex,
ownerLabelKey: "prefix",
timestampRegex: cloudfrontTimestampRegex,
timestampFormat: "2006-01-02\x0915:04:05",
skipHeaderCount: 2,
},
}
// regex that extracts the timestamp (RFC3339) from message log
timestampRegex = regexp.MustCompile(`\w+ (?P<timestamp>\d+-\d+-\d+T\d+:\d+:\d+\.\d+Z)`)
)

func getS3Client(ctx context.Context, region string) (*s3.Client, error) {
Expand All @@ -68,32 +114,24 @@ func getS3Client(ctx context.Context, region string) (*s3.Client, error) {
}

func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.ReadCloser) error {
parser, ok := parsers[labels["type"]]
if !ok {
if labels["type"] == CLOUDTRAIL_DIGEST_LOG_TYPE {
return nil
}
return fmt.Errorf("could not find parser for type %s", labels["type"])
}
gzreader, err := gzip.NewReader(obj)
if err != nil {
return err
}

scanner := bufio.NewScanner(gzreader)

skipHeader := false
logType := ""
switch labels["type"] {
case FLOW_LOG_TYPE:
skipHeader = true
logType = "s3_vpc_flow"
case LB_LOG_TYPE:
logType = "s3_lb"
case CLOUDTRAIL_LOG_TYPE:
logType = "s3_cloudtrail"
case CLOUDTRAIL_DIGEST_LOG_TYPE:
// do not ingest digest files' content
return nil
}

ls := model.LabelSet{
model.LabelName("__aws_log_type"): model.LabelValue(logType),
model.LabelName(fmt.Sprintf("__aws_%s", logType)): model.LabelValue(labels["src"]),
model.LabelName(fmt.Sprintf("__aws_%s_owner", logType)): model.LabelValue(labels["account_id"]),
model.LabelName("__aws_log_type"): model.LabelValue(parser.logTypeLabel),
model.LabelName(fmt.Sprintf("__aws_%s", parser.logTypeLabel)): model.LabelValue(labels["src"]),
model.LabelName(fmt.Sprintf("__aws_%s_owner", parser.logTypeLabel)): model.LabelValue(labels[parser.ownerLabelKey]),
}

ls = applyExtraLabels(ls)
Expand All @@ -102,7 +140,7 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
if labels["type"] == CLOUDTRAIL_LOG_TYPE {
records := make(chan Record)
jsonStream := NewJSONStream(records)
go jsonStream.Start(gzreader, 3)
go jsonStream.Start(gzreader, parser.skipHeaderCount)
// Stream json file
for record := range jsonStream.records {
if record.Error != nil {
Expand All @@ -123,17 +161,17 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
for scanner.Scan() {
log_line := scanner.Text()
lineCount++
if lineCount == 1 && skipHeader {
if lineCount <= parser.skipHeaderCount {
continue
}
if printLogLine {
fmt.Println(log_line)
}

match := timestampRegex.FindStringSubmatch(log_line)
timestamp := time.Now()
match := parser.timestampRegex.FindStringSubmatch(log_line)
if len(match) > 0 {
timestamp, err = time.Parse(time.RFC3339, match[1])
timestamp, err = time.Parse(parser.timestampFormat, match[1])
if err != nil {
return err
}
Expand All @@ -151,24 +189,23 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
}

func getLabels(record events.S3EventRecord) (map[string]string, error) {

labels := make(map[string]string)

labels["key"] = record.S3.Object.Key
labels["bucket"] = record.S3.Bucket.Name
labels["bucket_owner"] = record.S3.Bucket.OwnerIdentity.PrincipalID
labels["bucket_region"] = record.AWSRegion
var matchingExp *regexp.Regexp
var matchingType *string
for key, exp := range filenameRegexes {
if exp.MatchString(labels["key"]) {
matchingExp = exp
for key, p := range parsers {
if p.filenameRegex.MatchString(labels["key"]) {
matchingType = aws.String(key)
}
}
match := matchingExp.FindStringSubmatch(labels["key"])
for i, name := range matchingExp.SubexpNames() {
if i != 0 && name != "" {
labels[name] = match[i]
match := p.filenameRegex.FindStringSubmatch(labels["key"])
for i, name := range p.filenameRegex.SubexpNames() {
if i != 0 && name != "" {
labels[name] = match[i]
}
}
}
}
if labels["type"] == "" {
Expand Down
115 changes: 111 additions & 4 deletions tools/lambda-promtail/lambda-promtail/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,39 @@ func Test_getLabels(t *testing.T) {
},
wantErr: false,
},
{
name: "cloudtrail_digest_logs",
args: args{
record: events.S3EventRecord{
AWSRegion: "us-east-1",
S3: events.S3Entity{
Bucket: events.S3Bucket{
Name: "cloudtrail_digest_logs_test",
OwnerIdentity: events.S3UserIdentity{
PrincipalID: "test",
},
},
Object: events.S3Object{
Key: "my-bucket/AWSLogs/123456789012/CloudTrail-Digest/us-east-1/2022/01/24/123456789012_CloudTrail-Digest_us-east-1_20220124T0000Z_4jhzXFO2Jlvu2b3y.json.gz",
},
},
},
},
want: map[string]string{
"account_id": "123456789012",
"bucket": "cloudtrail_digest_logs_test",
"bucket_owner": "test",
"bucket_region": "us-east-1",
"day": "24",
"key": "my-bucket/AWSLogs/123456789012/CloudTrail-Digest/us-east-1/2022/01/24/123456789012_CloudTrail-Digest_us-east-1_20220124T0000Z_4jhzXFO2Jlvu2b3y.json.gz",
"month": "01",
"region": "us-east-1",
"src": "4jhzXFO2Jlvu2b3y",
"type": CLOUDTRAIL_DIGEST_LOG_TYPE,
"year": "2022",
},
wantErr: false,
},
{
name: "cloudtrail_logs",
args: args{
Expand Down Expand Up @@ -122,6 +155,38 @@ func Test_getLabels(t *testing.T) {
},
wantErr: false,
},
{
name: "s3_cloudfront",
args: args{
record: events.S3EventRecord{
AWSRegion: "us-east-1",
S3: events.S3Entity{
Bucket: events.S3Bucket{
Name: "cloudfront_logs_test",
OwnerIdentity: events.S3UserIdentity{
PrincipalID: "test",
},
},
Object: events.S3Object{
Key: "my/bucket/prefix/E2K2LNL5N3WR51.2022-07-18-12.a10a8496.gz",
},
},
},
},
want: map[string]string{
"bucket": "cloudfront_logs_test",
"bucket_owner": "test",
"bucket_region": "us-east-1",
"day": "18",
"key": "my/bucket/prefix/E2K2LNL5N3WR51.2022-07-18-12.a10a8496.gz",
"month": "07",
"prefix": "my/bucket/prefix",
"src": "E2K2LNL5N3WR51",
"type": CLOUDFRONT_LOG_TYPE,
"year": "2022",
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -149,6 +214,7 @@ func Test_parseS3Log(t *testing.T) {
name string
args args
wantErr bool
expectedLen int
expectedStream string
}{
{
Expand All @@ -165,6 +231,7 @@ func Test_parseS3Log(t *testing.T) {
"account_id": "123456789",
},
},
expectedLen: 1,
expectedStream: `{__aws_log_type="s3_vpc_flow", __aws_s3_vpc_flow="source", __aws_s3_vpc_flow_owner="123456789"}`,
wantErr: false,
},
Expand All @@ -182,6 +249,7 @@ func Test_parseS3Log(t *testing.T) {
"account_id": "123456789",
},
},
expectedLen: 1,
expectedStream: `{__aws_log_type="s3_lb", __aws_s3_lb="source", __aws_s3_lb_owner="123456789"}`,
wantErr: false,
},
Expand All @@ -199,9 +267,46 @@ func Test_parseS3Log(t *testing.T) {
"account_id": "123456789",
},
},
expectedLen: 1,
expectedStream: `{__aws_log_type="s3_cloudtrail", __aws_s3_cloudtrail="source", __aws_s3_cloudtrail_owner="123456789"}`,
wantErr: false,
},
{
name: "cloudtrail_digest_logs",
args: args{
batchSize: 131072, // Set large enough we don't try and send to promtail
filename: "../testdata/cloudtrail-log-file.json.gz",
b: &batch{
streams: map[string]*logproto.Stream{},
},
labels: map[string]string{
"type": CLOUDTRAIL_DIGEST_LOG_TYPE,
"src": "source",
"account_id": "123456789",
},
},
expectedLen: 0,
expectedStream: ``,
wantErr: false,
},
{
name: "cloudfrontlogs",
args: args{
batchSize: 131072, // Set large enough we don't try and send to promtail
filename: "../testdata/cloudfront.log.gz",
b: &batch{
streams: map[string]*logproto.Stream{},
},
labels: map[string]string{
"type": CLOUDFRONT_LOG_TYPE,
"src": "DISTRIBUTIONID",
"prefix": "path/to/file",
},
},
expectedLen: 1,
expectedStream: `{__aws_log_type="s3_cloudfront", __aws_s3_cloudfront="DISTRIBUTIONID", __aws_s3_cloudfront_owner="path/to/file"}`,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -214,10 +319,12 @@ func Test_parseS3Log(t *testing.T) {
if err := parseS3Log(context.Background(), tt.args.b, tt.args.labels, tt.args.obj); (err != nil) != tt.wantErr {
t.Errorf("parseS3Log() error = %v, wantErr %v", err, tt.wantErr)
}
require.Len(t, tt.args.b.streams, 1)
stream, ok := tt.args.b.streams[tt.expectedStream]
require.True(t, ok, "batch does not contain stream: %s", tt.expectedStream)
require.NotNil(t, stream)
require.Len(t, tt.args.b.streams, tt.expectedLen)
if tt.expectedStream != "" {
stream, ok := tt.args.b.streams[tt.expectedStream]
require.True(t, ok, "batch does not contain stream: %s", tt.expectedStream)
require.NotNil(t, stream)
}
})
}
}
Binary file added tools/lambda-promtail/testdata/cloudfront.log.gz
Binary file not shown.

0 comments on commit f239435

Please sign in to comment.