Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make PrestoSerializer compression compatible with Presto and add stats #8851

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion velox/exec/Exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ RowVectorPtr Exchange::getOutput() {

while (!inputStream.atEnd()) {
getSerde()->deserialize(
&inputStream, pool(), outputType_, &result_, resultOffset);
&inputStream, pool(), outputType_, &result_, resultOffset, &options_);
resultOffset = result_->size();
}
}
Expand Down
8 changes: 7 additions & 1 deletion velox/exec/Exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <random>
#include "velox/exec/ExchangeClient.h"
#include "velox/exec/Operator.h"
#include "velox/exec/OutputBufferManager.h"
#include "velox/serializers/PrestoSerializer.h"

namespace facebook::velox::exec {

Expand Down Expand Up @@ -50,7 +52,10 @@ class Exchange : public SourceOperator {
preferredOutputBatchBytes_{
driverCtx->queryConfig().preferredOutputBatchBytes()},
processSplits_{operatorCtx_->driverCtx()->driverId == 0},
exchangeClient_{std::move(exchangeClient)} {}
exchangeClient_{std::move(exchangeClient)} {
options_.compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
}

~Exchange() override {
close();
Expand Down Expand Up @@ -105,6 +110,7 @@ class Exchange : public SourceOperator {
std::vector<std::unique_ptr<SerializedPage>> currentPages_;
bool atEnd_{false};
std::default_random_engine rng_{std::random_device{}()};
serializer::presto::PrestoVectorSerde::PrestoOptions options_;
};

} // namespace facebook::velox::exec
14 changes: 12 additions & 2 deletions velox/exec/OutputBufferManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,21 @@
#include "velox/exec/Task.h"

namespace facebook::velox::exec {
// static
void OutputBufferManager::initialize(const Options& options) {
std::lock_guard<std::mutex> l(initMutex_);
VELOX_CHECK(
instance_ == nullptr, "May initialize OutputBufferManager only once");
instance_ = std::make_shared<OutputBufferManager>(options);
}

// static
std::weak_ptr<OutputBufferManager> OutputBufferManager::getInstance() {
static auto kInstance = std::make_shared<OutputBufferManager>();
return kInstance;
std::lock_guard<std::mutex> l(initMutex_);
if (!instance_) {
instance_ = std::make_shared<OutputBufferManager>(Options());
}
return instance_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to add a setInstance() API so we can make Prestissimo to use that one?

}

std::shared_ptr<OutputBuffer> OutputBufferManager::getBuffer(
Expand Down
29 changes: 29 additions & 0 deletions velox/exec/OutputBufferManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ namespace facebook::velox::exec {

class OutputBufferManager {
public:
/// Options for shuffle. This is initialized once and affects both
/// PartitionedOutput and Exchange. This can be used for controlling
/// compression, protocol version and other matters where shuffle sides should
/// agree.
struct Options {
common::CompressionKind compressionKind{
common::CompressionKind::CompressionKind_NONE};
};

OutputBufferManager(Options options)
: compressionKind_(options.compressionKind) {}

void initializeTask(
std::shared_ptr<Task> task,
core::PartitionedOutputNode::Kind kind,
Expand Down Expand Up @@ -84,6 +96,10 @@ class OutputBufferManager {

void removeTask(const std::string& taskId);

/// Initializes singleton with 'options'. May be called once before
/// getInstance().
static void initialize(const Options& options);

static std::weak_ptr<OutputBufferManager> getInstance();

uint64_t numBuffers() const;
Expand Down Expand Up @@ -117,17 +133,30 @@ class OutputBufferManager {
// Returns NULL if task not found.
std::shared_ptr<OutputBuffer> getBufferIfExists(const std::string& taskId);

void testingSetCompression(common::CompressionKind kind) {
*const_cast<common::CompressionKind*>(&compressionKind_) = kind;
}

common::CompressionKind compressionKind() const {
return compressionKind_;
}

private:
// Retrieves the set of buffers for a query.
// Throws an exception if buffer doesn't exist.
std::shared_ptr<OutputBuffer> getBuffer(const std::string& taskId);

const common::CompressionKind compressionKind_;
xiaoxmeng marked this conversation as resolved.
Show resolved Hide resolved

folly::Synchronized<
std::unordered_map<std::string, std::shared_ptr<OutputBuffer>>,
std::mutex>
buffers_;

std::function<std::unique_ptr<OutputStreamListener>()> listenerFactory_{
nullptr};

inline static std::shared_ptr<OutputBufferManager> instance_;
inline static std::mutex initMutex_;
};
} // namespace facebook::velox::exec
20 changes: 18 additions & 2 deletions velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "velox/exec/Task.h"

namespace facebook::velox::exec {

namespace detail {
BlockingReason Destination::advance(
uint64_t maxBytes,
Expand Down Expand Up @@ -55,7 +54,11 @@ BlockingReason Destination::advance(
if (!current_) {
current_ = std::make_unique<VectorStreamGroup>(pool_);
auto rowType = asRowType(output->type());
current_->createStreamTree(rowType, rowsInCurrent_);
serializer::presto::PrestoVectorSerde::PrestoOptions options;
xiaoxmeng marked this conversation as resolved.
Show resolved Hide resolved
options.compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
options.minCompressionRatio = PartitionedOutput::minCompressionRatio();
current_->createStreamTree(rowType, rowsInCurrent_, &options);
}
current_->append(
output, folly::Range(&rows_[firstRow], rowIdx_ - firstRow), scratch);
Expand Down Expand Up @@ -107,6 +110,18 @@ BlockingReason Destination::flush(
return blocked ? BlockingReason::kWaitForConsumer
: BlockingReason::kNotBlocked;
}

void Destination::updateStats(Operator* op) {
xiaoxmeng marked this conversation as resolved.
Show resolved Hide resolved
VELOX_CHECK(finished_);
if (current_) {
xiaoxmeng marked this conversation as resolved.
Show resolved Hide resolved
const auto serializerStats = current_->runtimeStats();
auto lockedStats = op->stats().wlock();
for (auto& pair : serializerStats) {
lockedStats->addRuntimeStat(pair.first, pair.second);
}
}
}

} // namespace detail

PartitionedOutput::PartitionedOutput(
Expand Down Expand Up @@ -366,6 +381,7 @@ RowVectorPtr PartitionedOutput::getOutput() {
}
destination->flush(*bufferManager, bufferReleaseFn_, nullptr);
destination->setFinished();
destination->updateStats(this);
}

bufferManager->noMoreData(operatorCtx_->task()->taskId());
Expand Down
15 changes: 15 additions & 0 deletions velox/exec/PartitionedOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ class Destination {
return bytesInCurrent_;
}

/// Adds stats from 'this' to runtime stats of 'op'.
void updateStats(Operator* op);

private:
// Sets the next target size for flushing. This is called at the
// start of each batch of output for the destination. The effect is
Expand Down Expand Up @@ -182,6 +185,14 @@ class PartitionedOutput : public Operator {
destinations_.clear();
}

static void testingSetMinCompressionRatio(float ratio) {
minCompressionRatio_ = ratio;
}

static float minCompressionRatio() {
return minCompressionRatio_;
}

private:
void initializeInput(RowVectorPtr input);

Expand All @@ -194,6 +205,10 @@ class PartitionedOutput : public Operator {
/// Collect all rows with null keys into nullRows_.
void collectNullRows();

// If compression in serde is enabled, this is the minimum compression that
// must be achieved before starting to skip compression. Used for testing.
inline static float minCompressionRatio_ = 0.8;

const std::vector<column_index_t> keyChannels_;
const int numDestinations_;
const bool replicateNullsAndAny_;
Expand Down
57 changes: 57 additions & 0 deletions velox/exec/tests/MultiFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "velox/dwio/common/tests/utils/BatchMaker.h"
#include "velox/exec/Exchange.h"
#include "velox/exec/OutputBufferManager.h"
#include "velox/exec/PartitionedOutput.h"
#include "velox/exec/PlanNodeStats.h"
#include "velox/exec/RoundRobinPartitionFunction.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
Expand Down Expand Up @@ -2000,5 +2001,61 @@ TEST_F(MultiFragmentTest, mergeSmallBatchesInExchange) {
test(100'000, 1);
}

TEST_F(MultiFragmentTest, compression) {
bufferManager_->testingSetCompression(
common::CompressionKind::CompressionKind_LZ4);
auto guard = folly::makeGuard([&]() {
bufferManager_->testingSetCompression(
common::CompressionKind::CompressionKind_NONE);
});

constexpr int32_t kNumRepeats = 1'000'000;
const auto data = makeRowVector({makeFlatVector<int64_t>({1, 2, 3})});

const auto producerPlan = test::PlanBuilder()
.values({data}, false, kNumRepeats)
.partitionedOutput({}, 1)
.planNode();
const auto producerTaskId = "local://t1";

const auto plan = test::PlanBuilder()
.exchange(asRowType(data->type()))
.singleAggregation({}, {"sum(c0)"})
.planNode();

const auto expected =
makeRowVector({makeFlatVector<int64_t>(std::vector<int64_t>{6000000})});

const auto test = [&](float minCompressionRatio, bool expectSkipCompression) {
PartitionedOutput::testingSetMinCompressionRatio(minCompressionRatio);
auto producerTask = makeTask(producerTaskId, producerPlan);
producerTask->start(1);

auto consumerTask = test::AssertQueryBuilder(plan)
.split(remoteSplit(producerTaskId))
.destination(0)
.assertResults(expected);

auto consumerTaskStats = exec::toPlanStats(consumerTask->taskStats());
const auto& consumerPlanStats = consumerTaskStats.at("0");
ASSERT_EQ(data->size() * kNumRepeats, consumerPlanStats.outputRows);

auto producerTaskStats = exec::toPlanStats(producerTask->taskStats());
const auto& producerStats = producerTaskStats.at("1");
// The data is extremely compressible, 1, 2, 3 repeated 1000000 times.
if (!expectSkipCompression) {
EXPECT_LT(
producerStats.customStats.at("compressedBytes").sum,
producerStats.customStats.at("compressionInputBytes").sum);
EXPECT_EQ(0, producerStats.customStats.at("compressionSkippedBytes").sum);
} else {
EXPECT_LT(0, producerStats.customStats.at("compressionSkippedBytes").sum);
}
};

test(0.7, false);
test(0.0000001, true);
}

} // namespace
} // namespace facebook::velox::exec
Loading
Loading