Skip to content

Commit

Permalink
make PrestoSerializer compression compatible with Presto and add stats (
Browse files Browse the repository at this point in the history
facebookincubator#8851)

Summary:
Presto uses the uncompressed size as part of checksum, not the compressed size. Presto also does not use the compression result unless it is under 0,8 x the source size. Presto does not require the compressed bit to be set if a compressing exchange codec is in place.

Adds counters to serializers for tracking compression efficacity.

Adds a setting to OutputBufferManager to decide if compression should be attempted for exchange.

Compatibly with Presto, uses uncompressed output if compression yields less then 20% savings. In addition to Presto, skips attempting compression for a number of forthcoming batches if compression was not effective.

Reports the compression input bytes, compressed bytes and bytes for which compression was not attempted in runtimeStats.

Pull Request resolved: facebookincubator#8851

Reviewed By: xiaoxmeng

Differential Revision: D54178975

Pulled By: oerling

fbshipit-source-id: 389d7c0d1f9590468c91a405c5c5e5834d29c0da
  • Loading branch information
Orri Erling authored and facebook-github-bot committed Mar 14, 2024
1 parent c71ed7d commit 177ff56
Show file tree
Hide file tree
Showing 10 changed files with 344 additions and 61 deletions.
2 changes: 1 addition & 1 deletion velox/exec/Exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ RowVectorPtr Exchange::getOutput() {

while (!inputStream.atEnd()) {
getSerde()->deserialize(
&inputStream, pool(), outputType_, &result_, resultOffset);
&inputStream, pool(), outputType_, &result_, resultOffset, &options_);
resultOffset = result_->size();
}
}
Expand Down
8 changes: 7 additions & 1 deletion velox/exec/Exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <random>
#include "velox/exec/ExchangeClient.h"
#include "velox/exec/Operator.h"
#include "velox/exec/OutputBufferManager.h"
#include "velox/serializers/PrestoSerializer.h"

namespace facebook::velox::exec {

Expand Down Expand Up @@ -50,7 +52,10 @@ class Exchange : public SourceOperator {
preferredOutputBatchBytes_{
driverCtx->queryConfig().preferredOutputBatchBytes()},
processSplits_{operatorCtx_->driverCtx()->driverId == 0},
exchangeClient_{std::move(exchangeClient)} {}
exchangeClient_{std::move(exchangeClient)} {
options_.compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
}

~Exchange() override {
close();
Expand Down Expand Up @@ -105,6 +110,7 @@ class Exchange : public SourceOperator {
std::vector<std::unique_ptr<SerializedPage>> currentPages_;
bool atEnd_{false};
std::default_random_engine rng_{std::random_device{}()};
serializer::presto::PrestoVectorSerde::PrestoOptions options_;
};

} // namespace facebook::velox::exec
14 changes: 12 additions & 2 deletions velox/exec/OutputBufferManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,21 @@
#include "velox/exec/Task.h"

namespace facebook::velox::exec {
// static
void OutputBufferManager::initialize(const Options& options) {
std::lock_guard<std::mutex> l(initMutex_);
VELOX_CHECK(
instance_ == nullptr, "May initialize OutputBufferManager only once");
instance_ = std::make_shared<OutputBufferManager>(options);
}

// static
std::weak_ptr<OutputBufferManager> OutputBufferManager::getInstance() {
static auto kInstance = std::make_shared<OutputBufferManager>();
return kInstance;
std::lock_guard<std::mutex> l(initMutex_);
if (!instance_) {
instance_ = std::make_shared<OutputBufferManager>(Options());
}
return instance_;
}

std::shared_ptr<OutputBuffer> OutputBufferManager::getBuffer(
Expand Down
29 changes: 29 additions & 0 deletions velox/exec/OutputBufferManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ namespace facebook::velox::exec {

class OutputBufferManager {
public:
/// Options for shuffle. This is initialized once and affects both
/// PartitionedOutput and Exchange. This can be used for controlling
/// compression, protocol version and other matters where shuffle sides should
/// agree.
struct Options {
common::CompressionKind compressionKind{
common::CompressionKind::CompressionKind_NONE};
};

OutputBufferManager(Options options)
: compressionKind_(options.compressionKind) {}

void initializeTask(
std::shared_ptr<Task> task,
core::PartitionedOutputNode::Kind kind,
Expand Down Expand Up @@ -84,6 +96,10 @@ class OutputBufferManager {

void removeTask(const std::string& taskId);

/// Initializes singleton with 'options'. May be called once before
/// getInstance().
static void initialize(const Options& options);

static std::weak_ptr<OutputBufferManager> getInstance();

uint64_t numBuffers() const;
Expand Down Expand Up @@ -117,17 +133,30 @@ class OutputBufferManager {
// Returns NULL if task not found.
std::shared_ptr<OutputBuffer> getBufferIfExists(const std::string& taskId);

void testingSetCompression(common::CompressionKind kind) {
*const_cast<common::CompressionKind*>(&compressionKind_) = kind;
}

common::CompressionKind compressionKind() const {
return compressionKind_;
}

private:
// Retrieves the set of buffers for a query.
// Throws an exception if buffer doesn't exist.
std::shared_ptr<OutputBuffer> getBuffer(const std::string& taskId);

const common::CompressionKind compressionKind_;

folly::Synchronized<
std::unordered_map<std::string, std::shared_ptr<OutputBuffer>>,
std::mutex>
buffers_;

std::function<std::unique_ptr<OutputStreamListener>()> listenerFactory_{
nullptr};

inline static std::shared_ptr<OutputBufferManager> instance_;
inline static std::mutex initMutex_;
};
} // namespace facebook::velox::exec
20 changes: 18 additions & 2 deletions velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "velox/exec/Task.h"

namespace facebook::velox::exec {

namespace detail {
BlockingReason Destination::advance(
uint64_t maxBytes,
Expand Down Expand Up @@ -55,7 +54,11 @@ BlockingReason Destination::advance(
if (!current_) {
current_ = std::make_unique<VectorStreamGroup>(pool_);
auto rowType = asRowType(output->type());
current_->createStreamTree(rowType, rowsInCurrent_);
serializer::presto::PrestoVectorSerde::PrestoOptions options;
options.compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
options.minCompressionRatio = PartitionedOutput::minCompressionRatio();
current_->createStreamTree(rowType, rowsInCurrent_, &options);
}
current_->append(
output, folly::Range(&rows_[firstRow], rowIdx_ - firstRow), scratch);
Expand Down Expand Up @@ -107,6 +110,18 @@ BlockingReason Destination::flush(
return blocked ? BlockingReason::kWaitForConsumer
: BlockingReason::kNotBlocked;
}

void Destination::updateStats(Operator* op) {
VELOX_CHECK(finished_);
if (current_) {
const auto serializerStats = current_->runtimeStats();
auto lockedStats = op->stats().wlock();
for (auto& pair : serializerStats) {
lockedStats->addRuntimeStat(pair.first, pair.second);
}
}
}

} // namespace detail

PartitionedOutput::PartitionedOutput(
Expand Down Expand Up @@ -366,6 +381,7 @@ RowVectorPtr PartitionedOutput::getOutput() {
}
destination->flush(*bufferManager, bufferReleaseFn_, nullptr);
destination->setFinished();
destination->updateStats(this);
}

bufferManager->noMoreData(operatorCtx_->task()->taskId());
Expand Down
15 changes: 15 additions & 0 deletions velox/exec/PartitionedOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ class Destination {
return bytesInCurrent_;
}

/// Adds stats from 'this' to runtime stats of 'op'.
void updateStats(Operator* op);

private:
// Sets the next target size for flushing. This is called at the
// start of each batch of output for the destination. The effect is
Expand Down Expand Up @@ -182,6 +185,14 @@ class PartitionedOutput : public Operator {
destinations_.clear();
}

static void testingSetMinCompressionRatio(float ratio) {
minCompressionRatio_ = ratio;
}

static float minCompressionRatio() {
return minCompressionRatio_;
}

private:
void initializeInput(RowVectorPtr input);

Expand All @@ -194,6 +205,10 @@ class PartitionedOutput : public Operator {
/// Collect all rows with null keys into nullRows_.
void collectNullRows();

// If compression in serde is enabled, this is the minimum compression that
// must be achieved before starting to skip compression. Used for testing.
inline static float minCompressionRatio_ = 0.8;

const std::vector<column_index_t> keyChannels_;
const int numDestinations_;
const bool replicateNullsAndAny_;
Expand Down
57 changes: 57 additions & 0 deletions velox/exec/tests/MultiFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "velox/dwio/common/tests/utils/BatchMaker.h"
#include "velox/exec/Exchange.h"
#include "velox/exec/OutputBufferManager.h"
#include "velox/exec/PartitionedOutput.h"
#include "velox/exec/PlanNodeStats.h"
#include "velox/exec/RoundRobinPartitionFunction.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
Expand Down Expand Up @@ -2000,5 +2001,61 @@ TEST_F(MultiFragmentTest, mergeSmallBatchesInExchange) {
test(100'000, 1);
}

TEST_F(MultiFragmentTest, compression) {
bufferManager_->testingSetCompression(
common::CompressionKind::CompressionKind_LZ4);
auto guard = folly::makeGuard([&]() {
bufferManager_->testingSetCompression(
common::CompressionKind::CompressionKind_NONE);
});

constexpr int32_t kNumRepeats = 1'000'000;
const auto data = makeRowVector({makeFlatVector<int64_t>({1, 2, 3})});

const auto producerPlan = test::PlanBuilder()
.values({data}, false, kNumRepeats)
.partitionedOutput({}, 1)
.planNode();
const auto producerTaskId = "local://t1";

const auto plan = test::PlanBuilder()
.exchange(asRowType(data->type()))
.singleAggregation({}, {"sum(c0)"})
.planNode();

const auto expected =
makeRowVector({makeFlatVector<int64_t>(std::vector<int64_t>{6000000})});

const auto test = [&](float minCompressionRatio, bool expectSkipCompression) {
PartitionedOutput::testingSetMinCompressionRatio(minCompressionRatio);
auto producerTask = makeTask(producerTaskId, producerPlan);
producerTask->start(1);

auto consumerTask = test::AssertQueryBuilder(plan)
.split(remoteSplit(producerTaskId))
.destination(0)
.assertResults(expected);

auto consumerTaskStats = exec::toPlanStats(consumerTask->taskStats());
const auto& consumerPlanStats = consumerTaskStats.at("0");
ASSERT_EQ(data->size() * kNumRepeats, consumerPlanStats.outputRows);

auto producerTaskStats = exec::toPlanStats(producerTask->taskStats());
const auto& producerStats = producerTaskStats.at("1");
// The data is extremely compressible, 1, 2, 3 repeated 1000000 times.
if (!expectSkipCompression) {
EXPECT_LT(
producerStats.customStats.at("compressedBytes").sum,
producerStats.customStats.at("compressionInputBytes").sum);
EXPECT_EQ(0, producerStats.customStats.at("compressionSkippedBytes").sum);
} else {
EXPECT_LT(0, producerStats.customStats.at("compressionSkippedBytes").sum);
}
};

test(0.7, false);
test(0.0000001, true);
}

} // namespace
} // namespace facebook::velox::exec
Loading

0 comments on commit 177ff56

Please sign in to comment.