Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for leftSemiProject join in nested loop join #12172

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ VectorPtr newConstantFromString(
.thenOrThrow(folly::identity, [&](const Status& status) {
VELOX_USER_FAIL("{}", status.message());
});
if constexpr (kind == TypeKind::TIMESTAMP) {
copy.toGMT(Timestamp::defaultTimezone());
}
// if constexpr (kind == TypeKind::TIMESTAMP) {
// copy.toGMT(Timestamp::defaultTimezone());
// }
return std::make_shared<ConstantVector<T>>(
pool, size, false, type, std::move(copy));
}
Expand Down Expand Up @@ -341,12 +341,23 @@ std::vector<TypePtr> SplitReader::adaptColumns(
childSpec->columnType() == common::ScanSpec::ColumnType::kRegular) {
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
if (!fileTypeIdx.has_value()) {
// Column is missing. Most likely due to schema evolution.
VELOX_CHECK(tableSchema, "Unable to resolve column '{}'", fieldName);
childSpec->setConstantValue(BaseVector::createNullConstant(
tableSchema->findChild(fieldName),
1,
connectorQueryCtx_->memoryPool()));
// If field name exists in the user-specified output type, set the
// column as null constant. Related PR:
// https://github.com/facebookincubator/velox/pull/6427.
auto outputTypeIdx = readerOutputType_->getChildIdxIfExists(fieldName);
if (outputTypeIdx.has_value()) {
childSpec->setConstantValue(BaseVector::createNullConstant(
readerOutputType_->childAt(outputTypeIdx.value()),
1,
connectorQueryCtx_->memoryPool()));
} else {
// Column is missing. Most likely due to schema evolution.
VELOX_CHECK(tableSchema);
childSpec->setConstantValue(BaseVector::createNullConstant(
tableSchema->findChild(fieldName),
1,
connectorQueryCtx_->memoryPool()));
}
} else {
// Column no longer missing, reset constant value set on the spec.
childSpec->setConstantValue(nullptr);
Expand Down
20 changes: 19 additions & 1 deletion velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1504,7 +1504,24 @@ NestedLoopJoinNode::NestedLoopJoinNode(

auto leftType = sources_[0]->outputType();
auto rightType = sources_[1]->outputType();
for (const auto& name : outputType_->names()) {

auto numOutputColumms = outputType_->size();
if (core::isLeftSemiProjectJoin(joinType) || core::isRightSemiProjectJoin(joinType)) {
--numOutputColumms;
VELOX_CHECK_EQ(outputType_->childAt(numOutputColumms), BOOLEAN());
const auto& name = outputType_->nameOf(numOutputColumms);
VELOX_CHECK(
!leftType->containsChild(name),
"Match column '{}' cannot be present in left source.",
name);
VELOX_CHECK(
!rightType->containsChild(name),
"Match column '{}' cannot be present in right source.",
name);
}

for (auto i = 0; i < numOutputColumms; ++i) {
auto name = outputType_->nameOf(i);
const bool leftContains = leftType->containsChild(name);
const bool rightContains = rightType->containsChild(name);
VELOX_USER_CHECK(
Expand Down Expand Up @@ -1538,6 +1555,7 @@ bool NestedLoopJoinNode::isSupported(core::JoinType joinType) {
case core::JoinType::kLeft:
case core::JoinType::kRight:
case core::JoinType::kFull:
case core::JoinType::kLeftSemiProject:
return true;

default:
Expand Down
4 changes: 1 addition & 3 deletions velox/dwio/common/SelectiveStructColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ void SelectiveStructColumnReaderBase::read(
}

const auto& childSpecs = scanSpec_->children();
VELOX_CHECK(!childSpecs.empty());
for (size_t i = 0; i < childSpecs.size(); ++i) {
const auto& childSpec = childSpecs[i];
VELOX_TRACE_HISTORY_PUSH("read %s", childSpec->fieldName().c_str());
Expand Down Expand Up @@ -464,13 +463,12 @@ bool SelectiveStructColumnReaderBase::isChildMissing(
fileType_->type()->kind() !=
TypeKind::MAP && // If this is the case it means this is a flat map,
// so it can't have "missing" fields.
childSpec.channel() >= fileType_->size());
!fileType_->containsChild(childSpec.fieldName()));
}

void SelectiveStructColumnReaderBase::getValues(
const RowSet& rows,
VectorPtr* result) {
VELOX_CHECK(!scanSpec_->children().empty());
VELOX_CHECK_NOT_NULL(
*result, "SelectiveStructColumnReaderBase expects a non-null result");
VELOX_CHECK(
Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/common/TypeWithId.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ class TypeWithId : public velox::Tree<std::shared_ptr<const TypeWithId>> {
return childAt(type_->as<velox::TypeKind::ROW>().getChildIdx(name));
}

bool containsChild(const std::string& name) const {
VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW);
return type_->as<velox::TypeKind::ROW>().containsChild(name);
}

const std::vector<std::shared_ptr<const TypeWithId>>& getChildren() const {
return children_;
}
Expand Down
173 changes: 161 additions & 12 deletions velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,17 @@ void SelectiveDecimalColumnReader<DataT>::seekToRowGroup(int64_t index) {

template <typename DataT>
template <bool kDense>
void SelectiveDecimalColumnReader<DataT>::readHelper(RowSet rows) {
vector_size_t numRows = rows.back() + 1;
void SelectiveDecimalColumnReader<DataT>::readHelper(
common::Filter* filter,
RowSet rows) {
ExtractToReader extractValues(this);
common::AlwaysTrue filter;
common::AlwaysTrue alwaysTrue;
DirectRleColumnVisitor<
int64_t,
common::AlwaysTrue,
decltype(extractValues),
kDense>
visitor(filter, this, rows, extractValues);
visitor(alwaysTrue, this, rows, extractValues);

// decode scale stream
if (version_ == velox::dwrf::RleVersion_1) {
Expand All @@ -96,46 +97,194 @@ void SelectiveDecimalColumnReader<DataT>::readHelper(RowSet rows) {
// reset numValues_ before reading values
numValues_ = 0;
valueSize_ = sizeof(DataT);
vector_size_t numRows = rows.back() + 1;
ensureValuesCapacity<DataT>(numRows);

// decode value stream
facebook::velox::dwio::common::
ColumnVisitor<DataT, common::AlwaysTrue, decltype(extractValues), kDense>
valueVisitor(filter, this, rows, extractValues);
valueVisitor(alwaysTrue, this, rows, extractValues);
decodeWithVisitor<DirectDecoder<true>>(valueDecoder_.get(), valueVisitor);
readOffset_ += numRows;

// Fill decimals before applying filter.
fillDecimals();

const auto rawNulls = nullsInReadRange_
? (kDense ? nullsInReadRange_->as<uint64_t>() : rawResultNulls_)
: nullptr;
// Process filter.
process(filter, rows, rawNulls);
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::processNulls(
bool isNull,
const RowSet& rows,
const uint64_t* rawNulls) {
if (!rawNulls) {
return;
}
returnReaderNulls_ = false;
anyNulls_ = !isNull;
allNull_ = isNull;

auto rawDecimal = values_->asMutable<DataT>();
auto rawScale = scaleBuffer_->asMutable<int64_t>();

vector_size_t idx = 0;
if (isNull) {
for (vector_size_t i = 0; i < numValues_; i++) {
if (bits::isBitNull(rawNulls, i)) {
bits::setNull(rawResultNulls_, idx);
addOutputRow(rows[i]);
idx++;
}
}
} else {
for (vector_size_t i = 0; i < numValues_; i++) {
if (!bits::isBitNull(rawNulls, i)) {
bits::setNull(rawResultNulls_, idx, false);
rawDecimal[idx] = rawDecimal[i];
rawScale[idx] = rawScale[i];
addOutputRow(rows[i]);
idx++;
}
}
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::processFilter(
const common::Filter* filter,
const RowSet& rows,
const uint64_t* rawNulls) {
returnReaderNulls_ = false;
anyNulls_ = false;
allNull_ = true;

auto rawDecimal = values_->asMutable<DataT>();

vector_size_t idx = 0;
for (vector_size_t i = 0; i < numValues_; i++) {
if (rawNulls && bits::isBitNull(rawNulls, i)) {
if (filter->testNull()) {
bits::setNull(rawResultNulls_, idx);
addOutputRow(rows[i]);
anyNulls_ = true;
idx++;
}
} else {
bool tested;
if constexpr (std::is_same_v<DataT, int64_t>) {
tested = filter->testInt64(rawDecimal[i]);
} else {
tested = filter->testInt128(rawDecimal[i]);
}

if (tested) {
if (rawNulls) {
bits::setNull(rawResultNulls_, idx, false);
}
rawDecimal[idx] = rawDecimal[i];
addOutputRow(rows[i]);
allNull_ = false;
idx++;
}
}
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::process(
const common::Filter* filter,
const RowSet& rows,
const uint64_t* rawNulls) {
// Treat the filter as kAlwaysTrue if any of the following conditions are met:
// 1) No filter found;
// 2) Filter is kIsNotNull but rawNulls == NULL (no elements is null).
auto filterKind =
!filter || (filter->kind() == common::FilterKind::kIsNotNull && !rawNulls)
? common::FilterKind::kAlwaysTrue
: filter->kind();
switch (filterKind) {
case common::FilterKind::kAlwaysTrue:
// Simply add all rows to output.
for (vector_size_t i = 0; i < numValues_; i++) {
addOutputRow(rows[i]);
}
break;
case common::FilterKind::kIsNull:
processNulls(true, rows, rawNulls);
break;
case common::FilterKind::kIsNotNull:
processNulls(false, rows, rawNulls);
break;
case common::FilterKind::kBigintRange:
case common::FilterKind::kBigintValuesUsingHashTable:
case common::FilterKind::kBigintValuesUsingBitmask:
case common::FilterKind::kNegatedBigintRange:
case common::FilterKind::kNegatedBigintValuesUsingHashTable:
case common::FilterKind::kNegatedBigintValuesUsingBitmask:
case common::FilterKind::kBigintMultiRange: {
if constexpr (std::is_same_v<DataT, int64_t>) {
processFilter(filter, rows, rawNulls);
} else {
const auto actualType = CppToType<DataT>::create();
VELOX_NYI(
"Expected type BIGINT, but found file type {}.",
actualType->toString());
}
break;
}
case common::FilterKind::kHugeintValuesUsingHashTable:
case common::FilterKind::kHugeintRange: {
if constexpr (std::is_same_v<DataT, int128_t>) {
processFilter(filter, rows, rawNulls);
} else {
const auto actualType = CppToType<DataT>::create();
VELOX_NYI(
"Expected type HUGEINT, but found file type {}.",
actualType->toString());
}
break;
}
default:
VELOX_NYI("Unsupported filter: {}.", (int)filterKind);
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::read(
int64_t offset,
const RowSet& rows,
const uint64_t* incomingNulls) {
VELOX_CHECK(!scanSpec_->filter());
VELOX_CHECK(!scanSpec_->valueHook());
prepareRead<int64_t>(offset, rows, incomingNulls);
bool isDense = rows.back() == rows.size() - 1;
if (isDense) {
readHelper<true>(rows);
readHelper<true>(scanSpec_->filter(), rows);
} else {
readHelper<false>(rows);
readHelper<false>(scanSpec_->filter(), rows);
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::getValues(
const RowSet& rows,
VectorPtr* result) {
rawValues_ = values_->asMutable<char>();
getIntValues(rows, requestedType_, result);
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::fillDecimals() {
auto nullsPtr =
resultNulls() ? resultNulls()->template as<uint64_t>() : nullptr;
auto scales = scaleBuffer_->as<int64_t>();
auto values = values_->asMutable<DataT>();

DecimalUtil::fillDecimals<DataT>(
values, nullsPtr, values, scales, numValues_, scale_);

rawValues_ = values_->asMutable<char>();
getIntValues(rows, requestedType_, result);
}

template class SelectiveDecimalColumnReader<int64_t>;
Expand Down
19 changes: 18 additions & 1 deletion velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,24 @@ class SelectiveDecimalColumnReader : public SelectiveColumnReader {

private:
template <bool kDense>
void readHelper(RowSet rows);
void readHelper(common::Filter* filter, RowSet rows);

// Process IsNull and IsNotNull filters.
void processNulls(bool isNull, const RowSet& rows, const uint64_t* rawNulls);

// Process filters on decimal values.
void processFilter(
const common::Filter* filter,
const RowSet& rows,
const uint64_t* rawNulls);

// Dispatch to the respective filter processing based on the filter type.
void process(
const common::Filter* filter,
const RowSet& rows,
const uint64_t* rawNulls);

void fillDecimals();

std::unique_ptr<IntDecoder<true>> valueDecoder_;
std::unique_ptr<IntDecoder<true>> scaleDecoder_;
Expand Down
9 changes: 5 additions & 4 deletions velox/dwio/parquet/reader/ParquetColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
const TypePtr& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
ParquetParams& params,
common::ScanSpec& scanSpec) {
common::ScanSpec& scanSpec,
memory::MemoryPool& pool) {
auto colName = scanSpec.fieldName();

switch (fileType->type()->kind()) {
Expand All @@ -58,19 +59,19 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(

case TypeKind::ROW:
return std::make_unique<StructColumnReader>(
requestedType, fileType, params, scanSpec);
requestedType, fileType, params, scanSpec, pool);

case TypeKind::VARBINARY:
case TypeKind::VARCHAR:
return std::make_unique<StringColumnReader>(fileType, params, scanSpec);

case TypeKind::ARRAY:
return std::make_unique<ListColumnReader>(
requestedType, fileType, params, scanSpec);
requestedType, fileType, params, scanSpec, pool);

case TypeKind::MAP:
return std::make_unique<MapColumnReader>(
requestedType, fileType, params, scanSpec);
requestedType, fileType, params, scanSpec, pool);

case TypeKind::BOOLEAN:
return std::make_unique<BooleanColumnReader>(
Expand Down
Loading