Skip to content

Commit

Permalink
Vistor
Browse files Browse the repository at this point in the history
  • Loading branch information
R-JunmingChen committed Oct 10, 2023
1 parent d74f17d commit a7d5c60
Showing 1 changed file with 87 additions and 80 deletions.
167 changes: 87 additions & 80 deletions cpp/src/arrow/array/array_dict.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,73 +212,90 @@ Result<std::shared_ptr<ArrayData>> TransposeDictIndices(
return out_data;
}

template <typename IndexArrowType>
Result<std::unique_ptr<Buffer>> CompactTransposeMapImpl(
const std::shared_ptr<ArrayData>& data, MemoryPool* pool,
std::shared_ptr<Array>& out_compact_dictionary) {
int64_t index_length = data->length;
int64_t dict_length = data->dictionary->length;
if (index_length == 0 || dict_length == 0) {
ARROW_ASSIGN_OR_RAISE(out_compact_dictionary,
MakeEmptyArray(data->dictionary->type, pool));
return AllocateBuffer(0, pool);
}

using CType = typename IndexArrowType::c_type;
const CType* indices_data = data->GetValues<CType>(1);
std::vector<bool> dict_used(dict_length, false);
CType dict_len = static_cast<CType>(dict_length);
for (int64_t i = 0; i < index_length; i++) {
if (data->IsNull(i)) {
continue;
struct CompactTransposeMapVistor {
const std::shared_ptr<ArrayData>& data;
arrow::MemoryPool* pool;
std::unique_ptr<Buffer> output_map;
std::shared_ptr<Array> out_compact_dictionary;

template <typename IndexArrowType>
Status CompactTransposeMapImpl() {
int64_t index_length = data->length;
int64_t dict_length = data->dictionary->length;
if (dict_length == 0) {
output_map = nullptr;
out_compact_dictionary = nullptr;
return Status::OK();
} else if (index_length == 0) {
ARROW_ASSIGN_OR_RAISE(out_compact_dictionary,
MakeEmptyArray(data->dictionary->type, pool));
ARROW_ASSIGN_OR_RAISE(output_map, AllocateBuffer(0, pool))
return Status::OK();
}

CType current_index = indices_data[i];
if (current_index < 0 || current_index >= dict_len) {
return Status::IndexError("Index out of bounds while compacting dictionary array: ", current_index, "(dictionary is ", dict_length, " long) at position ", i);
using CType = typename IndexArrowType::c_type;
const CType* indices_data = data->GetValues<CType>(1);
std::vector<bool> dict_used(dict_length, false);
CType dict_len = static_cast<CType>(dict_length);
int64_t dict_used_count = 0;
for (int64_t i = 0; i < index_length; i++) {
if (data->IsNull(i)) {
continue;
}

CType current_index = indices_data[i];
if (current_index < 0 || current_index >= dict_len) {
return Status::IndexError(
"Index out of bounds while compacting dictionary array: ", current_index,
"(dictionary is ", dict_length, " long) at position ", i);
} else if (!dict_used[current_index]) {
dict_used[current_index] = true;
dict_used_count++;

if (dict_used_count == dict_length) {
// The dictionary is already compact, so just return here
output_map = nullptr;
out_compact_dictionary = nullptr;
return Status::OK();
}
}
}

dict_used[current_index] = true;
}
CType max_used_index = 0, used_count = 0;
for (auto [i, used] : Zip(Enumerate<CType>, dict_used)) {
if (used) {
max_used_index = i;
++used_count;
using BuilderType = NumericBuilder<IndexArrowType>;
using arrow::compute::Take;
using arrow::compute::TakeOptions;
BuilderType dict_indices_builder(pool);
ARROW_ASSIGN_OR_RAISE(output_map,
AllocateBuffer(dict_length * sizeof(int32_t), pool));
int32_t* output_map_raw = reinterpret_cast<int32_t*>(output_map->mutable_data());
int32_t current_index = 0;
for (CType i = 0; i < dict_len; i++) {
if (dict_used[i]) {
ARROW_RETURN_NOT_OK(dict_indices_builder.Append(i));
output_map_raw[i] = current_index;
current_index++;
} else {
output_map_raw[i] = -1;
}
}
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> compacted_dict_indices,
dict_indices_builder.Finish());
ARROW_ASSIGN_OR_RAISE(auto compacted_dict_res,
Take(Datum(data->dictionary), compacted_dict_indices,
TakeOptions::NoBoundsCheck()));
out_compact_dictionary = compacted_dict_res.make_array();
return Status::OK();
}
if (max_used_index == used_count - 1) {
// The dictionary is already compact, so just return here
out_compact_dictionary = MakeArray(data);
return nullptr;

template <typename Type>
enable_if_integer<Type, Status> Visit(const Type&) {
return CompactTransposeMapImpl<Type>();
}

using BuilderType = NumericBuilder<IndexArrowType>;
using arrow::compute::Take;
using arrow::compute::TakeOptions;
BuilderType dict_indices_builder(pool);
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> result,
AllocateBuffer(dict_length * sizeof(int32_t), pool));
int32_t* result_raw = reinterpret_cast<int32_t*>(result->mutable_data());
int32_t current_index = 0;
for (CType i = 0; i < dict_len; i++) {
if (dict_used[i]) {
ARROW_RETURN_NOT_OK(dict_indices_builder.Append(i));
result_raw[i] = current_index;
current_index++;
} else {
result_raw[i] = -1;
}
Status Visit(const DataType& type) {
return Status::TypeError("Expected an Index Type of Int or UInt");
}
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> compacted_dict_indices,
dict_indices_builder.Finish());
ARROW_ASSIGN_OR_RAISE(auto compacted_dict_res,
Take(Datum(data->dictionary), compacted_dict_indices,
TakeOptions::NoBoundsCheck()));
out_compact_dictionary = compacted_dict_res.make_array();

return result;
}
};

Result<std::unique_ptr<Buffer>> CompactTransposeMap(
const std::shared_ptr<ArrayData>& data, MemoryPool* pool,
Expand All @@ -288,26 +305,11 @@ Result<std::unique_ptr<Buffer>> CompactTransposeMap(
}

const auto& dict_type = checked_cast<const DictionaryType&>(*data->type);
switch (dict_type.index_type()->id()) {
case Type::UINT8:
return CompactTransposeMapImpl<UInt8Type>(data, pool, out_compact_dictionary);
case Type::INT8:
return CompactTransposeMapImpl<Int8Type>(data, pool, out_compact_dictionary);
case Type::UINT16:
return CompactTransposeMapImpl<UInt16Type>(data, pool, out_compact_dictionary);
case Type::INT16:
return CompactTransposeMapImpl<Int16Type>(data, pool, out_compact_dictionary);
case Type::UINT32:
return CompactTransposeMapImpl<UInt32Type>(data, pool, out_compact_dictionary);
case Type::INT32:
return CompactTransposeMapImpl<Int32Type>(data, pool, out_compact_dictionary);
case Type::UINT64:
return CompactTransposeMapImpl<UInt64Type>(data, pool, out_compact_dictionary);
case Type::INT64:
return CompactTransposeMapImpl<Int64Type>(data, pool, out_compact_dictionary);
default:
util::Unreachable("Expected an Index Type of Int or UInt");
}
CompactTransposeMapVistor vistor{data, pool, nullptr, nullptr};
RETURN_NOT_OK(VisitTypeInline(*dict_type.index_type(), &vistor));

out_compact_dictionary = vistor.out_compact_dictionary;
return std::move(vistor.output_map);
}
} // namespace

Expand All @@ -324,8 +326,13 @@ Result<std::shared_ptr<Array>> DictionaryArray::Compact(MemoryPool* pool) const
std::shared_ptr<Array> compact_dictionary;
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> transpose_map,
CompactTransposeMap(this->data_, pool, compact_dictionary));
return this->Transpose(this->type(), compact_dictionary,
transpose_map->data_as<int32_t>(), pool);

if (transpose_map == nullptr) {
return std::make_shared<DictionaryArray>(this->data_);
} else {
return this->Transpose(this->type(), compact_dictionary,
transpose_map->data_as<int32_t>(), pool);
}
}

// ----------------------------------------------------------------------
Expand Down

0 comments on commit a7d5c60

Please sign in to comment.