diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 1546c2e6ffd02..3f34a33911635 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -280,6 +280,7 @@ if(ARROW_COMPUTE) compute/kernels/sum.cc compute/kernels/add.cc compute/kernels/take.cc + compute/kernels/ntake.cc compute/kernels/isin.cc compute/kernels/probe.cc compute/kernels/util_internal.cc diff --git a/cpp/src/arrow/compute/kernels/ntake.cc b/cpp/src/arrow/compute/kernels/ntake.cc new file mode 100644 index 0000000000000..a11fb2995b1fd --- /dev/null +++ b/cpp/src/arrow/compute/kernels/ntake.cc @@ -0,0 +1,199 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// returnGegarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include "arrow/array/concatenate.h" +#include "arrow/compute/kernels/ntake.h" +#include "arrow/compute/kernels/ntake_internal.h" +#include "arrow/util/logging.h" +#include "arrow/visitor_inline.h" + +namespace arrow { +namespace compute { + +template +class NTakeKernelImpl : public NTakeKernel { + public: + explicit NTakeKernelImpl(const std::shared_ptr& value_type) + : NTakeKernel(value_type) {} + + Status Init() { + return Taker>::Make(this->type_, &taker_); + } + + Status NTake(FunctionContext* ctx, const Array& values, const Array& indices_array, + std::shared_ptr* out) override { + RETURN_NOT_OK(taker_->SetContext(ctx)); + RETURN_NOT_OK(taker_->Take(values, ArrayIndexSequence(indices_array))); + return taker_->Finish(out); + } + + std::unique_ptr>> taker_; +}; + +struct UnpackIndices { + template + enable_if_integer Visit(const IndexType&) { + auto out = new NTakeKernelImpl(value_type_); + out_->reset(out); + return out->Init(); + } + + Status Visit(const DataType& other) { + return Status::TypeError("index type not supported: ", other); + } + + std::shared_ptr value_type_; + std::unique_ptr* out_; +}; + +Status NTakeKernel::Make(const std::shared_ptr& value_type, + const std::shared_ptr& index_type, + std::unique_ptr* out) { + UnpackIndices visitor{value_type, out}; + return VisitTypeInline(*index_type, &visitor); +} + +Status NTakeKernel::Call(FunctionContext* ctx, const Datum& values, const Datum& indices, + Datum* out) { + if (!values.is_array() || !indices.is_array()) { + return Status::Invalid("NTakeKernel expects array values and indices"); + } + auto values_array = values.make_array(); + auto indices_array = indices.make_array(); + std::shared_ptr out_array; + RETURN_NOT_OK(NTake(ctx, *values_array, *indices_array, &out_array)); + *out = Datum(out_array); + return Status::OK(); +} + +Status NTake(FunctionContext* ctx, const Array& values, const Array& indices, + const NTakeOptions& options, std::shared_ptr* out) { + Datum out_datum; + RETURN_NOT_OK( + NTake(ctx, Datum(values.data()), Datum(indices.data()), options, &out_datum)); + *out = out_datum.make_array(); + return Status::OK(); +} + +Status NTake(FunctionContext* ctx, const Datum& values, const Datum& indices, + const NTakeOptions& options, Datum* out) { + std::unique_ptr kernel; + RETURN_NOT_OK(NTakeKernel::Make(values.type(), indices.type(), &kernel)); + return kernel->Call(ctx, values, indices, out); +} + +Status NTake(FunctionContext* ctx, const ChunkedArray& values, const Array& indices, + const NTakeOptions& options, std::shared_ptr* out) { + auto num_chunks = values.num_chunks(); + std::vector> new_chunks(1); // Hard-coded 1 for now + std::shared_ptr current_chunk; + + // Case 1: `values` has a single chunk, so just use it + if (num_chunks == 1) { + current_chunk = values.chunk(0); + } else { + // TODO Case 2: See if all `indices` fall in the same chunk and call Array Take on it + // See + // https://github.com/apache/arrow/blob/6f2c9041137001f7a9212f244b51bc004efc29af/r/src/compute.cpp#L123-L151 + // TODO Case 3: If indices are sorted, can slice them and call Array Take + + // Case 4: Else, concatenate chunks and call Array Take + RETURN_NOT_OK(Concatenate(values.chunks(), default_memory_pool(), ¤t_chunk)); + } + // Call Array Take on our single chunk + RETURN_NOT_OK(NTake(ctx, *current_chunk, indices, options, &new_chunks[0])); + *out = std::make_shared(std::move(new_chunks)); + return Status::OK(); +} + +Status NTake(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray& indices, + const NTakeOptions& options, std::shared_ptr* out) { + auto num_chunks = indices.num_chunks(); + std::vector> new_chunks(num_chunks); + std::shared_ptr current_chunk; + + for (int i = 0; i < num_chunks; i++) { + // Take with that indices chunk + // Note that as currently implemented, this is inefficient because `values` + // will get concatenated on every iteration of this loop + RETURN_NOT_OK(NTake(ctx, values, *indices.chunk(i), options, ¤t_chunk)); + // Concatenate the result to make a single array for this chunk + RETURN_NOT_OK( + Concatenate(current_chunk->chunks(), default_memory_pool(), &new_chunks[i])); + } + *out = std::make_shared(std::move(new_chunks)); + return Status::OK(); +} + +Status NTake(FunctionContext* ctx, const Array& values, const ChunkedArray& indices, + const NTakeOptions& options, std::shared_ptr* out) { + auto num_chunks = indices.num_chunks(); + std::vector> new_chunks(num_chunks); + + for (int i = 0; i < num_chunks; i++) { + // Take with that indices chunk + RETURN_NOT_OK(NTake(ctx, values, *indices.chunk(i), options, &new_chunks[i])); + } + *out = std::make_shared(std::move(new_chunks)); + return Status::OK(); +} + +Status NTake(FunctionContext* ctx, const RecordBatch& batch, const Array& indices, + const NTakeOptions& options, std::shared_ptr* out) { + auto ncols = batch.num_columns(); + auto nrows = indices.length(); + + std::vector> columns(ncols); + + for (int j = 0; j < ncols; j++) { + RETURN_NOT_OK(NTake(ctx, *batch.column(j), indices, options, &columns[j])); + } + *out = RecordBatch::Make(batch.schema(), nrows, columns); + return Status::OK(); +} + +Status NTake(FunctionContext* ctx, const Table& table, const Array& indices, + const NTakeOptions& options, std::shared_ptr* out) { + auto ncols = table.num_columns(); + std::vector> columns(ncols); + + for (int j = 0; j < ncols; j++) { + RETURN_NOT_OK(NTake(ctx, *table.column(j), indices, options, &columns[j])); + } + *out = Table::Make(table.schema(), columns); + return Status::OK(); +} + +Status NTake(FunctionContext* ctx, const Table& table, const ChunkedArray& indices, + const NTakeOptions& options, std::shared_ptr
* out) { + auto ncols = table.num_columns(); + std::vector> columns(ncols); + + for (int j = 0; j < ncols; j++) { + RETURN_NOT_OK(NTake(ctx, *table.column(j), indices, options, &columns[j])); + } + *out = Table::Make(table.schema(), columns); + return Status::OK(); +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/ntake.h b/cpp/src/arrow/compute/kernels/ntake.h new file mode 100644 index 0000000000000..88c65f89b97ed --- /dev/null +++ b/cpp/src/arrow/compute/kernels/ntake.h @@ -0,0 +1,214 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "arrow/compute/kernel.h" +#include "arrow/status.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class Array; + +namespace compute { + +class FunctionContext; + +struct ARROW_EXPORT NTakeOptions {}; + +/// \brief Take from an array of values at indices in another array +/// +/// The output array will be of the same type as the input values +/// array, with elements taken from the values array at the given +/// indices. If an index is null then the taken element will be null. +/// +/// For example given values = ["a", "b", "c", null, "e", "f"] and +/// indices = [2, 1, null, 3], the output will be +/// = [values[2], values[1], null, values[3]] +/// = ["c", "b", null, null] +/// +/// \param[in] ctx the FunctionContext +/// \param[in] values array from which to take +/// \param[in] indices which values to take +/// \param[in] options options +/// \param[out] out resulting array +ARROW_EXPORT +Status NTake(FunctionContext* ctx, const Array& values, const Array& indices, + const NTakeOptions& options, std::shared_ptr* out); + +/// \brief Take from a chunked array of values at indices in another array +/// +/// The output chunked array will be of the same type as the input values +/// array, with elements taken from the values array at the given +/// indices. If an index is null then the taken element will be null. +/// +/// For example given values = ["a", "b", "c", null, "e", "f"] and +/// indices = [2, 1, null, 3], the output will be +/// = [values[2], values[1], null, values[3]] +/// = ["c", "b", null, null] +/// +/// \param[in] ctx the FunctionContext +/// \param[in] values chunked array from which to take +/// \param[in] indices which values to take +/// \param[in] options options +/// \param[out] out resulting chunked array +/// NOTE: Experimental API +ARROW_EXPORT +Status NTake(FunctionContext* ctx, const ChunkedArray& values, const Array& indices, + const NTakeOptions& options, std::shared_ptr* out); + +/// \brief Take from a chunked array of values at indices in a chunked array +/// +/// The output chunked array will be of the same type as the input values +/// array, with elements taken from the values array at the given +/// indices. If an index is null then the taken element will be null. +/// The chunks in the output array will align with the chunks in the indices. +/// +/// For example given values = ["a", "b", "c", null, "e", "f"] and +/// indices = [2, 1, null, 3], the output will be +/// = [values[2], values[1], null, values[3]] +/// = ["c", "b", null, null] +/// +/// \param[in] ctx the FunctionContext +/// \param[in] values chunked array from which to take +/// \param[in] indices which values to take +/// \param[in] options options +/// \param[out] out resulting chunked array +/// NOTE: Experimental API +ARROW_EXPORT +Status NTake(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray& indices, + const NTakeOptions& options, std::shared_ptr* out); + +/// \brief Take from an array of values at indices in a chunked array +/// +/// The output chunked array will be of the same type as the input values +/// array, with elements taken from the values array at the given +/// indices. If an index is null then the taken element will be null. +/// The chunks in the output array will align with the chunks in the indices. +/// +/// For example given values = ["a", "b", "c", null, "e", "f"] and +/// indices = [2, 1, null, 3], the output will be +/// = [values[2], values[1], null, values[3]] +/// = ["c", "b", null, null] +/// +/// \param[in] ctx the FunctionContext +/// \param[in] values array from which to take +/// \param[in] indices which values to take +/// \param[in] options options +/// \param[out] out resulting chunked array +/// NOTE: Experimental API +ARROW_EXPORT +Status NTake(FunctionContext* ctx, const Array& values, const ChunkedArray& indices, + const NTakeOptions& options, std::shared_ptr* out); + +/// \brief Take from a record batch at indices in another array +/// +/// The output batch will have the same schema as the input batch, +/// with rows taken from the columns in the batch at the given +/// indices. If an index is null then the taken element will be null. +/// +/// \param[in] ctx the FunctionContext +/// \param[in] batch record batch from which to take +/// \param[in] indices which values to take +/// \param[in] options options +/// \param[out] out resulting record batch +/// NOTE: Experimental API +ARROW_EXPORT +Status NTake(FunctionContext* ctx, const RecordBatch& batch, const Array& indices, + const NTakeOptions& options, std::shared_ptr* out); + +/// \brief Take from a table at indices in an array +/// +/// The output table will have the same schema as the input table, +/// with rows taken from the columns in the table at the given +/// indices. If an index is null then the taken element will be null. +/// +/// \param[in] ctx the FunctionContext +/// \param[in] table table from which to take +/// \param[in] indices which values to take +/// \param[in] options options +/// \param[out] out resulting table +/// NOTE: Experimental API +ARROW_EXPORT +Status NTake(FunctionContext* ctx, const Table& table, const Array& indices, + const NTakeOptions& options, std::shared_ptr
* out); + +/// \brief Take from a table at indices in a chunked array +/// +/// The output table will have the same schema as the input table, +/// with rows taken from the values array at the given +/// indices. If an index is null then the taken element will be null. +/// +/// \param[in] ctx the FunctionContext +/// \param[in] table table from which to take +/// \param[in] indices which values to take +/// \param[in] options options +/// \param[out] out resulting table +/// NOTE: Experimental API +ARROW_EXPORT +Status NTake(FunctionContext* ctx, const Table& table, const ChunkedArray& indices, + const NTakeOptions& options, std::shared_ptr
* out); + +/// \brief Take from an array of values at indices in another array +/// +/// \param[in] ctx the FunctionContext +/// \param[in] values datum from which to take +/// \param[in] indices which values to take +/// \param[in] options options +/// \param[out] out resulting datum +ARROW_EXPORT +Status NTake(FunctionContext* ctx, const Datum& values, const Datum& indices, + const NTakeOptions& options, Datum* out); + +/// \brief BinaryKernel implementing Take operation +class ARROW_EXPORT NTakeKernel : public BinaryKernel { + public: + explicit NTakeKernel(const std::shared_ptr& type, NTakeOptions options = {}) + : type_(type) {} + + /// \brief BinaryKernel interface + /// + /// delegates to subclasses via Take() + Status Call(FunctionContext* ctx, const Datum& values, const Datum& indices, + Datum* out) override; + + /// \brief output type of this kernel (identical to type of values taken) + std::shared_ptr out_type() const override { return type_; } + + /// \brief factory for TakeKernels + /// + /// \param[in] value_type constructed TakeKernel will support taking + /// values of this type + /// \param[in] index_type constructed TakeKernel will support taking + /// with indices of this type + /// \param[out] out created kernel + static Status Make(const std::shared_ptr& value_type, + const std::shared_ptr& index_type, + std::unique_ptr* out); + + /// \brief single-array implementation + virtual Status NTake(FunctionContext* ctx, const Array& values, const Array& indices, + std::shared_ptr* out) = 0; + + protected: + std::shared_ptr type_; +}; +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/ntake_internal.h b/cpp/src/arrow/compute/kernels/ntake_internal.h new file mode 100644 index 0000000000000..35b7f3e472030 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/ntake_internal.h @@ -0,0 +1,769 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "arrow/builder.h" +#include "arrow/compute/context.h" +#include "arrow/type_traits.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/logging.h" +#include "arrow/visitor_inline.h" + +namespace arrow { +namespace compute { + +using internal::checked_cast; +using internal::checked_pointer_cast; + +template +using enable_if_not_base_binary = + enable_if_t::value, R>; + +// For non-binary builders, use regular value append +template +static enable_if_not_base_binary UnsafeAppend( + Builder* builder, Scalar&& value) { + builder->UnsafeAppend(std::forward(value)); + return Status::OK(); +} + +// For binary builders, need to reserve byte storage first +template +static enable_if_base_binary UnsafeAppend( + Builder* builder, util::string_view value) { + RETURN_NOT_OK(builder->ReserveData(static_cast(value.size()))); + builder->UnsafeAppend(value); + return Status::OK(); +} + +/// \brief visit indices from an IndexSequence while bounds checking +/// +/// \param[in] indices IndexSequence to visit +/// \param[in] values array to bounds check against, if necessary +/// \param[in] vis index visitor, signature must be Status(int64_t index, bool is_valid) +template +Status VisitIndices(IndexSequence indices, const Array& values, Visitor&& vis) { + for (int64_t i = 0; i < indices.length(); ++i) { + auto index_valid = indices.Next(); + if (SomeIndicesNull && !index_valid.second) { + RETURN_NOT_OK(vis(0, false)); + continue; + } + + auto index = index_valid.first; + if (!NeverOutOfBounds) { + if (index < 0 || index >= values.length()) { + return Status::IndexError("take index out of bounds"); + } + } else { + DCHECK_GE(index, 0); + DCHECK_LT(index, values.length()); + } + + bool is_valid = !SomeValuesNull || values.IsValid(index); + RETURN_NOT_OK(vis(index, is_valid)); + } + return Status::OK(); +} + +template +Status VisitIndices(IndexSequence indices, const Array& values, Visitor&& vis) { + if (indices.never_out_of_bounds()) { + return VisitIndices( + indices, values, std::forward(vis)); + } + return VisitIndices(indices, values, + std::forward(vis)); +} + +template +Status VisitIndices(IndexSequence indices, const Array& values, Visitor&& vis) { + if (values.null_count() == 0) { + return VisitIndices(indices, values, + std::forward(vis)); + } + return VisitIndices(indices, values, std::forward(vis)); +} + +template +Status VisitIndices(IndexSequence indices, const Array& values, Visitor&& vis) { + if (indices.null_count() == 0) { + return VisitIndices(indices, values, std::forward(vis)); + } + return VisitIndices(indices, values, std::forward(vis)); +} + +// Helper class for gathering values from an array +template +class Taker { + public: + explicit Taker(const std::shared_ptr& type) : type_(type) {} + + virtual ~Taker() = default; + + // initialize this taker including constructing any children, + // must be called once after construction before any other methods are called + virtual Status Init() { return Status::OK(); } + + // reset this Taker and set FunctionContext for taking an array + // must be called each time the FunctionContext may have changed + virtual Status SetContext(FunctionContext* ctx) = 0; + + // gather elements from an array at the provided indices + virtual Status Take(const Array& values, IndexSequence indices) = 0; + + // assemble an array of all gathered values + virtual Status Finish(std::shared_ptr*) = 0; + + // factory; the output Taker will support gathering values of the given type + static Status Make(const std::shared_ptr& type, std::unique_ptr* out); + + static_assert(std::is_literal_type::value, + "Index sequences must be literal type"); + + static_assert(std::is_copy_constructible::value, + "Index sequences must be copy constructible"); + + static_assert(std::is_same().Next()), + std::pair>::value, + "An index sequence must yield pairs of indices:int64_t, validity:bool."); + + static_assert(std::is_same().length()), + int64_t>::value, + "An index sequence must provide its length."); + + static_assert(std::is_same().null_count()), + int64_t>::value, + "An index sequence must provide the number of nulls it will take."); + + static_assert( + std::is_same().never_out_of_bounds()), + bool>::value, + "Index sequences must declare whether bounds checking is necessary"); + + static_assert( + std::is_same().set_never_out_of_bounds()), + void>::value, + "An index sequence must support ignoring bounds checking."); + + protected: + template + Status MakeBuilder(MemoryPool* pool, std::unique_ptr* out) { + std::unique_ptr builder; + RETURN_NOT_OK(arrow::MakeBuilder(pool, type_, &builder)); + out->reset(checked_cast(builder.release())); + return Status::OK(); + } + + std::shared_ptr type_; +}; + +// an IndexSequence which yields indices from a specified range +// or yields null for the length of that range +class RangeIndexSequence { + public: + constexpr bool never_out_of_bounds() const { return true; } + void set_never_out_of_bounds() {} + + constexpr RangeIndexSequence() = default; + + RangeIndexSequence(bool is_valid, int64_t offset, int64_t length) + : is_valid_(is_valid), index_(offset), length_(length) {} + + std::pair Next() { return std::make_pair(index_++, is_valid_); } + + int64_t length() const { return length_; } + + int64_t null_count() const { return is_valid_ ? 0 : length_; } + + private: + bool is_valid_ = true; + int64_t index_ = 0, length_ = -1; +}; + +// an IndexSequence which yields the values of an Array of integers +template +class ArrayIndexSequence { + public: + bool never_out_of_bounds() const { return never_out_of_bounds_; } + void set_never_out_of_bounds() { never_out_of_bounds_ = true; } + + constexpr ArrayIndexSequence() = default; + + explicit ArrayIndexSequence(const Array& indices) + : indices_(&checked_cast&>(indices)) {} + + std::pair Next() { + if (indices_->IsNull(index_)) { + ++index_; + return std::make_pair(-1, false); + } + return std::make_pair(indices_->Value(index_++), true); + } + + int64_t length() const { return indices_->length(); } + + int64_t null_count() const { return indices_->null_count(); } + + private: + const NumericArray* indices_ = nullptr; + int64_t index_ = 0; + bool never_out_of_bounds_ = false; +}; + +// Default implementation: taking from a simple array into a builder requires only that +// the array supports array.GetView() and the corresponding builder supports +// builder.UnsafeAppend(array.GetView()) +template +class TakerImpl : public Taker { + public: + using ArrayType = typename TypeTraits::ArrayType; + using BuilderType = typename TypeTraits::BuilderType; + + using Taker::Taker; + + Status SetContext(FunctionContext* ctx) override { + return this->MakeBuilder(ctx->memory_pool(), &builder_); + } + + Status Take(const Array& values, IndexSequence indices) override { + DCHECK(this->type_->Equals(values.type())); + RETURN_NOT_OK(builder_->Reserve(indices.length())); + return VisitIndices(indices, values, [&](int64_t index, bool is_valid) { + if (!is_valid) { + //builder_->UnsafeAppendNull(); + return Status::OK(); + } + auto value = checked_cast(values).GetView(index); + return UnsafeAppend(builder_.get(), value); + }); + } + + Status Finish(std::shared_ptr* out) override { return builder_->Finish(out); } + + private: + std::unique_ptr builder_; +}; + +// Gathering from NullArrays is trivial; skip the builder and just +// do bounds checking +template +class TakerImpl : public Taker { + public: + using Taker::Taker; + + Status SetContext(FunctionContext*) override { return Status::OK(); } + + Status Take(const Array& values, IndexSequence indices) override { + DCHECK(this->type_->Equals(values.type())); + + length_ += indices.length(); + + if (indices.never_out_of_bounds()) { + return Status::OK(); + } + + return VisitIndices(indices, values, [](int64_t, bool) { return Status::OK(); }); + } + + Status Finish(std::shared_ptr* out) override { + out->reset(new NullArray(length_)); + return Status::OK(); + } + + private: + int64_t length_ = 0; +}; + +template +class ListTakerImpl : public Taker { + public: + using offset_type = typename TypeClass::offset_type; + using ArrayType = typename TypeTraits::ArrayType; + + using Taker::Taker; + + Status Init() override { + const auto& list_type = checked_cast(*this->type_); + return Taker::Make(list_type.value_type(), &value_taker_); + } + + Status SetContext(FunctionContext* ctx) override { + auto pool = ctx->memory_pool(); + null_bitmap_builder_.reset(new TypedBufferBuilder(pool)); + offset_builder_.reset(new TypedBufferBuilder(pool)); + RETURN_NOT_OK(offset_builder_->Append(0)); + return value_taker_->SetContext(ctx); + } + + Status Take(const Array& values, IndexSequence indices) override { + DCHECK(this->type_->Equals(values.type())); + + const auto& list_array = checked_cast(values); + + RETURN_NOT_OK(null_bitmap_builder_->Reserve(indices.length())); + RETURN_NOT_OK(offset_builder_->Reserve(indices.length())); + + offset_type offset = offset_builder_->data()[offset_builder_->length() - 1]; + return VisitIndices(indices, values, [&](int64_t index, bool is_valid) { + null_bitmap_builder_->UnsafeAppend(is_valid); + + if (is_valid) { + offset += list_array.value_length(index); + RangeIndexSequence value_indices(true, list_array.value_offset(index), + list_array.value_length(index)); + RETURN_NOT_OK(value_taker_->Take(*list_array.values(), value_indices)); + } + + offset_builder_->UnsafeAppend(offset); + return Status::OK(); + }); + } + + Status Finish(std::shared_ptr* out) override { + auto null_count = null_bitmap_builder_->false_count(); + auto length = null_bitmap_builder_->length(); + + std::shared_ptr offsets, null_bitmap; + RETURN_NOT_OK(null_bitmap_builder_->Finish(&null_bitmap)); + RETURN_NOT_OK(offset_builder_->Finish(&offsets)); + + std::shared_ptr taken_values; + RETURN_NOT_OK(value_taker_->Finish(&taken_values)); + + out->reset(new ArrayType(this->type_, length, offsets, taken_values, null_bitmap, + null_count)); + return Status::OK(); + } + + std::unique_ptr> null_bitmap_builder_; + std::unique_ptr> offset_builder_; + std::unique_ptr> value_taker_; +}; + +template +class TakerImpl : public ListTakerImpl { + using ListTakerImpl::ListTakerImpl; +}; + +template +class TakerImpl + : public ListTakerImpl { + using ListTakerImpl::ListTakerImpl; +}; + +template +class TakerImpl : public ListTakerImpl { + using ListTakerImpl::ListTakerImpl; +}; + +template +class TakerImpl : public Taker { + public: + using Taker::Taker; + + Status Init() override { + const auto& list_type = checked_cast(*this->type_); + return Taker::Make(list_type.value_type(), &value_taker_); + } + + Status SetContext(FunctionContext* ctx) override { + auto pool = ctx->memory_pool(); + null_bitmap_builder_.reset(new TypedBufferBuilder(pool)); + return value_taker_->SetContext(ctx); + } + + Status Take(const Array& values, IndexSequence indices) override { + DCHECK(this->type_->Equals(values.type())); + + const auto& list_array = checked_cast(values); + auto list_size = list_array.list_type()->list_size(); + + RETURN_NOT_OK(null_bitmap_builder_->Reserve(indices.length())); + return VisitIndices(indices, values, [&](int64_t index, bool is_valid) { + null_bitmap_builder_->UnsafeAppend(is_valid); + + // for FixedSizeList, null lists are not empty (they also span a segment of + // list_size in the child data), so we must append to value_taker_ even if !is_valid + RangeIndexSequence value_indices(is_valid, list_array.value_offset(index), + list_size); + return value_taker_->Take(*list_array.values(), value_indices); + }); + } + + Status Finish(std::shared_ptr* out) override { + auto null_count = null_bitmap_builder_->false_count(); + auto length = null_bitmap_builder_->length(); + + std::shared_ptr null_bitmap; + RETURN_NOT_OK(null_bitmap_builder_->Finish(&null_bitmap)); + + std::shared_ptr taken_values; + RETURN_NOT_OK(value_taker_->Finish(&taken_values)); + + out->reset(new FixedSizeListArray(this->type_, length, taken_values, null_bitmap, + null_count)); + return Status::OK(); + } + + protected: + std::unique_ptr> null_bitmap_builder_; + std::unique_ptr> value_taker_; +}; + +template +class TakerImpl : public Taker { + public: + using Taker::Taker; + + Status Init() override { + children_.resize(this->type_->num_children()); + for (int i = 0; i < this->type_->num_children(); ++i) { + RETURN_NOT_OK( + Taker::Make(this->type_->child(i)->type(), &children_[i])); + } + return Status::OK(); + } + + Status SetContext(FunctionContext* ctx) override { + null_bitmap_builder_.reset(new TypedBufferBuilder(ctx->memory_pool())); + for (int i = 0; i < this->type_->num_children(); ++i) { + RETURN_NOT_OK(children_[i]->SetContext(ctx)); + } + return Status::OK(); + } + + Status Take(const Array& values, IndexSequence indices) override { + DCHECK(this->type_->Equals(values.type())); + + RETURN_NOT_OK(null_bitmap_builder_->Reserve(indices.length())); + RETURN_NOT_OK(VisitIndices(indices, values, [&](int64_t, bool is_valid) { + null_bitmap_builder_->UnsafeAppend(is_valid); + return Status::OK(); + })); + + // bounds checking was done while appending to the null bitmap + indices.set_never_out_of_bounds(); + + const auto& struct_array = checked_cast(values); + for (int i = 0; i < this->type_->num_children(); ++i) { + RETURN_NOT_OK(children_[i]->Take(*struct_array.field(i), indices)); + } + return Status::OK(); + } + + Status Finish(std::shared_ptr* out) override { + auto null_count = null_bitmap_builder_->false_count(); + auto length = null_bitmap_builder_->length(); + std::shared_ptr null_bitmap; + RETURN_NOT_OK(null_bitmap_builder_->Finish(&null_bitmap)); + + ArrayVector fields(this->type_->num_children()); + for (int i = 0; i < this->type_->num_children(); ++i) { + RETURN_NOT_OK(children_[i]->Finish(&fields[i])); + } + + out->reset( + new StructArray(this->type_, length, std::move(fields), null_bitmap, null_count)); + return Status::OK(); + } + + protected: + std::unique_ptr> null_bitmap_builder_; + std::vector>> children_; +}; + +template +class TakerImpl : public Taker { + public: + using Taker::Taker; + + Status Init() override { + union_type_ = checked_cast(this->type_.get()); + + if (union_type_->mode() == UnionMode::SPARSE) { + sparse_children_.resize(this->type_->num_children()); + } else { + dense_children_.resize(this->type_->num_children()); + child_length_.resize(union_type_->max_type_code() + 1); + } + + for (int i = 0; i < this->type_->num_children(); ++i) { + if (union_type_->mode() == UnionMode::SPARSE) { + RETURN_NOT_OK(Taker::Make(this->type_->child(i)->type(), + &sparse_children_[i])); + } else { + RETURN_NOT_OK(Taker>::Make( + this->type_->child(i)->type(), &dense_children_[i])); + } + } + + return Status::OK(); + } + + Status SetContext(FunctionContext* ctx) override { + pool_ = ctx->memory_pool(); + null_bitmap_builder_.reset(new TypedBufferBuilder(pool_)); + type_id_builder_.reset(new TypedBufferBuilder(pool_)); + + if (union_type_->mode() == UnionMode::DENSE) { + offset_builder_.reset(new TypedBufferBuilder(pool_)); + std::fill(child_length_.begin(), child_length_.end(), 0); + } + + for (int i = 0; i < this->type_->num_children(); ++i) { + if (union_type_->mode() == UnionMode::SPARSE) { + RETURN_NOT_OK(sparse_children_[i]->SetContext(ctx)); + } else { + RETURN_NOT_OK(dense_children_[i]->SetContext(ctx)); + } + } + + return Status::OK(); + } + + Status Take(const Array& values, IndexSequence indices) override { + DCHECK(this->type_->Equals(values.type())); + const auto& union_array = checked_cast(values); + auto type_ids = union_array.raw_type_ids(); + + if (union_type_->mode() == UnionMode::SPARSE) { + RETURN_NOT_OK(null_bitmap_builder_->Reserve(indices.length())); + RETURN_NOT_OK(type_id_builder_->Reserve(indices.length())); + RETURN_NOT_OK(VisitIndices(indices, values, [&](int64_t index, bool is_valid) { + null_bitmap_builder_->UnsafeAppend(is_valid); + type_id_builder_->UnsafeAppend(type_ids[index]); + return Status::OK(); + })); + + // bounds checking was done while appending to the null bitmap + indices.set_never_out_of_bounds(); + + for (int i = 0; i < this->type_->num_children(); ++i) { + RETURN_NOT_OK(sparse_children_[i]->Take(*union_array.child(i), indices)); + } + } else { + // Gathering from the offsets into child arrays is a bit tricky. + std::vector child_counts(union_type_->max_type_code() + 1); + RETURN_NOT_OK(null_bitmap_builder_->Reserve(indices.length())); + RETURN_NOT_OK(type_id_builder_->Reserve(indices.length())); + RETURN_NOT_OK(VisitIndices(indices, values, [&](int64_t index, bool is_valid) { + null_bitmap_builder_->UnsafeAppend(is_valid); + type_id_builder_->UnsafeAppend(type_ids[index]); + child_counts[type_ids[index]] += is_valid; + return Status::OK(); + })); + + // bounds checking was done while appending to the null bitmap + indices.set_never_out_of_bounds(); + + // Allocate temporary storage for the offsets of all valid slots + // NB: Overestimates required space when indices and union_array are + // not null at identical positions. + auto child_offsets_storage_size = + indices.length() - std::max(union_array.null_count(), indices.null_count()); + std::shared_ptr child_offsets_storage; + RETURN_NOT_OK(AllocateBuffer(pool_, child_offsets_storage_size * sizeof(int32_t), + &child_offsets_storage)); + + // Partition offsets by type_id: child_offset_partitions[type_id] will + // point to storage for child_counts[type_id] offsets + std::vector child_offset_partitions(child_counts.size()); + auto child_offsets_storage_data = GetInt32(child_offsets_storage); + for (auto type_id : union_type_->type_codes()) { + child_offset_partitions[type_id] = child_offsets_storage_data; + child_offsets_storage_data += child_counts[type_id]; + } + + // Fill child_offsets_storage with the taken offsets + RETURN_NOT_OK(offset_builder_->Reserve(indices.length())); + RETURN_NOT_OK(VisitIndices(indices, values, [&](int64_t index, bool is_valid) { + auto type_id = type_ids[index]; + if (is_valid) { + offset_builder_->UnsafeAppend(child_length_[type_id]++); + *child_offset_partitions[type_id] = union_array.value_offset(index); + ++child_offset_partitions[type_id]; + } else { + offset_builder_->UnsafeAppend(0); + } + return Status::OK(); + })); + + // Take from each child at those offsets + int64_t taken_offset_begin = 0; + for (int i = 0; i < this->type_->num_children(); ++i) { + auto type_id = union_type_->type_codes()[i]; + auto length = child_counts[type_id]; + Int32Array taken_offsets(length, SliceBuffer(child_offsets_storage, + sizeof(int32_t) * taken_offset_begin, + sizeof(int32_t) * length)); + ArrayIndexSequence child_indices(taken_offsets); + child_indices.set_never_out_of_bounds(); + RETURN_NOT_OK(dense_children_[i]->Take(*union_array.child(i), child_indices)); + taken_offset_begin += length; + } + } + + return Status::OK(); + } + + Status Finish(std::shared_ptr* out) override { + auto null_count = null_bitmap_builder_->false_count(); + auto length = null_bitmap_builder_->length(); + std::shared_ptr null_bitmap, type_ids; + RETURN_NOT_OK(null_bitmap_builder_->Finish(&null_bitmap)); + RETURN_NOT_OK(type_id_builder_->Finish(&type_ids)); + + std::shared_ptr offsets; + if (union_type_->mode() == UnionMode::DENSE) { + RETURN_NOT_OK(offset_builder_->Finish(&offsets)); + } + + ArrayVector fields(this->type_->num_children()); + for (int i = 0; i < this->type_->num_children(); ++i) { + if (union_type_->mode() == UnionMode::SPARSE) { + RETURN_NOT_OK(sparse_children_[i]->Finish(&fields[i])); + } else { + RETURN_NOT_OK(dense_children_[i]->Finish(&fields[i])); + } + } + + out->reset(new UnionArray(this->type_, length, std::move(fields), type_ids, offsets, + null_bitmap, null_count)); + return Status::OK(); + } + + protected: + int32_t* GetInt32(const std::shared_ptr& b) const { + return reinterpret_cast(b->mutable_data()); + } + + const UnionType* union_type_ = nullptr; + MemoryPool* pool_ = nullptr; + std::unique_ptr> null_bitmap_builder_; + std::unique_ptr> type_id_builder_; + std::unique_ptr> offset_builder_; + std::vector>> sparse_children_; + std::vector>>> dense_children_; + std::vector child_length_; +}; + +// taking from a DictionaryArray is accomplished by taking from its indices +template +class TakerImpl : public Taker { + public: + using Taker::Taker; + + Status Init() override { + const auto& dict_type = checked_cast(*this->type_); + return Taker::Make(dict_type.index_type(), &index_taker_); + } + + Status SetContext(FunctionContext* ctx) override { + dictionary_ = nullptr; + return index_taker_->SetContext(ctx); + } + + Status Take(const Array& values, IndexSequence indices) override { + DCHECK(this->type_->Equals(values.type())); + const auto& dict_array = checked_cast(values); + + if (dictionary_ != nullptr && dictionary_ != dict_array.dictionary()) { + return Status::NotImplemented( + "taking from DictionaryArrays with different dictionaries"); + } else { + dictionary_ = dict_array.dictionary(); + } + return index_taker_->Take(*dict_array.indices(), indices); + } + + Status Finish(std::shared_ptr* out) override { + std::shared_ptr taken_indices; + RETURN_NOT_OK(index_taker_->Finish(&taken_indices)); + out->reset(new DictionaryArray(this->type_, taken_indices, dictionary_)); + return Status::OK(); + } + + protected: + std::shared_ptr dictionary_; + std::unique_ptr> index_taker_; +}; + +// taking from an ExtensionArray is accomplished by taking from its storage +template +class TakerImpl : public Taker { + public: + using Taker::Taker; + + Status Init() override { + const auto& ext_type = checked_cast(*this->type_); + return Taker::Make(ext_type.storage_type(), &storage_taker_); + } + + Status SetContext(FunctionContext* ctx) override { + return storage_taker_->SetContext(ctx); + } + + Status Take(const Array& values, IndexSequence indices) override { + DCHECK(this->type_->Equals(values.type())); + const auto& ext_array = checked_cast(values); + return storage_taker_->Take(*ext_array.storage(), indices); + } + + Status Finish(std::shared_ptr* out) override { + std::shared_ptr taken_storage; + RETURN_NOT_OK(storage_taker_->Finish(&taken_storage)); + out->reset(new ExtensionArray(this->type_, taken_storage)); + return Status::OK(); + } + + protected: + std::unique_ptr> storage_taker_; +}; + +template +struct TakerMakeImpl { + template + Status Visit(const T&) { + out_->reset(new TakerImpl(type_)); + return (*out_)->Init(); + } + + std::shared_ptr type_; + std::unique_ptr>* out_; +}; + +template +Status Taker::Make(const std::shared_ptr& type, + std::unique_ptr* out) { + TakerMakeImpl visitor{type, out}; + return VisitTypeInline(*type, &visitor); +} + +} // namespace compute +} // namespace arrow