Skip to content

Commit

Permalink
Optimize map_agg function for inputs with many duplicate keys (facebo…
Browse files Browse the repository at this point in the history
…okincubator#5468)

Summary:
The original implementation of map_agg function accumulated all keys in memory
before checking for duplicates. When input have many duplicate keys this leads
to high memory and CPU usage. The new implementation doesn't store duplicate
keys.

Fixes facebookincubator#5462

Pull Request resolved: facebookincubator#5468

Reviewed By: pedroerp

Differential Revision: D47123782

Pulled By: mbasmanova

fbshipit-source-id: 0d4817b96f4348481f470336ca9b89f6802a8001
  • Loading branch information
mbasmanova authored and facebook-github-bot committed Jul 4, 2023
1 parent 61bf505 commit 0005a75
Show file tree
Hide file tree
Showing 8 changed files with 619 additions and 252 deletions.
1 change: 0 additions & 1 deletion velox/functions/prestosql/aggregates/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ add_library(
ChecksumAggregate.cpp
HistogramAggregate.cpp
MapAggAggregate.cpp
MapAggregateBase.cpp
MapUnionAggregate.cpp
MapUnionSumAggregate.cpp
MinMaxAggregates.cpp
Expand Down
271 changes: 271 additions & 0 deletions velox/functions/prestosql/aggregates/MapAccumulator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <folly/container/F14Map.h>
#include "velox/common/memory/HashStringAllocator.h"
#include "velox/functions/prestosql/aggregates/AddressableNonNullValueList.h"
#include "velox/functions/prestosql/aggregates/Strings.h"
#include "velox/functions/prestosql/aggregates/ValueList.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/DecodedVector.h"
#include "velox/vector/FlatVector.h"

namespace facebook::velox::aggregate::prestosql {

namespace detail {
/// Maintains a key-value map. Keys must be non-null.
template <
typename T,
typename Hash = std::hash<T>,
typename EqualTo = std::equal_to<T>>
struct MapAccumulator {
// Value is the index of the corresponding entry in 'values'.
folly::F14FastMap<
T,
int32_t,
Hash,
EqualTo,
AlignedStlAllocator<std::pair<const T, int32_t>, 16>>
keys;
ValueList values;

MapAccumulator(const TypePtr& /*type*/, HashStringAllocator* allocator)
: keys{AlignedStlAllocator<std::pair<const T, int32_t>, 16>(allocator)} {}

MapAccumulator(Hash hash, EqualTo equalTo, HashStringAllocator* allocator)
: keys{
0,
hash,
equalTo,
AlignedStlAllocator<std::pair<const T, int32_t>, 16>(allocator)} {}

/// Adds key-value pair if entry with that key doesn't exist yet.
void insert(
const DecodedVector& decodedKeys,
const DecodedVector& decodedValues,
vector_size_t index,
HashStringAllocator& allocator) {
// Drop duplicate keys.
auto cnt = keys.size();
if (keys.insert({decodedKeys.valueAt<T>(index), cnt}).second) {
values.appendValue(decodedValues, index, &allocator);
}
}

void insertRange(
const DecodedVector& decodedKeys,
const DecodedVector& decodedValues,
vector_size_t offset,
vector_size_t size,
HashStringAllocator& allocator) {
for (auto i = offset; i < offset + size; ++i) {
insert(decodedKeys, decodedValues, i, allocator);
}
}

/// Returns number of key-value pairs.
size_t size() const {
return keys.size();
}

void extract(
const VectorPtr& mapKeys,
const VectorPtr& mapValues,
vector_size_t offset) {
const auto mapSize = keys.size();

// Align keys and values as the order of keys in 'keys' may not match the
// order of values in 'values'.
folly::F14FastMap<int32_t, int32_t> indices;

auto flatKeys = mapKeys->asFlatVector<T>();

vector_size_t index = offset;
for (auto value : keys) {
flatKeys->set(index, value.first);
indices[value.second] = index - offset;
++index;
}

extractValues(mapValues, offset, mapSize, indices);
}

void extractValues(
const VectorPtr& mapValues,
vector_size_t offset,
int32_t mapSize,
const folly::F14FastMap<int32_t, int32_t>& indices) {
ValueListReader valuesReader(values);
for (auto index = 0; index < mapSize; ++index) {
valuesReader.next(*mapValues, offset + indices.at(index));
}
}

void free(HashStringAllocator& allocator) {
values.free(&allocator);
}
};

/// Maintains a map with string keys.
struct StringViewMapAccumulator {
/// A set of unique StringViews pointing to storage managed by 'strings'.
MapAccumulator<StringView> base;

/// Stores unique non-null non-inline strings.
Strings strings;

StringViewMapAccumulator(const TypePtr& type, HashStringAllocator* allocator)
: base{type, allocator} {}

void insert(
const DecodedVector& decodedKeys,
const DecodedVector& decodedValues,
vector_size_t index,
HashStringAllocator& allocator) {
auto key = decodedKeys.valueAt<StringView>(index);
if (!key.isInline()) {
if (base.keys.contains(key)) {
return;
}
key = strings.append(key, allocator);
}

auto cnt = base.keys.size();
if (base.keys.insert({key, cnt}).second) {
base.values.appendValue(decodedValues, index, &allocator);
}
}

void insertRange(
const DecodedVector& decodedKeys,
const DecodedVector& decodedValues,
vector_size_t offset,
vector_size_t size,
HashStringAllocator& allocator) {
base.insertRange(decodedKeys, decodedValues, offset, size, allocator);
}

size_t size() const {
return base.size();
}

void extract(
const VectorPtr& mapKeys,
const VectorPtr& mapValues,
vector_size_t offset) {
base.extract(mapKeys, mapValues, offset);
}

void free(HashStringAllocator& allocator) {
strings.free(allocator);
}
};

/// Maintains a map with keys of type array, map or struct.
struct ComplexTypeMapAccumulator {
/// A set of pointers to values stored in AddressableNonNullValueList.
MapAccumulator<
HashStringAllocator::Position,
AddressableNonNullValueList::Hash,
AddressableNonNullValueList::EqualTo>
base;

/// Stores unique non-null keys.
AddressableNonNullValueList serializedKeys;

ComplexTypeMapAccumulator(const TypePtr& type, HashStringAllocator* allocator)
: base{
AddressableNonNullValueList::Hash{},
AddressableNonNullValueList::EqualTo{type},
allocator} {}

void insert(
const DecodedVector& decodedKeys,
const DecodedVector& decodedValues,
vector_size_t index,
HashStringAllocator& allocator) {
auto position = serializedKeys.append(decodedKeys, index, &allocator);

auto cnt = base.keys.size();
if (!base.keys.insert({position, cnt}).second) {
serializedKeys.removeLast(position);
return;
}

base.values.appendValue(decodedValues, index, &allocator);
}

void insertRange(
const DecodedVector& decodedKeys,
const DecodedVector& decodedValues,
vector_size_t offset,
vector_size_t size,
HashStringAllocator& allocator) {
base.insertRange(decodedKeys, decodedValues, offset, size, allocator);
}

size_t size() const {
return base.size();
}

void extract(
const VectorPtr& mapKeys,
const VectorPtr& mapValues,
vector_size_t offset) {
const auto mapSize = base.keys.size();

folly::F14FastMap<int32_t, int32_t> indices;
indices.reserve(mapSize);

vector_size_t index = offset;
for (const auto& value : base.keys) {
AddressableNonNullValueList::read(value.first, *mapKeys, index);
indices[value.second] = index - offset;
++index;
}

base.extractValues(mapValues, offset, mapSize, indices);
}

void free(HashStringAllocator& allocator) {
serializedKeys.free(allocator);
base.free(allocator);
}
};

template <typename T>
struct MapAccumulatorTypeTraits {
using AccumulatorType = MapAccumulator<T>;
};

template <>
struct MapAccumulatorTypeTraits<StringView> {
using AccumulatorType = StringViewMapAccumulator;
};

template <>
struct MapAccumulatorTypeTraits<ComplexType> {
using AccumulatorType = ComplexTypeMapAccumulator;
};

} // namespace detail

template <typename T>
using MapAccumulator =
typename detail::MapAccumulatorTypeTraits<T>::AccumulatorType;

} // namespace facebook::velox::aggregate::prestosql
44 changes: 23 additions & 21 deletions velox/functions/prestosql/aggregates/MapAggAggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,30 @@ namespace facebook::velox::aggregate::prestosql {
namespace {
// See documentation at
// https://prestodb.io/docs/current/functions/aggregate.html
class MapAggAggregate : public aggregate::MapAggregateBase {
template <typename K>
class MapAggAggregate : public MapAggregateBase<K> {
public:
explicit MapAggAggregate(TypePtr resultType) : MapAggregateBase(resultType) {}
explicit MapAggAggregate(TypePtr resultType)
: MapAggregateBase<K>(std::move(resultType)) {}

using Base = MapAggregateBase<K>;

void addRawInput(
char** groups,
const SelectivityVector& rows,
const std::vector<VectorPtr>& args,
bool /*mayPushdown*/) override {
decodedKeys_.decode(*args[0], rows);
decodedValues_.decode(*args[1], rows);
Base::decodedKeys_.decode(*args[0], rows);
Base::decodedValues_.decode(*args[1], rows);

rows.applyToSelected([&](vector_size_t row) {
// Skip null keys
if (!decodedKeys_.isNullAt(row)) {
if (!Base::decodedKeys_.isNullAt(row)) {
auto group = groups[row];
clearNull(group);
auto accumulator = value<MapAccumulator>(group);
auto tracker = trackRowSize(group);
accumulator->keys.appendValue(decodedKeys_, row, allocator_);
accumulator->values.appendValue(decodedValues_, row, allocator_);
Base::clearNull(group);
auto tracker = Base::trackRowSize(group);
Base::accumulator(group)->insert(
Base::decodedKeys_, Base::decodedValues_, row, *Base::allocator_);
}
});
}
Expand All @@ -50,19 +53,17 @@ class MapAggAggregate : public aggregate::MapAggregateBase {
const SelectivityVector& rows,
const std::vector<VectorPtr>& args,
bool /* mayPushdown */) override {
auto accumulator = value<MapAccumulator>(group);
auto& keys = accumulator->keys;
auto& values = accumulator->values;
auto singleAccumulator = Base::accumulator(group);

decodedKeys_.decode(*args[0], rows);
decodedValues_.decode(*args[1], rows);
auto tracker = trackRowSize(group);
Base::decodedKeys_.decode(*args[0], rows);
Base::decodedValues_.decode(*args[1], rows);
auto tracker = Base::trackRowSize(group);
rows.applyToSelected([&](vector_size_t row) {
// Skip null keys
if (!decodedKeys_.isNullAt(row)) {
clearNull(group);
keys.appendValue(decodedKeys_, row, allocator_);
values.appendValue(decodedValues_, row, allocator_);
if (!Base::decodedKeys_.isNullAt(row)) {
Base::clearNull(group);
singleAccumulator->insert(
Base::decodedKeys_, Base::decodedValues_, row, *Base::allocator_);
}
});
}
Expand Down Expand Up @@ -92,7 +93,8 @@ exec::AggregateRegistrationResult registerMapAgg(const std::string& name) {
rawInput ? 2 : 1,
"{} ({}): unexpected number of arguments",
name);
return std::make_unique<MapAggAggregate>(resultType);

return createMapAggregate<MapAggAggregate>(resultType);
});
}

Expand Down
Loading

0 comments on commit 0005a75

Please sign in to comment.