Skip to content

Commit

Permalink
Add support for Amazon Kinesis Client 2.x (#62)
Browse files Browse the repository at this point in the history
Added support for Enhanced Fan-Out and KCL 2.x.

Added a new interface for record processors that matches the
interface, `ShardRecordProcessor`, from the Java KCL.

Removed the state machine implementation for a simpler implementation.

Added support for configuring logging to kcl-bootstrap using the
`--log-configuration` option.
  • Loading branch information
pfifer authored Mar 6, 2019
1 parent 15cec60 commit a2be81a
Show file tree
Hide file tree
Showing 34 changed files with 482 additions and 566 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
*.log
.idea
package-lock.json

43 changes: 17 additions & 26 deletions Gruntfile.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
/***
Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Amazon Software License (the "License").
You may not use this file except in compliance with the License.
A copy of the License is located at
Licensed under the Amazon Software License (the "License").
You may not use this file except in compliance with the License.
A copy of the License is located at
http://aws.amazon.com/asl/
http://aws.amazon.com/asl/
or in the "license" file accompanying this file. This file is distributed
on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the specific language governing
permissions and limitations under the License.
***/
or in the "license" file accompanying this file. This file is distributed
on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the specific language governing
permissions and limitations under the License.
***/

'use strict';

Expand Down Expand Up @@ -59,12 +59,6 @@ module.exports = function(grunt) {
},
src: ['build']
},
coverage: {
options: {
force: true
},
src: ['coverage']
},
doc: {
options: {
force: true
Expand All @@ -77,23 +71,20 @@ module.exports = function(grunt) {
test: {
options: {
reporter: 'spec',
require: ['test/unit_tests_bootstrap'],
clearRequireCache: true
clearRequireCache: true,
},
src: ['test/**/*_tests.js']
},
html: mochaCoverageOptions('html-cov', 'coverage/index.html'),
json: mochaCoverageOptions('json-cov', 'coverage/javascript.coverage.json'),
}
},

jsdoc : {
dist : {
jsdoc: {
dist: {
src: ['index.js', 'lib/**/*.js', 'README.md'],
jsdoc: './node_modules/grunt-jsdoc/node_modules/jsdoc/jsdoc',
jsdoc: 'node_modules/grunt-jsdoc/node_modules/jsdoc/jsdoc',
options: {
destination: 'doc',
configure: './conf/jsdoc.conf.json',
template: './node_modules/grunt-jsdoc/node_modules/ink-docstrap/template'
configure: 'conf/jsdoc.conf.json',
template: 'node_modules/ink-docstrap/template'
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion NOTICE.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Amazon Kinesis Client Library for Node.js
Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
60 changes: 31 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ var recordProcessor = {
* array of records that are to be processed. Looks like -
* {"records":[<record>, <record>], "checkpointer":<Checkpointer>}
* where <record> format is specified above.
* @param {Checkpointer} processRecordsInput.checkpointer - A checkpointer
* which accepts a `string` or `null` sequence number and a
* callback.
* @param {callback} completeCallback - The callback that must be invoked
* once all records are processed and checkpoint (optional) is
* complete.
Expand Down Expand Up @@ -106,38 +103,43 @@ var recordProcessor = {
},

/**
* Called by KCL to indicate that this record processor should shut down.
* After shutdown operation is complete, there will not be any more calls to
* any other functions of this record processor. Note that reason
* could be either TERMINATE or ZOMBIE. If ZOMBIE, clients should not
* checkpoint because there is possibly another record processor which has
* acquired the lease for this shard. If TERMINATE, then
* `checkpointer.checkpoint()` should be called to checkpoint at the end of
* the shard so that this processor will be shut down and new processors
* will be created for the children of this shard.
*
* @param {object} shutdownInput - Shutdown information. Looks like -
* {"reason":"<TERMINATE|ZOMBIE>", "checkpointer":<Checkpointer>}
* @param {Checkpointer} shutdownInput.checkpointer - A checkpointer which
* accepts a `string` or `null` sequence number and a callback.
* @param {callback} completeCallback - The callback that must be invoked
* once shutdown-related operations are complete and checkpoint
* (optional) is complete.
*/
shutdown: function(shutdownInput, completeCallback) {
// Shutdown logic ...
* Called by the KCL to indicate that this record processor should shut down.
* After the lease lost operation is complete, there will not be any more calls to
* any other functions of this record processor. Clients should not attempt to
* checkpoint because the lease has been lost by this Worker.
*
* @param {object} leaseLostInput - Lease lost information.
* @param {callback} completeCallback - The callback must be invoked once lease
* lost operations are completed.
*/
leaseLost: function(leaseLostInput, completeCallback) {
// Lease lost logic ...
completeCallback();
},

if (shutdownInput.reason !== 'TERMINATE') {
completeCallback();
return;
}
/**
* Called by the KCL to indicate that this record processor should shutdown.
* After the shard ended operation is complete, there will not be any more calls to
* any other functions of this record processor. Clients are required to checkpoint
* at this time. This indicates that the current record processor has finished
* processing and new record processors for the children will be created.
*
* @param {object} shardEndedInput - ShardEnded information. Looks like -
* {"checkpointer": <Checpointer>}
* @param {callback} completeCallback - The callback must be invoked once shard
* ended operations are completed.
*/
shardEnded: function(shardEndedInput, completeCallback) {
// Shard end logic ...

// Since you are checkpointing, only call completeCallback once the checkpoint
// operation is complete.
shutdownInput.checkpointer.checkpoint(function(err) {
// In this example, regardless of error, we mark the shutdown operation
shardEndedInput.checkpointer.checkpoint(function(err) {
// In this example, regardless of the error, we mark the shutdown operation
// complete.
completeCallback();
});
completeCallback();
}
};

Expand Down
98 changes: 72 additions & 26 deletions bin/kcl-bootstrap
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env node

/***
Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Amazon Software License (the "License").
You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,59 +29,97 @@ var util = require('util');


var MAVEN_PACKAGE_LIST = [
getMavenPackageInfo('com.amazonaws', 'amazon-kinesis-client', '1.9.3'),
getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-dynamodb', '1.11.438'),
getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-s3', '1.11.438'),
getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-kms', '1.11.438'),
getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-core', '1.11.438'),
getMavenPackageInfo('org.apache.httpcomponents', 'httpclient', '4.5.5'),
getMavenPackageInfo('org.apache.httpcomponents', 'httpcore', '4.4.9'),
getMavenPackageInfo('commons-codec', 'commons-codec', '1.10'),
getMavenPackageInfo('software.amazon.ion', 'ion-java', '1.0.2'),
getMavenPackageInfo('com.fasterxml.jackson.core', 'jackson-databind', '2.6.7.1'),
getMavenPackageInfo('com.fasterxml.jackson.core', 'jackson-annotations', '2.6.0'),
getMavenPackageInfo('com.fasterxml.jackson.core', 'jackson-core', '2.6.7'),
getMavenPackageInfo('com.fasterxml.jackson.dataformat', 'jackson-dataformat-cbor', '2.6.7'),
getMavenPackageInfo('joda-time', 'joda-time', '2.8.1'),
getMavenPackageInfo('com.amazonaws', 'jmespath-java', '1.11.438'),
getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-kinesis', '1.11.438'),
getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-cloudwatch', '1.11.438'),
getMavenPackageInfo('software.amazon.kinesis', 'amazon-kinesis-client-multilang', '2.1.2'),
getMavenPackageInfo('software.amazon.kinesis', 'amazon-kinesis-client', '2.1.2'),
getMavenPackageInfo('software.amazon.awssdk', 'kinesis', '2.4.0'),
getMavenPackageInfo('software.amazon.awssdk', 'aws-cbor-protocol', '2.4.0'),
getMavenPackageInfo('com.fasterxml.jackson.dataformat', 'jackson-dataformat-cbor', '2.9.8'),
getMavenPackageInfo('software.amazon.awssdk', 'aws-json-protocol', '2.4.0'),
getMavenPackageInfo('software.amazon.awssdk', 'dynamodb', '2.4.0'),
getMavenPackageInfo('software.amazon.awssdk', 'cloudwatch', '2.4.0'),
getMavenPackageInfo('software.amazon.awssdk', 'netty-nio-client', '2.4.0'),
getMavenPackageInfo('io.netty', 'netty-codec-http', '4.1.32.Final'),
getMavenPackageInfo('io.netty', 'netty-codec-http2', '4.1.32.Final'),
getMavenPackageInfo('io.netty', 'netty-codec', '4.1.32.Final'),
getMavenPackageInfo('io.netty', 'netty-transport', '4.1.32.Final'),
getMavenPackageInfo('io.netty', 'netty-resolver', '4.1.32.Final'),
getMavenPackageInfo('io.netty', 'netty-common', '4.1.32.Final'),
getMavenPackageInfo('io.netty', 'netty-buffer', '4.1.32.Final'),
getMavenPackageInfo('io.netty', 'netty-handler', '4.1.32.Final'),
getMavenPackageInfo('io.netty', 'netty-transport-native-epoll', '4.1.32.Final'),
getMavenPackageInfo('io.netty', 'netty-transport-native-unix-common', '4.1.32.Final'),
getMavenPackageInfo('com.typesafe.netty', 'netty-reactive-streams-http', '2.0.0'),
getMavenPackageInfo('com.typesafe.netty', 'netty-reactive-streams', '2.0.0'),
getMavenPackageInfo('org.reactivestreams', 'reactive-streams', '1.0.2'),
getMavenPackageInfo('com.google.guava', 'guava', '26.0-jre'),
getMavenPackageInfo('com.google.code.findbugs', 'jsr305', '3.0.2'),
getMavenPackageInfo('org.checkerframework', 'checker-qual', '2.5.2'),
getMavenPackageInfo('com.google.errorprone', 'error_prone_annotations', '2.1.3'),
getMavenPackageInfo('com.google.j2objc', 'j2objc-annotations', '1.1'),
getMavenPackageInfo('org.codehaus.mojo', 'animal-sniffer-annotations', '1.14'),
getMavenPackageInfo('com.google.protobuf', 'protobuf-java', '2.6.1'),
getMavenPackageInfo('org.apache.commons', 'commons-lang3', '3.7'),
getMavenPackageInfo('commons-logging', 'commons-logging', '1.1.3')
getMavenPackageInfo('org.apache.commons', 'commons-lang3', '3.8.1'),
getMavenPackageInfo('org.slf4j', 'slf4j-api', '1.7.25'),
getMavenPackageInfo('io.reactivex.rxjava2', 'rxjava', '2.1.14'),
getMavenPackageInfo('software.amazon.awssdk', 'sts', '2.4.0'),
getMavenPackageInfo('software.amazon.awssdk', 'aws-query-protocol', '2.4.0'),
getMavenPackageInfo('software.amazon.awssdk', 'protocol-core', '2.4.0'),
getMavenPackageInfo('software.amazon.awssdk', 'profiles', '2.4.0'),
getMavenPackageInfo('software.amazon.awssdk', 'sdk-core', '2.4.0'),
getMavenPackageInfo('com.fasterxml.jackson.core', 'jackson-core', '2.9.8'),
getMavenPackageInfo('com.fasterxml.jackson.core', 'jackson-databind', '2.9.8'),
getMavenPackageInfo('software.amazon.awssdk', 'auth', '2.4.0'),
getMavenPackageInfo('software.amazon', 'flow', '1.7'),
getMavenPackageInfo('software.amazon.awssdk', 'http-client-spi', '2.4.0'),
getMavenPackageInfo('software.amazon.awssdk', 'regions', '2.4.0'),
getMavenPackageInfo('com.fasterxml.jackson.core', 'jackson-annotations', '2.9.0'),
getMavenPackageInfo('software.amazon.awssdk', 'annotations', '2.4.0'),
getMavenPackageInfo('software.amazon.awssdk', 'utils', '2.4.0'),
getMavenPackageInfo('software.amazon.awssdk', 'aws-core', '2.4.0'),
getMavenPackageInfo('software.amazon.awssdk', 'apache-client', '2.4.0'),
getMavenPackageInfo('org.apache.httpcomponents', 'httpclient', '4.5.6'),
getMavenPackageInfo('commons-codec', 'commons-codec', '1.10'),
getMavenPackageInfo('org.apache.httpcomponents', 'httpcore', '4.4.10'),
getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-core', '1.11.477'),
getMavenPackageInfo('commons-logging', 'commons-logging', '1.1.3'),
getMavenPackageInfo('software.amazon.ion', 'ion-java', '1.0.2'),
getMavenPackageInfo('joda-time', 'joda-time', '2.8.1'),
getMavenPackageInfo('ch.qos.logback', 'logback-classic', '1.2.3'),
getMavenPackageInfo('ch.qos.logback', 'logback-core', '1.2.3'),
getMavenPackageInfo('com.beust', 'jcommander', '1.72'),
getMavenPackageInfo('commons-io', 'commons-io', '2.6'),
getMavenPackageInfo('org.apache.commons', 'commons-collections4', '4.2'),
getMavenPackageInfo('commons-beanutils', 'commons-beanutils', '1.9.3'),
getMavenPackageInfo('commons-collections', 'commons-collections', '3.2.2')
];

var DEFAULT_JAR_PATH = path.resolve(path.join(__dirname, '..', 'lib', 'jars'));
var MULTI_LANG_DAEMON_CLASS = 'com.amazonaws.services.kinesis.multilang.MultiLangDaemon';
var MULTI_LANG_DAEMON_CLASS = 'software.amazon.kinesis.multilang.MultiLangDaemon';
var MAX_HTTP_REDIRECT_FOLLOW = 3;


function bootstrap() {
var args = parseArguments();
downloadMavenPackages(MAVEN_PACKAGE_LIST, args.jarPath, function(err) {
if (err) {
errorExit(util.format('Unable to download Multi-Language Daemon jar files from maven: %s', err));
errorExit(util.format('Unable to download MultiLangDaemon jar files from maven: %s', err));
}
startKinesisClientLibraryApplication(args);
});
}

function parseArguments() {
program
.option('-p, --properties <properties file>', 'properties file with multi-language daemon options')
.option('-p, --properties [properties file]', 'properties file with multi-language daemon options')
.option('-l, --log-configuration [logback.xml]', 'logback.xml to be used with MultiLangDaemon for logging (optional)')
.option('-j, --java [java path]', 'path to java executable - defaults to using JAVA_HOME environment variable to get java path (optional)')
.option('-c, --jar-path [jar path]', 'path where all multi-language daemon jar files will be downloaded (optional)')
.option('-e, --execute', 'execute the KCL application')
.parse(process.argv);

var args = {
'properties': program.properties,
'logConfiguration': program.logConfiguration ? program.logConfiguration: null,
'java': (program.java ? program.java : (process.env.JAVA_HOME ? path.join(process.env.JAVA_HOME, 'bin', 'java') : null)),
'jarPath': (program.jarPath ? program.jarPath : DEFAULT_JAR_PATH),
'execute': program.execute
Expand All @@ -96,6 +134,9 @@ function parseArguments() {
if (!isFile(args.java)) {
invalidInvocationExit(program, 'Valid --java value is required or alternatively JAVA_HOME environment variable must be set.', true);
}
if (args.logCofiguration && !isFile(args.logConfiguration)) {
invalidInvocationExit(program, args.logConfiguration + ' file does not exists. Specify a valid --log-configuration value', true);
}
if (args.jarPath === DEFAULT_JAR_PATH) {
createDirectory(args.jarPath);
}
Expand All @@ -106,16 +147,21 @@ function parseArguments() {
}

function startKinesisClientLibraryApplication(options) {
var classpath = getClasspath(options).join(getPathDelimiter());
var classpath = '-cp ' + getClasspath(options).join(getPathDelimiter());
var java = options.java;
var args = ['-cp', classpath, MULTI_LANG_DAEMON_CLASS, options.properties];
var propertiesFile = '--properties-file ' + options.properties;
var logConfiguration = '';
if (options.logConfiguration) {
logConfiguration = '--log-configuration ' + options.logConfiguration;
}
var args = [classpath, MULTI_LANG_DAEMON_CLASS, propertiesFile, logConfiguration];
var cmd = java + ' ' + args.join(' ');

console.log("==========================================================");
console.log(cmd);
console.log("==========================================================");
if (options.execute) {
console.log("Starting Multi-Lang Daemon ...");
console.log("Starting MultiLangDaemon ...");
spawn(java, args, { stdio: 'inherit' });
}
}
Expand Down
2 changes: 1 addition & 1 deletion bin/kcl-bootstrap.bat
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
@echo off

REM Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
REM Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.

REM Licensed under the Amazon Software License (the "License").
REM You may not use this file except in compliance with the License.
Expand Down
1 change: 1 addition & 0 deletions conf/.jshintrc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"eqnull" : true,
"strict" : true,
"node" : true,
"esversion" : 6,
"globals" : {
/* MOCHA */
"describe" : false,
Expand Down
2 changes: 1 addition & 1 deletion conf/jsdoc.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"outputSourcePath" : false,
"systemName" : "Amazon Kinesis Client Library in Node.js",
"footer" : "Amazon Kinesis Client Library in Node.js",
"copyright" : "Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.",
"copyright" : "Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.",
"navType" : "vertical",
"theme" : "cosmo",
"linenums" : true,
Expand Down
2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/***
Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Amazon Software License (the "License").
You may not use this file except in compliance with the License.
Expand Down
3 changes: 1 addition & 2 deletions lib/kcl/action_handler.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/***
Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Amazon Software License (the "License").
You may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,6 @@ permissions and limitations under the License.

'use strict';


/**
* @fileoverview
* Marshals and unmarshals actions and delegates them back and forth between the I/O handler
Expand Down
3 changes: 1 addition & 2 deletions lib/kcl/checkpointer.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/***
Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Amazon Software License (the "License").
You may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,6 @@ permissions and limitations under the License.

'use strict';


/**
* @fileoverview
* Allows you to make checkpoint requests. A checkpoint marks a point in a shard until which all records are processed
Expand Down
Loading

0 comments on commit a2be81a

Please sign in to comment.