diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java index ed48ac276..da280ddf0 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java @@ -19,6 +19,7 @@ import java.net.URI; import java.util.Arrays; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -27,6 +28,7 @@ import java.util.function.Function; import org.apache.commons.beanutils.BeanUtilsBean; +import org.apache.commons.beanutils.ConvertUtils; import org.apache.commons.beanutils.ConvertUtilsBean; import org.apache.commons.beanutils.Converter; import org.apache.commons.beanutils.converters.ArrayConverter; @@ -197,6 +199,14 @@ public MultiLangDaemonConfiguration(BeanUtilsBean utilsBean, ConvertUtilsBean co this.utilsBean = utilsBean; this.convertUtilsBean = convertUtilsBean; + convertUtilsBean.register(new Converter() { + @Override + public T convert(Class type, Object value) { + Date date = new Date(Long.parseLong(value.toString()) * 1000L); + return type.cast(InitialPositionInStreamExtended.newInitialPositionAtTimestamp(date)); + } + }, InitialPositionInStreamExtended.class); + convertUtilsBean.register(new Converter() { @Override public T convert(Class type, Object value) { diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java index 685d83dcc..031fc4272 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java @@ -27,15 +27,15 @@ import java.io.InputStream; import java.net.URI; import java.util.Arrays; +import java.util.Date; import java.util.HashSet; -import java.util.Optional; import java.util.Set; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import org.apache.commons.lang3.StringUtils; -import org.junit.Ignore; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.junit.Rule; import org.junit.Test; @@ -45,11 +45,8 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.kinesis.common.InitialPositionInStream; -import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration; import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; @@ -93,6 +90,47 @@ public void testWithLongVariables() { assertEquals(config.getShardSyncIntervalMillis(), 500); } + @Test + public void testWithInitialPositionInStreamExtended() { + long epochTimeInSeconds = 1617406032; + MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "applicationName = app", + "streamName = 123", "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2, + "initialPositionInStreamExtended = " + epochTimeInSeconds}, '\n')); + + assertEquals(config.getInitialPositionInStreamExtended().getTimestamp(), new Date(epochTimeInSeconds * 1000L)); + assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.AT_TIMESTAMP); + } + + @Test + public void testInvalidInitialPositionInStream() { + // AT_TIMESTAMP cannot be used as initialPositionInStream. If a user wants to specify AT_TIMESTAMP, + // they must specify the time with initialPositionInStreamExtended. + try { + getConfiguration(StringUtils.join(new String[] { "applicationName = app", + "streamName = 123", "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2, + "initialPositionInStream = AT_TIMESTAMP"}, '\n')); + fail("Should have thrown when initialPositionInStream is set to AT_TIMESTAMP"); + } catch (Exception e) { + Throwable rootCause = ExceptionUtils.getRootCause(e); + assertTrue(rootCause instanceof IllegalArgumentException); + } + } + + @Test + public void testInvalidInitialPositionInStreamExtended() { + // initialPositionInStreamExtended takes a long value indicating seconds since epoch. If a non-long + // value is provided, the constructor should throw an IllegalArgumentException exception. + try { + getConfiguration(StringUtils.join(new String[] { "applicationName = app", + "streamName = 123", "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2, + "initialPositionInStreamExtended = null"}, '\n')); + fail("Should have thrown when initialPositionInStreamExtended is set to null"); + } catch (Exception e) { + Throwable rootCause = ExceptionUtils.getRootCause(e); + assertTrue(rootCause instanceof IllegalArgumentException); + } + } + @Test public void testWithUnsupportedClientConfigurationVariables() { MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join( @@ -159,7 +197,7 @@ public void testWithSetVariables() { } @Test - public void testWithInitialPositionInStreamVariables() { + public void testWithInitialPositionInStreamTrimHorizon() { MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a", "applicationName = b", "AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 123", "initialPositionInStream = TriM_Horizon" }, '\n')); @@ -167,6 +205,15 @@ public void testWithInitialPositionInStreamVariables() { assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON); } + @Test + public void testWithInitialPositionInStreamLatest() { + MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a", + "applicationName = b", "AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 123", + "initialPositionInStream = LateSt" }, '\n')); + + assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.LATEST); + } + @Test public void testSkippingNonKCLVariables() { MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",