Skip to content

Commit

Permalink
[native] Implement bucket conversion for Hive splits
Browse files Browse the repository at this point in the history
When the bucket count of a table changes over time, there can be legitimate
cases that multiple buckets exist in the same file.  In such cases the query
planner should set bucket conversion for these splits and in Velox we use extra
filter to get only the rows corresponding to the bucket number requested.
  • Loading branch information
Yuhta committed Jun 18, 2024
1 parent debdc5b commit 0596214
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,41 @@ toHiveBucketProperty(
sortedBy);
}

std::unique_ptr<velox::connector::hive::HiveColumnHandle>
toVeloxHiveColumnHandle(
const protocol::ColumnHandle* column,
const TypeParser& typeParser) {
auto* hiveColumn = dynamic_cast<const protocol::HiveColumnHandle*>(column);
VELOX_CHECK_NOT_NULL(
hiveColumn, "Unexpected column handle type {}", column->_type);
velox::type::fbhive::HiveTypeParser hiveTypeParser;
// TODO(spershin): Should we pass something different than 'typeSignature'
// to 'hiveType' argument of the 'HiveColumnHandle' constructor?
return std::make_unique<velox::connector::hive::HiveColumnHandle>(
hiveColumn->name,
toHiveColumnType(hiveColumn->columnType),
stringToType(hiveColumn->typeSignature, typeParser),
hiveTypeParser.parse(hiveColumn->hiveType),
toRequiredSubfields(hiveColumn->requiredSubfields));
}

velox::connector::hive::HiveBucketConversion toVeloxBucketConversion(
const protocol::BucketConversion& bucketConversion) {
velox::connector::hive::HiveBucketConversion veloxBucketConversion;
// Current table bucket count (new).
veloxBucketConversion.tableBucketCount = bucketConversion.tableBucketCount;
// Partition bucket count (old).
veloxBucketConversion.partitionBucketCount =
bucketConversion.partitionBucketCount;
TypeParser typeParser;
for (const auto& column : bucketConversion.bucketColumnHandles) {
// Columns used as bucket input.
veloxBucketConversion.bucketColumnHandles.push_back(
toVeloxHiveColumnHandle(&column, typeParser));
}
return veloxBucketConversion;
}

velox::connector::hive::iceberg::FileContent toVeloxFileContent(
const presto::protocol::FileContent content) {
if (content == protocol::FileContent::DATA) {
Expand Down Expand Up @@ -1075,39 +1110,35 @@ HivePrestoToVeloxConnector::toVeloxSplit(
infoColumns.insert(
{"$file_modified_time",
std::to_string(hiveSplit->fileSplit.fileModifiedTime)});
return std::make_unique<velox::connector::hive::HiveConnectorSplit>(
catalogId,
hiveSplit->fileSplit.path,
toVeloxFileFormat(hiveSplit->storage.storageFormat),
hiveSplit->fileSplit.start,
hiveSplit->fileSplit.length,
partitionKeys,
hiveSplit->tableBucketNumber
? std::optional<int>(*hiveSplit->tableBucketNumber)
: std::nullopt,
customSplitInfo,
extraFileInfo,
serdeParameters,
hiveSplit->splitWeight,
infoColumns);
auto veloxSplit =
std::make_unique<velox::connector::hive::HiveConnectorSplit>(
catalogId,
hiveSplit->fileSplit.path,
toVeloxFileFormat(hiveSplit->storage.storageFormat),
hiveSplit->fileSplit.start,
hiveSplit->fileSplit.length,
partitionKeys,
hiveSplit->tableBucketNumber
? std::optional<int>(*hiveSplit->tableBucketNumber)
: std::nullopt,
customSplitInfo,
extraFileInfo,
serdeParameters,
hiveSplit->splitWeight,
infoColumns);
if (hiveSplit->bucketConversion) {
VELOX_CHECK_NOT_NULL(hiveSplit->tableBucketNumber);
veloxSplit->bucketConversion =
toVeloxBucketConversion(*hiveSplit->bucketConversion);
}
return veloxSplit;
}

std::unique_ptr<velox::connector::ColumnHandle>
HivePrestoToVeloxConnector::toVeloxColumnHandle(
const protocol::ColumnHandle* column,
const TypeParser& typeParser) const {
auto hiveColumn = dynamic_cast<const protocol::HiveColumnHandle*>(column);
VELOX_CHECK_NOT_NULL(
hiveColumn, "Unexpected column handle type {}", column->_type);
velox::type::fbhive::HiveTypeParser hiveTypeParser;
// TODO(spershin): Should we pass something different than 'typeSignature'
// to 'hiveType' argument of the 'HiveColumnHandle' constructor?
return std::make_unique<velox::connector::hive::HiveColumnHandle>(
hiveColumn->name,
toHiveColumnType(hiveColumn->columnType),
stringToType(hiveColumn->typeSignature, typeParser),
hiveTypeParser.parse(hiveColumn->hiveType),
toRequiredSubfields(hiveColumn->requiredSubfields));
return toVeloxHiveColumnHandle(column, typeParser);
}

std::unique_ptr<velox::connector::ConnectorTableHandle>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,33 @@ TEST_F(PrestoToVeloxSplitTest, serdeParameters) {
dwio::common::SerDeOptions::kMapKeyDelim),
"|");
}

TEST_F(PrestoToVeloxSplitTest, bucketConversion) {
auto scheduledSplit = makeHiveScheduledSplit();
auto& hiveSplit =
static_cast<protocol::HiveSplit&>(*scheduledSplit.split.connectorSplit);
hiveSplit.tableBucketNumber = std::make_shared<int>(42);
hiveSplit.bucketConversion = std::make_shared<protocol::BucketConversion>();
hiveSplit.bucketConversion->tableBucketCount = 4096;
hiveSplit.bucketConversion->partitionBucketCount = 512;
auto& column = hiveSplit.bucketConversion->bucketColumnHandles.emplace_back();
column.name = "c0";
column.hiveType = "bigint";
column.typeSignature = "bigint";
column.columnType = protocol::ColumnType::REGULAR;
auto veloxSplit = toVeloxSplit(scheduledSplit);
const auto& veloxHiveSplit =
static_cast<const connector::hive::HiveConnectorSplit&>(
*veloxSplit.connectorSplit);
ASSERT_TRUE(veloxHiveSplit.bucketConversion.has_value());
ASSERT_EQ(veloxHiveSplit.bucketConversion->tableBucketCount, 4096);
ASSERT_EQ(veloxHiveSplit.bucketConversion->partitionBucketCount, 512);
ASSERT_EQ(veloxHiveSplit.bucketConversion->bucketColumnHandles.size(), 1);
auto& veloxColumn = veloxHiveSplit.bucketConversion->bucketColumnHandles[0];
ASSERT_EQ(veloxColumn->name(), "c0");
ASSERT_EQ(*veloxColumn->dataType(), *BIGINT());
ASSERT_EQ(*veloxColumn->hiveType(), *BIGINT());
ASSERT_EQ(
veloxColumn->columnType(),
connector::hive::HiveColumnHandle::ColumnType::kRegular);
}

0 comments on commit 0596214

Please sign in to comment.