Sending Data to an Amazon Kinesis Data Firehose Delivery Stream
Kinesis Agent for Microsoft Windows
- Case 1: Linux agent
- Case 2: Windows agent
-
Prepare your system follow the prerequisites guide. Here I use the EC2 running Amazon Linux as example
-
Credentials, follow up the guide to set the AWS Credentials
aws sts get-caller-identity
- Create the S3 bucket and Kiensis Firehose deliver stream kinsis-firehose-s3
- S3 bucket prefix
rawdata/iot_firehose/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/
- S3 bucket error prefix
rawdata/iot_firehose/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/!{firehose:error-output-type}
- buffer 5MB size or interval 60 seconds
- Download and Install the Agent
sudo yum install –y aws-kinesis-agent
- Configure and Start the Agent
- Create or Edit
/etc/aws-kinesis/agent.json
, I use the Ningxia region cn-northwest-1 as example
{
"cloudwatch.emitMetrics": true,
"cloudwatch.endpoint": "monitoring.cn-northwest-1.amazonaws.com.cn",
"firehose.endpoint": "firehose.cn-northwest-1.amazonaws.com.cn",
"flows": [
{
"filePattern": "/tmp/iot-app.log*",
"deliveryStream": "iot-data-collector"
}
]
}
- Start the agent manually
sudo service aws-kinesis-agent start
Agent activity is logged in /var/log/aws-kinesis-agent/aws-kinesis-agent.log
.
tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log
2021-01-29 03:04:21.538+0000 (Agent STARTING) com.amazon.kinesis.streaming.agent.Agent [INFO] Agent: Starting tailer for file fh:iot-data-collector:/tmp/iot-app.log*
2021-01-29 03:04:21.561+0000 (FileTailer[fh:iot-data-collector:/tmp/iot-app.log*]) com.amazon.kinesis.streaming.agent.tailing.FirehoseParser [INFO] FirehoseParser[fh:iot-data-collector:/tmp/iot-app.log*]: Opening /tmp/iot-app.log for parsing.
2021-01-29 03:04:21.570+0000 (Agent STARTING) com.amazon.kinesis.streaming.agent.Agent [INFO] Agent: Startup completed in 37 ms.
2021-01-29 03:04:51.573+0000 (Agent.MetricsEmitter RUNNING) com.amazon.kinesis.streaming.agent.Agent [INFO] Agent: Progress: 30 records parsed (43790 bytes), and 0 records sent successfully to destinations. Uptime: 30041ms
.....
2021-01-29 03:05:51.574+0000 (cw-metrics-publisher) com.amazon.kinesis.streaming.agent.metrics.CWPublisherRunnable [INFO] Successfully published 5 datums.
2021-01-29 03:05:51.574+0000 (FileTailer[fh:iot-data-collector:/tmp/iot-app.log*].MetricsEmitter RUNNING) com.amazon.kinesis.streaming.agent.tailing.FileTailer [INFO] FileTailer[fh:iot-data-collector:/tmp/iot-app.log*]: Tailer Progress: Tailer has parsed 150 records (64840 bytes), transformed 0 records, skipped 0 records, and has successfully sent 122 records to destination.
2021-01-29 03:05:51.576+0000 (Agent.MetricsEmitter RUNNING) com.amazon.kinesis.streaming.agent.Agent [INFO] Agent: Progress: 150 records parsed (64840 bytes), and 122 records sent successfully to destinations. Uptime: 90040ms
(Optional) Configure the agent to start on system startup:
sudo chkconfig aws-kinesis-agent on
Here use python3 generate the dummy logs
python scripts/dummy-logs.py
Stream JSON Log Files to Amazon S3 Using Kinesis Agent for Windows
- Prepare your system follow the prerequisites guide. Here I use the EC2 running Windows Server 2019 as example
Check the Microsoft .NET Framework >= 4.6
[System.Version](
(Get-ChildItem 'HKLM:\SOFTWARE\Microsoft\NET Framework Setup\NDP' -recurse `
| Get-ItemProperty -Name Version -ErrorAction SilentlyContinue `
| Where-Object { ($_.PSChildName -match 'Full') } `
| Select-Object Version | Sort-Object -Property Version -Descending)[0]).Version
- Configure AWS Services. For China region, please change the
arn:aws
toarn:aws-cn
- Configure IAM Policies and Roles
- Create the Amazon S3 Bucket
- Create the Kinesis Data Firehose Delivery Stream
- S3 bucket prefix
rawdata/iot_firehose/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/
- S3 bucket error prefix
rawdata/iot_firehose/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/!{firehose:error-output-type}
- buffer 5MB size or interval 60 seconds
-
On the instance, use Windows Server Manager to disable Microsoft Internet Explorer Enhanced Security Configuration for users and administrators. For more information, see How To Turn Off Internet Explorer Enhanced Security Configuration on the Microsoft TechNet website.
-
Installing Kinesis Agent for Windows
-
Install from AWS Systems Manager - Global region
-
Run a PowerShell script - China region as below
-
Install, you can choice one of installation type
# Install using MSI (Recommended)
Download from https://s3-us-west-2.amazonaws.com/kinesis-agent-windows/downloads/index.html
msiexec /i AWSKinesisTap.1.1.216.4.msi /q
# Install using PowerShell
Invoke-Expression ((New-Object System.Net.WebClient).DownloadString('https://s3-us-west-2.amazonaws.com/kinesis-agent-windows/downloads/InstallKinesisAgent.ps1'))
# Install locally
PowerShell.exe -File ".\InstallKinesisAgent.ps1"
PowerShell.exe -File ".\InstallKinesisAgent.ps1" -version "version"
- Configure
%PROGRAMFILES%\Amazon\AWSKinesisTap\appsettings.json
{
"Sources": [
{
"Id": "JsonLogSource",
"SourceType": "DirectorySource",
"RecordParser": "SingleLineJson",
"Directory": "C:\\LogSource\\",
"FileNameFilter": "*.log",
"InitialPosition": 0
}
],
"Sinks": [
{
"Id": "FirehoseLogStream",
"SinkType": "KinesisFirehose",
"StreamName": "iot-data-collector",
"Region": "cn-northwest-1",
"Format": "json",
"ObjectDecoration": "ComputerName={ComputerName};DT={timestamp:yyyy-MM-dd HH:mm:ss}"
}
],
"Pipes": [
{
"Id": "JsonLogSourceToFirehoseLogStream",
"SourceRef": "JsonLogSource",
"SinkRef": "FirehoseLogStream"
}
]
}
- Update the AWSKinesisTap.exe.config file in the
%PROGRAMFILES%\Amazon\AWSKinesisTap
directory to specify the name of the AWS profile. More details, please refer: Sink Security Configuration
<configuration>
<appSettings>
<add key="AWSProfileName" value="development"/>
<add key="AWSProfilesLocation" value="C:\Users\USERNAME\.aws\credentials"/>
</appSettings>
</configuration>
- Use the proxy
<configuration>
<aws>
<proxy
host="proxy"
port="3128" />
</aws>
<appSettings>
<add key="AWSProfileName" value="development"/>
<add key="AWSProfilesLocation" value="C:\Users\USERNAME\.aws\credentials"/>
</appSettings>
</configuration>
- Create directory:
C:\LogSource
and create c:\LogSource\windows-iot-app.log
python scripts/dummy-logs-windows.py
- Start Agent
# To start the agent:
Start-Service -Name AWSKinesisTap
# You can make sure the agent is running:
Get-Service -Name AWSKinesisTap
Status Name DisplayName
------ ---- -----------
Running AWSKinesisTap Amazon Kinesis Agent for Microsoft ...
# To stop the agent:
Stop-Service -Name AWSKinesisTap
- Viewing the Amazon Kinesis Agent for Windows log file
The agent writes its logs to C:\ProgramData\Amazon\AWSKinesisTap\logs\KinesisTap.log.
CREATE EXTERNAL TABLE iotlogs (
Critical int,
AlertMessage string,
AlertCount int,
Device string,
EventTime timestamp,
ComputerName string,
DT timestamp
)
PARTITIONED BY(year string, month string, day string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://ray-datalake-lab/rawdata/iot_firehose/'
MSCK REPAIR TABLE iotlogs;
SELECT * FROM "blogdb"."iotlogs" limit 10;
SELECT * FROM "blogdb"."iotlogs" WHERE Critical = 1
Kinesis Agent right now do not support the gz,zip compression file. Here is tracking issue. awslabs/amazon-kinesis-agent#37 From the source code, these files have been ignored https://github.com/awslabs/amazon-kinesis-agent/blob/master/src/com/amazon/kinesis/streaming/agent/tailing/SourceFile.java#L124
- Using the rar as package format and Kinesis Agent setting as below. However the Kinesis Agent will use the base64 encoding the stream. So you need decode the file on S3 bucket before use it.
{
"Sources": [
{
"Id": "JsonLogSource",
"SourceType": "DirectorySource",
"RecordParser": "SingleLine",
"Directory": "C:\\LogSource\\",
"InitialPosition": 0
}
],
"Sinks": [
{
"Id": "FirehoseLogStream",
"SinkType": "KinesisFirehose",
"StreamName": "iot-data-collector",
"Region": "cn-northwest-1",
"ObjectDecoration": "ComputerName={ComputerName};DT={timestamp:yyyy-MM-dd HH:mm:ss}"
}
],
"Pipes": [
{
"Id": "JsonLogSourceToFirehoseLogStream",
"SourceRef": "JsonLogSource",
"SinkRef": "FirehoseLogStream"
}
],
"SelfUpdate": 0 //minutes
}