Skip to content

Commit

Permalink
Add vectorization for druid-histogram extension (apache#10304)
Browse files Browse the repository at this point in the history
* First draft

* Remove redundant code from FixedBucketsHistogramAggregator classes

* Add test cases for new classes

* Fix tests in sql compatible mode

* Typo fix

* Fix comment

* Add spelling

* Vectorize only for supported types

* Rename internal aggregator files

* Fix tests
  • Loading branch information
abhishekagarwal87 authored and JulianJaffePinterest committed Jan 22, 2021
1 parent 808f33d commit a4273dd
Show file tree
Hide file tree
Showing 21 changed files with 1,267 additions and 115 deletions.
3 changes: 2 additions & 1 deletion docs/querying/query-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ requirements:
include "selector", "bound", "in", "like", "regex", "search", "and", "or", and "not".
- All filters in filtered aggregators must offer vectorized row-matchers.
- All aggregators must offer vectorized implementations. These include "count", "doubleSum", "floatSum", "longSum", "longMin",
"longMax", "doubleMin", "doubleMax", "floatMin", "floatMax", "hyperUnique", and "filtered".
"longMax", "doubleMin", "doubleMax", "floatMin", "floatMax", "hyperUnique", "filtered", "approxHistogram",
"approxHistogramFold", and "fixedBucketsHistogram" (with numerical input).
- No virtual columns.
- For GroupBy: All dimension specs must be "default" (no extraction functions or filtered dimension specs).
- For GroupBy: No multi-value dimensions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -103,6 +107,24 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
);
}

@Override
public VectorAggregator factorizeVector(VectorColumnSelectorFactory metricVectorFactory)
{
return new ApproximateHistogramVectorAggregator(
metricVectorFactory.makeValueSelector(fieldName),
resolution
);
}

@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
/* skip vectorization for string types which may be parseable to numbers. There is no vector equivalent of
string value selector*/
ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
return (capabilities != null) && capabilities.getType().isNumeric();
}

@Override
public Comparator getComparator()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,54 +28,30 @@
public class ApproximateHistogramBufferAggregator implements BufferAggregator
{
private final BaseFloatColumnValueSelector selector;
private final int resolution;
private final ApproximateHistogramBufferAggregatorHelper innerAggregator;

public ApproximateHistogramBufferAggregator(BaseFloatColumnValueSelector selector, int resolution)
{
this.selector = selector;
this.resolution = resolution;
this.innerAggregator = new ApproximateHistogramBufferAggregatorHelper(resolution);
}

@Override
public void init(ByteBuffer buf, int position)
{
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);

mutationBuffer.putInt(resolution);
mutationBuffer.putInt(0); //initial binCount
for (int i = 0; i < resolution; ++i) {
mutationBuffer.putFloat(0f);
}
for (int i = 0; i < resolution; ++i) {
mutationBuffer.putLong(0L);
}

// min
mutationBuffer.putFloat(Float.POSITIVE_INFINITY);
// max
mutationBuffer.putFloat(Float.NEGATIVE_INFINITY);
innerAggregator.init(buf, position);
}

@Override
public void aggregate(ByteBuffer buf, int position)
{
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);

ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer);
h0.offer(selector.getFloat());

mutationBuffer.position(position);
h0.toBytesDense(mutationBuffer);
innerAggregator.aggregate(buf, position, selector.getFloat());
}

@Override
public Object get(ByteBuffer buf, int position)
{
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
return ApproximateHistogram.fromBytes(mutationBuffer);
return innerAggregator.get(buf, position);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation.histogram;

import java.nio.ByteBuffer;

/**
* A helper class used by {@link ApproximateHistogramBufferAggregator} and {@link ApproximateHistogramVectorAggregator}
* for aggregation operations on byte buffers. Getting the object from value selectors is outside this class.
*/
final class ApproximateHistogramBufferAggregatorHelper
{
private final int resolution;

public ApproximateHistogramBufferAggregatorHelper(int resolution)
{
this.resolution = resolution;
}

public void init(final ByteBuffer buf, final int position)
{
ApproximateHistogram histogram = new ApproximateHistogram(resolution);
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
histogram.toBytesDense(mutationBuffer);
}

public ApproximateHistogram get(final ByteBuffer buf, final int position)
{
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
return ApproximateHistogram.fromBytesDense(mutationBuffer);
}

public void put(final ByteBuffer buf, final int position, final ApproximateHistogram histogram)
{
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
histogram.toBytesDense(mutationBuffer);
}

public void aggregate(final ByteBuffer buf, final int position, final float value)
{
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);

ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer);
h0.offer(value);

mutationBuffer.position(position);
h0.toBytesDense(mutationBuffer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,15 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorObjectSelector;

import javax.annotation.Nullable;
import java.util.Objects;
Expand Down Expand Up @@ -93,10 +99,33 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
);
}

@Override
public VectorAggregator factorizeVector(VectorColumnSelectorFactory metricVectorFactory)
{
VectorObjectSelector selector = metricVectorFactory.makeObjectSelector(fieldName);
return new ApproximateHistogramFoldingVectorAggregator(selector, resolution, lowerLimit, upperLimit);
}


@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
return (capabilities != null) && (capabilities.getType() == ValueType.COMPLEX);
}

@Override
public AggregatorFactory getCombiningFactory()
{
return new ApproximateHistogramFoldingAggregatorFactory(name, name, resolution, numBuckets, lowerLimit, upperLimit, finalizeAsBase64Binary);
return new ApproximateHistogramFoldingAggregatorFactory(
name,
name,
resolution,
numBuckets,
lowerLimit,
upperLimit,
finalizeAsBase64Binary
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,7 @@
public class ApproximateHistogramFoldingBufferAggregator implements BufferAggregator
{
private final BaseObjectColumnValueSelector<ApproximateHistogram> selector;
private final int resolution;
private final float upperLimit;
private final float lowerLimit;

private float[] tmpBufferP;
private long[] tmpBufferB;
private final ApproximateHistogramFoldingBufferAggregatorHelper innerAggregator;

public ApproximateHistogramFoldingBufferAggregator(
BaseObjectColumnValueSelector<ApproximateHistogram> selector,
Expand All @@ -43,50 +38,26 @@ public ApproximateHistogramFoldingBufferAggregator(
)
{
this.selector = selector;
this.resolution = resolution;
this.lowerLimit = lowerLimit;
this.upperLimit = upperLimit;

tmpBufferP = new float[resolution];
tmpBufferB = new long[resolution];
this.innerAggregator = new ApproximateHistogramFoldingBufferAggregatorHelper(resolution, lowerLimit, upperLimit);
}

@Override
public void init(ByteBuffer buf, int position)
{
ApproximateHistogram h = new ApproximateHistogram(resolution, lowerLimit, upperLimit);

ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
// use dense storage for aggregation
h.toBytesDense(mutationBuffer);
innerAggregator.init(buf, position);
}

@Override
public void aggregate(ByteBuffer buf, int position)
{
ApproximateHistogram hNext = selector.getObject();
if (hNext == null) {
return;
}
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);

ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer);
h0.setLowerLimit(lowerLimit);
h0.setUpperLimit(upperLimit);
h0.foldFast(hNext, tmpBufferP, tmpBufferB);

mutationBuffer.position(position);
h0.toBytesDense(mutationBuffer);
innerAggregator.aggregate(buf, position, hNext);
}

@Override
public Object get(ByteBuffer buf, int position)
{
ByteBuffer mutationBuffer = buf.asReadOnlyBuffer();
mutationBuffer.position(position);
return ApproximateHistogram.fromBytesDense(mutationBuffer);
return innerAggregator.get(buf, position);
}

@Override
Expand All @@ -106,6 +77,7 @@ public double getDouble(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("ApproximateHistogramFoldingBufferAggregator does not support getDouble()");
}

@Override
public void close()
{
Expand Down
Loading

0 comments on commit a4273dd

Please sign in to comment.