Skip to content

Commit

Permalink
Merge branch 'main' into dom/64-bits
Browse files Browse the repository at this point in the history
  • Loading branch information
domoritz committed Apr 16, 2024
2 parents a03a227 + 5abd933 commit e3e6d03
Show file tree
Hide file tree
Showing 56 changed files with 616 additions and 202 deletions.
23 changes: 23 additions & 0 deletions cpp/src/arrow/acero/hash_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2036,6 +2036,29 @@ TEST(HashJoin, ResidualFilter) {
[3, 4, "alpha", 4, 16, "alpha"]])")});
}

TEST(HashJoin, FilterEmptyRows) {
// Regression test for GH-41121.
BatchesWithSchema input_left;
input_left.batches = {
ExecBatchFromJSON({int32(), utf8(), int32()}, R"([[2, "Jarry", 28]])")};
input_left.schema =
schema({field("id", int32()), field("name", utf8()), field("age", int32())});

BatchesWithSchema input_right;
input_right.batches = {ExecBatchFromJSON(
{int32(), int32(), utf8()},
R"([[2, 10, "Jack"], [3, 12, "Mark"], [4, 15, "Tom"], [1, 10, "Jack"]])")};
input_right.schema =
schema({field("id", int32()), field("stu_id", int32()), field("subject", utf8())});

const ResidualFilterCaseRunner runner{std::move(input_left), std::move(input_right)};

Expression filter = greater(field_ref("age"), literal(25));

runner.Run(JoinType::LEFT_ANTI, {"id"}, {"stu_id"}, std::move(filter),
{ExecBatchFromJSON({int32(), utf8(), int32()}, R"([[2, "Jarry", 28]])")});
}

TEST(HashJoin, TrivialResidualFilter) {
Expression always_true =
equal(call("add", {field_ref("l1"), field_ref("r1")}), literal(2)); // 1 + 1 == 2
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/acero/swiss_join.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2167,6 +2167,11 @@ Status JoinResidualFilter::FilterOneBatch(const ExecBatch& keypayload_batch,
ARROW_DCHECK(!output_payload_ids || payload_ids_maybe_null);

*num_passing_rows = 0;

if (num_batch_rows == 0) {
return Status::OK();
}

ARROW_ASSIGN_OR_RAISE(Datum mask,
EvalFilter(keypayload_batch, num_batch_rows, batch_row_ids,
key_ids_maybe_null, payload_ids_maybe_null));
Expand Down
69 changes: 68 additions & 1 deletion cpp/src/arrow/array/array_list_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ class TestListArray : public ::testing::Test {
ArrayFromJSON(type, "[[1, 2], [3], [4], null, [5], [], [6]]"));
auto sliced_list_array =
std::dynamic_pointer_cast<ArrayType>(list_array->Slice(3, 4));
ASSERT_OK_AND_ASSIGN(auto flattened, list_array->Flatten());
ASSERT_OK_AND_ASSIGN(auto flattened, sliced_list_array->Flatten());
ASSERT_OK(flattened->ValidateFull());
// Note the difference between values() and Flatten().
EXPECT_TRUE(flattened->Equals(ArrayFromJSON(int32(), "[5, 6]")));
Expand Down Expand Up @@ -763,6 +763,52 @@ class TestListArray : public ::testing::Test {
<< flattened->ToString();
}

void TestFlattenRecursively() {
auto inner_type = std::make_shared<T>(int32());
auto type = std::make_shared<T>(inner_type);

// List types with two nested level: list<list<int32>>
auto nested_list_array = std::dynamic_pointer_cast<ArrayType>(ArrayFromJSON(type, R"([
[[0, 1, 2], null, [3, null]],
[null],
[[2, 9], [4], [], [6, 5]]
])"));
ASSERT_OK_AND_ASSIGN(auto flattened, nested_list_array->FlattenRecursively());
ASSERT_OK(flattened->ValidateFull());
ASSERT_EQ(10, flattened->length());
ASSERT_TRUE(
flattened->Equals(ArrayFromJSON(int32(), "[0, 1, 2, 3, null, 2, 9, 4, 6, 5]")));

// Empty nested list should flatten until non-list type is reached
nested_list_array =
std::dynamic_pointer_cast<ArrayType>(ArrayFromJSON(type, R"([null])"));
ASSERT_OK_AND_ASSIGN(flattened, nested_list_array->FlattenRecursively());
ASSERT_TRUE(flattened->type()->Equals(int32()));

// List types with three nested level: list<list<fixed_size_list<int32, 2>>>
type = std::make_shared<T>(std::make_shared<T>(fixed_size_list(int32(), 2)));
nested_list_array = std::dynamic_pointer_cast<ArrayType>(ArrayFromJSON(type, R"([
[
[[null, 0]],
[[3, 7], null]
],
[
[[4, null], [5, 8]],
[[8, null]],
null
],
[
null
]
])"));
ASSERT_OK_AND_ASSIGN(flattened, nested_list_array->FlattenRecursively());
ASSERT_OK(flattened->ValidateFull());
ASSERT_EQ(10, flattened->length());
ASSERT_EQ(3, flattened->null_count());
ASSERT_TRUE(flattened->Equals(
ArrayFromJSON(int32(), "[null, 0, 3, 7, 4, null, 5, 8, 8, null]")));
}

Status ValidateOffsetsAndSizes(int64_t length, std::vector<offset_type> offsets,
std::vector<offset_type> sizes,
std::shared_ptr<Array> values, int64_t offset = 0) {
Expand Down Expand Up @@ -925,10 +971,12 @@ TYPED_TEST(TestListArray, BuilderPreserveFieldName) {
TYPED_TEST(TestListArray, FlattenSimple) { this->TestFlattenSimple(); }
TYPED_TEST(TestListArray, FlattenNulls) { this->TestFlattenNulls(); }
TYPED_TEST(TestListArray, FlattenAllEmpty) { this->TestFlattenAllEmpty(); }
TYPED_TEST(TestListArray, FlattenSliced) { this->TestFlattenSliced(); }
TYPED_TEST(TestListArray, FlattenZeroLength) { this->TestFlattenZeroLength(); }
TYPED_TEST(TestListArray, TestFlattenNonEmptyBackingNulls) {
this->TestFlattenNonEmptyBackingNulls();
}
TYPED_TEST(TestListArray, FlattenRecursively) { this->TestFlattenRecursively(); }

TYPED_TEST(TestListArray, ValidateDimensions) { this->TestValidateDimensions(); }

Expand Down Expand Up @@ -1714,4 +1762,23 @@ TEST_F(TestFixedSizeListArray, Flatten) {
}
}

TEST_F(TestFixedSizeListArray, FlattenRecursively) {
// Nested fixed-size list-array: fixed_size_list(fixed_size_list(int32, 2), 2)
auto inner_type = fixed_size_list(value_type_, 2);
type_ = fixed_size_list(inner_type, 2);

auto values = std::dynamic_pointer_cast<FixedSizeListArray>(ArrayFromJSON(type_, R"([
[[0, 1], [null, 3]],
[[7, null], [2, 5]],
[null, null]
])"));
ASSERT_OK(values->ValidateFull());
ASSERT_OK_AND_ASSIGN(auto flattened, values->FlattenRecursively());
ASSERT_OK(flattened->ValidateFull());
ASSERT_EQ(8, flattened->length());
ASSERT_EQ(2, flattened->null_count());
AssertArraysEqual(*flattened,
*ArrayFromJSON(value_type_, "[0, 1, null, 3, 7, null, 2, 5]"));
}

} // namespace arrow
44 changes: 44 additions & 0 deletions cpp/src/arrow/array/array_nested.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "arrow/util/checked_cast.h"
#include "arrow/util/list_util.h"
#include "arrow/util/logging.h"
#include "arrow/util/unreachable.h"

namespace arrow {

Expand Down Expand Up @@ -469,6 +470,49 @@ inline void SetListData(VarLengthListLikeArray<TYPE>* self,
self->values_ = MakeArray(self->data_->child_data[0]);
}

Result<std::shared_ptr<Array>> FlattenLogicalListRecursively(const Array& in_array,
MemoryPool* memory_pool) {
std::shared_ptr<Array> array = in_array.Slice(0, in_array.length());
for (auto kind = array->type_id(); is_list(kind) || is_list_view(kind);
kind = array->type_id()) {
switch (kind) {
case Type::LIST: {
ARROW_ASSIGN_OR_RAISE(
array, (checked_cast<const ListArray*>(array.get())->Flatten(memory_pool)));
break;
}
case Type::LARGE_LIST: {
ARROW_ASSIGN_OR_RAISE(
array,
(checked_cast<const LargeListArray*>(array.get())->Flatten(memory_pool)));
break;
}
case Type::LIST_VIEW: {
ARROW_ASSIGN_OR_RAISE(
array,
(checked_cast<const ListViewArray*>(array.get())->Flatten(memory_pool)));
break;
}
case Type::LARGE_LIST_VIEW: {
ARROW_ASSIGN_OR_RAISE(
array,
(checked_cast<const LargeListViewArray*>(array.get())->Flatten(memory_pool)));
break;
}
case Type::FIXED_SIZE_LIST: {
ARROW_ASSIGN_OR_RAISE(
array,
(checked_cast<const FixedSizeListArray*>(array.get())->Flatten(memory_pool)));
break;
}
default:
Unreachable("unexpected non-list type");
break;
}
}
return array;
}

} // namespace internal

// ----------------------------------------------------------------------
Expand Down
32 changes: 32 additions & 0 deletions cpp/src/arrow/array/array_nested.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ void SetListData(VarLengthListLikeArray<TYPE>* self,
const std::shared_ptr<ArrayData>& data,
Type::type expected_type_id = TYPE::type_id);

/// \brief A version of Flatten that keeps recursively flattening until an array of
/// non-list values is reached.
///
/// Array types considered to be lists by this function:
/// - list
/// - large_list
/// - list_view
/// - large_list_view
/// - fixed_size_list
///
/// \see ListArray::Flatten
ARROW_EXPORT Result<std::shared_ptr<Array>> FlattenLogicalListRecursively(
const Array& in_array, MemoryPool* memory_pool);

} // namespace internal

/// Base class for variable-sized list and list-view arrays, regardless of offset size.
Expand Down Expand Up @@ -103,6 +117,15 @@ class VarLengthListLikeArray : public Array {
return values_->Slice(value_offset(i), value_length(i));
}

/// \brief Flatten all level recursively until reach a non-list type, and return
/// a non-list type Array.
///
/// \see internal::FlattenLogicalListRecursively
Result<std::shared_ptr<Array>> FlattenRecursively(
MemoryPool* memory_pool = default_memory_pool()) const {
return internal::FlattenLogicalListRecursively(*this, memory_pool);
}

protected:
friend void internal::SetListData<TYPE>(VarLengthListLikeArray<TYPE>* self,
const std::shared_ptr<ArrayData>& data,
Expand Down Expand Up @@ -595,6 +618,15 @@ class ARROW_EXPORT FixedSizeListArray : public Array {
Result<std::shared_ptr<Array>> Flatten(
MemoryPool* memory_pool = default_memory_pool()) const;

/// \brief Flatten all level recursively until reach a non-list type, and return
/// a non-list type Array.
///
/// \see internal::FlattenLogicalListRecursively
Result<std::shared_ptr<Array>> FlattenRecursively(
MemoryPool* memory_pool = default_memory_pool()) const {
return internal::FlattenLogicalListRecursively(*this, memory_pool);
}

/// \brief Construct FixedSizeListArray from child value array and value_length
///
/// \param[in] values Array containing list values
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/array/array_primitive.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ int64_t BooleanArray::false_count() const {
}

int64_t BooleanArray::true_count() const {
if (data_->null_count.load() != 0) {
if (data_->MayHaveNulls()) {
DCHECK(data_->buffers[0]);
return internal::CountAndSetBits(data_->buffers[0]->data(), data_->offset,
data_->buffers[1]->data(), data_->offset,
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/array/array_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,13 @@ TEST(TestBooleanArray, TrueCountFalseCount) {
CheckArray(checked_cast<const BooleanArray&>(*arr));
CheckArray(checked_cast<const BooleanArray&>(*arr->Slice(5)));
CheckArray(checked_cast<const BooleanArray&>(*arr->Slice(0, 0)));

// GH-41016 true_count() with array without validity buffer with null_count of -1
auto arr_unknown_null_count = ArrayFromJSON(boolean(), "[true, false, true]");
arr_unknown_null_count->data()->null_count = kUnknownNullCount;
ASSERT_EQ(arr_unknown_null_count->data()->null_count.load(), -1);
ASSERT_EQ(arr_unknown_null_count->null_bitmap(), nullptr);
ASSERT_EQ(checked_pointer_cast<BooleanArray>(arr_unknown_null_count)->true_count(), 2);
}

TEST(TestPrimitiveAdHoc, TestType) {
Expand Down
17 changes: 9 additions & 8 deletions cpp/src/arrow/flight/integration_tests/test_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,8 @@ class ExpirationTimeRenewFlightEndpointScenario : public Scenario {
/// both "lol_invalid", which will result in errors attempting to set either.
class SessionOptionsServer : public sql::FlightSqlServerBase {
static inline const std::string invalid_option_name = "lol_invalid";
static inline const SessionOptionValue invalid_option_value = "lol_invalid";
static inline const SessionOptionValue invalid_option_value =
std::string("lol_invalid");

const std::string session_middleware_key;
// These will never be threaded so using a plain map and no lock
Expand Down Expand Up @@ -852,7 +853,7 @@ class SessionOptionsScenario : public Scenario {
{{"foolong", 123L},
{"bardouble", 456.0},
{"lol_invalid", "this won't get set"},
{"key_with_invalid_value", "lol_invalid"},
{"key_with_invalid_value", std::string("lol_invalid")},
{"big_ol_string_list", std::vector<std::string>{"a", "b", "sea", "dee", " ",
" ", "geee", "(づ。◕‿‿◕。)づ"}}}};
ARROW_ASSIGN_OR_RAISE(auto res1, client.SetSessionOptions({}, req1));
Expand All @@ -878,16 +879,16 @@ class SessionOptionsScenario : public Scenario {
}
// Update
ARROW_ASSIGN_OR_RAISE(
auto res3,
client.SetSessionOptions(
{}, SetSessionOptionsRequest{
{{"foolong", std::monostate{}},
{"big_ol_string_list", "a,b,sea,dee, , ,geee,(づ。◕‿‿◕。)づ"}}}));
auto res3, client.SetSessionOptions(
{}, SetSessionOptionsRequest{
{{"foolong", std::monostate{}},
{"big_ol_string_list",
std::string("a,b,sea,dee, , ,geee,(づ。◕‿‿◕。)づ")}}}));
ARROW_ASSIGN_OR_RAISE(auto res4, client.GetSessionOptions({}, {}));
if (res4.session_options !=
std::map<std::string, SessionOptionValue>{
{"bardouble", 456.0},
{"big_ol_string_list", "a,b,sea,dee, , ,geee,(づ。◕‿‿◕。)づ"}}) {
{"big_ol_string_list", std::string("a,b,sea,dee, , ,geee,(づ。◕‿‿◕。)づ")}}) {
return Status::Invalid("res4 incorrect: " + res4.ToString());
}

Expand Down
Loading

0 comments on commit e3e6d03

Please sign in to comment.