Skip to content

Commit

Permalink
fix: Negative nano handling (#65)
Browse files Browse the repository at this point in the history
* fix: negative nano handling

* test: adding unit tests and refactor code

* test case changed

* refactoring and docs added
  • Loading branch information
Vaishnavi190900 authored Feb 7, 2025
1 parent 727d83e commit 8c0add8
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 2 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ plugins {
}

group 'com.gotocompany'
version '0.10.6'
version '0.10.7'

repositories {
mavenLocal()
Expand Down
16 changes: 15 additions & 1 deletion docs/reference/configuration/maxcompute.md
Original file line number Diff line number Diff line change
Expand Up @@ -346,4 +346,18 @@ The format of this config is `key1=value1,key2=value2,...`. Further documentatio

* Example value: `table.format.version=2`
* Type: `optional`
* Default value: ``
* Default value: ``

## SINK_MAXCOMPUTE_NANO_HANDLING_ENABLED

This configuration is used to handle nano data values. Its default value is true.

If it is enabled:
1. If nano is less than 0, it will be set to 0.
2. If nano exceeds its range (> 999,999,999), the extra value will be added to the seconds.

If it is disabled and nano is outside the default range, firehose will crash.

* Example value: `true`
* Type: `optional`
* Default value: `true`
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,8 @@ public interface MaxComputeSinkConfig extends Config {
@DefaultValue("")
Map<String, String> getTableProperties();

@Key("SINK_MAXCOMPUTE_NANO_HANDLING_ENABLED")
@DefaultValue("true")
boolean isNanoHandlingEnabled();

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
public class LocalDateTimeValidator {

private static final long DAYS_IN_YEAR = 365L;
private static final int NANOS_IN_ONE_SECOND = 1_000_000_000;

private final TemporalAmount maxPastEventTimeDifference;
private final TemporalAmount maxFutureEventTimeDifference;
Expand All @@ -23,6 +24,7 @@ public class LocalDateTimeValidator {
private final String tablePartitionKey;
private final int maxPastYearEventTimeDifference;
private final int maxFutureYearEventTimeDifference;
private final boolean isNanoHandlingEnabled;

public LocalDateTimeValidator(MaxComputeSinkConfig maxComputeSinkConfig) {
this.maxPastEventTimeDifference = Duration.ofDays(maxComputeSinkConfig.getMaxPastYearEventTimeDifference() * DAYS_IN_YEAR);
Expand All @@ -34,9 +36,18 @@ public LocalDateTimeValidator(MaxComputeSinkConfig maxComputeSinkConfig) {
this.tablePartitionKey = maxComputeSinkConfig.getTablePartitionKey();
this.maxPastYearEventTimeDifference = maxComputeSinkConfig.getMaxPastYearEventTimeDifference();
this.maxFutureYearEventTimeDifference = maxComputeSinkConfig.getMaxFutureYearEventTimeDifference();
this.isNanoHandlingEnabled = maxComputeSinkConfig.isNanoHandlingEnabled();
}

public LocalDateTime parseAndValidate(long seconds, int nanos, String fieldName, boolean isRootLevel) {
if (isNanoHandlingEnabled) {
if (nanos < 0) {
nanos = 0;
} else if (nanos >= NANOS_IN_ONE_SECOND) {
seconds += nanos / NANOS_IN_ONE_SECOND;
nanos = nanos % NANOS_IN_ONE_SECOND;
}
}
Instant instant = Instant.now();
ZoneOffset zoneOffset = zoneId.getRules().getOffset(instant);
LocalDateTime localDateTime = LocalDateTime.ofEpochSecond(seconds, nanos, zoneOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -237,4 +238,113 @@ public void shouldSkipDifferenceValidationWhenIsNotRootLevel() {
.isEqualTo(expectedLocalDateTime);
}

@Test
public void shouldConvertPayloadWithValidNanos() {
Timestamp timestamp = Timestamp.newBuilder()
.setSeconds(2500)
.setNanos(100)
.build();
TestMaxComputeTypeInfo.TestRoot message = TestMaxComputeTypeInfo.TestRoot.newBuilder()
.setTimestampField(timestamp)
.build();
LocalDateTime expectedLocalDateTime = LocalDateTime.ofEpochSecond(
timestamp.getSeconds(), timestamp.getNanos(), ZoneOffset.UTC);

Object result = timestampNtzProtobufMaxComputeConverter.convertSingularPayload(
new ProtoPayload(descriptor.getFields().get(3), message.getField(descriptor.getFields().get(3)), true));

assertThat(result).isEqualTo(expectedLocalDateTime);
}

@Test
public void shouldHandleNegativeNanosWhenEnabled() {
MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class);
when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC"));
when(maxComputeSinkConfig.getValidMinTimestamp()).thenReturn(LocalDateTime.parse("1970-01-01T00:00:00", DateTimeFormatter.ISO_DATE_TIME));
when(maxComputeSinkConfig.getValidMaxTimestamp()).thenReturn(LocalDateTime.parse("9999-01-01T23:59:59", DateTimeFormatter.ISO_DATE_TIME));
when(maxComputeSinkConfig.isNanoHandlingEnabled()).thenReturn(true);

timestampNtzProtobufMaxComputeConverter = new TimestampNTZProtobufMaxComputeConverter(maxComputeSinkConfig);

Timestamp timestamp = Timestamp.newBuilder()
.setSeconds(2500)
.setNanos(-500)
.build();
TestMaxComputeTypeInfo.TestRoot message = TestMaxComputeTypeInfo.TestRoot.newBuilder()
.setTimestampField(timestamp)
.build();

Object result = timestampNtzProtobufMaxComputeConverter.convertSingularPayload(
new ProtoPayload(descriptor.getFields().get(3), message.getField(descriptor.getFields().get(3)), true));

assertThat(result).isEqualTo(LocalDateTime.ofEpochSecond(2500, 0, ZoneOffset.UTC));
}

@Test(expected = java.time.DateTimeException.class)
public void shouldThrowExceptionForNegativeNanosWhenDisabled() {
MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class);
when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC"));
when(maxComputeSinkConfig.getValidMinTimestamp()).thenReturn(LocalDateTime.parse("1970-01-01T00:00:00", DateTimeFormatter.ISO_DATE_TIME));
when(maxComputeSinkConfig.getValidMaxTimestamp()).thenReturn(LocalDateTime.parse("9999-01-01T23:59:59", DateTimeFormatter.ISO_DATE_TIME));
when(maxComputeSinkConfig.isNanoHandlingEnabled()).thenReturn(false);

timestampNtzProtobufMaxComputeConverter = new TimestampNTZProtobufMaxComputeConverter(maxComputeSinkConfig);

Timestamp timestamp = Timestamp.newBuilder()
.setSeconds(2500)
.setNanos(-500)
.build();
TestMaxComputeTypeInfo.TestRoot message = TestMaxComputeTypeInfo.TestRoot.newBuilder()
.setTimestampField(timestamp)
.build();

timestampNtzProtobufMaxComputeConverter.convertSingularPayload(
new ProtoPayload(descriptor.getFields().get(3), message.getField(descriptor.getFields().get(3)), true));
}

@Test(expected = java.time.DateTimeException.class)
public void shouldThrowExceptionForExcessNanosWhenDisabled() {
MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class);
when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC"));
when(maxComputeSinkConfig.getValidMinTimestamp()).thenReturn(LocalDateTime.parse("1970-01-01T00:00:00", DateTimeFormatter.ISO_DATE_TIME));
when(maxComputeSinkConfig.getValidMaxTimestamp()).thenReturn(LocalDateTime.parse("9999-01-01T23:59:59", DateTimeFormatter.ISO_DATE_TIME));
when(maxComputeSinkConfig.isNanoHandlingEnabled()).thenReturn(false);

timestampNtzProtobufMaxComputeConverter = new TimestampNTZProtobufMaxComputeConverter(maxComputeSinkConfig);

Timestamp timestamp = Timestamp.newBuilder()
.setSeconds(2500)
.setNanos(1_000_000_000)
.build();
TestMaxComputeTypeInfo.TestRoot message = TestMaxComputeTypeInfo.TestRoot.newBuilder()
.setTimestampField(timestamp)
.build();

timestampNtzProtobufMaxComputeConverter.convertSingularPayload(
new ProtoPayload(descriptor.getFields().get(3), message.getField(descriptor.getFields().get(3)), true));
}

@Test
public void shouldConvertExcessNanosToSecondsWhenEnabled() {
MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class);
when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC"));
when(maxComputeSinkConfig.getValidMinTimestamp()).thenReturn(LocalDateTime.parse("1970-01-01T00:00:00", DateTimeFormatter.ISO_DATE_TIME));
when(maxComputeSinkConfig.getValidMaxTimestamp()).thenReturn(LocalDateTime.parse("9999-01-01T23:59:59", DateTimeFormatter.ISO_DATE_TIME));
when(maxComputeSinkConfig.isNanoHandlingEnabled()).thenReturn(true);

timestampNtzProtobufMaxComputeConverter = new TimestampNTZProtobufMaxComputeConverter(maxComputeSinkConfig);

Timestamp timestamp = Timestamp.newBuilder()
.setSeconds(2500)
.setNanos(1_500_000_000)
.build();
TestMaxComputeTypeInfo.TestRoot message = TestMaxComputeTypeInfo.TestRoot.newBuilder()
.setTimestampField(timestamp)
.build();

Object result = timestampNtzProtobufMaxComputeConverter.convertSingularPayload(
new ProtoPayload(descriptor.getFields().get(3), message.getField(descriptor.getFields().get(3)), true));

assertThat(result).isEqualTo(LocalDateTime.ofEpochSecond(2501, 500000000, ZoneOffset.UTC));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -230,4 +231,113 @@ public void shouldSkipDifferenceValidationWhenIsNotRootLevel() {

assertThat(result).isEqualTo(expectedTimestamp);
}

@Test
public void shouldConvertExcessNanosToSecondsWhenEnabled() {
MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class);
when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC"));
when(maxComputeSinkConfig.isTablePartitioningEnabled()).thenReturn(true);
when(maxComputeSinkConfig.getValidMinTimestamp()).thenReturn(LocalDateTime.parse("1970-01-01T00:00:00", DateTimeFormatter.ISO_DATE_TIME));
when(maxComputeSinkConfig.getValidMaxTimestamp()).thenReturn(LocalDateTime.parse("9999-01-01T23:59:59", DateTimeFormatter.ISO_DATE_TIME));
when(maxComputeSinkConfig.isNanoHandlingEnabled()).thenReturn(true);
timestampProtobufMaxComputeConverter = new TimestampProtobufMaxComputeConverter(maxComputeSinkConfig);

Timestamp timestamp = Timestamp.newBuilder()
.setSeconds(2500)
.setNanos(1500000000)
.build();
TestMaxComputeTypeInfo.TestRoot message = TestMaxComputeTypeInfo.TestRoot.newBuilder()
.setTimestampField(timestamp)
.build();

java.sql.Timestamp expectedTimestamp = java.sql.Timestamp.valueOf(LocalDateTime.ofEpochSecond(
timestamp.getSeconds() + 1, 500000000, ZoneOffset.UTC));
Object result = timestampProtobufMaxComputeConverter.convertSingularPayload(new ProtoPayload(descriptor.getFields().get(3), message.getField(descriptor.getFields().get(3)), true));
assertThat(result).isEqualTo(expectedTimestamp);
}

@Test(expected = java.time.DateTimeException.class)
public void shouldThrowExceptionForExcessNanosWhenDisabled() {
MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class);
when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC"));
when(maxComputeSinkConfig.isTablePartitioningEnabled()).thenReturn(false);
when(maxComputeSinkConfig.getValidMinTimestamp()).thenReturn(LocalDateTime.parse("1970-01-01T00:00:00", DateTimeFormatter.ISO_DATE_TIME));
when(maxComputeSinkConfig.getValidMaxTimestamp()).thenReturn(LocalDateTime.parse("9999-01-01T23:59:59", DateTimeFormatter.ISO_DATE_TIME));
when(maxComputeSinkConfig.isNanoHandlingEnabled()).thenReturn(false);
timestampProtobufMaxComputeConverter = new TimestampProtobufMaxComputeConverter(maxComputeSinkConfig);

Timestamp timestamp = Timestamp.newBuilder()
.setSeconds(2500)
.setNanos(1000000000)
.build();
TestMaxComputeTypeInfo.TestRoot message = TestMaxComputeTypeInfo.TestRoot.newBuilder()
.setTimestampField(timestamp)
.build();

timestampProtobufMaxComputeConverter.convertSingularPayload(new ProtoPayload(descriptor.getFields().get(3), message.getField(descriptor.getFields().get(3)), true));
}

@Test(expected = java.time.DateTimeException.class)
public void shouldThrowExceptionForNegativeNanosWhenDisabled() {
MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class);
when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC"));
when(maxComputeSinkConfig.isTablePartitioningEnabled()).thenReturn(false);
when(maxComputeSinkConfig.getValidMinTimestamp()).thenReturn(LocalDateTime.parse("1970-01-01T00:00:00", DateTimeFormatter.ISO_DATE_TIME));
when(maxComputeSinkConfig.getValidMaxTimestamp()).thenReturn(LocalDateTime.parse("9999-01-01T23:59:59", DateTimeFormatter.ISO_DATE_TIME));
when(maxComputeSinkConfig.isNanoHandlingEnabled()).thenReturn(false);
timestampProtobufMaxComputeConverter = new TimestampProtobufMaxComputeConverter(maxComputeSinkConfig);

Timestamp timestamp = Timestamp.newBuilder()
.setSeconds(2500)
.setNanos(-1)
.build();
TestMaxComputeTypeInfo.TestRoot message = TestMaxComputeTypeInfo.TestRoot.newBuilder()
.setTimestampField(timestamp)
.build();

timestampProtobufMaxComputeConverter.convertSingularPayload(new ProtoPayload(descriptor.getFields().get(3), message.getField(descriptor.getFields().get(3)), true));
}

@Test
public void shouldHandleNegativeNanosWhenEnabled() {
MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class);
when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC"));
when(maxComputeSinkConfig.isTablePartitioningEnabled()).thenReturn(true);
when(maxComputeSinkConfig.getValidMinTimestamp()).thenReturn(LocalDateTime.parse("1970-01-01T00:00:00", DateTimeFormatter.ISO_DATE_TIME));
when(maxComputeSinkConfig.getValidMaxTimestamp()).thenReturn(LocalDateTime.parse("9999-01-01T23:59:59", DateTimeFormatter.ISO_DATE_TIME));
when(maxComputeSinkConfig.isNanoHandlingEnabled()).thenReturn(true);
timestampProtobufMaxComputeConverter = new TimestampProtobufMaxComputeConverter(maxComputeSinkConfig);

Timestamp timestamp = Timestamp.newBuilder()
.setSeconds(2500)
.setNanos(-500000000) // Negative nanoseconds
.build();
TestMaxComputeTypeInfo.TestRoot message = TestMaxComputeTypeInfo.TestRoot.newBuilder()
.setTimestampField(timestamp)
.build();

java.sql.Timestamp expectedTimestamp = java.sql.Timestamp.valueOf(LocalDateTime.ofEpochSecond(
timestamp.getSeconds(), 0, ZoneOffset.UTC));

Object result = timestampProtobufMaxComputeConverter.convertSingularPayload(new ProtoPayload(descriptor.getFields().get(3), message.getField(descriptor.getFields().get(3)), true));

assertThat(result).isEqualTo(expectedTimestamp);
}

@Test
public void shouldConvertPayloadWithValidNanos() {
Timestamp timestamp = Timestamp.newBuilder()
.setSeconds(2500)
.setNanos(500000000)
.build();
TestMaxComputeTypeInfo.TestRoot message = TestMaxComputeTypeInfo.TestRoot.newBuilder()
.setTimestampField(timestamp)
.build();
java.sql.Timestamp expectedTimestamp = java.sql.Timestamp.valueOf(LocalDateTime.ofEpochSecond(
timestamp.getSeconds(), timestamp.getNanos(), ZoneOffset.UTC));

Object result = timestampProtobufMaxComputeConverter.convertSingularPayload(new ProtoPayload(descriptor.getFields().get(3), message.getField(descriptor.getFields().get(3)), true));
assertThat(result).isEqualTo(expectedTimestamp);
}

}

0 comments on commit 8c0add8

Please sign in to comment.