From 8c0add8c02335437ffa7e544491c1780de2c325f Mon Sep 17 00:00:00 2001 From: Vaishnavi190900 <152475034+Vaishnavi190900@users.noreply.github.com> Date: Fri, 7 Feb 2025 18:21:30 +0530 Subject: [PATCH] fix: Negative nano handling (#65) * fix: negative nano handling * test: adding unit tests and refactor code * test case changed * refactoring and docs added --- build.gradle | 2 +- docs/reference/configuration/maxcompute.md | 16 ++- .../depot/config/MaxComputeSinkConfig.java | 4 + .../util/LocalDateTimeValidator.java | 11 ++ ...ampNTZProtobufMaxComputeConverterTest.java | 110 ++++++++++++++++++ ...estampProtobufMaxComputeConverterTest.java | 110 ++++++++++++++++++ 6 files changed, 251 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index f5058bda..5e620777 100644 --- a/build.gradle +++ b/build.gradle @@ -22,7 +22,7 @@ plugins { } group 'com.gotocompany' -version '0.10.6' +version '0.10.7' repositories { mavenLocal() diff --git a/docs/reference/configuration/maxcompute.md b/docs/reference/configuration/maxcompute.md index d0fcbad2..8114b564 100644 --- a/docs/reference/configuration/maxcompute.md +++ b/docs/reference/configuration/maxcompute.md @@ -346,4 +346,18 @@ The format of this config is `key1=value1,key2=value2,...`. Further documentatio * Example value: `table.format.version=2` * Type: `optional` -* Default value: `` \ No newline at end of file +* Default value: `` + +## SINK_MAXCOMPUTE_NANO_HANDLING_ENABLED + +This configuration is used to handle nano data values. Its default value is true. + +If it is enabled: +1. If nano is less than 0, it will be set to 0. +2. If nano exceeds its range (> 999,999,999), the extra value will be added to the seconds. + +If it is disabled and nano is outside the default range, firehose will crash. + +* Example value: `true` +* Type: `optional` +* Default value: `true` \ No newline at end of file diff --git a/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java b/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java index de64353b..75c326a6 100644 --- a/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java +++ b/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java @@ -192,4 +192,8 @@ public interface MaxComputeSinkConfig extends Config { @DefaultValue("") Map getTableProperties(); + @Key("SINK_MAXCOMPUTE_NANO_HANDLING_ENABLED") + @DefaultValue("true") + boolean isNanoHandlingEnabled(); + } diff --git a/src/main/java/com/gotocompany/depot/maxcompute/util/LocalDateTimeValidator.java b/src/main/java/com/gotocompany/depot/maxcompute/util/LocalDateTimeValidator.java index 9256c336..b530930b 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/util/LocalDateTimeValidator.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/util/LocalDateTimeValidator.java @@ -13,6 +13,7 @@ public class LocalDateTimeValidator { private static final long DAYS_IN_YEAR = 365L; + private static final int NANOS_IN_ONE_SECOND = 1_000_000_000; private final TemporalAmount maxPastEventTimeDifference; private final TemporalAmount maxFutureEventTimeDifference; @@ -23,6 +24,7 @@ public class LocalDateTimeValidator { private final String tablePartitionKey; private final int maxPastYearEventTimeDifference; private final int maxFutureYearEventTimeDifference; + private final boolean isNanoHandlingEnabled; public LocalDateTimeValidator(MaxComputeSinkConfig maxComputeSinkConfig) { this.maxPastEventTimeDifference = Duration.ofDays(maxComputeSinkConfig.getMaxPastYearEventTimeDifference() * DAYS_IN_YEAR); @@ -34,9 +36,18 @@ public LocalDateTimeValidator(MaxComputeSinkConfig maxComputeSinkConfig) { this.tablePartitionKey = maxComputeSinkConfig.getTablePartitionKey(); this.maxPastYearEventTimeDifference = maxComputeSinkConfig.getMaxPastYearEventTimeDifference(); this.maxFutureYearEventTimeDifference = maxComputeSinkConfig.getMaxFutureYearEventTimeDifference(); + this.isNanoHandlingEnabled = maxComputeSinkConfig.isNanoHandlingEnabled(); } public LocalDateTime parseAndValidate(long seconds, int nanos, String fieldName, boolean isRootLevel) { + if (isNanoHandlingEnabled) { + if (nanos < 0) { + nanos = 0; + } else if (nanos >= NANOS_IN_ONE_SECOND) { + seconds += nanos / NANOS_IN_ONE_SECOND; + nanos = nanos % NANOS_IN_ONE_SECOND; + } + } Instant instant = Instant.now(); ZoneOffset zoneOffset = zoneId.getRules().getOffset(instant); LocalDateTime localDateTime = LocalDateTime.ofEpochSecond(seconds, nanos, zoneOffset); diff --git a/src/test/java/com/gotocompany/depot/maxcompute/converter/TimestampNTZProtobufMaxComputeConverterTest.java b/src/test/java/com/gotocompany/depot/maxcompute/converter/TimestampNTZProtobufMaxComputeConverterTest.java index 696b2afc..dad2b29e 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/converter/TimestampNTZProtobufMaxComputeConverterTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/converter/TimestampNTZProtobufMaxComputeConverterTest.java @@ -15,6 +15,7 @@ import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneId; +import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.List; @@ -237,4 +238,113 @@ public void shouldSkipDifferenceValidationWhenIsNotRootLevel() { .isEqualTo(expectedLocalDateTime); } + @Test + public void shouldConvertPayloadWithValidNanos() { + Timestamp timestamp = Timestamp.newBuilder() + .setSeconds(2500) + .setNanos(100) + .build(); + TestMaxComputeTypeInfo.TestRoot message = TestMaxComputeTypeInfo.TestRoot.newBuilder() + .setTimestampField(timestamp) + .build(); + LocalDateTime expectedLocalDateTime = LocalDateTime.ofEpochSecond( + timestamp.getSeconds(), timestamp.getNanos(), ZoneOffset.UTC); + + Object result = timestampNtzProtobufMaxComputeConverter.convertSingularPayload( + new ProtoPayload(descriptor.getFields().get(3), message.getField(descriptor.getFields().get(3)), true)); + + assertThat(result).isEqualTo(expectedLocalDateTime); + } + + @Test + public void shouldHandleNegativeNanosWhenEnabled() { + MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); + when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); + when(maxComputeSinkConfig.getValidMinTimestamp()).thenReturn(LocalDateTime.parse("1970-01-01T00:00:00", DateTimeFormatter.ISO_DATE_TIME)); + when(maxComputeSinkConfig.getValidMaxTimestamp()).thenReturn(LocalDateTime.parse("9999-01-01T23:59:59", DateTimeFormatter.ISO_DATE_TIME)); + when(maxComputeSinkConfig.isNanoHandlingEnabled()).thenReturn(true); + + timestampNtzProtobufMaxComputeConverter = new TimestampNTZProtobufMaxComputeConverter(maxComputeSinkConfig); + + Timestamp timestamp = Timestamp.newBuilder() + .setSeconds(2500) + .setNanos(-500) + .build(); + TestMaxComputeTypeInfo.TestRoot message = TestMaxComputeTypeInfo.TestRoot.newBuilder() + .setTimestampField(timestamp) + .build(); + + Object result = timestampNtzProtobufMaxComputeConverter.convertSingularPayload( + new ProtoPayload(descriptor.getFields().get(3), message.getField(descriptor.getFields().get(3)), true)); + + assertThat(result).isEqualTo(LocalDateTime.ofEpochSecond(2500, 0, ZoneOffset.UTC)); + } + + @Test(expected = java.time.DateTimeException.class) + public void shouldThrowExceptionForNegativeNanosWhenDisabled() { + MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); + when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); + when(maxComputeSinkConfig.getValidMinTimestamp()).thenReturn(LocalDateTime.parse("1970-01-01T00:00:00", DateTimeFormatter.ISO_DATE_TIME)); + when(maxComputeSinkConfig.getValidMaxTimestamp()).thenReturn(LocalDateTime.parse("9999-01-01T23:59:59", DateTimeFormatter.ISO_DATE_TIME)); + when(maxComputeSinkConfig.isNanoHandlingEnabled()).thenReturn(false); + + timestampNtzProtobufMaxComputeConverter = new TimestampNTZProtobufMaxComputeConverter(maxComputeSinkConfig); + + Timestamp timestamp = Timestamp.newBuilder() + .setSeconds(2500) + .setNanos(-500) + .build(); + TestMaxComputeTypeInfo.TestRoot message = TestMaxComputeTypeInfo.TestRoot.newBuilder() + .setTimestampField(timestamp) + .build(); + + timestampNtzProtobufMaxComputeConverter.convertSingularPayload( + new ProtoPayload(descriptor.getFields().get(3), message.getField(descriptor.getFields().get(3)), true)); + } + + @Test(expected = java.time.DateTimeException.class) + public void shouldThrowExceptionForExcessNanosWhenDisabled() { + MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); + when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); + when(maxComputeSinkConfig.getValidMinTimestamp()).thenReturn(LocalDateTime.parse("1970-01-01T00:00:00", DateTimeFormatter.ISO_DATE_TIME)); + when(maxComputeSinkConfig.getValidMaxTimestamp()).thenReturn(LocalDateTime.parse("9999-01-01T23:59:59", DateTimeFormatter.ISO_DATE_TIME)); + when(maxComputeSinkConfig.isNanoHandlingEnabled()).thenReturn(false); + + timestampNtzProtobufMaxComputeConverter = new TimestampNTZProtobufMaxComputeConverter(maxComputeSinkConfig); + + Timestamp timestamp = Timestamp.newBuilder() + .setSeconds(2500) + .setNanos(1_000_000_000) + .build(); + TestMaxComputeTypeInfo.TestRoot message = TestMaxComputeTypeInfo.TestRoot.newBuilder() + .setTimestampField(timestamp) + .build(); + + timestampNtzProtobufMaxComputeConverter.convertSingularPayload( + new ProtoPayload(descriptor.getFields().get(3), message.getField(descriptor.getFields().get(3)), true)); + } + + @Test + public void shouldConvertExcessNanosToSecondsWhenEnabled() { + MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); + when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); + when(maxComputeSinkConfig.getValidMinTimestamp()).thenReturn(LocalDateTime.parse("1970-01-01T00:00:00", DateTimeFormatter.ISO_DATE_TIME)); + when(maxComputeSinkConfig.getValidMaxTimestamp()).thenReturn(LocalDateTime.parse("9999-01-01T23:59:59", DateTimeFormatter.ISO_DATE_TIME)); + when(maxComputeSinkConfig.isNanoHandlingEnabled()).thenReturn(true); + + timestampNtzProtobufMaxComputeConverter = new TimestampNTZProtobufMaxComputeConverter(maxComputeSinkConfig); + + Timestamp timestamp = Timestamp.newBuilder() + .setSeconds(2500) + .setNanos(1_500_000_000) + .build(); + TestMaxComputeTypeInfo.TestRoot message = TestMaxComputeTypeInfo.TestRoot.newBuilder() + .setTimestampField(timestamp) + .build(); + + Object result = timestampNtzProtobufMaxComputeConverter.convertSingularPayload( + new ProtoPayload(descriptor.getFields().get(3), message.getField(descriptor.getFields().get(3)), true)); + + assertThat(result).isEqualTo(LocalDateTime.ofEpochSecond(2501, 500000000, ZoneOffset.UTC)); + } } diff --git a/src/test/java/com/gotocompany/depot/maxcompute/converter/TimestampProtobufMaxComputeConverterTest.java b/src/test/java/com/gotocompany/depot/maxcompute/converter/TimestampProtobufMaxComputeConverterTest.java index 04a1485f..715d01a3 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/converter/TimestampProtobufMaxComputeConverterTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/converter/TimestampProtobufMaxComputeConverterTest.java @@ -16,6 +16,7 @@ import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneId; +import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.List; @@ -230,4 +231,113 @@ public void shouldSkipDifferenceValidationWhenIsNotRootLevel() { assertThat(result).isEqualTo(expectedTimestamp); } + + @Test + public void shouldConvertExcessNanosToSecondsWhenEnabled() { + MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); + when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); + when(maxComputeSinkConfig.isTablePartitioningEnabled()).thenReturn(true); + when(maxComputeSinkConfig.getValidMinTimestamp()).thenReturn(LocalDateTime.parse("1970-01-01T00:00:00", DateTimeFormatter.ISO_DATE_TIME)); + when(maxComputeSinkConfig.getValidMaxTimestamp()).thenReturn(LocalDateTime.parse("9999-01-01T23:59:59", DateTimeFormatter.ISO_DATE_TIME)); + when(maxComputeSinkConfig.isNanoHandlingEnabled()).thenReturn(true); + timestampProtobufMaxComputeConverter = new TimestampProtobufMaxComputeConverter(maxComputeSinkConfig); + + Timestamp timestamp = Timestamp.newBuilder() + .setSeconds(2500) + .setNanos(1500000000) + .build(); + TestMaxComputeTypeInfo.TestRoot message = TestMaxComputeTypeInfo.TestRoot.newBuilder() + .setTimestampField(timestamp) + .build(); + + java.sql.Timestamp expectedTimestamp = java.sql.Timestamp.valueOf(LocalDateTime.ofEpochSecond( + timestamp.getSeconds() + 1, 500000000, ZoneOffset.UTC)); + Object result = timestampProtobufMaxComputeConverter.convertSingularPayload(new ProtoPayload(descriptor.getFields().get(3), message.getField(descriptor.getFields().get(3)), true)); + assertThat(result).isEqualTo(expectedTimestamp); + } + + @Test(expected = java.time.DateTimeException.class) + public void shouldThrowExceptionForExcessNanosWhenDisabled() { + MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); + when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); + when(maxComputeSinkConfig.isTablePartitioningEnabled()).thenReturn(false); + when(maxComputeSinkConfig.getValidMinTimestamp()).thenReturn(LocalDateTime.parse("1970-01-01T00:00:00", DateTimeFormatter.ISO_DATE_TIME)); + when(maxComputeSinkConfig.getValidMaxTimestamp()).thenReturn(LocalDateTime.parse("9999-01-01T23:59:59", DateTimeFormatter.ISO_DATE_TIME)); + when(maxComputeSinkConfig.isNanoHandlingEnabled()).thenReturn(false); + timestampProtobufMaxComputeConverter = new TimestampProtobufMaxComputeConverter(maxComputeSinkConfig); + + Timestamp timestamp = Timestamp.newBuilder() + .setSeconds(2500) + .setNanos(1000000000) + .build(); + TestMaxComputeTypeInfo.TestRoot message = TestMaxComputeTypeInfo.TestRoot.newBuilder() + .setTimestampField(timestamp) + .build(); + + timestampProtobufMaxComputeConverter.convertSingularPayload(new ProtoPayload(descriptor.getFields().get(3), message.getField(descriptor.getFields().get(3)), true)); + } + + @Test(expected = java.time.DateTimeException.class) + public void shouldThrowExceptionForNegativeNanosWhenDisabled() { + MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); + when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); + when(maxComputeSinkConfig.isTablePartitioningEnabled()).thenReturn(false); + when(maxComputeSinkConfig.getValidMinTimestamp()).thenReturn(LocalDateTime.parse("1970-01-01T00:00:00", DateTimeFormatter.ISO_DATE_TIME)); + when(maxComputeSinkConfig.getValidMaxTimestamp()).thenReturn(LocalDateTime.parse("9999-01-01T23:59:59", DateTimeFormatter.ISO_DATE_TIME)); + when(maxComputeSinkConfig.isNanoHandlingEnabled()).thenReturn(false); + timestampProtobufMaxComputeConverter = new TimestampProtobufMaxComputeConverter(maxComputeSinkConfig); + + Timestamp timestamp = Timestamp.newBuilder() + .setSeconds(2500) + .setNanos(-1) + .build(); + TestMaxComputeTypeInfo.TestRoot message = TestMaxComputeTypeInfo.TestRoot.newBuilder() + .setTimestampField(timestamp) + .build(); + + timestampProtobufMaxComputeConverter.convertSingularPayload(new ProtoPayload(descriptor.getFields().get(3), message.getField(descriptor.getFields().get(3)), true)); + } + + @Test + public void shouldHandleNegativeNanosWhenEnabled() { + MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); + when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); + when(maxComputeSinkConfig.isTablePartitioningEnabled()).thenReturn(true); + when(maxComputeSinkConfig.getValidMinTimestamp()).thenReturn(LocalDateTime.parse("1970-01-01T00:00:00", DateTimeFormatter.ISO_DATE_TIME)); + when(maxComputeSinkConfig.getValidMaxTimestamp()).thenReturn(LocalDateTime.parse("9999-01-01T23:59:59", DateTimeFormatter.ISO_DATE_TIME)); + when(maxComputeSinkConfig.isNanoHandlingEnabled()).thenReturn(true); + timestampProtobufMaxComputeConverter = new TimestampProtobufMaxComputeConverter(maxComputeSinkConfig); + + Timestamp timestamp = Timestamp.newBuilder() + .setSeconds(2500) + .setNanos(-500000000) // Negative nanoseconds + .build(); + TestMaxComputeTypeInfo.TestRoot message = TestMaxComputeTypeInfo.TestRoot.newBuilder() + .setTimestampField(timestamp) + .build(); + + java.sql.Timestamp expectedTimestamp = java.sql.Timestamp.valueOf(LocalDateTime.ofEpochSecond( + timestamp.getSeconds(), 0, ZoneOffset.UTC)); + + Object result = timestampProtobufMaxComputeConverter.convertSingularPayload(new ProtoPayload(descriptor.getFields().get(3), message.getField(descriptor.getFields().get(3)), true)); + + assertThat(result).isEqualTo(expectedTimestamp); + } + + @Test + public void shouldConvertPayloadWithValidNanos() { + Timestamp timestamp = Timestamp.newBuilder() + .setSeconds(2500) + .setNanos(500000000) + .build(); + TestMaxComputeTypeInfo.TestRoot message = TestMaxComputeTypeInfo.TestRoot.newBuilder() + .setTimestampField(timestamp) + .build(); + java.sql.Timestamp expectedTimestamp = java.sql.Timestamp.valueOf(LocalDateTime.ofEpochSecond( + timestamp.getSeconds(), timestamp.getNanos(), ZoneOffset.UTC)); + + Object result = timestampProtobufMaxComputeConverter.convertSingularPayload(new ProtoPayload(descriptor.getFields().get(3), message.getField(descriptor.getFields().get(3)), true)); + assertThat(result).isEqualTo(expectedTimestamp); + } + }