Skip to content

Commit

Permalink
make PrestoSerializer compression compatible with Presto and add stats
Browse files Browse the repository at this point in the history
Presto uses the uncompressed size as part of checksum, not the
compressed size. Presto also does not use the compression result
unless it is under 0,8 x the source size. Presto does not require the
compressed bit to be set if a compressing exchange codec is in place.

Adds counters to serializers for tracking compression efficacity.

Adds a setting to OutputBufferManager to decide if compression should
be attempted for exchange.

Compatibly with Presto, uses uncompressed output if compression yields less then 20% savings. In addition to Presto, skips attempting compression for a number of forthcoming batches if compression was not effective.

Reports the compression input bytes, compressed bytes and bytes for
which compression was not attempted in runtimeStats.
  • Loading branch information
Orri Erling committed Mar 13, 2024
1 parent 17de646 commit a9b6559
Show file tree
Hide file tree
Showing 10 changed files with 339 additions and 60 deletions.
2 changes: 1 addition & 1 deletion velox/exec/Exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ RowVectorPtr Exchange::getOutput() {

while (!inputStream.atEnd()) {
getSerde()->deserialize(
&inputStream, pool(), outputType_, &result_, resultOffset);
&inputStream, pool(), outputType_, &result_, resultOffset, &options_);
resultOffset = result_->size();
}
}
Expand Down
8 changes: 7 additions & 1 deletion velox/exec/Exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <random>
#include "velox/exec/ExchangeClient.h"
#include "velox/exec/Operator.h"
#include "velox/exec/OutputBufferManager.h"
#include "velox/serializers/PrestoSerializer.h"

namespace facebook::velox::exec {

Expand Down Expand Up @@ -50,7 +52,10 @@ class Exchange : public SourceOperator {
preferredOutputBatchBytes_{
driverCtx->queryConfig().preferredOutputBatchBytes()},
processSplits_{operatorCtx_->driverCtx()->driverId == 0},
exchangeClient_{std::move(exchangeClient)} {}
exchangeClient_{std::move(exchangeClient)} {
options_.compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
}

~Exchange() override {
close();
Expand Down Expand Up @@ -105,6 +110,7 @@ class Exchange : public SourceOperator {
std::vector<std::unique_ptr<SerializedPage>> currentPages_;
bool atEnd_{false};
std::default_random_engine rng_{std::random_device{}()};
serializer::presto::PrestoVectorSerde::Options options_;
};

} // namespace facebook::velox::exec
14 changes: 12 additions & 2 deletions velox/exec/OutputBufferManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,21 @@
#include "velox/exec/Task.h"

namespace facebook::velox::exec {
// static
void OutputBufferManager::initialize(const Options& options) {
std::lock_guard<std::mutex> l(initMutex_);
VELOX_CHECK(
instance_ == nullptr, "May initialize OutputBufferManager only once");
instance_ = std::make_shared<OutputBufferManager>(options);
}

// static
std::weak_ptr<OutputBufferManager> OutputBufferManager::getInstance() {
static auto kInstance = std::make_shared<OutputBufferManager>();
return kInstance;
std::lock_guard<std::mutex> l(initMutex_);
if (!instance_) {
instance_ = std::make_shared<OutputBufferManager>(Options());
}
return instance_;
}

std::shared_ptr<OutputBuffer> OutputBufferManager::getBuffer(
Expand Down
29 changes: 29 additions & 0 deletions velox/exec/OutputBufferManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ namespace facebook::velox::exec {

class OutputBufferManager {
public:
/// Options for shuffle. This is initialized once and affects both
/// PartitionedOutput and Exchange. This can be used for controlling
/// compression, protocol version and other matters where shuffle sides should
/// agree.
struct Options {
common::CompressionKind compressionKind{
common::CompressionKind::CompressionKind_NONE};
};

OutputBufferManager(Options options)
: compressionKind_(options.compressionKind) {}

void initializeTask(
std::shared_ptr<Task> task,
core::PartitionedOutputNode::Kind kind,
Expand Down Expand Up @@ -84,6 +96,10 @@ class OutputBufferManager {

void removeTask(const std::string& taskId);

/// Initializes singleton with 'options'. May be called once before
/// getInstance().
static void initialize(const Options& options);

static std::weak_ptr<OutputBufferManager> getInstance();

uint64_t numBuffers() const;
Expand Down Expand Up @@ -117,17 +133,30 @@ class OutputBufferManager {
// Returns NULL if task not found.
std::shared_ptr<OutputBuffer> getBufferIfExists(const std::string& taskId);

void testingSetCompression(common::CompressionKind kind) {
*const_cast<common::CompressionKind*>(&compressionKind_) = kind;
}

common::CompressionKind compressionKind() const {
return compressionKind_;
}

private:
// Retrieves the set of buffers for a query.
// Throws an exception if buffer doesn't exist.
std::shared_ptr<OutputBuffer> getBuffer(const std::string& taskId);

const common::CompressionKind compressionKind_;

folly::Synchronized<
std::unordered_map<std::string, std::shared_ptr<OutputBuffer>>,
std::mutex>
buffers_;

std::function<std::unique_ptr<OutputStreamListener>()> listenerFactory_{
nullptr};

inline static std::shared_ptr<OutputBufferManager> instance_;
inline static std::mutex initMutex_;
};
} // namespace facebook::velox::exec
20 changes: 18 additions & 2 deletions velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "velox/exec/Task.h"

namespace facebook::velox::exec {

namespace detail {
BlockingReason Destination::advance(
uint64_t maxBytes,
Expand Down Expand Up @@ -55,7 +54,11 @@ BlockingReason Destination::advance(
if (!current_) {
current_ = std::make_unique<VectorStreamGroup>(pool_);
auto rowType = asRowType(output->type());
current_->createStreamTree(rowType, rowsInCurrent_);
serializer::presto::PrestoVectorSerde::PrestoOptions options;
options.compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
options.minCompressionRatio = PartitionedOutput::minCompressionRatio();
current_->createStreamTree(rowType, rowsInCurrent_, &options);
}
current_->append(
output, folly::Range(&rows_[firstRow], rowIdx_ - firstRow), scratch);
Expand Down Expand Up @@ -107,6 +110,18 @@ BlockingReason Destination::flush(
return blocked ? BlockingReason::kWaitForConsumer
: BlockingReason::kNotBlocked;
}

void Destination::updateStats(Operator* op) {
VELOX_CHECK(finished_);
if (current_) {
const auto serializerStats = current_->runtimeStats();
auto lockedStats = op->stats().wlock();
for (auto& pair : serializerStats) {
lockedStats->addRuntimeStat(pair.first, pair.second);
}
}
}

} // namespace detail

PartitionedOutput::PartitionedOutput(
Expand Down Expand Up @@ -366,6 +381,7 @@ RowVectorPtr PartitionedOutput::getOutput() {
}
destination->flush(*bufferManager, bufferReleaseFn_, nullptr);
destination->setFinished();
destination->updateStats(this);
}

bufferManager->noMoreData(operatorCtx_->task()->taskId());
Expand Down
15 changes: 15 additions & 0 deletions velox/exec/PartitionedOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ class Destination {
return bytesInCurrent_;
}

/// Adds stats from 'this' to runtime stats of 'op'.
void updateStats(Operator* op);

private:
// Sets the next target size for flushing. This is called at the
// start of each batch of output for the destination. The effect is
Expand Down Expand Up @@ -182,6 +185,14 @@ class PartitionedOutput : public Operator {
destinations_.clear();
}

static void testingSetMinCompressionRatio(float ratio) {
minCompressionRatio_ = ratio;
}

static float minCompressionRatio() {
return minCompressionRatio_;
}

private:
void initializeInput(RowVectorPtr input);

Expand All @@ -194,6 +205,10 @@ class PartitionedOutput : public Operator {
/// Collect all rows with null keys into nullRows_.
void collectNullRows();

// If compression in serde is enabled, this is the minimum compression that
// must be achieved before starting to skip compression. Used for testing.
inline static float minCompressionRatio_ = 0.8;

const std::vector<column_index_t> keyChannels_;
const int numDestinations_;
const bool replicateNullsAndAny_;
Expand Down
57 changes: 57 additions & 0 deletions velox/exec/tests/MultiFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "velox/dwio/common/tests/utils/BatchMaker.h"
#include "velox/exec/Exchange.h"
#include "velox/exec/OutputBufferManager.h"
#include "velox/exec/PartitionedOutput.h"
#include "velox/exec/PlanNodeStats.h"
#include "velox/exec/RoundRobinPartitionFunction.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
Expand Down Expand Up @@ -2000,5 +2001,61 @@ TEST_F(MultiFragmentTest, mergeSmallBatchesInExchange) {
test(100'000, 1);
}

TEST_F(MultiFragmentTest, compression) {
bufferManager_->testingSetCompression(
common::CompressionKind::CompressionKind_LZ4);
auto guard = folly::makeGuard([&]() {
bufferManager_->testingSetCompression(
common::CompressionKind::CompressionKind_NONE);
});

constexpr int32_t kNumRepeats = 1'000'000;
const auto data = makeRowVector({makeFlatVector<int64_t>({1, 2, 3})});

const auto producerPlan = test::PlanBuilder()
.values({data}, false, kNumRepeats)
.partitionedOutput({}, 1)
.planNode();
const auto producerTaskId = "local://t1";

const auto plan = test::PlanBuilder()
.exchange(asRowType(data->type()))
.singleAggregation({}, {"sum(c0)"})
.planNode();

const auto expected =
makeRowVector({makeFlatVector<int64_t>(std::vector<int64_t>{6000000})});

const auto test = [&](float minCompressionRatio, bool expectSkipCompression) {
PartitionedOutput::testingSetMinCompressionRatio(minCompressionRatio);
auto producerTask = makeTask(producerTaskId, producerPlan);
producerTask->start(1);

auto consumerTask = test::AssertQueryBuilder(plan)
.split(remoteSplit(producerTaskId))
.destination(0)
.assertResults(expected);

auto consumerTaskStats = exec::toPlanStats(consumerTask->taskStats());
const auto& consumerPlanStats = consumerTaskStats.at("0");
ASSERT_EQ(data->size() * kNumRepeats, consumerPlanStats.outputRows);

auto producerTaskStats = exec::toPlanStats(producerTask->taskStats());
const auto& producerStats = producerTaskStats.at("1");
// The data is extremely compressible, 1, 2, 3 repeated 1000000 times.
if (!expectSkipCompression) {
EXPECT_LT(
producerStats.customStats.at("compressedBytes").sum,
producerStats.customStats.at("compressionInputBytes").sum);
EXPECT_EQ(0, producerStats.customStats.at("compressionSkippedBytes").sum);
} else {
EXPECT_LT(0, producerStats.customStats.at("compressionSkippedBytes").sum);
}
};

test(0.7, false);
test(0.0000001, true);
}

} // namespace
} // namespace facebook::velox::exec
Loading

0 comments on commit a9b6559

Please sign in to comment.