diff --git a/velox/vector/BaseVector.cpp b/velox/vector/BaseVector.cpp index 0aa8819a2811..95fdb51526d0 100644 --- a/velox/vector/BaseVector.cpp +++ b/velox/vector/BaseVector.cpp @@ -334,6 +334,28 @@ VectorPtr BaseVector::createInternal( } } +// static +void BaseVector::setNulls( + uint64_t* rawNulls, + const folly::Range& ranges, + bool isNull) { + const auto nullBits = isNull ? bits::kNull : bits::kNotNull; + applyToEachRange(ranges, [&](auto targetIndex, auto sourceIndex, auto count) { + bits::fillBits(rawNulls, targetIndex, targetIndex + count, nullBits); + }); +} + +// static +void BaseVector::copyNulls( + uint64_t* targetRawNulls, + const uint64_t* sourceRawNulls, + const folly::Range& ranges) { + applyToEachRange(ranges, [&](auto targetIndex, auto sourceIndex, auto count) { + bits::copyBits( + sourceRawNulls, sourceIndex, targetRawNulls, targetIndex, count); + }); +} + void BaseVector::addNulls(const uint64_t* bits, const SelectivityVector& rows) { VELOX_CHECK(isNullsWritable()); VELOX_CHECK(length_ >= rows.end()); diff --git a/velox/vector/BaseVector.h b/velox/vector/BaseVector.h index d8a076c61f01..db47e646505f 100644 --- a/velox/vector/BaseVector.h +++ b/velox/vector/BaseVector.h @@ -371,6 +371,25 @@ class BaseVector { nulls_->asMutable(), idx, bits::kNull ? value : !value); } + struct CopyRange { + vector_size_t sourceIndex; + vector_size_t targetIndex; + vector_size_t count; + }; + + /// Sets null flags for each row in 'ranges' to 'isNull'. + static void setNulls( + uint64_t* rawNulls, + const folly::Range& ranges, + bool isNull); + + /// Copies null flags for each row in 'ranges' from 'sourceRawNulls' to + /// 'targetRawNulls'. + static void copyNulls( + uint64_t* targetRawNulls, + const uint64_t* sourceRawNulls, + const folly::Range& ranges); + static int32_t countNulls(const BufferPtr& nulls, vector_size_t begin, vector_size_t end) { return nulls ? bits::countNulls(nulls->as(), begin, end) : 0; @@ -437,12 +456,6 @@ class BaseVector { copyRanges(source, folly::Range(&range, 1)); } - struct CopyRange { - vector_size_t sourceIndex; - vector_size_t targetIndex; - vector_size_t count; - }; - /// Converts SelectivityVetor into a list of CopyRanges having sourceIndex == /// targetIndex. Aims to produce as few ranges as possible. If all rows are /// selected, returns a single range. @@ -906,6 +919,32 @@ class BaseVector { bool memoDisabled_{false}; }; +/// Loops over rows in 'ranges' and invokes 'func' for each row. +/// @param TFunc A void function taking two arguments: targetIndex and +/// sourceIndex. +template +void applyToEachRow( + const folly::Range& ranges, + const TFunc& func) { + for (const auto& range : ranges) { + for (auto i = 0; i < range.count; ++i) { + func(range.targetIndex + i, range.sourceIndex + i); + } + } +} + +/// Loops over 'ranges' and invokes 'func' for each range. +/// @param TFunc A void function taking 3 arguments: targetIndex, sourceIndex +/// and count. +template +void applyToEachRange( + const folly::Range& ranges, + const TFunc& func) { + for (const auto& range : ranges) { + func(range.targetIndex, range.sourceIndex, range.count); + } +} + template <> uint64_t BaseVector::byteSize(vector_size_t count); diff --git a/velox/vector/ComplexVector.cpp b/velox/vector/ComplexVector.cpp index 71b641685e6a..5e5138cc81e2 100644 --- a/velox/vector/ComplexVector.cpp +++ b/velox/vector/ComplexVector.cpp @@ -231,17 +231,19 @@ void RowVector::copy( [&](auto row) { rawMappedIndices[row] = indices[toSourceRow[row]]; }); } - auto baseSource = decodedSource.base()->as(); - for (auto i = 0; i < childrenSize_; ++i) { - if (baseSource->childAt(i)) { - BaseVector::ensureWritable( - rows, type()->asRow().childAt(i), pool(), children_[i]); - children_[i]->copy( - baseSource->childAt(i)->loadedVector(), - nonNullRows, - rawMappedIndices ? rawMappedIndices : indices); - } else { - children_[i].reset(); + if (source->typeKind() != TypeKind::UNKNOWN) { + auto baseSource = decodedSource.base()->as(); + for (auto i = 0; i < childrenSize_; ++i) { + if (baseSource->childAt(i)) { + BaseVector::ensureWritable( + rows, type()->asRow().childAt(i), pool(), children_[i]); + children_[i]->copy( + baseSource->childAt(i)->loadedVector(), + nonNullRows, + rawMappedIndices ? rawMappedIndices : indices); + } else { + children_[i].reset(); + } } } } @@ -268,12 +270,18 @@ void RowVector::copyRanges( return; } + if (source->typeKind() == TypeKind::UNKNOWN) { + BaseVector::setNulls(mutableRawNulls(), ranges, true); + return; + } + auto minTargetIndex = std::numeric_limits::max(); auto maxTargetIndex = std::numeric_limits::min(); - for (auto& r : ranges) { - minTargetIndex = std::min(minTargetIndex, r.targetIndex); - maxTargetIndex = std::max(maxTargetIndex, r.targetIndex + r.count); - } + applyToEachRange(ranges, [&](auto targetIndex, auto sourceIndex, auto count) { + minTargetIndex = std::min(minTargetIndex, targetIndex); + maxTargetIndex = std::max(maxTargetIndex, targetIndex + count); + }); + SelectivityVector rows(maxTargetIndex); rows.setValidRange(0, minTargetIndex, false); rows.updateBounds(); @@ -285,11 +293,7 @@ void RowVector::copyRanges( DecodedVector decoded(*source); if (decoded.isIdentityMapping() && !decoded.mayHaveNulls()) { if (rawNulls_) { - auto* rawNulls = mutableRawNulls(); - for (auto& r : ranges) { - bits::fillBits( - rawNulls, r.targetIndex, r.targetIndex + r.count, bits::kNotNull); - } + setNulls(mutableRawNulls(), ranges, false); } auto* rowSource = source->loadedVector()->as(); for (int i = 0; i < children_.size(); ++i) { @@ -298,27 +302,26 @@ void RowVector::copyRanges( } else { std::vector baseRanges; baseRanges.reserve(ranges.size()); - for (auto& r : ranges) { - for (vector_size_t i = 0; i < r.count; ++i) { - bool isNull = decoded.isNullAt(r.sourceIndex + i); - setNull(r.targetIndex + i, isNull); - if (isNull) { - continue; - } - auto baseIndex = decoded.index(r.sourceIndex + i); - if (!baseRanges.empty() && - baseRanges.back().sourceIndex + 1 == baseIndex && - baseRanges.back().targetIndex + 1 == r.targetIndex + i) { - ++baseRanges.back().count; - } else { - baseRanges.push_back({ - .sourceIndex = baseIndex, - .targetIndex = r.targetIndex + i, - .count = 1, - }); - } + applyToEachRow(ranges, [&](auto targetIndex, auto sourceIndex) { + bool isNull = decoded.isNullAt(sourceIndex); + setNull(targetIndex, isNull); + if (isNull) { + return; } - } + auto baseIndex = decoded.index(sourceIndex); + if (!baseRanges.empty() && + baseRanges.back().sourceIndex + 1 == baseIndex && + baseRanges.back().targetIndex + 1 == targetIndex) { + ++baseRanges.back().count; + } else { + baseRanges.push_back({ + .sourceIndex = baseIndex, + .targetIndex = targetIndex, + .count = 1, + }); + } + }); + auto* rowSource = decoded.base()->as(); for (int i = 0; i < children_.size(); ++i) { children_[i]->copyRanges( @@ -430,11 +433,9 @@ void ArrayVectorBase::copyRangesImpl( // A null constant does not have a value vector, so wrappedVector // returns the constant. VELOX_CHECK(sourceValue->isNullAt(0)); - for (auto& r : ranges) { - for (auto i = 0; i < r.count; ++i) { - setNull(r.targetIndex + i, true); - } - } + applyToEachRow(ranges, [&](auto targetIndex, auto /*sourceIndex*/) { + setNull(targetIndex, true); + }); return; } VELOX_CHECK_EQ(sourceValue->encoding(), encoding()); @@ -490,46 +491,43 @@ void ArrayVectorBase::copyRangesImpl( } else { std::vector outRanges; vector_size_t totalCount = 0; - for (auto& range : ranges) { - if (range.count == 0) { - continue; - } - VELOX_DCHECK(BaseVector::length_ >= range.targetIndex + range.count); - totalCount += range.count; - } - outRanges.reserve(totalCount); - for (auto& range : ranges) { - for (vector_size_t i = 0; i < range.count; ++i) { - if (source->isNullAt(range.sourceIndex + i)) { - setNull(range.targetIndex + i, true); - } else { - if (setNotNulls) { - setNull(range.targetIndex + i, false); + applyToEachRange( + ranges, [&](auto targetIndex, auto sourceIndex, auto count) { + if (count > 0) { + VELOX_DCHECK(BaseVector::length_ >= targetIndex + count); + totalCount += count; } - vector_size_t wrappedIndex = - source->wrappedIndex(range.sourceIndex + i); - vector_size_t copySize = sourceArray->sizeAt(wrappedIndex); - - if (copySize > 0) { - auto copyOffset = sourceArray->offsetAt(wrappedIndex); - - // If we're copying two adjacent ranges, merge them. This only - // works if they're consecutive. - if (!outRanges.empty() && - (outRanges.back().sourceIndex + outRanges.back().count == - copyOffset)) { - outRanges.back().count += copySize; - } else { - outRanges.push_back({copyOffset, childSize, copySize}); - } + }); + outRanges.reserve(totalCount); + applyToEachRow(ranges, [&](auto targetIndex, auto sourceIndex) { + if (source->isNullAt(sourceIndex)) { + setNull(targetIndex, true); + } else { + if (setNotNulls) { + setNull(targetIndex, false); + } + auto wrappedIndex = source->wrappedIndex(sourceIndex); + auto copySize = sourceArray->sizeAt(wrappedIndex); + + if (copySize > 0) { + auto copyOffset = sourceArray->offsetAt(wrappedIndex); + + // If we're copying two adjacent ranges, merge them. This only + // works if they're consecutive. + if (!outRanges.empty() && + (outRanges.back().sourceIndex + outRanges.back().count == + copyOffset)) { + outRanges.back().count += copySize; + } else { + outRanges.push_back({copyOffset, childSize, copySize}); } - - mutableOffsets[range.targetIndex + i] = childSize; - mutableSizes[range.targetIndex + i] = copySize; - childSize += copySize; } + + mutableOffsets[targetIndex] = childSize; + mutableSizes[targetIndex] = copySize; + childSize += copySize; } - } + }); targetValues->get()->resize(childSize); targetValues->get()->copyRanges(sourceValues, outRanges); diff --git a/velox/vector/FlatVector-inl.h b/velox/vector/FlatVector-inl.h index 05fe4cb9f22a..2b16c1085320 100644 --- a/velox/vector/FlatVector-inl.h +++ b/velox/vector/FlatVector-inl.h @@ -218,25 +218,43 @@ void FlatVector::copyValuesAndNulls( } template -void FlatVector::copyValuesAndNulls( +void FlatVector::copyRanges( const BaseVector* source, - vector_size_t targetIndex, - vector_size_t sourceIndex, - vector_size_t count) { - if (count == 0) { - return; + const folly::Range& ranges) { + if constexpr (std::is_same_v) { + auto leaf = + source->wrappedVector()->asUnchecked>(); + if (BaseVector::pool_ != leaf->pool()) { + applyToEachRow(ranges, [&](auto targetIndex, auto sourceIndex) { + if (source->isNullAt(sourceIndex)) { + this->setNull(targetIndex, true); + } else { + this->set( + targetIndex, leaf->valueAt(source->wrappedIndex(sourceIndex))); + } + }); + return; + } + + // We copy referencing the storage of 'source'. + acquireSharedStringBuffers(source); } + source = source->loadedVector(); VELOX_CHECK( BaseVector::compatibleKind(BaseVector::typeKind(), source->typeKind())); - VELOX_CHECK_GE(source->size(), sourceIndex + count); - VELOX_CHECK_GE(BaseVector::length_, targetIndex + count); - const uint64_t* sourceNulls = source->rawNulls(); + + const uint64_t* sourceRawNulls = source->rawNulls(); uint64_t* rawNulls = const_cast(BaseVector::rawNulls_); if (source->mayHaveNulls()) { rawNulls = BaseVector::mutableRawNulls(); } + if (source->typeKind() == TypeKind::UNKNOWN) { + BaseVector::setNulls(rawNulls, ranges, true); + return; + } + // Allocate values buffer if not allocated yet. This may happen if vector // contains only null values. if (!values_) { @@ -249,55 +267,85 @@ void FlatVector::copyValuesAndNulls( VELOX_CHECK_EQ( BaseVector::countNulls(source->nulls(), 0, source->size()), source->size()); - } else if (source->typeKind() != TypeKind::UNKNOWN) { + } else { auto flat = source->asUnchecked>(); - if (Buffer::is_pod_like_v) { - memcpy( - &rawValues_[targetIndex], - &flat->rawValues()[sourceIndex], - count * sizeof(T)); + if constexpr (std::is_same_v) { + auto rawValues = reinterpret_cast(rawValues_); + auto* sourceValues = + source->asUnchecked>()->rawValues(); + applyToEachRange( + ranges, [&](auto targetIndex, auto sourceIndex, auto count) { + bits::copyBits( + sourceValues, sourceIndex, rawValues, targetIndex, count); + }); } else { - const T* srcValues = flat->rawValues(); - std::copy( - srcValues + sourceIndex, - srcValues + sourceIndex + count, - rawValues_ + targetIndex); + const T* sourceValues = flat->rawValues(); + applyToEachRange( + ranges, [&](auto targetIndex, auto sourceIndex, auto count) { + if (Buffer::is_pod_like_v) { + memcpy( + &rawValues_[targetIndex], + &sourceValues[sourceIndex], + count * sizeof(T)); + } else { + std::copy( + sourceValues + sourceIndex, + sourceValues + sourceIndex + count, + rawValues_ + targetIndex); + } + }); } } + if (rawNulls) { - if (sourceNulls) { - bits::copyBits(sourceNulls, sourceIndex, rawNulls, targetIndex, count); + if (sourceRawNulls) { + BaseVector::copyNulls(rawNulls, sourceRawNulls, ranges); } else { - bits::fillBits( - rawNulls, targetIndex, targetIndex + count, bits::kNotNull); + BaseVector::setNulls(rawNulls, ranges, false); } } } else if (source->isConstantEncoding()) { if (source->isNullAt(0)) { - bits::fillBits(rawNulls, targetIndex, targetIndex + count, bits::kNull); + BaseVector::setNulls(rawNulls, ranges, true); return; } auto constant = source->asUnchecked>(); T value = constant->valueAt(0); - for (auto row = targetIndex; row < targetIndex + count; ++row) { - rawValues_[row] = value; + if constexpr (std::is_same_v) { + auto rawValues = reinterpret_cast(rawValues_); + applyToEachRange( + ranges, [&](auto targetIndex, auto /*sourceIndex*/, auto count) { + bits::fillBits(rawValues, targetIndex, targetIndex + count, value); + }); + } else { + applyToEachRow(ranges, [&](auto targetIndex, auto /*sourceIndex*/) { + rawValues_[targetIndex] = value; + }); } if (rawNulls) { - bits::fillBits( - rawNulls, targetIndex, targetIndex + count, bits::kNotNull); + BaseVector::setNulls(rawNulls, ranges, false); } } else { - auto sourceVector = source->asUnchecked>(); - for (int32_t i = 0; i < count; ++i) { - if (!source->isNullAt(sourceIndex + i)) { - rawValues_[targetIndex + i] = sourceVector->valueAt(sourceIndex + i); + auto* sourceVector = source->asUnchecked>(); + uint64_t* rawBoolValues = nullptr; + if constexpr (std::is_same_v) { + rawBoolValues = reinterpret_cast(rawValues_); + } + applyToEachRow(ranges, [&](auto targetIndex, auto sourceIndex) { + if (!source->isNullAt(sourceIndex)) { + auto sourceValue = sourceVector->valueAt(sourceIndex); + if constexpr (std::is_same_v) { + bits::setBit(rawBoolValues, targetIndex, sourceValue); + } else { + rawValues_[targetIndex] = sourceValue; + } if (rawNulls) { - bits::clearNull(rawNulls, targetIndex + i); + bits::clearNull(rawNulls, targetIndex); } } else { - bits::setNull(rawNulls, targetIndex + i); + bits::setNull(rawNulls, targetIndex); } - } + }); } } diff --git a/velox/vector/FlatVector.cpp b/velox/vector/FlatVector.cpp index cbc846270a18..b29fabf04151 100644 --- a/velox/vector/FlatVector.cpp +++ b/velox/vector/FlatVector.cpp @@ -130,75 +130,6 @@ void FlatVector::copyValuesAndNulls( } } -template <> -void FlatVector::copyValuesAndNulls( - const BaseVector* source, - vector_size_t targetIndex, - vector_size_t sourceIndex, - vector_size_t count) { - if (count == 0) { - return; - } - source = source->loadedVector(); - VELOX_CHECK( - BaseVector::compatibleKind(BaseVector::typeKind(), source->typeKind())); - VELOX_CHECK(source->size() >= sourceIndex + count); - VELOX_CHECK(BaseVector::length_ >= targetIndex + count); - - const uint64_t* sourceNulls = source->rawNulls(); - auto rawValues = reinterpret_cast(rawValues_); - uint64_t* rawNulls = const_cast(BaseVector::rawNulls_); - if (source->mayHaveNulls()) { - rawNulls = BaseVector::mutableRawNulls(); - } - if (source->isFlatEncoding()) { - if (source->typeKind() != TypeKind::UNKNOWN) { - auto* sourceValues = - source->asUnchecked>()->rawValues(); - bits::copyBits(sourceValues, sourceIndex, rawValues, targetIndex, count); - } - if (rawNulls) { - if (sourceNulls) { - bits::copyBits(sourceNulls, sourceIndex, rawNulls, targetIndex, count); - } else { - bits::fillBits( - rawNulls, targetIndex, targetIndex + count, bits::kNotNull); - } - } - } else if (source->isConstantEncoding()) { - auto constant = source->asUnchecked>(); - if (constant->isNullAt(0)) { - bits::fillBits(rawNulls, targetIndex, targetIndex + count, bits::kNull); - return; - } - bool value = constant->valueAt(0); - bits::fillBits(rawValues, targetIndex, targetIndex + count, value); - if (rawNulls) { - bits::fillBits( - rawNulls, targetIndex, targetIndex + count, bits::kNotNull); - } - } else { - auto sourceVector = source->typeKind() != TypeKind::UNKNOWN - ? source->asUnchecked>() - : nullptr; - for (int32_t i = 0; i < count; ++i) { - if (!source->isNullAt(sourceIndex + i)) { - if (sourceVector) { - bits::setBit( - rawValues, - targetIndex + i, - sourceVector->valueAt(sourceIndex + i)); - } - if (rawNulls) { - bits::clearNull(rawNulls, targetIndex + i); - } - } else { - bits::setNull(rawNulls, targetIndex + i); - } - } - } -} - template <> Buffer* FlatVector::getBufferWithSpace(vector_size_t size) { VELOX_DCHECK_GE(stringBuffers_.size(), stringBufferSet_.size()); @@ -461,43 +392,6 @@ void FlatVector::copy( } } -template <> -void FlatVector::copy( - const BaseVector* source, - vector_size_t targetIndex, - vector_size_t sourceIndex, - vector_size_t count) { - if (count == 0) { - return; - } - BaseVector::copy(source, targetIndex, sourceIndex, count); -} - -template <> -void FlatVector::copyRanges( - const BaseVector* source, - const folly::Range& ranges) { - auto leaf = source->wrappedVector()->asUnchecked>(); - if (pool_ == leaf->pool()) { - // We copy referencing the storage of 'source'. - for (auto& r : ranges) { - copyValuesAndNulls(source, r.targetIndex, r.sourceIndex, r.count); - } - acquireSharedStringBuffers(source); - } else { - for (auto& r : ranges) { - for (auto i = 0; i < r.count; ++i) { - if (source->isNullAt(r.sourceIndex + i)) { - setNull(r.targetIndex + i, true); - } else { - set(r.targetIndex + i, - leaf->valueAt(source->wrappedIndex(r.sourceIndex + i))); - } - } - } - } -} - // For strings, we also verify if they point to valid memory locations inside // the string buffers. template <> diff --git a/velox/vector/FlatVector.h b/velox/vector/FlatVector.h index 8df10806f4f6..ae535a9cbb2d 100644 --- a/velox/vector/FlatVector.h +++ b/velox/vector/FlatVector.h @@ -230,16 +230,13 @@ class FlatVector final : public SimpleVector { if (count == 0) { return; } - copyValuesAndNulls(source, targetIndex, sourceIndex, count); + BaseVector::CopyRange range{sourceIndex, targetIndex, count}; + copyRanges(source, folly::Range(&range, 1)); } void copyRanges( const BaseVector* source, - const folly::Range& ranges) override { - for (auto& range : ranges) { - copy(source, range.targetIndex, range.sourceIndex, range.count); - } - } + const folly::Range& ranges) override; void resize(vector_size_t newSize, bool setNotNull = true) override; @@ -450,12 +447,6 @@ class FlatVector final : public SimpleVector { const SelectivityVector& rows, const vector_size_t* toSourceRow); - void copyValuesAndNulls( - const BaseVector* source, - vector_size_t targetIndex, - vector_size_t sourceIndex, - vector_size_t count); - // Ensures that the values buffer has space for 'newSize' elements and is // mutable. Sets elements between the old and new sizes to 'initialValue' if // the new size > old size. @@ -509,18 +500,6 @@ void FlatVector::copy( const SelectivityVector& rows, const vector_size_t* toSourceRow); -template <> -void FlatVector::copy( - const BaseVector* source, - vector_size_t targetIndex, - vector_size_t sourceIndex, - vector_size_t count); - -template <> -void FlatVector::copyRanges( - const BaseVector* source, - const folly::Range& ranges); - template <> void FlatVector::validate( const VectorValidateOptions& options) const; @@ -531,13 +510,6 @@ void FlatVector::copyValuesAndNulls( const SelectivityVector& rows, const vector_size_t* toSourceRow); -template <> -void FlatVector::copyValuesAndNulls( - const BaseVector* source, - vector_size_t targetIndex, - vector_size_t sourceIndex, - vector_size_t count); - template <> Buffer* FlatVector::getBufferWithSpace(vector_size_t size);