Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zanmato1984 committed Feb 9, 2024
1 parent 460dfe3 commit 4a4dda0
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 47 deletions.
14 changes: 8 additions & 6 deletions cpp/src/arrow/acero/swiss_join.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ int RowArrayAccessor::NumRowsToSkip(const RowTableImpl& rows, int column_id, int

// Find the length of the requested varying length field in that row
//
uint32_t field_offset_within_row, field_length;
uint32_t field_offset_within_row;
int32_t field_length;
if (varbinary_column_id == 0) {
rows.metadata().first_varbinary_offset_and_length(
row_ptr, &field_offset_within_row, &field_length);
Expand Down Expand Up @@ -123,7 +124,8 @@ void RowArrayAccessor::Visit(const RowTableImpl& rows, int column_id, int num_ro
int varbinary_column_id = VarbinaryColumnId(rows.metadata(), column_id);
const uint8_t* row_ptr_base = rows.data(2);
const int32_t* row_offsets = rows.offsets();
uint32_t field_offset_within_row, field_length;
uint32_t field_offset_within_row;
int32_t field_length;

if (varbinary_column_id == 0) {
// Case 1: This is the first varbinary column
Expand Down Expand Up @@ -493,11 +495,11 @@ Status RowArrayMerge::PrepareForMerge(RowArray* target,
num_rows = 0;
num_bytes = 0;
for (size_t i = 0; i < sources.size(); ++i) {
target->rows_.mutable_offsets()[num_rows] = static_cast<uint32_t>(num_bytes);
target->rows_.mutable_offsets()[num_rows] = static_cast<int32_t>(num_bytes);
num_rows += sources[i]->rows_.length();
num_bytes += sources[i]->rows_.offsets()[sources[i]->rows_.length()];
}
target->rows_.mutable_offsets()[num_rows] = static_cast<uint32_t>(num_bytes);
target->rows_.mutable_offsets()[num_rows] = static_cast<int32_t>(num_bytes);
}

return Status::OK();
Expand Down Expand Up @@ -573,7 +575,7 @@ void RowArrayMerge::CopyVaryingLength(RowTableImpl* target, const RowTableImpl&
if (!source_rows_permutation) {
int64_t target_row_offset = first_target_row_offset;
for (int64_t i = 0; i < num_source_rows; ++i) {
target_offsets[first_target_row_id + i] = static_cast<uint32_t>(target_row_offset);
target_offsets[first_target_row_id + i] = static_cast<int32_t>(target_row_offset);
target_row_offset += source_offsets[i + 1] - source_offsets[i];
}
// We purposefully skip outputting of N+1 offset, to allow concurrent
Expand Down Expand Up @@ -604,7 +606,7 @@ void RowArrayMerge::CopyVaryingLength(RowTableImpl* target, const RowTableImpl&
*target_row_ptr++ = *source_row_ptr++;
}

target_offsets[first_target_row_id + i] = static_cast<uint32_t>(target_row_offset);
target_offsets[first_target_row_id + i] = static_cast<int32_t>(target_row_offset);
target_row_offset += length;
}
}
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/arrow/compute/row/compare_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ void KeyCompare::CompareBinaryColumnToRowHelper(
for (uint32_t i = first_row_to_compare; i < num_rows_to_compare; ++i) {
uint32_t irow_left = use_selection ? sel_left_maybe_null[i] : i;
uint32_t irow_right = left_to_right_map[irow_left];
uint32_t offset_right = offsets_right[irow_right] + offset_within_row;
int32_t offset_right = offsets_right[irow_right] + offset_within_row;
match_bytevector[i] = compare_fn(rows_left, rows_right, irow_left, offset_right);
}
}
Expand Down Expand Up @@ -247,10 +247,10 @@ void KeyCompare::CompareVarBinaryColumnToRowHelper(
for (uint32_t i = first_row_to_compare; i < num_rows_to_compare; ++i) {
uint32_t irow_left = use_selection ? sel_left_maybe_null[i] : i;
uint32_t irow_right = left_to_right_map[irow_left];
uint32_t begin_left = offsets_left[irow_left];
uint32_t length_left = offsets_left[irow_left + 1] - begin_left;
uint32_t begin_right = offsets_right[irow_right];
uint32_t length_right;
int32_t begin_left = offsets_left[irow_left];
int32_t length_left = offsets_left[irow_left + 1] - begin_left;
int32_t begin_right = offsets_right[irow_right];
int32_t length_right;
uint32_t offset_within_row;
if (!is_first_varbinary_col) {
rows.metadata().nth_varbinary_offset_and_length(
Expand All @@ -260,7 +260,7 @@ void KeyCompare::CompareVarBinaryColumnToRowHelper(
rows_right + begin_right, &offset_within_row, &length_right);
}
begin_right += offset_within_row;
uint32_t length = std::min(length_left, length_right);
int32_t length = std::min(length_left, length_right);
const uint64_t* key_left_ptr =
reinterpret_cast<const uint64_t*>(rows_left + begin_left);
util::CheckAlignment<uint64_t>(rows_right + begin_right);
Expand Down
15 changes: 7 additions & 8 deletions cpp/src/arrow/compute/row/compare_internal_avx2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,7 @@ uint32_t KeyCompare::CompareBinaryColumnToRowHelper_avx2(
irow_right =
_mm256_loadu_si256(reinterpret_cast<const __m256i*>(left_to_right_map) + i);
}
__m256i offset_right =
_mm256_i32gather_epi32((const int*)offsets_right, irow_right, 4);
__m256i offset_right = _mm256_i32gather_epi32(offsets_right, irow_right, 4);
offset_right = _mm256_add_epi32(offset_right, _mm256_set1_epi32(offset_within_row));

reinterpret_cast<uint64_t*>(match_bytevector)[i] =
Expand Down Expand Up @@ -511,10 +510,10 @@ void KeyCompare::CompareVarBinaryColumnToRowImp_avx2(
for (uint32_t i = 0; i < num_rows_to_compare; ++i) {
uint32_t irow_left = use_selection ? sel_left_maybe_null[i] : i;
uint32_t irow_right = left_to_right_map[irow_left];
uint32_t begin_left = offsets_left[irow_left];
uint32_t length_left = offsets_left[irow_left + 1] - begin_left;
uint32_t begin_right = offsets_right[irow_right];
uint32_t length_right;
int32_t begin_left = offsets_left[irow_left];
int32_t length_left = offsets_left[irow_left + 1] - begin_left;
int32_t begin_right = offsets_right[irow_right];
int32_t length_right;
uint32_t offset_within_row;
if (!is_first_varbinary_col) {
rows.metadata().nth_varbinary_offset_and_length(
Expand All @@ -526,15 +525,15 @@ void KeyCompare::CompareVarBinaryColumnToRowImp_avx2(
begin_right += offset_within_row;

__m256i result_or = _mm256_setzero_si256();
uint32_t length = std::min(length_left, length_right);
int32_t length = std::min(length_left, length_right);
if (length > 0) {
const __m256i* key_left_ptr =
reinterpret_cast<const __m256i*>(rows_left + begin_left);
const __m256i* key_right_ptr =
reinterpret_cast<const __m256i*>(rows_right + begin_right);
int32_t j;
// length is greater than zero
for (j = 0; j < (static_cast<int32_t>(length) + 31) / 32 - 1; ++j) {
for (j = 0; j < (length + 31) / 32 - 1; ++j) {
__m256i key_left = _mm256_loadu_si256(key_left_ptr + j);
__m256i key_right = _mm256_loadu_si256(key_right_ptr + j);
result_or = _mm256_or_si256(result_or, _mm256_xor_si256(key_left, key_right));
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/compute/row/encode_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ void EncoderVarBinary::EncodeSelected(uint32_t ivarbinary, RowTableImpl* rows,
for (uint32_t i = 0; i < num_selected; ++i) {
uint8_t* row = row_base + row_offsets[i];
uint32_t row_offset;
uint32_t length;
int32_t length;
rows->metadata().first_varbinary_offset_and_length(row, &row_offset, &length);
uint32_t irow = selection[i];
memcpy(row + row_offset, col_base + col_offsets[irow], length);
Expand All @@ -858,7 +858,7 @@ void EncoderVarBinary::EncodeSelected(uint32_t ivarbinary, RowTableImpl* rows,
for (uint32_t i = 0; i < num_selected; ++i) {
uint8_t* row = row_base + row_offsets[i];
uint32_t row_offset;
uint32_t length;
int32_t length;
rows->metadata().nth_varbinary_offset_and_length(row, ivarbinary, &row_offset,
&length);
uint32_t irow = selection[i];
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/arrow/compute/row/encode_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ class EncoderVarBinary {

private:
template <bool first_varbinary_col, class COPY_FN>
static inline void DecodeHelper(uint32_t start_row, uint32_t num_rows,
static inline void DecodeHelper(uint32_t start_row, int32_t num_rows,
uint32_t varbinary_col_id,
const RowTableImpl* rows_const,
RowTableImpl* rows_mutable_maybe_null,
Expand All @@ -270,16 +270,16 @@ class EncoderVarBinary {
const int32_t* row_offsets_for_batch = rows_const->offsets() + start_row;
const int32_t* col_offsets = col_const->offsets();

uint32_t col_offset_next = col_offsets[0];
for (uint32_t i = 0; i < num_rows; ++i) {
uint32_t col_offset = col_offset_next;
int32_t col_offset_next = col_offsets[0];
for (int32_t i = 0; i < num_rows; ++i) {
int32_t col_offset = col_offset_next;
col_offset_next = col_offsets[i + 1];

uint32_t row_offset = row_offsets_for_batch[i];
int32_t row_offset = row_offsets_for_batch[i];
const uint8_t* row = rows_const->data(2) + row_offset;

uint32_t offset_within_row;
uint32_t length;
int32_t length;
if (first_varbinary_col) {
rows_const->metadata().first_varbinary_offset_and_length(row, &offset_within_row,
&length);
Expand Down
30 changes: 15 additions & 15 deletions cpp/src/arrow/compute/row/row_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ Status RowTableImpl::Init(MemoryPool* pool, const RowTableMetadata& metadata) {
auto offsets, AllocateResizableBuffer(size_offsets(kInitialRowsCapacity), pool_));
offsets_ = std::move(offsets);
memset(offsets_->mutable_data(), 0, size_offsets(kInitialRowsCapacity));
reinterpret_cast<uint32_t*>(offsets_->mutable_data())[0] = 0;
reinterpret_cast<int32_t*>(offsets_->mutable_data())[0] = 0;

ARROW_ASSIGN_OR_RAISE(
auto rows,
Expand Down Expand Up @@ -224,7 +224,7 @@ void RowTableImpl::Clean() {
has_any_nulls_ = false;

if (!metadata_.is_fixed_length) {
reinterpret_cast<uint32_t*>(offsets_->mutable_data())[0] = 0;
reinterpret_cast<int32_t*>(offsets_->mutable_data())[0] = 0;
}
}

Expand Down Expand Up @@ -314,21 +314,21 @@ Status RowTableImpl::ResizeOptionalVaryingLengthBuffer(int64_t num_extra_bytes)
}

Status RowTableImpl::AppendSelectionFrom(const RowTableImpl& from,
uint32_t num_rows_to_append,
int32_t num_rows_to_append,
const uint16_t* source_row_ids) {
DCHECK(metadata_.is_compatible(from.metadata()));

RETURN_NOT_OK(ResizeFixedLengthBuffers(num_rows_to_append));

if (!metadata_.is_fixed_length) {
// Varying-length rows
auto from_offsets = reinterpret_cast<const uint32_t*>(from.offsets_->data());
auto to_offsets = reinterpret_cast<uint32_t*>(offsets_->mutable_data());
uint32_t total_length = to_offsets[num_rows_];
uint32_t total_length_to_append = 0;
for (uint32_t i = 0; i < num_rows_to_append; ++i) {
auto from_offsets = reinterpret_cast<const int32_t*>(from.offsets_->data());
auto to_offsets = reinterpret_cast<int32_t*>(offsets_->mutable_data());
int32_t total_length = to_offsets[num_rows_];
int32_t total_length_to_append = 0;
for (int32_t i = 0; i < num_rows_to_append; ++i) {
uint16_t row_id = source_row_ids ? source_row_ids[i] : i;
uint32_t length = from_offsets[row_id + 1] - from_offsets[row_id];
int32_t length = from_offsets[row_id + 1] - from_offsets[row_id];
total_length_to_append += length;
to_offsets[num_rows_ + i + 1] = total_length + total_length_to_append;
}
Expand All @@ -337,9 +337,9 @@ Status RowTableImpl::AppendSelectionFrom(const RowTableImpl& from,

const uint8_t* src = from.rows_->data();
uint8_t* dst = rows_->mutable_data() + total_length;
for (uint32_t i = 0; i < num_rows_to_append; ++i) {
for (int32_t i = 0; i < num_rows_to_append; ++i) {
uint16_t row_id = source_row_ids ? source_row_ids[i] : i;
uint32_t length = from_offsets[row_id + 1] - from_offsets[row_id];
int32_t length = from_offsets[row_id + 1] - from_offsets[row_id];
auto src64 = reinterpret_cast<const uint64_t*>(src + from_offsets[row_id]);
auto dst64 = reinterpret_cast<uint64_t*>(dst);
for (uint32_t j = 0; j < bit_util::CeilDiv(length, 8); ++j) {
Expand All @@ -351,7 +351,7 @@ Status RowTableImpl::AppendSelectionFrom(const RowTableImpl& from,
// Fixed-length rows
const uint8_t* src = from.rows_->data();
uint8_t* dst = rows_->mutable_data() + num_rows_ * metadata_.fixed_length;
for (uint32_t i = 0; i < num_rows_to_append; ++i) {
for (int32_t i = 0; i < num_rows_to_append; ++i) {
uint16_t row_id = source_row_ids ? source_row_ids[i] : i;
uint32_t length = metadata_.fixed_length;
auto src64 = reinterpret_cast<const uint64_t*>(src + length * row_id);
Expand All @@ -368,7 +368,7 @@ Status RowTableImpl::AppendSelectionFrom(const RowTableImpl& from,
uint64_t dst_byte_offset = num_rows_ * byte_length;
const uint8_t* src_base = from.null_masks_->data();
uint8_t* dst_base = null_masks_->mutable_data();
for (uint32_t i = 0; i < num_rows_to_append; ++i) {
for (int32_t i = 0; i < num_rows_to_append; ++i) {
uint32_t row_id = source_row_ids ? source_row_ids[i] : i;
int64_t src_byte_offset = row_id * byte_length;
const uint8_t* src = src_base + src_byte_offset;
Expand All @@ -384,8 +384,8 @@ Status RowTableImpl::AppendSelectionFrom(const RowTableImpl& from,
return Status::OK();
}

Status RowTableImpl::AppendEmpty(uint32_t num_rows_to_append,
uint32_t num_extra_bytes_to_append) {
Status RowTableImpl::AppendEmpty(int32_t num_rows_to_append,
int32_t num_extra_bytes_to_append) {
RETURN_NOT_OK(ResizeFixedLengthBuffers(num_rows_to_append));
RETURN_NOT_OK(ResizeOptionalVaryingLengthBuffer(num_extra_bytes_to_append));
num_rows_ += num_rows_to_append;
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/compute/row/row_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ struct ARROW_EXPORT RowTableMetadata {

/// Returns the offset within the row and length of the first varbinary field.
inline void first_varbinary_offset_and_length(const uint8_t* row, uint32_t* offset,
uint32_t* length) const {
int32_t* length) const {
ARROW_DCHECK(!is_fixed_length);
*offset = fixed_length;
*length = varbinary_end_array(row)[0] - fixed_length;
Expand All @@ -122,7 +122,7 @@ struct ARROW_EXPORT RowTableMetadata {
/// fields.
inline void nth_varbinary_offset_and_length(const uint8_t* row, int varbinary_id,
uint32_t* out_offset,
uint32_t* out_length) const {
int32_t* out_length) const {
ARROW_DCHECK(!is_fixed_length);
ARROW_DCHECK(varbinary_id > 0);
const int32_t* varbinary_end = varbinary_end_array(row);
Expand Down Expand Up @@ -175,12 +175,12 @@ class ARROW_EXPORT RowTableImpl {
/// \param num_extra_bytes_to_append For tables storing variable-length data this
/// should be a guess of how many data bytes will be needed to populate the
/// data. This is ignored if there are no variable-length columns
Status AppendEmpty(uint32_t num_rows_to_append, uint32_t num_extra_bytes_to_append);
Status AppendEmpty(int32_t num_rows_to_append, int32_t num_extra_bytes_to_append);
/// \brief Append rows from a source table
/// \param from The table to append from
/// \param num_rows_to_append The number of rows to append
/// \param source_row_ids Indices (into `from`) of the desired rows
Status AppendSelectionFrom(const RowTableImpl& from, uint32_t num_rows_to_append,
Status AppendSelectionFrom(const RowTableImpl& from, int32_t num_rows_to_append,
const uint16_t* source_row_ids);
/// \brief Metadata describing the data stored in this table
const RowTableMetadata& metadata() const { return metadata_; }
Expand Down

0 comments on commit 4a4dda0

Please sign in to comment.