Skip to content

Commit

Permalink
Fix hash calculation for Timestamp in HiveBucketing to be Hive Compat…
Browse files Browse the repository at this point in the history
…ible
  • Loading branch information
kewang1024 committed Jun 12, 2024
1 parent 3d06a01 commit c2a94ce
Show file tree
Hide file tree
Showing 16 changed files with 178 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class HiveCommonClientConfig
private boolean readNullMaskedParquetEncryptedValueEnabled;
private boolean useParquetColumnNames;
private boolean zstdJniDecompressionEnabled;
private boolean legacyTimestampBucketing;

public NodeSelectionStrategy getNodeSelectionStrategy()
{
Expand Down Expand Up @@ -284,4 +285,17 @@ public HiveCommonClientConfig setZstdJniDecompressionEnabled(boolean zstdJniDeco
this.zstdJniDecompressionEnabled = zstdJniDecompressionEnabled;
return this;
}

public boolean isLegacyTimestampBucketing()
{
return legacyTimestampBucketing;
}

@Config("hive.legacy-timestamp-bucketing")
@ConfigDescription("Use legacy timestamp bucketing algorithm (which is not Hive compatible) for table bucketed by timestamp type.")
public HiveCommonClientConfig setLegacyTimestampBucketing(boolean legacyTimestampBucketing)
{
this.legacyTimestampBucketing = legacyTimestampBucketing;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class HiveCommonSessionProperties
private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size";
private static final String PARQUET_USE_COLUMN_NAMES = "parquet_use_column_names";
public static final String READ_MASKED_VALUE_ENABLED = "read_null_masked_parquet_encrypted_value_enabled";
public static final String LEGACY_TIMESTAMP_BUCKETING = "legacy_timestamp_bucketing";
private final List<PropertyMetadata<?>> sessionProperties;

@Inject
Expand Down Expand Up @@ -176,6 +177,11 @@ public HiveCommonSessionProperties(HiveCommonClientConfig hiveCommonClientConfig
READ_MASKED_VALUE_ENABLED,
"Return null when access is denied for an encrypted parquet column",
hiveCommonClientConfig.getReadNullMaskedParquetEncryptedValue(),
false),
booleanProperty(
LEGACY_TIMESTAMP_BUCKETING,
"Use legacy timestamp bucketing algorithm (which is not Hive compatible) for table bucketed by timestamp type.",
hiveCommonClientConfig.isLegacyTimestampBucketing(),
false));
}

Expand Down Expand Up @@ -286,6 +292,11 @@ public static boolean getReadNullMaskedParquetEncryptedValue(ConnectorSession se
return session.getProperty(READ_MASKED_VALUE_ENABLED, Boolean.class);
}

public static boolean isLegacyTimestampBucketing(ConnectorSession session)
{
return session.getProperty(LEGACY_TIMESTAMP_BUCKETING, Boolean.class);
}

public static PropertyMetadata<DataSize> dataSizeSessionProperty(String name, String description, DataSize defaultValue, boolean hidden)
{
return new PropertyMetadata<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public void testDefaults()
.setZstdJniDecompressionEnabled(false)
.setParquetBatchReaderVerificationEnabled(false)
.setParquetBatchReadOptimizationEnabled(false)
.setReadNullMaskedParquetEncryptedValue(false));
.setReadNullMaskedParquetEncryptedValue(false)
.setLegacyTimestampBucketing(false));
}

@Test
Expand All @@ -72,6 +73,7 @@ public void testExplicitPropertyMappings()
.put("hive.enable-parquet-batch-reader-verification", "true")
.put("hive.parquet-batch-read-optimization-enabled", "true")
.put("hive.read-null-masked-parquet-encrypted-value-enabled", "true")
.put("hive.legacy-timestamp-bucketing", "true")
.build();

HiveCommonClientConfig expected = new HiveCommonClientConfig()
Expand All @@ -92,7 +94,8 @@ public void testExplicitPropertyMappings()
.setZstdJniDecompressionEnabled(true)
.setParquetBatchReaderVerificationEnabled(true)
.setParquetBatchReadOptimizationEnabled(true)
.setReadNullMaskedParquetEncryptedValue(true);
.setReadNullMaskedParquetEncryptedValue(true)
.setLegacyTimestampBucketing(true);

ConfigAssertions.assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,22 @@ public class BucketAdaptation
private final int tableBucketCount;
private final int partitionBucketCount;
private final int bucketToKeep;
private final boolean useLegacyTimestampBucketing;

public BucketAdaptation(int[] bucketColumnIndices, List<HiveType> bucketColumnHiveTypes, int tableBucketCount, int partitionBucketCount, int bucketToKeep)
public BucketAdaptation(
int[] bucketColumnIndices,
List<HiveType> bucketColumnHiveTypes,
int tableBucketCount,
int partitionBucketCount,
int bucketToKeep,
boolean useLegacyTimestampBucketing)
{
this.bucketColumnIndices = bucketColumnIndices;
this.bucketColumnHiveTypes = bucketColumnHiveTypes;
this.tableBucketCount = tableBucketCount;
this.partitionBucketCount = partitionBucketCount;
this.bucketToKeep = bucketToKeep;
this.useLegacyTimestampBucketing = useLegacyTimestampBucketing;
}

public int[] getBucketColumnIndices()
Expand All @@ -56,4 +64,9 @@ public int getBucketToKeep()
{
return bucketToKeep;
}

public boolean useLegacyTimestampBucketing()
{
return useLegacyTimestampBucketing;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class HiveBucketAdapterRecordCursor
private final int bucketToKeep;

private final Object[] scratch;
private final boolean useLegacyTimestampBucketing;

public HiveBucketAdapterRecordCursor(
int[] bucketColumnIndices,
Expand All @@ -49,7 +50,8 @@ public HiveBucketAdapterRecordCursor(
int partitionBucketCount,
int bucketToKeep,
TypeManager typeManager,
RecordCursor delegate)
RecordCursor delegate,
boolean useLegacyTimestampBucketing)
{
this.bucketColumnIndices = requireNonNull(bucketColumnIndices, "bucketColumnIndices is null");
this.delegate = requireNonNull(delegate, "delegate is null");
Expand All @@ -67,6 +69,7 @@ public HiveBucketAdapterRecordCursor(
this.bucketToKeep = bucketToKeep;

this.scratch = new Object[bucketColumnHiveTypes.size()];
this.useLegacyTimestampBucketing = useLegacyTimestampBucketing;
}

@Override
Expand Down Expand Up @@ -121,7 +124,7 @@ else if (javaType == Block.class) {
throw new UnsupportedOperationException("unknown java type");
}
}
int bucket = HiveBucketing.getHiveBucket(tableBucketCount, typeInfoList, scratch);
int bucket = HiveBucketing.getHiveBucket(tableBucketCount, typeInfoList, scratch, useLegacyTimestampBucketing);
if ((bucket - bucketToKeep) % partitionBucketCount != 0) {
throw new PrestoException(HIVE_INVALID_BUCKET_FILES, format(
"A row that is supposed to be in bucket %s is encountered. Only rows in bucket %s (modulo %s) are expected",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,30 @@ public class HiveBucketFunction
private final BucketFunctionType bucketFunctionType;
private final Optional<List<TypeInfo>> typeInfos;
private final Optional<List<Type>> types;
private final boolean useLegacyTimestampBucketing;

public static HiveBucketFunction createHiveCompatibleBucketFunction(
int bucketCount,
List<HiveType> hiveTypes)
List<HiveType> hiveTypes,
boolean useLegacyTimestampBucketing)
{
return new HiveBucketFunction(bucketCount, HIVE_COMPATIBLE, Optional.of(hiveTypes), Optional.empty());
return new HiveBucketFunction(bucketCount, HIVE_COMPATIBLE, Optional.of(hiveTypes), Optional.empty(), useLegacyTimestampBucketing);
}

public static HiveBucketFunction createPrestoNativeBucketFunction(
int bucketCount,
List<Type> types)
List<Type> types,
boolean useLegacyTimestampBucketing)
{
return new HiveBucketFunction(bucketCount, PRESTO_NATIVE, Optional.empty(), Optional.of(types));
return new HiveBucketFunction(bucketCount, PRESTO_NATIVE, Optional.empty(), Optional.of(types), useLegacyTimestampBucketing);
}

private HiveBucketFunction(
int bucketCount,
BucketFunctionType bucketFunctionType,
Optional<List<HiveType>> hiveTypes,
Optional<List<Type>> types)
Optional<List<Type>> types,
boolean useLegacyTimestampBucketing)
{
this.bucketCount = bucketCount;
this.bucketFunctionType = requireNonNull(bucketFunctionType, "bucketFunctionType is null");
Expand All @@ -66,14 +70,15 @@ private HiveBucketFunction(
.map(HiveType::getTypeInfo)
.collect(toImmutableList()));
this.types = requireNonNull(types, "types is null");
this.useLegacyTimestampBucketing = useLegacyTimestampBucketing;
}

@Override
public int getBucket(Page page, int position)
{
switch (bucketFunctionType) {
case HIVE_COMPATIBLE:
return getHiveBucket(bucketCount, typeInfos.get(), page, position);
return getHiveBucket(bucketCount, typeInfos.get(), page, position, useLegacyTimestampBucketing);
case PRESTO_NATIVE:
return HiveBucketing.getBucket(bucketCount, types.get(), page, position);
default:
Expand Down
Loading

0 comments on commit c2a94ce

Please sign in to comment.