Skip to content

Commit

Permalink
Made hash work for dictionary arrays.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed May 30, 2021
1 parent faf8ab9 commit 1b48f38
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 22 deletions.
2 changes: 1 addition & 1 deletion datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ simd = ["arrow2/simd"]
[dependencies]
ahash = "0.7"
hashbrown = "0.11"
arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "d2be5b4dd0176672ab34460d8147562590d33567" }
arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "a86ec804e0e40359041cbcba9cb3556f7729570d" }
sqlparser = "0.9.0"
paste = "^1.0"
num_cpus = "1.13.0"
Expand Down
19 changes: 18 additions & 1 deletion datafusion/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,30 @@ fn calculate_statistics(
}
}

fn field_is_consistent(lhs: &Field, rhs: &Field) -> bool {
lhs.name() == rhs.name()
&& lhs.data_type() == rhs.data_type()
&& (lhs.is_nullable() || lhs.is_nullable() == rhs.is_nullable())
}

fn schema_is_consistent(lhs: &Schema, rhs: &Schema) -> bool {
if lhs.fields().len() != rhs.fields().len() {
return false;
}

lhs.fields()
.iter()
.zip(rhs.fields().iter())
.all(|(lhs, rhs)| field_is_consistent(lhs, rhs))
}

impl MemTable {
/// Create a new in-memory table from the provided schema and record batches
pub fn try_new(schema: SchemaRef, partitions: Vec<Vec<RecordBatch>>) -> Result<Self> {
if partitions
.iter()
.flatten()
.all(|batches| schema.as_ref() == batches.schema().as_ref())
.all(|batch| schema_is_consistent(schema.as_ref(), batch.schema()))
{
let statistics = calculate_statistics(&schema, &partitions);
debug!("MemTable statistics: {:?}", statistics);
Expand Down
13 changes: 7 additions & 6 deletions datafusion/src/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,18 +407,19 @@ fn producer_task<R: Read>(
.from_reader(reader);

let mut current_read = 0;
let mut rows = vec![read::ByteRecord::default(); batch_size];
while current_read < limit {
let batch_size = batch_size.min(limit - current_read);
let rows = read::read_rows(&mut reader, 0, batch_size)?;
if rows.is_empty() {
break;
}
current_read += rows.len();
let rows_read = read::read_rows(&mut reader, 0, &mut rows[..batch_size])?;
current_read += rows_read;

let batch = deserialize(&rows, projection, schema.clone());
let batch = deserialize(&rows[..rows_read], projection, schema.clone());
response_tx
.blocking_send(batch)
.map_err(|x| DataFusionError::Execution(format!("{}", x)))?;
if rows_read < batch_size {
break;
}
}
Ok(())
}
Expand Down
45 changes: 31 additions & 14 deletions datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,36 @@ pin_project! {
}
}

fn hash_(group_values: &[ArrayRef]) -> Result<MutableBuffer<u64>> {
// compute the hashes
// todo: we should be able to use `MutableBuffer<u64>` to compute the hash and ^ them without
// allocating all the hashes before ^ them
let hashes = group_values
.iter()
.map(|x| {
let a = match x.data_type() {
DataType::Dictionary(_, d) => {
// todo: think about how to perform this more efficiently
// * first hash, then unpack
// * do not unpack at all, and instead figure out a way to leverage dictionary-encoded.
let unpacked = arrow2::compute::cast::cast(x.as_ref(), d)?;
arrow2::compute::hash::hash(unpacked.as_ref())
}
_ => arrow2::compute::hash::hash(x.as_ref()),
};
Ok(a?)
})
.collect::<Result<Vec<_>>>()?;
let hash = MutableBuffer::<u64>::from(hashes[0].values());

Ok(hashes.iter().skip(1).fold(hash, |mut acc, x| {
acc.iter_mut()
.zip(x.values().iter())
.for_each(|(hash, other)| *hash ^= other);
acc
}))
}

fn group_aggregate_batch(
mode: &AggregateMode,
group_expr: &[Arc<dyn PhysicalExpr>],
Expand Down Expand Up @@ -354,20 +384,7 @@ fn group_aggregate_batch(
// Make sure we can create the accumulators or otherwise return an error
create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?;

// compute the hashes
// todo: we should be able to use `MutableBuffer<u64>` to compute the hash and ^ them without
// allocating all the hashes before ^ them
let hashes = group_values
.iter()
.map(|x| Ok(arrow2::compute::hash::hash(x.as_ref())?))
.collect::<Result<Vec<_>>>()?;
let hash = MutableBuffer::<u64>::from(hashes[0].values());
let hash = hashes.iter().skip(1).fold(hash, |mut acc, x| {
acc.iter_mut()
.zip(x.values().iter())
.for_each(|(hash, other)| *hash ^= other);
acc
});
let hash = hash_(&group_values)?;

let mut batch_keys = vec![];
hash.iter().enumerate().for_each(|(row, key)| {
Expand Down

0 comments on commit 1b48f38

Please sign in to comment.