diff --git a/cpp/src/arrow/acero/hash_join_node_test.cc b/cpp/src/arrow/acero/hash_join_node_test.cc index 63969d9a3ed4b..9c3dbc176ff4f 100644 --- a/cpp/src/arrow/acero/hash_join_node_test.cc +++ b/cpp/src/arrow/acero/hash_join_node_test.cc @@ -2036,6 +2036,29 @@ TEST(HashJoin, ResidualFilter) { [3, 4, "alpha", 4, 16, "alpha"]])")}); } +TEST(HashJoin, FilterEmptyRows) { + // Regression test for GH-41121. + BatchesWithSchema input_left; + input_left.batches = { + ExecBatchFromJSON({int32(), utf8(), int32()}, R"([[2, "Jarry", 28]])")}; + input_left.schema = + schema({field("id", int32()), field("name", utf8()), field("age", int32())}); + + BatchesWithSchema input_right; + input_right.batches = {ExecBatchFromJSON( + {int32(), int32(), utf8()}, + R"([[2, 10, "Jack"], [3, 12, "Mark"], [4, 15, "Tom"], [1, 10, "Jack"]])")}; + input_right.schema = + schema({field("id", int32()), field("stu_id", int32()), field("subject", utf8())}); + + const ResidualFilterCaseRunner runner{std::move(input_left), std::move(input_right)}; + + Expression filter = greater(field_ref("age"), literal(25)); + + runner.Run(JoinType::LEFT_ANTI, {"id"}, {"stu_id"}, std::move(filter), + {ExecBatchFromJSON({int32(), utf8(), int32()}, R"([[2, "Jarry", 28]])")}); +} + TEST(HashJoin, TrivialResidualFilter) { Expression always_true = equal(call("add", {field_ref("l1"), field_ref("r1")}), literal(2)); // 1 + 1 == 2 diff --git a/cpp/src/arrow/acero/swiss_join.cc b/cpp/src/arrow/acero/swiss_join.cc index 61c8bfe95414e..542e943c4a82b 100644 --- a/cpp/src/arrow/acero/swiss_join.cc +++ b/cpp/src/arrow/acero/swiss_join.cc @@ -2167,6 +2167,11 @@ Status JoinResidualFilter::FilterOneBatch(const ExecBatch& keypayload_batch, ARROW_DCHECK(!output_payload_ids || payload_ids_maybe_null); *num_passing_rows = 0; + + if (num_batch_rows == 0) { + return Status::OK(); + } + ARROW_ASSIGN_OR_RAISE(Datum mask, EvalFilter(keypayload_batch, num_batch_rows, batch_row_ids, key_ids_maybe_null, payload_ids_maybe_null)); diff --git a/cpp/src/arrow/array/array_list_test.cc b/cpp/src/arrow/array/array_list_test.cc index b08fa99168616..18afcc90d71f8 100644 --- a/cpp/src/arrow/array/array_list_test.cc +++ b/cpp/src/arrow/array/array_list_test.cc @@ -735,7 +735,7 @@ class TestListArray : public ::testing::Test { ArrayFromJSON(type, "[[1, 2], [3], [4], null, [5], [], [6]]")); auto sliced_list_array = std::dynamic_pointer_cast(list_array->Slice(3, 4)); - ASSERT_OK_AND_ASSIGN(auto flattened, list_array->Flatten()); + ASSERT_OK_AND_ASSIGN(auto flattened, sliced_list_array->Flatten()); ASSERT_OK(flattened->ValidateFull()); // Note the difference between values() and Flatten(). EXPECT_TRUE(flattened->Equals(ArrayFromJSON(int32(), "[5, 6]"))); @@ -763,6 +763,52 @@ class TestListArray : public ::testing::Test { << flattened->ToString(); } + void TestFlattenRecursively() { + auto inner_type = std::make_shared(int32()); + auto type = std::make_shared(inner_type); + + // List types with two nested level: list> + auto nested_list_array = std::dynamic_pointer_cast(ArrayFromJSON(type, R"([ + [[0, 1, 2], null, [3, null]], + [null], + [[2, 9], [4], [], [6, 5]] + ])")); + ASSERT_OK_AND_ASSIGN(auto flattened, nested_list_array->FlattenRecursively()); + ASSERT_OK(flattened->ValidateFull()); + ASSERT_EQ(10, flattened->length()); + ASSERT_TRUE( + flattened->Equals(ArrayFromJSON(int32(), "[0, 1, 2, 3, null, 2, 9, 4, 6, 5]"))); + + // Empty nested list should flatten until non-list type is reached + nested_list_array = + std::dynamic_pointer_cast(ArrayFromJSON(type, R"([null])")); + ASSERT_OK_AND_ASSIGN(flattened, nested_list_array->FlattenRecursively()); + ASSERT_TRUE(flattened->type()->Equals(int32())); + + // List types with three nested level: list>> + type = std::make_shared(std::make_shared(fixed_size_list(int32(), 2))); + nested_list_array = std::dynamic_pointer_cast(ArrayFromJSON(type, R"([ + [ + [[null, 0]], + [[3, 7], null] + ], + [ + [[4, null], [5, 8]], + [[8, null]], + null + ], + [ + null + ] + ])")); + ASSERT_OK_AND_ASSIGN(flattened, nested_list_array->FlattenRecursively()); + ASSERT_OK(flattened->ValidateFull()); + ASSERT_EQ(10, flattened->length()); + ASSERT_EQ(3, flattened->null_count()); + ASSERT_TRUE(flattened->Equals( + ArrayFromJSON(int32(), "[null, 0, 3, 7, 4, null, 5, 8, 8, null]"))); + } + Status ValidateOffsetsAndSizes(int64_t length, std::vector offsets, std::vector sizes, std::shared_ptr values, int64_t offset = 0) { @@ -925,10 +971,12 @@ TYPED_TEST(TestListArray, BuilderPreserveFieldName) { TYPED_TEST(TestListArray, FlattenSimple) { this->TestFlattenSimple(); } TYPED_TEST(TestListArray, FlattenNulls) { this->TestFlattenNulls(); } TYPED_TEST(TestListArray, FlattenAllEmpty) { this->TestFlattenAllEmpty(); } +TYPED_TEST(TestListArray, FlattenSliced) { this->TestFlattenSliced(); } TYPED_TEST(TestListArray, FlattenZeroLength) { this->TestFlattenZeroLength(); } TYPED_TEST(TestListArray, TestFlattenNonEmptyBackingNulls) { this->TestFlattenNonEmptyBackingNulls(); } +TYPED_TEST(TestListArray, FlattenRecursively) { this->TestFlattenRecursively(); } TYPED_TEST(TestListArray, ValidateDimensions) { this->TestValidateDimensions(); } @@ -1714,4 +1762,23 @@ TEST_F(TestFixedSizeListArray, Flatten) { } } +TEST_F(TestFixedSizeListArray, FlattenRecursively) { + // Nested fixed-size list-array: fixed_size_list(fixed_size_list(int32, 2), 2) + auto inner_type = fixed_size_list(value_type_, 2); + type_ = fixed_size_list(inner_type, 2); + + auto values = std::dynamic_pointer_cast(ArrayFromJSON(type_, R"([ + [[0, 1], [null, 3]], + [[7, null], [2, 5]], + [null, null] + ])")); + ASSERT_OK(values->ValidateFull()); + ASSERT_OK_AND_ASSIGN(auto flattened, values->FlattenRecursively()); + ASSERT_OK(flattened->ValidateFull()); + ASSERT_EQ(8, flattened->length()); + ASSERT_EQ(2, flattened->null_count()); + AssertArraysEqual(*flattened, + *ArrayFromJSON(value_type_, "[0, 1, null, 3, 7, null, 2, 5]")); +} + } // namespace arrow diff --git a/cpp/src/arrow/array/array_nested.cc b/cpp/src/arrow/array/array_nested.cc index 958c2e25380b0..24e0dfb7081ac 100644 --- a/cpp/src/arrow/array/array_nested.cc +++ b/cpp/src/arrow/array/array_nested.cc @@ -42,6 +42,7 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/list_util.h" #include "arrow/util/logging.h" +#include "arrow/util/unreachable.h" namespace arrow { @@ -469,6 +470,49 @@ inline void SetListData(VarLengthListLikeArray* self, self->values_ = MakeArray(self->data_->child_data[0]); } +Result> FlattenLogicalListRecursively(const Array& in_array, + MemoryPool* memory_pool) { + std::shared_ptr array = in_array.Slice(0, in_array.length()); + for (auto kind = array->type_id(); is_list(kind) || is_list_view(kind); + kind = array->type_id()) { + switch (kind) { + case Type::LIST: { + ARROW_ASSIGN_OR_RAISE( + array, (checked_cast(array.get())->Flatten(memory_pool))); + break; + } + case Type::LARGE_LIST: { + ARROW_ASSIGN_OR_RAISE( + array, + (checked_cast(array.get())->Flatten(memory_pool))); + break; + } + case Type::LIST_VIEW: { + ARROW_ASSIGN_OR_RAISE( + array, + (checked_cast(array.get())->Flatten(memory_pool))); + break; + } + case Type::LARGE_LIST_VIEW: { + ARROW_ASSIGN_OR_RAISE( + array, + (checked_cast(array.get())->Flatten(memory_pool))); + break; + } + case Type::FIXED_SIZE_LIST: { + ARROW_ASSIGN_OR_RAISE( + array, + (checked_cast(array.get())->Flatten(memory_pool))); + break; + } + default: + Unreachable("unexpected non-list type"); + break; + } + } + return array; +} + } // namespace internal // ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/array/array_nested.h b/cpp/src/arrow/array/array_nested.h index 768a630e0af54..5744f5fcadf05 100644 --- a/cpp/src/arrow/array/array_nested.h +++ b/cpp/src/arrow/array/array_nested.h @@ -58,6 +58,20 @@ void SetListData(VarLengthListLikeArray* self, const std::shared_ptr& data, Type::type expected_type_id = TYPE::type_id); +/// \brief A version of Flatten that keeps recursively flattening until an array of +/// non-list values is reached. +/// +/// Array types considered to be lists by this function: +/// - list +/// - large_list +/// - list_view +/// - large_list_view +/// - fixed_size_list +/// +/// \see ListArray::Flatten +ARROW_EXPORT Result> FlattenLogicalListRecursively( + const Array& in_array, MemoryPool* memory_pool); + } // namespace internal /// Base class for variable-sized list and list-view arrays, regardless of offset size. @@ -103,6 +117,15 @@ class VarLengthListLikeArray : public Array { return values_->Slice(value_offset(i), value_length(i)); } + /// \brief Flatten all level recursively until reach a non-list type, and return + /// a non-list type Array. + /// + /// \see internal::FlattenLogicalListRecursively + Result> FlattenRecursively( + MemoryPool* memory_pool = default_memory_pool()) const { + return internal::FlattenLogicalListRecursively(*this, memory_pool); + } + protected: friend void internal::SetListData(VarLengthListLikeArray* self, const std::shared_ptr& data, @@ -595,6 +618,15 @@ class ARROW_EXPORT FixedSizeListArray : public Array { Result> Flatten( MemoryPool* memory_pool = default_memory_pool()) const; + /// \brief Flatten all level recursively until reach a non-list type, and return + /// a non-list type Array. + /// + /// \see internal::FlattenLogicalListRecursively + Result> FlattenRecursively( + MemoryPool* memory_pool = default_memory_pool()) const { + return internal::FlattenLogicalListRecursively(*this, memory_pool); + } + /// \brief Construct FixedSizeListArray from child value array and value_length /// /// \param[in] values Array containing list values diff --git a/cpp/src/arrow/array/array_primitive.cc b/cpp/src/arrow/array/array_primitive.cc index 7c4a14d93400f..da3810aa392c9 100644 --- a/cpp/src/arrow/array/array_primitive.cc +++ b/cpp/src/arrow/array/array_primitive.cc @@ -56,7 +56,7 @@ int64_t BooleanArray::false_count() const { } int64_t BooleanArray::true_count() const { - if (data_->null_count.load() != 0) { + if (data_->MayHaveNulls()) { DCHECK(data_->buffers[0]); return internal::CountAndSetBits(data_->buffers[0]->data(), data_->offset, data_->buffers[1]->data(), data_->offset, diff --git a/cpp/src/arrow/array/array_test.cc b/cpp/src/arrow/array/array_test.cc index 21ac1a09f56e7..60efdb47683f4 100644 --- a/cpp/src/arrow/array/array_test.cc +++ b/cpp/src/arrow/array/array_test.cc @@ -1307,6 +1307,13 @@ TEST(TestBooleanArray, TrueCountFalseCount) { CheckArray(checked_cast(*arr)); CheckArray(checked_cast(*arr->Slice(5))); CheckArray(checked_cast(*arr->Slice(0, 0))); + + // GH-41016 true_count() with array without validity buffer with null_count of -1 + auto arr_unknown_null_count = ArrayFromJSON(boolean(), "[true, false, true]"); + arr_unknown_null_count->data()->null_count = kUnknownNullCount; + ASSERT_EQ(arr_unknown_null_count->data()->null_count.load(), -1); + ASSERT_EQ(arr_unknown_null_count->null_bitmap(), nullptr); + ASSERT_EQ(checked_pointer_cast(arr_unknown_null_count)->true_count(), 2); } TEST(TestPrimitiveAdHoc, TestType) { diff --git a/cpp/src/arrow/flight/integration_tests/test_integration.cc b/cpp/src/arrow/flight/integration_tests/test_integration.cc index 21f57efd122c3..5fa561cd7af28 100644 --- a/cpp/src/arrow/flight/integration_tests/test_integration.cc +++ b/cpp/src/arrow/flight/integration_tests/test_integration.cc @@ -752,7 +752,8 @@ class ExpirationTimeRenewFlightEndpointScenario : public Scenario { /// both "lol_invalid", which will result in errors attempting to set either. class SessionOptionsServer : public sql::FlightSqlServerBase { static inline const std::string invalid_option_name = "lol_invalid"; - static inline const SessionOptionValue invalid_option_value = "lol_invalid"; + static inline const SessionOptionValue invalid_option_value = + std::string("lol_invalid"); const std::string session_middleware_key; // These will never be threaded so using a plain map and no lock @@ -852,7 +853,7 @@ class SessionOptionsScenario : public Scenario { {{"foolong", 123L}, {"bardouble", 456.0}, {"lol_invalid", "this won't get set"}, - {"key_with_invalid_value", "lol_invalid"}, + {"key_with_invalid_value", std::string("lol_invalid")}, {"big_ol_string_list", std::vector{"a", "b", "sea", "dee", " ", " ", "geee", "(づ。◕‿‿◕。)づ"}}}}; ARROW_ASSIGN_OR_RAISE(auto res1, client.SetSessionOptions({}, req1)); @@ -878,16 +879,16 @@ class SessionOptionsScenario : public Scenario { } // Update ARROW_ASSIGN_OR_RAISE( - auto res3, - client.SetSessionOptions( - {}, SetSessionOptionsRequest{ - {{"foolong", std::monostate{}}, - {"big_ol_string_list", "a,b,sea,dee, , ,geee,(づ。◕‿‿◕。)づ"}}})); + auto res3, client.SetSessionOptions( + {}, SetSessionOptionsRequest{ + {{"foolong", std::monostate{}}, + {"big_ol_string_list", + std::string("a,b,sea,dee, , ,geee,(づ。◕‿‿◕。)づ")}}})); ARROW_ASSIGN_OR_RAISE(auto res4, client.GetSessionOptions({}, {})); if (res4.session_options != std::map{ {"bardouble", 456.0}, - {"big_ol_string_list", "a,b,sea,dee, , ,geee,(づ。◕‿‿◕。)づ"}}) { + {"big_ol_string_list", std::string("a,b,sea,dee, , ,geee,(づ。◕‿‿◕。)づ")}}) { return Status::Invalid("res4 incorrect: " + res4.ToString()); } diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index 5faa4d095eb1e..6a6fbf40f9628 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -269,7 +269,7 @@ class CompressedInputStream::Impl { // Read compressed data if necessary Status EnsureCompressedData() { - int64_t compressed_avail = compressed_ ? compressed_->size() - compressed_pos_ : 0; + int64_t compressed_avail = compressed_buffer_available(); if (compressed_avail == 0) { // Ensure compressed_ buffer is allocated with kChunkSize. if (!supports_zero_copy_from_raw_) { @@ -297,10 +297,14 @@ class CompressedInputStream::Impl { return Status::OK(); } - // Decompress some data from the compressed_ buffer. - // Call this function only if the decompressed_ buffer is empty. + // Decompress some data from the compressed_ buffer into decompressor_. + // Call this function only if the decompressed_ buffer is fully consumed. Status DecompressData() { + // compressed_buffer_available() could be 0 here because there might + // still be some decompressed data left to emit even though the compressed + // data was entirely consumed (especially if the expansion factor is large) DCHECK_NE(compressed_->data(), nullptr); + DCHECK_EQ(0, decompressed_buffer_available()); int64_t decompress_size = kDecompressSize; @@ -352,8 +356,10 @@ class CompressedInputStream::Impl { } // Try to feed more data into the decompressed_ buffer. - Status RefillDecompressed(bool* has_data) { - // First try to read data from the decompressor + // Returns whether there is more data to read. + Result RefillDecompressed() { + // First try to read data from the decompressor, unless we haven't read any + // compressed data yet. if (compressed_ && compressed_->size() != 0) { if (decompressor_->IsFinished()) { // We just went over the end of a previous compressed stream. @@ -362,21 +368,21 @@ class CompressedInputStream::Impl { } RETURN_NOT_OK(DecompressData()); } - if (!decompressed_ || decompressed_->size() == 0) { - // Got nothing, need to read more compressed data + int64_t decompress_avail = decompressed_buffer_available(); + if (decompress_avail == 0) { + // Got nothing from existing `compressed_` and `decompressor_`, + // need to read more compressed data. RETURN_NOT_OK(EnsureCompressedData()); - if (compressed_pos_ == compressed_->size()) { + if (compressed_buffer_available() == 0) { // No more data to decompress if (!fresh_decompressor_ && !decompressor_->IsFinished()) { return Status::IOError("Truncated compressed stream"); } - *has_data = false; - return Status::OK(); + return false; } RETURN_NOT_OK(DecompressData()); } - *has_data = true; - return Status::OK(); + return true; } Result Read(int64_t nbytes, void* out) { @@ -394,7 +400,7 @@ class CompressedInputStream::Impl { // At this point, no more decompressed data remains, so we need to // decompress more - RETURN_NOT_OK(RefillDecompressed(&decompressor_has_data)); + ARROW_ASSIGN_OR_RAISE(decompressor_has_data, RefillDecompressed()); } total_pos_ += total_read; @@ -405,13 +411,22 @@ class CompressedInputStream::Impl { ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes, pool_)); ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buf->mutable_data())); RETURN_NOT_OK(buf->Resize(bytes_read)); - // Using std::move because the some compiler might has issue below: + // Using std::move because some compiler might has issue below: // https://wg21.cmeerw.net/cwg/issue1579 return std::move(buf); } const std::shared_ptr& raw() const { return raw_; } + private: + int64_t compressed_buffer_available() const { + return compressed_ ? compressed_->size() - compressed_pos_ : 0; + } + + int64_t decompressed_buffer_available() const { + return decompressed_ ? decompressed_->size() - decompressed_pos_ : 0; + } + private: // Read 64 KB compressed data at a time static const int64_t kChunkSize = 64 * 1024; diff --git a/cpp/src/arrow/util/decimal.h b/cpp/src/arrow/util/decimal.h index 345c74d95b101..14c7103d5ac0d 100644 --- a/cpp/src/arrow/util/decimal.h +++ b/cpp/src/arrow/util/decimal.h @@ -80,7 +80,7 @@ class ARROW_EXPORT Decimal128 : public BasicDecimal128 { std::pair result; auto dstatus = BasicDecimal128::Divide(divisor, &result.first, &result.second); ARROW_RETURN_NOT_OK(ToArrowStatus(dstatus)); - return std::move(result); + return result; } /// \brief Convert the Decimal128 value to a base 10 decimal string with the given @@ -118,7 +118,7 @@ class ARROW_EXPORT Decimal128 : public BasicDecimal128 { Decimal128 out; auto dstatus = BasicDecimal128::Rescale(original_scale, new_scale, &out); ARROW_RETURN_NOT_OK(ToArrowStatus(dstatus)); - return std::move(out); + return out; } /// \brief Convert to a signed integer @@ -218,7 +218,7 @@ class ARROW_EXPORT Decimal256 : public BasicDecimal256 { Decimal256 out; auto dstatus = BasicDecimal256::Rescale(original_scale, new_scale, &out); ARROW_RETURN_NOT_OK(ToArrowStatus(dstatus)); - return std::move(out); + return out; } /// Divide this number by right and return the result. @@ -235,7 +235,7 @@ class ARROW_EXPORT Decimal256 : public BasicDecimal256 { std::pair result; auto dstatus = BasicDecimal256::Divide(divisor, &result.first, &result.second); ARROW_RETURN_NOT_OK(ToArrowStatus(dstatus)); - return std::move(result); + return result; } /// \brief Convert from a big-endian byte representation. The length must be diff --git a/csharp/src/Apache.Arrow.Flight.AspNetCore/Apache.Arrow.Flight.AspNetCore.csproj b/csharp/src/Apache.Arrow.Flight.AspNetCore/Apache.Arrow.Flight.AspNetCore.csproj index 3fbd772db5ec6..55497203a12be 100644 --- a/csharp/src/Apache.Arrow.Flight.AspNetCore/Apache.Arrow.Flight.AspNetCore.csproj +++ b/csharp/src/Apache.Arrow.Flight.AspNetCore/Apache.Arrow.Flight.AspNetCore.csproj @@ -5,7 +5,7 @@ - + diff --git a/csharp/src/Apache.Arrow/Arrays/ArrayDataConcatenator.cs b/csharp/src/Apache.Arrow/Arrays/ArrayDataConcatenator.cs index 84658a5fab812..347d0d76bac64 100644 --- a/csharp/src/Apache.Arrow/Arrays/ArrayDataConcatenator.cs +++ b/csharp/src/Apache.Arrow/Arrays/ArrayDataConcatenator.cs @@ -367,7 +367,7 @@ private ArrowBuffer ConcatenateUnionTypeBuffer() foreach (ArrayData arrayData in _arrayDataList) { - builder.Append(arrayData.Buffers[0]); + builder.Append(arrayData.Buffers[0].Span.Slice(arrayData.Offset, arrayData.Length)); } return builder.Build(_allocator); @@ -376,18 +376,26 @@ private ArrowBuffer ConcatenateUnionTypeBuffer() private ArrowBuffer ConcatenateUnionOffsetBuffer() { var builder = new ArrowBuffer.Builder(_totalLength); - int baseOffset = 0; + var typeCount = _arrayDataList.Count > 0 ? _arrayDataList[0].Children.Length : 0; + var baseOffsets = new int[typeCount]; foreach (ArrayData arrayData in _arrayDataList) { - ReadOnlySpan span = arrayData.Buffers[1].Span.CastTo(); - foreach (int offset in span) + ReadOnlySpan typeSpan = arrayData.Buffers[0].Span.Slice(arrayData.Offset, arrayData.Length); + ReadOnlySpan offsetSpan = arrayData.Buffers[1].Span.CastTo().Slice(arrayData.Offset, arrayData.Length); + for (int i = 0; i < arrayData.Length; ++i) { - builder.Append(baseOffset + offset); + var typeId = typeSpan[i]; + builder.Append(checked(baseOffsets[typeId] + offsetSpan[i])); } - // The next offset must start from the current last offset. - baseOffset += span[arrayData.Length]; + for (int i = 0; i < typeCount; ++i) + { + checked + { + baseOffsets[i] += arrayData.Children[i].Length; + } + } } return builder.Build(_allocator); diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs b/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs index 6127c5a662dfe..1b83735925556 100644 --- a/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs +++ b/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs @@ -163,9 +163,18 @@ public void Visit(BooleanArray array) public void Visit(ListArray array) { _buffers.Add(CreateBitmapBuffer(array.NullBitmapBuffer, array.Offset, array.Length)); - _buffers.Add(CreateSlicedBuffer(array.ValueOffsetsBuffer, array.Offset, array.Length + 1)); + _buffers.Add(CreateBuffer(GetZeroBasedValueOffsets(array.ValueOffsetsBuffer, array.Offset, array.Length))); - VisitArray(array.Values); + int valuesOffset = array.ValueOffsets[0]; + int valuesLength = array.ValueOffsets[array.Length] - valuesOffset; + + var values = array.Values; + if (valuesOffset > 0 || valuesLength < values.Length) + { + values = ArrowArrayFactory.Slice(values, valuesOffset, valuesLength); + } + + VisitArray(values); } public void Visit(ListViewArray array) @@ -195,8 +204,12 @@ public void Visit(FixedSizeListArray array) public void Visit(BinaryArray array) { _buffers.Add(CreateBitmapBuffer(array.NullBitmapBuffer, array.Offset, array.Length)); - _buffers.Add(CreateSlicedBuffer(array.ValueOffsetsBuffer, array.Offset, array.Length + 1)); - _buffers.Add(CreateBuffer(array.ValueBuffer)); + _buffers.Add(CreateBuffer(GetZeroBasedValueOffsets(array.ValueOffsetsBuffer, array.Offset, array.Length))); + + int valuesOffset = array.ValueOffsets[0]; + int valuesLength = array.ValueOffsets[array.Length] - valuesOffset; + + _buffers.Add(CreateSlicedBuffer(array.ValueBuffer, valuesOffset, valuesLength)); } public void Visit(BinaryViewArray array) @@ -263,6 +276,39 @@ public void Visit(NullArray array) // There are no buffers for a NullArray } + private ArrowBuffer GetZeroBasedValueOffsets(ArrowBuffer valueOffsetsBuffer, int arrayOffset, int arrayLength) + { + var requiredBytes = CalculatePaddedBufferLength(sizeof(int) * (arrayLength + 1)); + + if (arrayOffset != 0) + { + // Array has been sliced, so we need to shift and adjust the offsets + var originalOffsets = valueOffsetsBuffer.Span.CastTo().Slice(arrayOffset, arrayLength + 1); + var firstOffset = arrayLength > 0 ? originalOffsets[0] : 0; + + var newValueOffsetsBuffer = _allocator.Allocate(requiredBytes); + var newValueOffsets = newValueOffsetsBuffer.Memory.Span.CastTo(); + + for (int i = 0; i < arrayLength + 1; ++i) + { + newValueOffsets[i] = originalOffsets[i] - firstOffset; + } + + return new ArrowBuffer(newValueOffsetsBuffer); + } + else if (valueOffsetsBuffer.Length > requiredBytes) + { + // Array may have been sliced but the offset is zero, + // so we can truncate the existing offsets + return new ArrowBuffer(valueOffsetsBuffer.Memory.Slice(0, requiredBytes)); + } + else + { + // Use the full buffer + return valueOffsetsBuffer; + } + } + private Buffer CreateBitmapBuffer(ArrowBuffer buffer, int offset, int length) { if (buffer.IsEmpty) diff --git a/csharp/test/Apache.Arrow.Compression.Tests/Apache.Arrow.Compression.Tests.csproj b/csharp/test/Apache.Arrow.Compression.Tests/Apache.Arrow.Compression.Tests.csproj index d4d124668e081..5cc0d303e881e 100644 --- a/csharp/test/Apache.Arrow.Compression.Tests/Apache.Arrow.Compression.Tests.csproj +++ b/csharp/test/Apache.Arrow.Compression.Tests/Apache.Arrow.Compression.Tests.csproj @@ -8,8 +8,8 @@ - - + + diff --git a/csharp/test/Apache.Arrow.Flight.Sql.Tests/Apache.Arrow.Flight.Sql.Tests.csproj b/csharp/test/Apache.Arrow.Flight.Sql.Tests/Apache.Arrow.Flight.Sql.Tests.csproj index ad6efbd7b45e7..5b7c10f35bed0 100644 --- a/csharp/test/Apache.Arrow.Flight.Sql.Tests/Apache.Arrow.Flight.Sql.Tests.csproj +++ b/csharp/test/Apache.Arrow.Flight.Sql.Tests/Apache.Arrow.Flight.Sql.Tests.csproj @@ -7,8 +7,8 @@ - - + + diff --git a/csharp/test/Apache.Arrow.Flight.Tests/Apache.Arrow.Flight.Tests.csproj b/csharp/test/Apache.Arrow.Flight.Tests/Apache.Arrow.Flight.Tests.csproj index 6f1b4e180e4fc..050d0f452cc4e 100644 --- a/csharp/test/Apache.Arrow.Flight.Tests/Apache.Arrow.Flight.Tests.csproj +++ b/csharp/test/Apache.Arrow.Flight.Tests/Apache.Arrow.Flight.Tests.csproj @@ -7,8 +7,8 @@ - - + + diff --git a/csharp/test/Apache.Arrow.Tests/Apache.Arrow.Tests.csproj b/csharp/test/Apache.Arrow.Tests/Apache.Arrow.Tests.csproj index 3febfc92b97c8..92f6e2d662f38 100644 --- a/csharp/test/Apache.Arrow.Tests/Apache.Arrow.Tests.csproj +++ b/csharp/test/Apache.Arrow.Tests/Apache.Arrow.Tests.csproj @@ -15,8 +15,8 @@ - - + + all runtime; build; native; contentfiles; analyzers diff --git a/csharp/test/Apache.Arrow.Tests/ArrowArrayConcatenatorTests.cs b/csharp/test/Apache.Arrow.Tests/ArrowArrayConcatenatorTests.cs index 700de58adb8c1..a1f6b1b8d80a0 100644 --- a/csharp/test/Apache.Arrow.Tests/ArrowArrayConcatenatorTests.cs +++ b/csharp/test/Apache.Arrow.Tests/ArrowArrayConcatenatorTests.cs @@ -29,12 +29,6 @@ public void TestStandardCases() { foreach ((List testTargetArrayList, IArrowArray expectedArray) in GenerateTestData()) { - if (expectedArray is UnionArray) - { - // Union array concatenation is incorrect. See https://github.com/apache/arrow/issues/41198 - continue; - } - IArrowArray actualArray = ArrowArrayConcatenator.Concatenate(testTargetArrayList); ArrowReaderVerifier.CompareArrays(expectedArray, actualArray); } @@ -604,10 +598,11 @@ public void Visit(UnionType type) for (int j = 0; j < dataList.Count; j++) { - byte index = (byte)Math.Max(j % 3, 1); + byte index = (byte)Math.Min(j % 3, 1); int? intValue = (index == 1) ? dataList[j] : null; string stringValue = (index == 1) ? null : dataList[j]?.ToString(); typeBuilder.Append(index); + typeResultBuilder.Append(index); if (isDense) { diff --git a/csharp/test/Apache.Arrow.Tests/ArrowFileWriterTests.cs b/csharp/test/Apache.Arrow.Tests/ArrowFileWriterTests.cs index faf650973d64c..baea4d61e5b66 100644 --- a/csharp/test/Apache.Arrow.Tests/ArrowFileWriterTests.cs +++ b/csharp/test/Apache.Arrow.Tests/ArrowFileWriterTests.cs @@ -113,6 +113,7 @@ public async Task WritesFooterAlignedMultipleOf8Async() [InlineData(0, 45)] [InlineData(3, 45)] [InlineData(16, 45)] + [InlineData(10, 0)] public async Task WriteSlicedArrays(int sliceOffset, int sliceLength) { var originalBatch = TestData.CreateSampleRecordBatch(length: 100); diff --git a/csharp/test/Apache.Arrow.Tests/ArrowReaderVerifier.cs b/csharp/test/Apache.Arrow.Tests/ArrowReaderVerifier.cs index 07c8aa3f56b3b..95972193219ac 100644 --- a/csharp/test/Apache.Arrow.Tests/ArrowReaderVerifier.cs +++ b/csharp/test/Apache.Arrow.Tests/ArrowReaderVerifier.cs @@ -160,10 +160,14 @@ public void Visit(StructArray array) Assert.Equal(expectedArray.Length, array.Length); Assert.Equal(expectedArray.NullCount, array.NullCount); - Assert.Equal(0, array.Offset); Assert.Equal(expectedArray.Data.Children.Length, array.Data.Children.Length); Assert.Equal(expectedArray.Fields.Count, array.Fields.Count); + if (_strictCompare) + { + Assert.Equal(expectedArray.Offset, array.Offset); + } + for (int i = 0; i < array.Fields.Count; i++) { array.Fields[i].Accept(new ArrayComparer(expectedArray.Fields[i], _strictCompare)); @@ -178,12 +182,12 @@ public void Visit(UnionArray array) Assert.Equal(expectedArray.Mode, array.Mode); Assert.Equal(expectedArray.Length, array.Length); Assert.Equal(expectedArray.NullCount, array.NullCount); - Assert.Equal(0, array.Offset); Assert.Equal(expectedArray.Data.Children.Length, array.Data.Children.Length); Assert.Equal(expectedArray.Fields.Count, array.Fields.Count); if (_strictCompare) { + Assert.Equal(expectedArray.Offset, array.Offset); Assert.True(expectedArray.TypeBuffer.Span.SequenceEqual(array.TypeBuffer.Span)); } else @@ -252,12 +256,12 @@ private void CompareBinaryArrays(BinaryArray actualArray) Assert.Equal(expectedArray.Length, actualArray.Length); Assert.Equal(expectedArray.NullCount, actualArray.NullCount); - Assert.Equal(0, actualArray.Offset); CompareValidityBuffer(expectedArray.NullCount, _expectedArray.Length, expectedArray.NullBitmapBuffer, expectedArray.Offset, actualArray.NullBitmapBuffer); if (_strictCompare) { + Assert.Equal(expectedArray.Offset, actualArray.Offset); Assert.True(expectedArray.ValueOffsetsBuffer.Span.SequenceEqual(actualArray.ValueOffsetsBuffer.Span)); Assert.True(expectedArray.Values.Slice(0, expectedArray.Length).SequenceEqual(actualArray.Values.Slice(0, actualArray.Length))); } @@ -284,7 +288,11 @@ private void CompareVariadicArrays(BinaryViewArray actualArray) Assert.Equal(expectedArray.Length, actualArray.Length); Assert.Equal(expectedArray.NullCount, actualArray.NullCount); - Assert.Equal(0, actualArray.Offset); + + if (_strictCompare) + { + Assert.Equal(expectedArray.Offset, actualArray.Offset); + } CompareValidityBuffer(expectedArray.NullCount, _expectedArray.Length, expectedArray.NullBitmapBuffer, expectedArray.Offset, actualArray.NullBitmapBuffer); @@ -309,12 +317,12 @@ private void CompareArrays(FixedSizeBinaryArray actualArray) Assert.Equal(expectedArray.Length, actualArray.Length); Assert.Equal(expectedArray.NullCount, actualArray.NullCount); - Assert.Equal(0, actualArray.Offset); CompareValidityBuffer(expectedArray.NullCount, _expectedArray.Length, expectedArray.NullBitmapBuffer, expectedArray.Offset, actualArray.NullBitmapBuffer); if (_strictCompare) { + Assert.Equal(expectedArray.Offset, actualArray.Offset); Assert.True(expectedArray.ValueBuffer.Span.Slice(0, expectedArray.Length).SequenceEqual(actualArray.ValueBuffer.Span.Slice(0, actualArray.Length))); } else @@ -338,12 +346,12 @@ private void CompareArrays(PrimitiveArray actualArray) Assert.Equal(expectedArray.Length, actualArray.Length); Assert.Equal(expectedArray.NullCount, actualArray.NullCount); - Assert.Equal(0, actualArray.Offset); CompareValidityBuffer(expectedArray.NullCount, _expectedArray.Length, expectedArray.NullBitmapBuffer, expectedArray.Offset, actualArray.NullBitmapBuffer); if (_strictCompare) { + Assert.Equal(expectedArray.Offset, actualArray.Offset); Assert.True(expectedArray.Values.Slice(0, expectedArray.Length).SequenceEqual(actualArray.Values.Slice(0, actualArray.Length))); } else @@ -370,12 +378,12 @@ private void CompareArrays(BooleanArray actualArray) Assert.Equal(expectedArray.Length, actualArray.Length); Assert.Equal(expectedArray.NullCount, actualArray.NullCount); - Assert.Equal(0, actualArray.Offset); CompareValidityBuffer(expectedArray.NullCount, _expectedArray.Length, expectedArray.NullBitmapBuffer, expectedArray.Offset, actualArray.NullBitmapBuffer); if (_strictCompare) { + Assert.Equal(expectedArray.Offset, actualArray.Offset); int booleanByteCount = BitUtility.ByteCount(expectedArray.Length); Assert.True(expectedArray.Values.Slice(0, booleanByteCount).SequenceEqual(actualArray.Values.Slice(0, booleanByteCount))); } @@ -397,22 +405,31 @@ private void CompareArrays(ListArray actualArray) Assert.Equal(expectedArray.Length, actualArray.Length); Assert.Equal(expectedArray.NullCount, actualArray.NullCount); - Assert.Equal(0, actualArray.Offset); CompareValidityBuffer(expectedArray.NullCount, _expectedArray.Length, expectedArray.NullBitmapBuffer, expectedArray.Offset, actualArray.NullBitmapBuffer); if (_strictCompare) { + Assert.Equal(expectedArray.Offset, actualArray.Offset); Assert.True(expectedArray.ValueOffsetsBuffer.Span.SequenceEqual(actualArray.ValueOffsetsBuffer.Span)); + actualArray.Values.Accept(new ArrayComparer(expectedArray.Values, _strictCompare)); } else { - int offsetsStart = (expectedArray.Offset) * sizeof(int); - int offsetsLength = (expectedArray.Length + 1) * sizeof(int); - Assert.True(expectedArray.ValueOffsetsBuffer.Span.Slice(offsetsStart, offsetsLength).SequenceEqual(actualArray.ValueOffsetsBuffer.Span.Slice(0, offsetsLength))); + for (int i = 0; i < actualArray.Length; ++i) + { + if (expectedArray.IsNull(i)) + { + Assert.True(actualArray.IsNull(i)); + } + else + { + var expectedList = expectedArray.GetSlicedValues(i); + var actualList = actualArray.GetSlicedValues(i); + actualList.Accept(new ArrayComparer(expectedList, _strictCompare)); + } + } } - - actualArray.Values.Accept(new ArrayComparer(expectedArray.Values, _strictCompare)); } private void CompareArrays(ListViewArray actualArray) @@ -424,12 +441,12 @@ private void CompareArrays(ListViewArray actualArray) Assert.Equal(expectedArray.Length, actualArray.Length); Assert.Equal(expectedArray.NullCount, actualArray.NullCount); - Assert.Equal(0, actualArray.Offset); CompareValidityBuffer(expectedArray.NullCount, _expectedArray.Length, expectedArray.NullBitmapBuffer, expectedArray.Offset, actualArray.NullBitmapBuffer); if (_strictCompare) { + Assert.Equal(expectedArray.Offset, actualArray.Offset); Assert.True(expectedArray.ValueOffsetsBuffer.Span.SequenceEqual(actualArray.ValueOffsetsBuffer.Span)); Assert.True(expectedArray.SizesBuffer.Span.SequenceEqual(actualArray.SizesBuffer.Span)); } @@ -453,7 +470,10 @@ private void CompareArrays(FixedSizeListArray actualArray) Assert.Equal(expectedArray.Length, actualArray.Length); Assert.Equal(expectedArray.NullCount, actualArray.NullCount); - Assert.Equal(0, actualArray.Offset); + if (_strictCompare) + { + Assert.Equal(expectedArray.Offset, actualArray.Offset); + } CompareValidityBuffer(expectedArray.NullCount, _expectedArray.Length, expectedArray.NullBitmapBuffer, expectedArray.Offset, actualArray.NullBitmapBuffer); diff --git a/csharp/test/Apache.Arrow.Tests/TestData.cs b/csharp/test/Apache.Arrow.Tests/TestData.cs index 29ddef2864862..3ea42ee0fbcb7 100644 --- a/csharp/test/Apache.Arrow.Tests/TestData.cs +++ b/csharp/test/Apache.Arrow.Tests/TestData.cs @@ -294,7 +294,18 @@ public void Visit(StringType type) for (var i = 0; i < Length; i++) { - builder.Append(str); + switch (i % 3) + { + case 0: + builder.AppendNull(); + break; + case 1: + builder.Append(str); + break; + case 2: + builder.Append(str + str); + break; + } } Array = builder.Build(); @@ -328,15 +339,21 @@ public void Visit(ListType type) { var builder = new ListArray.Builder(type.ValueField).Reserve(Length); - var valueBuilder = (Int64Array.Builder)builder.ValueBuilder.Reserve(Length + 1); + var valueBuilder = (Int64Array.Builder)builder.ValueBuilder.Reserve(Length * 3 / 2); for (var i = 0; i < Length; i++) { - builder.Append(); - valueBuilder.Append(i); + if (i % 10 == 2) + { + builder.AppendNull(); + } + else + { + builder.Append(); + var listLength = i % 4; + valueBuilder.AppendRange(Enumerable.Range(i, listLength).Select(x => (long)x)); + } } - //Add a value to check if Values.Length can exceed ListArray.Length - valueBuilder.Append(0); Array = builder.Build(); } @@ -352,8 +369,12 @@ public void Visit(ListViewType type) builder.Append(); valueBuilder.Append(i); } - //Add a value to check if Values.Length can exceed ListArray.Length - valueBuilder.Append(0); + + if (Length > 0) + { + // Add a value to check if Values.Length can exceed ListArray.Length + valueBuilder.Append(0); + } Array = builder.Build(); } @@ -562,9 +583,13 @@ public void Visit(MapType type) keyBuilder.Append(i.ToString()); valueBuilder.Append(i); } - //Add a value to check if Values.Length can exceed MapArray.Length - keyBuilder.Append("0"); - valueBuilder.Append(0); + + if (Length > 0) + { + // Add a value to check if Values.Length can exceed MapArray.Length + keyBuilder.Append("0"); + valueBuilder.Append(0); + } Array = builder.Build(); } diff --git a/dev/release/verify-release-candidate.sh b/dev/release/verify-release-candidate.sh index cf49751e6e2a9..f18b18aaa997c 100755 --- a/dev/release/verify-release-candidate.sh +++ b/dev/release/verify-release-candidate.sh @@ -831,9 +831,7 @@ test_glib() { show_header "Build and test C GLib libraries" # Build and test C GLib - # We can unpin gobject-introspection after - # https://github.com/conda-forge/glib-feedstock/pull/174 is merged. - maybe_setup_conda glib gobject-introspection=1.78.1 meson ninja ruby + maybe_setup_conda glib gobject-introspection meson ninja ruby maybe_setup_virtualenv meson # Install bundler if doesn't exist diff --git a/js/.eslintrc.cjs b/js/.eslintrc.cjs index 8a36516eec1c0..1792a33abae28 100644 --- a/js/.eslintrc.cjs +++ b/js/.eslintrc.cjs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +/** @type {import('eslint').Linter.Config} */ module.exports = { env: { browser: true, @@ -25,7 +26,7 @@ module.exports = { parserOptions: { project: ["tsconfig.json", "tsconfig/tsconfig.bin.cjs.json"], sourceType: "module", - ecmaVersion: 2020, + ecmaVersion: "latest", }, plugins: ["@typescript-eslint", "jest", "unicorn"], extends: [ @@ -92,16 +93,13 @@ module.exports = { "unicorn/empty-brace-spaces": "off", "unicorn/no-zero-fractions": "off", "unicorn/prevent-abbreviations": "off", - "unicorn/prefer-module": "off", "unicorn/numeric-separators-style": "off", "unicorn/prefer-spread": "off", "unicorn/filename-case": "off", "unicorn/prefer-export-from": "off", "unicorn/prefer-switch": "off", - "unicorn/prefer-node-protocol": "off", "unicorn/text-encoding-identifier-case": "off", "unicorn/prefer-top-level-await": "off", - "unicorn/consistent-destructuring": "off", "unicorn/no-array-reduce": "off", "unicorn/no-await-expression-member": "off", diff --git a/js/bin/file-to-stream.ts b/js/bin/file-to-stream.ts index 9dad4951f96e1..b7341faf2c0e0 100755 --- a/js/bin/file-to-stream.ts +++ b/js/bin/file-to-stream.ts @@ -17,9 +17,9 @@ // specific language governing permissions and limitations // under the License. -import * as fs from 'fs'; -import * as Path from 'path'; -import { finished as eos } from 'stream/promises'; +import * as fs from 'node:fs'; +import * as Path from 'node:path'; +import { finished as eos } from 'node:stream/promises'; import { RecordBatchReader, RecordBatchStreamWriter } from '../index.ts'; (async () => { diff --git a/js/bin/integration.ts b/js/bin/integration.ts index f9aad3422ae72..f73388cc85cf0 100755 --- a/js/bin/integration.ts +++ b/js/bin/integration.ts @@ -17,8 +17,8 @@ // specific language governing permissions and limitations // under the License. -import * as fs from 'fs'; -import * as Path from 'path'; +import * as fs from 'node:fs'; +import * as Path from 'node:path'; import { glob } from 'glob'; import { zip } from 'ix/iterable/zip.js'; import commandLineArgs from 'command-line-args'; @@ -41,8 +41,8 @@ const argv = commandLineArgs(cliOpts(), { partial: true }); const exists = async (p: string) => { try { return !!(await fs.promises.stat(p)); - } catch (e) { return false; } -} + } catch { return false; } +}; (async () => { @@ -52,17 +52,17 @@ const exists = async (p: string) => { let jsonPaths = [...(argv.json || [])]; let arrowPaths = [...(argv.arrow || [])]; - if (mode === 'VALIDATE' && !jsonPaths.length) { + if (mode === 'VALIDATE' && jsonPaths.length === 0) { [jsonPaths, arrowPaths] = await loadLocalJSONAndArrowPathsForDebugging(jsonPaths, arrowPaths); } - if (!jsonPaths.length) { return print_usage(); } + if (jsonPaths.length === 0) { return print_usage(); } let threw = false; switch (mode) { case 'VALIDATE': - for (let [jsonPath, arrowPath] of zip(jsonPaths, arrowPaths)) { + for (const [jsonPath, arrowPath] of zip(jsonPaths, arrowPaths)) { try { await validate(jsonPath, arrowPath); } catch (e: any) { @@ -232,7 +232,7 @@ function compareVectors(actual: Vector, expected: Vector) { (() => { let i = -1; - for (let [x1, x2] of zip(actual, expected)) { + for (const [x1, x2] of zip(actual, expected)) { ++i; if (!createElementComparator(x2)(x1)) { throw new Error(`${i}: ${x1} !== ${x2}`); @@ -245,14 +245,14 @@ async function loadLocalJSONAndArrowPathsForDebugging(jsonPaths: string[], arrow const sourceJSONPaths = await glob(Path.resolve(__dirname, `../test/data/json/`, `*.json`)); - if (!arrowPaths.length) { + if (arrowPaths.length === 0) { await loadJSONAndArrowPaths(sourceJSONPaths, jsonPaths, arrowPaths, 'cpp', 'file'); await loadJSONAndArrowPaths(sourceJSONPaths, jsonPaths, arrowPaths, 'java', 'file'); await loadJSONAndArrowPaths(sourceJSONPaths, jsonPaths, arrowPaths, 'cpp', 'stream'); await loadJSONAndArrowPaths(sourceJSONPaths, jsonPaths, arrowPaths, 'java', 'stream'); } - for (let [jsonPath, arrowPath] of zip(jsonPaths, arrowPaths)) { + for (const [jsonPath, arrowPath] of zip(jsonPaths, arrowPaths)) { console.log(`jsonPath: ${jsonPath}`); console.log(`arrowPath: ${arrowPath}`); } diff --git a/js/bin/json-to-arrow.ts b/js/bin/json-to-arrow.ts index 49726706a1be6..168db00a54f26 100755 --- a/js/bin/json-to-arrow.ts +++ b/js/bin/json-to-arrow.ts @@ -17,10 +17,10 @@ // specific language governing permissions and limitations // under the License. -import * as fs from 'fs'; -import * as Path from 'path'; +import * as fs from 'node:fs'; +import * as Path from 'node:path'; import commandLineArgs from 'command-line-args'; -import { finished as eos } from 'stream/promises'; +import { finished as eos } from 'node:stream/promises'; // @ts-ignore import { parse as bignumJSONParse } from 'json-bignum'; import { RecordBatchReader, RecordBatchFileWriter, RecordBatchStreamWriter } from '../index.ts'; @@ -31,7 +31,7 @@ const arrowPaths = [...(argv.arrow || [])]; (async () => { - if (!jsonPaths.length || !arrowPaths.length || (jsonPaths.length !== arrowPaths.length)) { + if (jsonPaths.length === 0 || arrowPaths.length === 0 || (jsonPaths.length !== arrowPaths.length)) { return print_usage(); } @@ -51,7 +51,7 @@ const arrowPaths = [...(argv.arrow || [])]; await eos(jsonToArrow); })); - return undefined; + return; })() .then((x) => x ?? 0, (e) => { e && process.stderr.write(`${e}`); diff --git a/js/bin/print-buffer-alignment.ts b/js/bin/print-buffer-alignment.ts index 07563af5a87e8..dabe3c5f9ab48 100755 --- a/js/bin/print-buffer-alignment.ts +++ b/js/bin/print-buffer-alignment.ts @@ -17,8 +17,8 @@ // specific language governing permissions and limitations // under the License. -import * as fs from 'fs'; -import * as Path from 'path'; +import * as fs from 'node:fs'; +import * as Path from 'node:path'; import { VectorLoader } from '../src/visitor/vectorloader.ts'; import { RecordBatch, AsyncMessageReader, makeData, Struct, Schema, Field } from '../index.ts'; diff --git a/js/bin/stream-to-file.ts b/js/bin/stream-to-file.ts index 6e09ead2fde19..c1317abc975db 100755 --- a/js/bin/stream-to-file.ts +++ b/js/bin/stream-to-file.ts @@ -17,9 +17,9 @@ // specific language governing permissions and limitations // under the License. -import * as fs from 'fs'; -import * as path from 'path'; -import { finished as eos } from 'stream/promises'; +import * as fs from 'node:fs'; +import * as path from 'node:path'; +import { finished as eos } from 'node:stream/promises'; import { RecordBatchReader, RecordBatchFileWriter } from '../index.ts'; (async () => { diff --git a/js/gulp/arrow-task.js b/js/gulp/arrow-task.js index f8a18fe122a93..855cb71283748 100644 --- a/js/gulp/arrow-task.js +++ b/js/gulp/arrow-task.js @@ -18,9 +18,9 @@ import { mainExport, targetDir, observableFromStreams } from './util.js'; import gulp from 'gulp'; -import path from 'path'; +import path from 'node:path'; import { mkdirp } from 'mkdirp'; -import * as fs from 'fs/promises'; +import * as fs from 'node:fs/promises'; import gulpRename from 'gulp-rename'; import gulpReplace from 'gulp-replace'; import { memoizeTask } from './memoize-task.js'; diff --git a/js/gulp/bundle-task.js b/js/gulp/bundle-task.js index 41a0895b082f3..b0aab6c468eac 100644 --- a/js/gulp/bundle-task.js +++ b/js/gulp/bundle-task.js @@ -23,9 +23,9 @@ import source from 'vinyl-source-stream'; import buffer from 'vinyl-buffer'; import { observableFromStreams } from './util.js'; import { forkJoin as ObservableForkJoin } from 'rxjs'; -import { resolve, join } from 'path'; -import { readdirSync } from 'fs'; -import { execSync } from 'child_process'; +import { resolve, join } from 'node:path'; +import { readdirSync } from 'node:fs'; +import { execSync } from 'node:child_process'; import gulpEsbuild from 'gulp-esbuild'; import esbuildAlias from 'esbuild-plugin-alias'; @@ -38,8 +38,8 @@ import { BundleAnalyzerPlugin } from 'webpack-bundle-analyzer'; import webpack from 'webpack-stream'; import named from 'vinyl-named'; -import { fileURLToPath } from 'url'; -import { dirname } from 'path'; +import { fileURLToPath } from 'node:url'; +import { dirname } from 'node:path'; const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); diff --git a/js/gulp/closure-task.js b/js/gulp/closure-task.js index 7c0ae1c1b6cd5..c620784fd1a84 100644 --- a/js/gulp/closure-task.js +++ b/js/gulp/closure-task.js @@ -17,9 +17,9 @@ import { targetDir, mainExport, esmRequire, gCCLanguageNames, publicModulePaths, observableFromStreams, shouldRunInChildProcess, spawnGulpCommandInChildProcess } from "./util.js"; -import fs from 'fs'; +import fs from 'node:fs'; import gulp from 'gulp'; -import path from 'path'; +import path from 'node:path'; import { mkdirp } from 'mkdirp'; import sourcemaps from 'gulp-sourcemaps'; import { memoizeTask } from './memoize-task.js'; diff --git a/js/gulp/test-task.js b/js/gulp/test-task.js index 5d190be22d5a0..e2263049b3d76 100644 --- a/js/gulp/test-task.js +++ b/js/gulp/test-task.js @@ -16,14 +16,14 @@ // under the License. import { deleteAsync as del } from 'del'; -import path from 'path'; +import path from 'node:path'; import { mkdirp } from 'mkdirp'; import { argv } from './argv.js'; -import { promisify } from 'util'; +import { promisify } from 'node:util'; import { glob } from 'glob'; -import child_process from 'child_process'; +import child_process from 'node:child_process'; import { memoizeTask } from './memoize-task.js'; -import fs from 'fs'; +import fs from 'node:fs'; const readFile = promisify(fs.readFile); import asyncDoneSync from 'async-done'; const asyncDone = promisify(asyncDoneSync); @@ -31,7 +31,7 @@ const exec = promisify(child_process.exec); import xml2js from 'xml2js'; const parseXML = promisify(xml2js.parseString); import { targetAndModuleCombinations, npmPkgName } from './util.js'; -import { createRequire } from 'module'; +import { createRequire } from 'node:module'; const require = createRequire(import.meta.url); diff --git a/js/gulp/typescript-task.js b/js/gulp/typescript-task.js index 31769e3b1b236..b5a4c3232dc76 100644 --- a/js/gulp/typescript-task.js +++ b/js/gulp/typescript-task.js @@ -18,10 +18,10 @@ import { targetDir, tsconfigName, observableFromStreams, shouldRunInChildProcess, spawnGulpCommandInChildProcess } from './util.js'; import gulp from 'gulp'; -import path from 'path'; +import path from 'node:path'; import tsc from 'typescript'; import ts from 'gulp-typescript'; -import * as fs from 'fs/promises'; +import * as fs from 'node:fs/promises'; import sourcemaps from 'gulp-sourcemaps'; import { memoizeTask } from './memoize-task.js'; import { ReplaySubject, forkJoin as ObservableForkJoin, defer as ObservableDefer } from 'rxjs'; diff --git a/js/gulp/util.js b/js/gulp/util.js index d86011008ad3e..2ce756f4acafa 100644 --- a/js/gulp/util.js +++ b/js/gulp/util.js @@ -15,18 +15,18 @@ // specific language governing permissions and limitations // under the License. -import fs from 'fs'; -import path from 'path'; -import child_process from 'child_process'; -import stream from 'stream'; -import util from 'util'; +import fs from 'node:fs'; +import path from 'node:path'; +import child_process from 'node:child_process'; +import stream from 'node:stream'; +import util from 'node:util'; import asyncDoneSync from 'async-done'; const pump = stream.pipeline; import { targets, modules } from './argv.js'; import { ReplaySubject, empty as ObservableEmpty, throwError as ObservableThrow, fromEvent as ObservableFromEvent } from 'rxjs'; import { share, flatMap, takeUntil, defaultIfEmpty, mergeWith } from 'rxjs/operators'; const asyncDone = util.promisify(asyncDoneSync); -import { createRequire } from 'module'; +import { createRequire } from 'node:module'; import esmRequire from './esm-require.cjs' const require = createRequire(import.meta.url); diff --git a/js/src/bin/arrow2csv.ts b/js/src/bin/arrow2csv.ts index 4115f30099f03..569e419faabe4 100755 --- a/js/src/bin/arrow2csv.ts +++ b/js/src/bin/arrow2csv.ts @@ -19,8 +19,8 @@ /* eslint-disable unicorn/no-array-for-each */ -import * as fs from 'fs'; -import * as stream from 'stream'; +import * as fs from 'node:fs'; +import * as stream from 'node:stream'; import { Schema, RecordBatch, RecordBatchReader, AsyncByteQueue, util } from '../Arrow.js'; import * as commandLineUsage from 'command-line-usage'; diff --git a/js/src/builder.ts b/js/src/builder.ts index 1880db3818ca5..5ae43a8836746 100644 --- a/js/src/builder.ts +++ b/js/src/builder.ts @@ -27,6 +27,11 @@ import { import { createIsValidFunction } from './builder/valid.js'; import { BufferBuilder, BitmapBufferBuilder, DataBufferBuilder, OffsetsBufferBuilder } from './builder/buffer.js'; +import type { BuilderDuplexOptions } from './io/node/builder.js'; +import type { BuilderTransform, BuilderTransformOptions } from './io/whatwg/builder.js'; + +import type { Duplex } from 'node:stream'; + /** * A set of options required to create a `Builder` instance for a given `DataType`. * @see {@link Builder} @@ -98,12 +103,12 @@ export abstract class Builder { /** @nocollapse */ // @ts-ignore - public static throughNode(options: import('./io/node/builder').BuilderDuplexOptions): import('stream').Duplex { + public static throughNode(options: BuilderDuplexOptions): Duplex { throw new Error(`"throughNode" not available in this environment`); } /** @nocollapse */ // @ts-ignore - public static throughDOM(options: import('./io/whatwg/builder').BuilderTransformOptions): import('./io/whatwg/builder').BuilderTransform { + public static throughDOM(options: BuilderTransformOptions): BuilderTransform { throw new Error(`"throughDOM" not available in this environment`); } diff --git a/js/src/fb/.eslintrc.cjs b/js/src/fb/.eslintrc.cjs index eb0fc1c7cdccb..b7fc1c0290903 100644 --- a/js/src/fb/.eslintrc.cjs +++ b/js/src/fb/.eslintrc.cjs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +/** @type {import('eslint').Linter.Config} */ module.exports = { rules: { "@typescript-eslint/type-annotation-spacing": "off", diff --git a/js/src/io/adapters.ts b/js/src/io/adapters.ts index 05020314aa6b9..3118cba049dc5 100644 --- a/js/src/io/adapters.ts +++ b/js/src/io/adapters.ts @@ -25,6 +25,8 @@ import { import { ReadableDOMStreamOptions } from './interfaces.js'; +import type { ReadableOptions, Readable } from 'node:stream'; + type Uint8ArrayGenerator = Generator; type AsyncUint8ArrayGenerator = AsyncGenerator; @@ -47,7 +49,7 @@ export default { throw new Error(`"toDOMStream" not available in this environment`); }, // @ts-ignore - toNodeStream(source: Iterable | AsyncIterable, options?: import('stream').ReadableOptions): import('stream').Readable { + toNodeStream(source: Iterable | AsyncIterable, options?: ReadableOptions): Readable { throw new Error(`"toNodeStream" not available in this environment`); }, }; @@ -71,7 +73,7 @@ function* fromIterable(source: Iterable | T): } // Yield so the caller can inject the read command before creating the source Iterator - ({ cmd, size } = (yield (() => null)()) || {cmd: 'read', size: 0}); + ({ cmd, size } = (yield (() => null)()) || { cmd: 'read', size: 0 }); // initialize the iterator const it = toUint8ArrayIterator(source)[Symbol.iterator](); @@ -117,7 +119,7 @@ async function* fromAsyncIterable(source: AsyncI } // Yield so the caller can inject the read command before creating the source AsyncIterator - ({ cmd, size } = (yield (() => null)()) || {cmd: 'read', size: 0}); + ({ cmd, size } = (yield (() => null)()) || { cmd: 'read', size: 0 }); // initialize the iterator const it = toUint8ArrayAsyncIterator(source)[Symbol.asyncIterator](); @@ -167,7 +169,7 @@ async function* fromDOMStream(source: ReadableSt } // Yield so the caller can inject the read command before we establish the ReadableStream lock - ({ cmd, size } = (yield (() => null)()) || {cmd: 'read', size: 0}); + ({ cmd, size } = (yield (() => null)()) || { cmd: 'read', size: 0 }); // initialize the reader and lock the stream const it = new AdaptiveByteReader(source); @@ -273,7 +275,7 @@ async function* fromNodeStream(stream: NodeJS.ReadableStream): AsyncUint8ArrayGe // Yield so the caller can inject the read command before we // add the listener for the source stream's 'readable' event. - ({ cmd, size } = (yield (() => null)()) || {cmd: 'read', size: 0}); + ({ cmd, size } = (yield (() => null)()) || { cmd: 'read', size: 0 }); // ignore stdin if it's a TTY if ((stream as any)['isTTY']) { diff --git a/js/src/io/interfaces.ts b/js/src/io/interfaces.ts index d69841bcbbb91..6775c3240b64c 100644 --- a/js/src/io/interfaces.ts +++ b/js/src/io/interfaces.ts @@ -17,11 +17,12 @@ import streamAdapters from './adapters.js'; +export type { FileHandle } from 'node:fs/promises'; +import type { ReadableOptions, Readable as StreamReadable } from 'node:stream'; + /** @ignore */ export const ITERATOR_DONE: any = Object.freeze({ done: true, value: void (0) }); -/** @ignore */ -export type FileHandle = import('fs').promises.FileHandle; /** @ignore */ export type ArrowJSONLike = { schema: any; batches?: any[]; dictionaries?: any[] }; /** @ignore */ @@ -60,14 +61,14 @@ export interface Writable { export interface ReadableWritable extends Readable, Writable { [Symbol.asyncIterator](): AsyncIterableIterator; toDOMStream(options?: ReadableDOMStreamOptions): ReadableStream; - toNodeStream(options?: import('stream').ReadableOptions): import('stream').Readable; + toNodeStream(options?: ReadableOptions): StreamReadable; } /** @ignore */ export abstract class ReadableInterop { public abstract toDOMStream(options?: ReadableDOMStreamOptions): ReadableStream; - public abstract toNodeStream(options?: import('stream').ReadableOptions): import('stream').Readable; + public abstract toNodeStream(options?: ReadableOptions): StreamReadable; public tee(): [ReadableStream, ReadableStream] { return this._getDOMStream().tee(); @@ -85,7 +86,7 @@ export abstract class ReadableInterop { return this._DOMStream || (this._DOMStream = this.toDOMStream()); } - protected _nodeStream?: import('stream').Readable; + protected _nodeStream?: StreamReadable; private _getNodeStream() { return this._nodeStream || (this._nodeStream = this.toNodeStream()); } @@ -144,7 +145,7 @@ export class AsyncQueue extends R : (this._values as any) as Iterable, options); } - public toNodeStream(options?: import('stream').ReadableOptions) { + public toNodeStream(options?: ReadableOptions) { return streamAdapters.toNodeStream( (this._closedPromiseResolve || this._error) ? (this as AsyncIterable) diff --git a/js/src/io/node/builder.ts b/js/src/io/node/builder.ts index be289f447f5da..1d02febac7ba9 100644 --- a/js/src/io/node/builder.ts +++ b/js/src/io/node/builder.ts @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -import { Duplex } from 'stream'; +import { Duplex } from 'node:stream'; import { DataType } from '../../type.js'; import { Builder, BuilderOptions } from '../../builder.js'; import { makeBuilder } from '../../factories.js'; diff --git a/js/src/io/node/iterable.ts b/js/src/io/node/iterable.ts index 6698e7fa92915..67b2143ea2453 100644 --- a/js/src/io/node/iterable.ts +++ b/js/src/io/node/iterable.ts @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -import { Readable } from 'stream'; +import { Readable, ReadableOptions as ReadableOptions_ } from 'node:stream'; import { isIterable, isAsyncIterable } from '../../util/compat.js'; /** @ignore */ -type ReadableOptions = import('stream').ReadableOptions; +type ReadableOptions = ReadableOptions_; /** @ignore */ type SourceIterator = Generator; /** @ignore */ diff --git a/js/src/io/node/reader.ts b/js/src/io/node/reader.ts index e8bbf736aa651..77e1fc26e2f1c 100644 --- a/js/src/io/node/reader.ts +++ b/js/src/io/node/reader.ts @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -import { Duplex, DuplexOptions } from 'stream'; +import { Duplex, DuplexOptions } from 'node:stream'; import { AsyncByteQueue } from '../../io/stream.js'; import { RecordBatchReader } from '../../ipc/reader.js'; import { RecordBatch } from '../../recordbatch.js'; diff --git a/js/src/io/node/writer.ts b/js/src/io/node/writer.ts index 5725ef7a5d726..fb426a968118d 100644 --- a/js/src/io/node/writer.ts +++ b/js/src/io/node/writer.ts @@ -15,7 +15,8 @@ // specific language governing permissions and limitations // under the License. -import { Duplex, DuplexOptions } from 'stream'; +import { Duplex } from 'node:stream'; +import type { DuplexOptions } from 'node:stream'; import { AsyncByteStream } from '../../io/stream.js'; import { RecordBatchWriter } from '../../ipc/writer.js'; import { TypeMap } from '../../type.js'; diff --git a/js/src/ipc/reader.ts b/js/src/ipc/reader.ts index e4dac0606aa47..547739dacce3a 100644 --- a/js/src/ipc/reader.ts +++ b/js/src/ipc/reader.ts @@ -47,6 +47,8 @@ import { isReadableDOMStream, isReadableNodeStream } from '../util/compat.js'; +import type { DuplexOptions, Duplex } from 'node:stream'; + /** @ignore */ export type FromArg0 = ArrowJSONLike; /** @ignore */ export type FromArg1 = PromiseLike; /** @ignore */ export type FromArg2 = Iterable | ArrayBufferViewInput; @@ -129,7 +131,7 @@ export class RecordBatchReader extends ReadableInterop< /** @nocollapse */ // @ts-ignore - public static throughNode(options?: import('stream').DuplexOptions & { autoDestroy: boolean }): import('stream').Duplex { + public static throughNode(options?: DuplexOptions & { autoDestroy: boolean }): Duplex { throw new Error(`"throughNode" not available in this environment`); } /** @nocollapse */ diff --git a/js/src/ipc/writer.ts b/js/src/ipc/writer.ts index 565b0825bd9be..8d924ab64c8f9 100644 --- a/js/src/ipc/writer.ts +++ b/js/src/ipc/writer.ts @@ -35,6 +35,8 @@ import { RecordBatch, _InternalEmptyPlaceholderRecordBatch } from '../recordbatc import { Writable, ReadableInterop, ReadableDOMStreamOptions } from '../io/interfaces.js'; import { isPromise, isAsyncIterable, isWritableDOMStream, isWritableNodeStream, isIterable, isObject } from '../util/compat.js'; +import type { DuplexOptions, Duplex, ReadableOptions } from 'node:stream'; + export interface RecordBatchStreamWriterOptions { /** * @@ -53,7 +55,7 @@ export class RecordBatchWriter extends ReadableInterop< /** @nocollapse */ // @ts-ignore - public static throughNode(options?: import('stream').DuplexOptions & { autoDestroy: boolean }): import('stream').Duplex { + public static throughNode(options?: DuplexOptions & { autoDestroy: boolean }): Duplex { throw new Error(`"throughNode" not available in this environment`); } /** @nocollapse */ @@ -111,7 +113,7 @@ export class RecordBatchWriter extends ReadableInterop< public get closed() { return this._sink.closed; } public [Symbol.asyncIterator]() { return this._sink[Symbol.asyncIterator](); } public toDOMStream(options?: ReadableDOMStreamOptions) { return this._sink.toDOMStream(options); } - public toNodeStream(options?: import('stream').ReadableOptions) { return this._sink.toNodeStream(options); } + public toNodeStream(options?: ReadableOptions) { return this._sink.toNodeStream(options); } public close() { return this.reset()._sink.close(); diff --git a/js/src/util/compat.ts b/js/src/util/compat.ts index 0948e8bea2f1d..73af3087eae65 100644 --- a/js/src/util/compat.ts +++ b/js/src/util/compat.ts @@ -17,12 +17,14 @@ import { ReadableInterop, ArrowJSONLike } from '../io/interfaces.js'; -/* eslint-disable unicorn/throw-new-error */ +import type { ByteBuffer } from 'flatbuffers'; +import type { ReadStream } from 'node:fs'; +import type { FileHandle as FileHandle_ } from 'node:fs/promises'; /** @ignore */ -type FSReadStream = import('fs').ReadStream; +type FSReadStream = ReadStream; /** @ignore */ -type FileHandle = import('fs').promises.FileHandle; +type FileHandle = FileHandle_; /** @ignore */ export interface Subscription { @@ -145,7 +147,7 @@ export const isReadableNodeStream = (x: any): x is NodeJS.ReadableStream => { }; /** @ignore */ -export const isFlatbuffersByteBuffer = (x: any): x is import('flatbuffers').ByteBuffer => { +export const isFlatbuffersByteBuffer = (x: any): x is ByteBuffer => { return isObject(x) && isFunction(x['clear']) && isFunction(x['bytes']) && diff --git a/js/test/.eslintrc.cjs b/js/test/.eslintrc.cjs index bb388f463af1b..41197c6edb8bf 100644 --- a/js/test/.eslintrc.cjs +++ b/js/test/.eslintrc.cjs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +/** @type {import('eslint').Linter.Config} */ module.exports = { rules: { "@typescript-eslint/no-require-imports": "off", diff --git a/js/test/unit/ipc/helpers.ts b/js/test/unit/ipc/helpers.ts index 2a228aa7abf18..02f45b57428a3 100644 --- a/js/test/unit/ipc/helpers.ts +++ b/js/test/unit/ipc/helpers.ts @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -import * as fs from 'fs'; +import * as fs from 'node:fs'; import { fs as memfs } from 'memfs'; -import { PassThrough, Readable } from 'stream'; +import { PassThrough, Readable } from 'node:stream'; import { RecordBatchFileWriter, diff --git a/js/test/unit/ipc/reader/streams-node-tests.ts b/js/test/unit/ipc/reader/streams-node-tests.ts index 2e3f08c4e7837..bde685cb952a2 100644 --- a/js/test/unit/ipc/reader/streams-node-tests.ts +++ b/js/test/unit/ipc/reader/streams-node-tests.ts @@ -101,7 +101,7 @@ import { it('readAll() should pipe to separate NodeJS WritableStreams', async () => { const { default: MultiStream } = await import('multistream'); - const { PassThrough } = await import('stream'); + const { PassThrough } = await import('node:stream'); expect.hasAssertions(); diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index 45fd29ad3b3f3..60fc09ea861b6 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -1516,11 +1516,28 @@ cdef class Array(_PandasConvertible): def _to_pandas(self, options, types_mapper=None, **kwargs): return _array_like_to_pandas(self, options, types_mapper=types_mapper) - def __array__(self, dtype=None): + def __array__(self, dtype=None, copy=None): + if copy is False: + try: + values = self.to_numpy(zero_copy_only=True) + except ArrowInvalid: + raise ValueError( + "Unable to avoid a copy while creating a numpy array as requested.\n" + "If using `np.array(obj, copy=False)` replace it with " + "`np.asarray(obj)` to allow a copy when needed" + ) + # values is already a numpy array at this point, but calling np.array(..) + # again to handle the `dtype` keyword with a no-copy guarantee + return np.array(values, dtype=dtype, copy=False) + values = self.to_numpy(zero_copy_only=False) + if copy is True and is_numeric(self.type.id) and self.null_count == 0: + # to_numpy did not yet make a copy (is_numeric = integer/floats, no decimal) + return np.array(values, dtype=dtype, copy=True) + if dtype is None: return values - return values.astype(dtype) + return np.asarray(values, dtype=dtype) def to_numpy(self, zero_copy_only=True, writable=False): """ diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index a35919579541a..6dae45ab80b1c 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -173,6 +173,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: c_string ToString() c_bool is_primitive(Type type) + c_bool is_numeric(Type type) cdef cppclass CArrayData" arrow::ArrayData": shared_ptr[CDataType] type diff --git a/python/pyarrow/pandas-shim.pxi b/python/pyarrow/pandas-shim.pxi index 0409e133ada5d..74f0d981b52f4 100644 --- a/python/pyarrow/pandas-shim.pxi +++ b/python/pyarrow/pandas-shim.pxi @@ -38,7 +38,7 @@ cdef class _PandasAPIShim(object): object _array_like_types, _is_extension_array_dtype, _lock bint has_sparse bint _pd024 - bint _is_v1, _is_ge_v21 + bint _is_v1, _is_ge_v21, _is_ge_v3 def __init__(self): self._lock = Lock() @@ -79,6 +79,7 @@ cdef class _PandasAPIShim(object): self._is_v1 = self._loose_version < Version('2.0.0') self._is_ge_v21 = self._loose_version >= Version('2.1.0') + self._is_ge_v3 = self._loose_version >= Version('3.0.0.dev0') self._compat_module = pdcompat self._data_frame = pd.DataFrame @@ -169,6 +170,10 @@ cdef class _PandasAPIShim(object): self._check_import() return self._is_ge_v21 + def is_ge_v3(self): + self._check_import() + return self._is_ge_v3 + @property def categorical_type(self): self._check_import() diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py index 5bd0dfcf6b94a..00fa19604e5c3 100644 --- a/python/pyarrow/pandas_compat.py +++ b/python/pyarrow/pandas_compat.py @@ -676,7 +676,7 @@ def get_datetimetz_type(values, dtype, type_): # Converting pyarrow.Table efficiently to pandas.DataFrame -def _reconstruct_block(item, columns=None, extension_columns=None): +def _reconstruct_block(item, columns=None, extension_columns=None, return_block=True): """ Construct a pandas Block from the `item` dictionary coming from pyarrow's serialization or returned by arrow::python::ConvertTableToPandas. @@ -709,22 +709,23 @@ def _reconstruct_block(item, columns=None, extension_columns=None): block_arr = item.get('block', None) placement = item['placement'] if 'dictionary' in item: - cat = _pandas_api.categorical_type.from_codes( + arr = _pandas_api.categorical_type.from_codes( block_arr, categories=item['dictionary'], ordered=item['ordered']) - block = _int.make_block(cat, placement=placement) elif 'timezone' in item: unit, _ = np.datetime_data(block_arr.dtype) dtype = make_datetimetz(unit, item['timezone']) if _pandas_api.is_ge_v21(): - pd_arr = _pandas_api.pd.array( + arr = _pandas_api.pd.array( block_arr.view("int64"), dtype=dtype, copy=False ) - block = _int.make_block(pd_arr, placement=placement) else: - block = _int.make_block(block_arr, placement=placement, - klass=_int.DatetimeTZBlock, - dtype=dtype) + arr = block_arr + if return_block: + block = _int.make_block(block_arr, placement=placement, + klass=_int.DatetimeTZBlock, + dtype=dtype) + return block elif 'py_array' in item: # create ExtensionBlock arr = item['py_array'] @@ -734,12 +735,14 @@ def _reconstruct_block(item, columns=None, extension_columns=None): if not hasattr(pandas_dtype, '__from_arrow__'): raise ValueError("This column does not support to be converted " "to a pandas ExtensionArray") - pd_ext_arr = pandas_dtype.__from_arrow__(arr) - block = _int.make_block(pd_ext_arr, placement=placement) + arr = pandas_dtype.__from_arrow__(arr) else: - block = _int.make_block(block_arr, placement=placement) + arr = block_arr - return block + if return_block: + return _int.make_block(arr, placement=placement) + else: + return arr, placement def make_datetimetz(unit, tz): @@ -752,9 +755,6 @@ def make_datetimetz(unit, tz): def table_to_dataframe( options, table, categories=None, ignore_metadata=False, types_mapper=None ): - from pandas.core.internals import BlockManager - from pandas import DataFrame - all_columns = [] column_indexes = [] pandas_metadata = table.schema.pandas_metadata @@ -774,15 +774,35 @@ def table_to_dataframe( _check_data_column_metadata_consistency(all_columns) columns = _deserialize_column_index(table, all_columns, column_indexes) - blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes) - axes = [columns, index] - mgr = BlockManager(blocks, axes) - if _pandas_api.is_ge_v21(): - df = DataFrame._from_mgr(mgr, mgr.axes) + column_names = table.column_names + result = pa.lib.table_to_blocks(options, table, categories, + list(ext_columns_dtypes.keys())) + if _pandas_api.is_ge_v3(): + from pandas.api.internals import create_dataframe_from_blocks + + blocks = [ + _reconstruct_block( + item, column_names, ext_columns_dtypes, return_block=False) + for item in result + ] + df = create_dataframe_from_blocks(blocks, index=index, columns=columns) + return df else: - df = DataFrame(mgr) - return df + from pandas.core.internals import BlockManager + from pandas import DataFrame + + blocks = [ + _reconstruct_block(item, column_names, ext_columns_dtypes) + for item in result + ] + axes = [columns, index] + mgr = BlockManager(blocks, axes) + if _pandas_api.is_ge_v21(): + df = DataFrame._from_mgr(mgr, mgr.axes) + else: + df = DataFrame(mgr) + return df # Set of the string repr of all numpy dtypes that can be stored in a pandas @@ -1099,17 +1119,6 @@ def _reconstruct_columns_from_metadata(columns, column_indexes): return pd.Index(new_levels[0], dtype=new_levels[0].dtype, name=columns.name) -def _table_to_blocks(options, block_table, categories, extension_columns): - # Part of table_to_blockmanager - - # Convert an arrow table to Block from the internal pandas API - columns = block_table.column_names - result = pa.lib.table_to_blocks(options, block_table, categories, - list(extension_columns.keys())) - return [_reconstruct_block(item, columns, extension_columns) - for item in result] - - def _add_any_metadata(table, pandas_metadata): modified_columns = {} modified_fields = {} diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 0ba8b4debd8f4..379bb82ea6ede 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -525,11 +525,19 @@ cdef class ChunkedArray(_PandasConvertible): return values - def __array__(self, dtype=None): + def __array__(self, dtype=None, copy=None): + if copy is False: + raise ValueError( + "Unable to avoid a copy while creating a numpy array as requested " + "(converting a pyarrow.ChunkedArray always results in a copy).\n" + "If using `np.array(obj, copy=False)` replace it with " + "`np.asarray(obj)` to allow a copy when needed" + ) + # 'copy' can further be ignored because to_numpy() already returns a copy values = self.to_numpy() if dtype is None: return values - return values.astype(dtype) + return values.astype(dtype, copy=False) def cast(self, object target_type=None, safe=None, options=None): """ @@ -1562,7 +1570,16 @@ cdef class _Tabular(_PandasConvertible): raise TypeError(f"Do not call {self.__class__.__name__}'s constructor directly, use " f"one of the `{self.__class__.__name__}.from_*` functions instead.") - def __array__(self, dtype=None): + def __array__(self, dtype=None, copy=None): + if copy is False: + raise ValueError( + "Unable to avoid a copy while creating a numpy array as requested " + f"(converting a pyarrow.{self.__class__.__name__} always results " + "in a copy).\n" + "If using `np.array(obj, copy=False)` replace it with " + "`np.asarray(obj)` to allow a copy when needed" + ) + # 'copy' can further be ignored because stacking will result in a copy column_arrays = [ np.asarray(self.column(i), dtype=dtype) for i in range(self.num_columns) ] diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py index 8bcb28c0d41b9..156d58326b961 100644 --- a/python/pyarrow/tests/test_array.py +++ b/python/pyarrow/tests/test_array.py @@ -31,6 +31,7 @@ import pyarrow as pa import pyarrow.tests.strategies as past +from pyarrow.vendored.version import Version def test_total_bytes_allocated(): @@ -3302,6 +3303,52 @@ def test_array_from_large_pyints(): pa.array([int(2 ** 63)]) +def test_numpy_array_protocol(): + # test the __array__ method on pyarrow.Array + arr = pa.array([1, 2, 3]) + result = np.asarray(arr) + expected = np.array([1, 2, 3], dtype="int64") + np.testing.assert_array_equal(result, expected) + + # this should not raise a deprecation warning with numpy 2.0+ + result = np.array(arr, copy=False) + np.testing.assert_array_equal(result, expected) + + result = np.array(arr, dtype="int64", copy=False) + np.testing.assert_array_equal(result, expected) + + # no zero-copy is possible + arr = pa.array([1, 2, None]) + expected = np.array([1, 2, np.nan], dtype="float64") + result = np.asarray(arr) + np.testing.assert_array_equal(result, expected) + + if Version(np.__version__) < Version("2.0"): + # copy keyword is not strict and not passed down to __array__ + result = np.array(arr, copy=False) + np.testing.assert_array_equal(result, expected) + + result = np.array(arr, dtype="float64", copy=False) + np.testing.assert_array_equal(result, expected) + else: + # starting with numpy 2.0, the copy=False keyword is assumed to be strict + with pytest.raises(ValueError, match="Unable to avoid a copy"): + np.array(arr, copy=False) + + arr = pa.array([1, 2, 3]) + with pytest.raises(ValueError): + np.array(arr, dtype="float64", copy=False) + + # copy=True -> not yet passed by numpy, so we have to call this directly to test + arr = pa.array([1, 2, 3]) + result = arr.__array__(copy=True) + assert result.flags.writeable + + arr = pa.array([1, 2, 3]) + result = arr.__array__(dtype=np.dtype("float64"), copy=True) + assert result.dtype == "float64" + + def test_array_protocol(): class MyArray: diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 7a140d4132c50..a58010d083e92 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -24,6 +24,7 @@ import pytest import pyarrow as pa import pyarrow.compute as pc +from pyarrow.vendored.version import Version def test_chunked_array_basics(): @@ -3275,6 +3276,21 @@ def test_numpy_asarray(constructor): assert result.dtype == "int32" +@pytest.mark.parametrize("constructor", [pa.table, pa.record_batch]) +def test_numpy_array_protocol(constructor): + table = constructor([[1, 2, 3], [4.0, 5.0, 6.0]], names=["a", "b"]) + expected = np.array([[1, 4], [2, 5], [3, 6]], dtype="float64") + + if Version(np.__version__) < Version("2.0"): + # copy keyword is not strict and not passed down to __array__ + result = np.array(table, copy=False) + np.testing.assert_array_equal(result, expected) + else: + # starting with numpy 2.0, the copy=False keyword is assumed to be strict + with pytest.raises(ValueError, match="Unable to avoid a copy"): + np.array(table, copy=False) + + @pytest.mark.acero def test_invalid_non_join_column(): NUM_ITEMS = 30