Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add input for Cloudwatch logs via Kinesis #13317

Merged

Conversation

kvch
Copy link
Contributor

@kvch kvch commented Aug 22, 2019

New experimental input is added to Functionbeat to read logs from Cloudwatch coming through Kinesis.

I have tested the input manually.

Configuration

The configuration is similar to the existing Kinesis function. It has two additional options, base64_encoded and compressed. If these options are set first base64 decode takes place, then gzip decompression. If your data is just gzipped, only set compressed option.

  # Create a function that accepts Cloudwatch logs from Kinesis streams.
  - name: cloudwatch-logs-kinesis
    enabled: false
    type: cloudwatch_logs_kinesis

    # Description of the method to help identify them when you run multiples functions.
    description: "lambda function for Cloudwatch logs in Kinesis events"

    # Set base64_encoded if your data is base64 encoded.
    #base64_encoded: false

    # Set compressed if your data is compressed with gzip.
    #compressed: true

    # Concurrency, is the reserved number of instances for that function.
    # Default is 5.
    #
    # Note: There is a hard limit of 1000 functions of any kind per account.
    #concurrency: 5

    # The maximum memory allocated for this function, the configured size must be a factor of 64.
    # There is a hard limit of 3008MiB for each function. Default is 128MiB.
    #memory_size: 128MiB

    # Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue.
    #dead_letter_config.target_arn:

    # Execution role of the function.
    #role: arn:aws:iam::123456789012:role/MyFunction

    # Connect to private resources in an Amazon VPC.
    #virtual_private_cloud:
    #  security_group_ids: []
    #  subnet_ids: []

    # Optional fields that you can specify to add additional information to the
    # output. Fields can be scalar values, arrays, dictionaries, or any nested
    # combination of these.
    #fields:
    #  env: staging

    # Define custom processors for this function.
    #processors:
    #  - decode_json_fields:
    #      fields: ["message"]
    #      process_array: false
    #      max_depth: 1
    #      target: ""
    #      overwrite_keys: false

    # List of Kinesis streams.
    triggers:
        # Arn for the Kinesis stream.
      - event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents

        # batch_size is the number of events read in a batch.
        # Default is 10.
        #batch_size: 100

        # Starting position is where to start reading events from the Kinesis stream.
        # Default is trim_horizon.
        #starting_position: "trim_horizon"

TODO

  • better PR description
  • more tests

return nil, err
}
kinesisData = outBuf.Bytes()
}
Copy link
Contributor

@ph ph Aug 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kvch we need this #13291 PR in that release.

I thought the content was base64 encoded? In your code this doesn't appear to be the case? How did you test it? Other than that, it look good.

Copy link
Contributor Author

@kvch kvch Aug 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is what I expected also. (That's why the development took longer than expected.) But when I tested it with a Kinesis stream receiving events from a Log stream, I got gzipped data. I tried to find documentation and all I got is this:

You can use CloudWatch Logs subscription feature to stream data from CloudWatch Logs to Kinesis Data Firehose. All log events from CloudWatch Logs are already compressed in gzip format, so you should keep Firehose’s compression configuration as uncompressed to avoid double-compression.
But this is about Data Firehose and I used Data Streams, so I guess it applies to all of Kinesis services.

I will spend a bit more time about finding out more about compression in Kinesis and Cloudwatch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a compromise, I added a new option base64_encoded so we support both my experience with Kinesis and the documentation.

@kvch
Copy link
Contributor Author

kvch commented Aug 23, 2019

Failing tests are unrelated.

Copy link
Contributor

@ph ph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes LGTM, we will need to add a followup docs PR after, but lets get that merge for FF.

@kvch kvch force-pushed the feature-functionbeat-cloudwatch-through-kinesis branch from dfab075 to 10fc0af Compare August 27, 2019 14:18
@kvch
Copy link
Contributor Author

kvch commented Aug 27, 2019

Failing tests are unrelated.

@kvch kvch merged commit fa17a55 into elastic:master Aug 27, 2019
@acchen97
Copy link

@kvch @dedemorton when going through the documentation, it was a bit difficult to find what inputs Functionbeat has. Perhaps I'm missing something, but I only see it in the functionbeat.reference.yml section. Would it make sense for us to have a dedicated section that enumerates the available function input types?

@dedemorton
Copy link
Contributor

dedemorton commented Sep 22, 2019

@acchen97 Yah now that we have more than a couple of inputs, it's probably worth breaking them out into separate sections like we do for Filebeat. So we might have:

Configure functions
   Cloudwatch input
   Sqs input
   Kinesis input

Is that what you had in mind?

(We can also change Configure functions to Configure inputs, but I'm not sure if we want to do that.)

@acchen97
Copy link

@dedemorton yep, that's what I was thinking (enhanced list below). I think the current function terminology is probably fine. /cc @urso

Configure functions
   Cloudwatch Logs
   Cloudwatch Logs Kinesis
   SQS
   Kinesis

@ravinaik1312
Copy link
Contributor

Does this solve for #12442? If I understand the code and read the discussion correctly, I think it should solve for it such that multiple records from the array of events from cloudwatch indeed are transformed into individual documents in ES.

@ravinaik1312
Copy link
Contributor

I just did a quick test with cloudwatch-logs-kinesis in the functionbeat configuration and it doesn't look it splits the records into individual documents into ES.

@ppf2
Copy link
Member

ppf2 commented Apr 21, 2020

We will work on updating the documentation. The type value for this new feature is actually type: cloudwatch_logs_kinesis :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants