Skip to content

Commit

Permalink
Merge branch 'main' into dom/updates-78234
Browse files Browse the repository at this point in the history
  • Loading branch information
domoritz committed Apr 16, 2024
2 parents cb6728b + b99b00d commit 290b18c
Show file tree
Hide file tree
Showing 21 changed files with 256 additions and 78 deletions.
6 changes: 5 additions & 1 deletion c_glib/example/vala/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
# under the License.

if generate_vapi
c_flags = [
'-Wunused-but-set-variable',
]
c_flags = meson.get_compiler('c').get_supported_arguments(c_flags)
vala_example_executable_kwargs = {
'c_args': [
'-I' + project_build_root,
'-I' + project_source_root,
],
] + c_flags,
'dependencies': [
arrow_glib_vapi,
dependency('gio-2.0'),
Expand Down
4 changes: 2 additions & 2 deletions c_glib/example/vala/read-file.vala
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ void print_array(GArrow.Array array) {

void print_record_batch(GArrow.RecordBatch record_batch) {
var n_columns = record_batch.get_n_columns();
for (var nth_column = 0; nth_column < n_columns; nth_column++) {
stdout.printf("columns[%" + int64.FORMAT + "](%s): ",
for (int nth_column = 0; nth_column < n_columns; nth_column++) {
stdout.printf("columns[%d](%s): ",
nth_column,
record_batch.get_column_name(nth_column));
var array = record_batch.get_column_data(nth_column);
Expand Down
4 changes: 2 additions & 2 deletions c_glib/example/vala/read-stream.vala
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ void print_array(GArrow.Array array) {

void print_record_batch(GArrow.RecordBatch record_batch) {
var n_columns = record_batch.get_n_columns();
for (var nth_column = 0; nth_column < n_columns; nth_column++) {
stdout.printf("columns[%" + int64.FORMAT + "](%s): ",
for (int nth_column = 0; nth_column < n_columns; nth_column++) {
stdout.printf("columns[%d](%s): ",
nth_column,
record_batch.get_column_name(nth_column));
var array = record_batch.get_column_data(nth_column);
Expand Down
5 changes: 4 additions & 1 deletion ci/conda_env_sphinx.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@ sphinx-copybutton
sphinxcontrib-jquery
sphinx==6.2
# Requirement for doctest-cython
pytest-cython
# Needs upper pin of 0.3.0, see:
# https://github.com/lgpage/pytest-cython/issues/67
# With 0.3.* bug fix release, the pin can be removed
pytest-cython==0.2.2
pandas
2 changes: 0 additions & 2 deletions cpp/src/arrow/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ std::string MakeSimdLevelString(QueryFlagFunction&& query_flag) {
return "avx";
} else if (query_flag(CpuInfo::SSE4_2)) {
return "sse4_2";
} else if (query_flag(CpuInfo::ASIMD)) {
return "neon";
} else {
return "none";
}
Expand Down
36 changes: 30 additions & 6 deletions cpp/src/arrow/dataset/dataset_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ class DatasetWriter::DatasetWriterImpl {
std::function<void()> finish_callback, uint64_t max_rows_queued)
: scheduler_(scheduler),
write_tasks_(util::MakeThrottledAsyncTaskGroup(
scheduler_, 1, /*queue=*/nullptr,
scheduler_, /*max_concurrent_cost=*/1, /*queue=*/nullptr,
[finish_callback = std::move(finish_callback)] {
finish_callback();
return Status::OK();
Expand All @@ -541,6 +541,23 @@ class DatasetWriter::DatasetWriterImpl {
}
}

void ResumeIfNeeded() {
if (!paused_) {
return;
}
bool needs_resume = false;
{
std::lock_guard lg(mutex_);
if (!write_tasks_ || write_tasks_->QueueSize() == 0) {
needs_resume = true;
}
}
if (needs_resume) {
paused_ = false;
resume_callback_();
}
}

void WriteRecordBatch(std::shared_ptr<RecordBatch> batch, const std::string& directory,
const std::string& prefix) {
write_tasks_->AddSimpleTask(
Expand All @@ -549,11 +566,14 @@ class DatasetWriter::DatasetWriterImpl {
WriteAndCheckBackpressure(std::move(batch), directory, prefix);
if (!has_room.is_finished()) {
// We don't have to worry about sequencing backpressure here since
// task_group_ serves as our sequencer. If batches continue to arrive after
// we pause they will queue up in task_group_ until we free up and call
// Resume
// task_group_ serves as our sequencer. If batches continue to arrive
// after we pause they will queue up in task_group_ until we free up and
// call Resume
pause_callback_();
return has_room.Then([this] { resume_callback_(); });
paused_ = true;
return has_room.Then([this] { ResumeIfNeeded(); });
} else {
ResumeIfNeeded();
}
return has_room;
},
Expand All @@ -571,6 +591,9 @@ class DatasetWriter::DatasetWriterImpl {
return Future<>::MakeFinished();
},
"DatasetWriter::FinishAll"sv);
// Reset write_tasks_ to signal that we are done adding tasks, this will allow
// us to invoke the finish callback once the tasks wrap up.
std::lock_guard lg(mutex_);
write_tasks_.reset();
}

Expand Down Expand Up @@ -660,7 +683,7 @@ class DatasetWriter::DatasetWriterImpl {
}

util::AsyncTaskScheduler* scheduler_ = nullptr;
std::unique_ptr<util::AsyncTaskScheduler> write_tasks_;
std::unique_ptr<util::ThrottledAsyncTaskScheduler> write_tasks_;
Future<> finish_fut_ = Future<>::Make();
FileSystemDatasetWriteOptions write_options_;
DatasetWriterState writer_state_;
Expand All @@ -670,6 +693,7 @@ class DatasetWriter::DatasetWriterImpl {
std::unordered_map<std::string, std::shared_ptr<DatasetWriterDirectoryQueue>>
directory_queues_;
std::mutex mutex_;
bool paused_ = false;
Status err_;
};

Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/util/async_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ class FifoQueue : public ThrottledAsyncTaskScheduler::Queue {

void Purge() override { tasks_.clear(); }

std::size_t Size() const override { return tasks_.size(); }

private:
std::list<std::unique_ptr<Task>> tasks_;
};
Expand Down Expand Up @@ -332,6 +334,10 @@ class ThrottledAsyncTaskSchedulerImpl

void Pause() override { throttle_->Pause(); }
void Resume() override { throttle_->Resume(); }
std::size_t QueueSize() override {
std::lock_guard lk(mutex_);
return queue_->Size();
}
const util::tracing::Span& span() const override { return target_->span(); }

private:
Expand Down Expand Up @@ -499,6 +505,7 @@ class ThrottledAsyncTaskGroup : public ThrottledAsyncTaskScheduler {
: throttle_(std::move(throttle)), task_group_(std::move(task_group)) {}
void Pause() override { throttle_->Pause(); }
void Resume() override { throttle_->Resume(); }
std::size_t QueueSize() override { return throttle_->QueueSize(); }
const util::tracing::Span& span() const override { return task_group_->span(); }
bool AddTask(std::unique_ptr<Task> task) override {
return task_group_->AddTask(std::move(task));
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/util/async_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ class ARROW_EXPORT ThrottledAsyncTaskScheduler : public AsyncTaskScheduler {
virtual bool Empty() = 0;
/// Purge the queue of all items
virtual void Purge() = 0;
virtual std::size_t Size() const = 0;
};

class Throttle {
Expand Down Expand Up @@ -277,6 +278,8 @@ class ARROW_EXPORT ThrottledAsyncTaskScheduler : public AsyncTaskScheduler {
/// Allows task to be submitted again. If there is a max_concurrent_cost limit then
/// it will still apply.
virtual void Resume() = 0;
/// Return the number of tasks queued but not yet submitted
virtual std::size_t QueueSize() = 0;

/// Create a throttled view of a scheduler
///
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/util/async_util_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ class PriorityQueue : public ThrottledAsyncTaskScheduler::Queue {
queue_.pop();
}
}
std::size_t Size() const { return queue_.size(); }

private:
std::priority_queue<TaskWithPriority*, std::vector<TaskWithPriority*>,
Expand Down
63 changes: 47 additions & 16 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3143,27 +3143,58 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder {
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<BooleanType>::Accumulator* out) override {
if (null_count != 0) {
// TODO(ARROW-34660): implement DecodeArrow with null slots.
ParquetException::NYI("RleBoolean DecodeArrow with null slots");
if (null_count == num_values) {
PARQUET_THROW_NOT_OK(out->AppendNulls(null_count));
return 0;
}
constexpr int kBatchSize = 1024;
std::array<bool, kBatchSize> values;
int sum_decode_count = 0;
do {
int current_batch = std::min(kBatchSize, num_values);
int decoded_count = decoder_->GetBatch(values.data(), current_batch);
if (decoded_count == 0) {
break;
const int num_non_null_values = num_values - null_count;
// Remaining non-null boolean values to read from decoder.
// We decode from `decoder_` with maximum 1024 size batches.
int num_remain_non_null_values = num_non_null_values;
int current_index_in_batch = 0;
int current_batch_size = 0;
auto next_boolean_batch = [&]() {
DCHECK_GT(num_remain_non_null_values, 0);
DCHECK_EQ(current_index_in_batch, current_batch_size);
current_batch_size = std::min(num_remain_non_null_values, kBatchSize);
int decoded_count = decoder_->GetBatch(values.data(), current_batch_size);
if (ARROW_PREDICT_FALSE(decoded_count != current_batch_size)) {
// required values is more than values in decoder.
ParquetException::EofException();
}
sum_decode_count += decoded_count;
PARQUET_THROW_NOT_OK(out->Reserve(sum_decode_count));
for (int i = 0; i < decoded_count; ++i) {
PARQUET_THROW_NOT_OK(out->Append(values[i]));
num_remain_non_null_values -= current_batch_size;
current_index_in_batch = 0;
};

// Reserve all values including nulls first
PARQUET_THROW_NOT_OK(out->Reserve(num_values));
if (null_count == 0) {
// Fast-path for not having nulls.
do {
next_boolean_batch();
PARQUET_THROW_NOT_OK(
out->AppendValues(values.begin(), values.begin() + current_batch_size));
num_values -= current_batch_size;
current_index_in_batch = 0;
} while (num_values > 0);
return num_non_null_values;
}
auto next_value = [&]() -> bool {
if (current_index_in_batch == current_batch_size) {
next_boolean_batch();
DCHECK_GT(current_batch_size, 0);
}
num_values -= decoded_count;
} while (num_values > 0);
return sum_decode_count;
DCHECK_LT(current_index_in_batch, current_batch_size);
bool value = values[current_index_in_batch];
++current_index_in_batch;
return value;
};
VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
[&]() { out->UnsafeAppend(next_value()); }, [&]() { out->UnsafeAppendNull(); });
return num_non_null_values;
}

int DecodeArrow(
Expand Down
9 changes: 4 additions & 5 deletions cpp/src/parquet/encoding_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1518,11 +1518,10 @@ BENCHMARK_DEFINE_F(BM_DecodeArrowBooleanRle, DecodeArrowNonNull)
(benchmark::State& state) { DecodeArrowNonNullDenseBenchmark(state); }
BENCHMARK_REGISTER_F(BM_DecodeArrowBooleanRle, DecodeArrowNonNull)
->Range(MIN_RANGE, MAX_RANGE);
// TODO(mwish): RleBoolean not implemented DecodeArrow with null slots yet.
// BENCHMARK_DEFINE_F(BM_DecodeArrowBooleanRle, DecodeArrowWithNull)
//(benchmark::State& state) { DecodeArrowWithNullDenseBenchmark(state); }
// BENCHMARK_REGISTER_F(BM_DecodeArrowBooleanRle, DecodeArrowWithNull)
// ->Apply(BooleanWithNullCustomArguments);
BENCHMARK_DEFINE_F(BM_DecodeArrowBooleanRle, DecodeArrowWithNull)
(benchmark::State& state) { DecodeArrowWithNullDenseBenchmark(state); }
BENCHMARK_REGISTER_F(BM_DecodeArrowBooleanRle, DecodeArrowWithNull)
->Apply(BooleanWithNullCustomArguments);

BENCHMARK_DEFINE_F(BM_DecodeArrowBooleanPlain, DecodeArrow)
(benchmark::State& state) { DecodeArrowDenseBenchmark(state); }
Expand Down
46 changes: 23 additions & 23 deletions cpp/src/parquet/encoding_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -602,35 +602,37 @@ TEST(PlainEncodingAdHoc, ArrowBinaryDirectPut) {

// Check that one can put several Arrow arrays into a given encoder
// and decode to the right values (see GH-36939)
TEST(PlainBooleanArrayEncoding, AdHocRoundTrip) {
TEST(BooleanArrayEncoding, AdHocRoundTrip) {
std::vector<std::shared_ptr<::arrow::Array>> arrays{
::arrow::ArrayFromJSON(::arrow::boolean(), R"([])"),
::arrow::ArrayFromJSON(::arrow::boolean(), R"([false, null, true])"),
::arrow::ArrayFromJSON(::arrow::boolean(), R"([null, null, null])"),
::arrow::ArrayFromJSON(::arrow::boolean(), R"([true, null, false])"),
};

auto encoder = MakeTypedEncoder<BooleanType>(Encoding::PLAIN,
/*use_dictionary=*/false);
for (const auto& array : arrays) {
encoder->Put(*array);
}
auto buffer = encoder->FlushValues();
auto decoder = MakeTypedDecoder<BooleanType>(Encoding::PLAIN);
EXPECT_OK_AND_ASSIGN(auto expected, ::arrow::Concatenate(arrays));
decoder->SetData(static_cast<int>(expected->length()), buffer->data(),
static_cast<int>(buffer->size()));

::arrow::BooleanBuilder builder;
ASSERT_EQ(static_cast<int>(expected->length() - expected->null_count()),
decoder->DecodeArrow(static_cast<int>(expected->length()),
static_cast<int>(expected->null_count()),
expected->null_bitmap_data(), 0, &builder));
for (auto encoding : {Encoding::PLAIN, Encoding::RLE}) {
auto encoder = MakeTypedEncoder<BooleanType>(encoding,
/*use_dictionary=*/false);
for (const auto& array : arrays) {
encoder->Put(*array);
}
auto buffer = encoder->FlushValues();
auto decoder = MakeTypedDecoder<BooleanType>(encoding);
EXPECT_OK_AND_ASSIGN(auto expected, ::arrow::Concatenate(arrays));
decoder->SetData(static_cast<int>(expected->length()), buffer->data(),
static_cast<int>(buffer->size()));

::arrow::BooleanBuilder builder;
ASSERT_EQ(static_cast<int>(expected->length() - expected->null_count()),
decoder->DecodeArrow(static_cast<int>(expected->length()),
static_cast<int>(expected->null_count()),
expected->null_bitmap_data(), 0, &builder));

std::shared_ptr<::arrow::Array> result;
ASSERT_OK(builder.Finish(&result));
ASSERT_EQ(expected->length(), result->length());
::arrow::AssertArraysEqual(*expected, *result, /*verbose=*/true);
std::shared_ptr<::arrow::Array> result;
ASSERT_OK(builder.Finish(&result));
ASSERT_EQ(expected->length(), result->length());
::arrow::AssertArraysEqual(*expected, *result, /*verbose=*/true);
}
}

template <typename T>
Expand Down Expand Up @@ -963,8 +965,6 @@ TYPED_TEST(EncodingAdHocTyped, ByteStreamSplitArrowDirectPut) {
}

TYPED_TEST(EncodingAdHocTyped, RleArrowDirectPut) {
// TODO: test with nulls once RleBooleanDecoder::DecodeArrow supports them
this->null_probability_ = 0;
for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
this->Rle(seed);
}
Expand Down
10 changes: 9 additions & 1 deletion java/vector/src/main/codegen/templates/DenseUnionVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public class DenseUnionVector extends AbstractContainerVector implements FieldVe
ArrowType.Struct.INSTANCE, /*dictionary*/ null, /*metadata*/ null);

public static DenseUnionVector empty(String name, BufferAllocator allocator) {
FieldType fieldType = FieldType.nullable(new ArrowType.Union(
FieldType fieldType = FieldType.notNullable(new ArrowType.Union(
UnionMode.Dense, null));
return new DenseUnionVector(name, allocator, fieldType, null);
}
Expand Down Expand Up @@ -908,6 +908,14 @@ private int getTypeBufferValueCapacity() {
return (int) typeBuffer.capacity() / TYPE_WIDTH;
}
public void setOffset(int index, int offset) {
while (index >= getOffsetBufferValueCapacity()) {
reallocOffsetBuffer();
}
offsetBuffer.setInt((long) index * OFFSET_WIDTH, offset);
}
private long getOffsetBufferValueCapacity() {
return offsetBuffer.capacity() / OFFSET_WIDTH;
}
Expand Down
Loading

0 comments on commit 290b18c

Please sign in to comment.