Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make PrestoSerializer compression compatible with Presto and add stats #8851

Closed
wants to merge 1 commit into from

Conversation

oerling
Copy link
Contributor

@oerling oerling commented Feb 26, 2024

Presto uses the uncompressed size as part of checksum, not the compressed size. Presto also does not use the compression result unless it is under 0,8 x the source size. Presto does not require the compressed bit to be set if a compressing exchange codec is in place.

Adds counters to serializers for tracking compression efficacity.

Adds a setting to OutputBufferManager to decide if compression should be attempted for exchange.

Compatibly with Presto, uses uncompressed output if compression yields less then 20% savings. In addition to Presto, skips attempting compression for a number of forthcoming batches if compression was not effective.

Reports the compression input bytes, compressed bytes and bytes for which compression was not attempted in runtimeStats.

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Feb 26, 2024
Copy link

netlify bot commented Feb 26, 2024

Deploy Preview for meta-velox canceled.

Name Link
🔨 Latest commit 0fc334f
🔍 Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/65f2a4127b22a60008326ee7

@facebook-github-bot
Copy link
Contributor

@oerling has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@oerling oerling requested a review from Yuhta February 26, 2024 03:19
@oerling oerling force-pushed the ser-comp-pr branch 2 times, most recently from c657c51 to 7b68fe4 Compare March 7, 2024 18:50
@@ -62,6 +62,17 @@ class StreamArena {
return pool_;
}

///
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment?

@@ -247,6 +247,11 @@ class ByteOutputStream {
}

void startWrite(int32_t initialSize) {
ranges_.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to this change? Thanks!

@@ -105,6 +108,7 @@ class Exchange : public SourceOperator {
std::vector<std::unique_ptr<SerializedPage>> currentPages_;
bool atEnd_{false};
std::default_random_engine rng_{std::random_device{}()};
const common::CompressionKind compressionKind_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move const member first?

@@ -122,8 +123,11 @@ RowVectorPtr Exchange::getOutput() {
auto inputStream = page->prepareStreamForDeserialize();

while (!inputStream.atEnd()) {
serializer::presto::PrestoVectorSerde::PrestoOptions options;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this a member of Exchange object? Thanks!

@@ -117,6 +117,14 @@ class OutputBufferManager {
// Returns NULL if task not found.
std::shared_ptr<OutputBuffer> getBufferIfExists(const std::string& taskId);

void enableCompression(common::CompressionKind kind) {
compressionKind_ = kind;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add check that kind is not None?

velox/serializers/PrestoSerializer.cpp Outdated Show resolved Hide resolved
velox/serializers/PrestoSerializer.cpp Outdated Show resolved Hide resolved
@@ -3493,6 +3500,7 @@ void flushCompressed(
}

writeInt32(output, numRows);
auto maskOffset = output->tellp();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const auto maskOffset =

velox/serializers/PrestoSerializer.cpp Outdated Show resolved Hide resolved
velox/serializers/PrestoSerializer.cpp Outdated Show resolved Hide resolved
velox/serializers/PrestoSerializer.cpp Outdated Show resolved Hide resolved
const int32_t compressedSize = compressed->length();
writeInt32(output, uncompressedSize);
if (compressedSize > uncompressedSize * 0.8) {
writeInt32(output, uncompressedSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the compressed and uncompressed path share many code except the actual buffer write.

velox/serializers/PrestoSerializer.cpp Outdated Show resolved Hide resolved
@@ -77,18 +77,24 @@ int64_t computeChecksum(
ByteInputStream* source,
int codecMarker,
int numRows,
int uncompressedSize) {
int uncompressedSize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use the same type for compressed and uncompressed size? Thanks!

velox/serializers/PrestoSerializer.cpp Outdated Show resolved Hide resolved
}

void flushCompressed(
std::pair<int64_t, int64_t> flushCompressed(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

struct FlushSizes {
   uint64_t compressedSize;
   uint64_t uncompressedSize;
}

velox/serializers/PrestoSerializer.cpp Outdated Show resolved Hide resolved
velox/serializers/PrestoSerializer.cpp Outdated Show resolved Hide resolved
velox/serializers/PrestoSerializer.cpp Outdated Show resolved Hide resolved
velox/serializers/PrestoSerializer.cpp Show resolved Hide resolved
velox/exec/tests/MultiFragmentTest.cpp Outdated Show resolved Hide resolved
velox/exec/tests/MultiFragmentTest.cpp Outdated Show resolved Hide resolved
makeFlatVector<int32_t>(3'000, [](auto row) { return 1 + row % 3; }),
});

auto test = [&](uint64_t maxBytes, int32_t expectedBatches) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is maxBytes used?

@facebook-github-bot
Copy link
Contributor

@oerling has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

Copy link
Contributor

@xiaoxmeng xiaoxmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oerling thanks for the update. It looks pretty good now!

@@ -105,6 +110,7 @@ class Exchange : public SourceOperator {
std::vector<std::unique_ptr<SerializedPage>> currentPages_;
bool atEnd_{false};
std::default_random_engine rng_{std::random_device{}()};
serializer::presto::PrestoVectorSerde::Options options_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NYC: maybe we shall consider to have a constructor for serializer::presto::PrestoVectorSerde::Options and many places we initialize the Options with a compression kind.

velox/exec/OutputBufferManager.h Outdated Show resolved Hide resolved
velox/exec/OutputBufferManager.h Show resolved Hide resolved
if (!instance_) {
instance_ = std::make_shared<OutputBufferManager>(compressionKind);
}
return instance_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to add a setInstance() API so we can make Prestissimo to use that one?

velox/exec/PartitionedOutput.h Outdated Show resolved Hide resolved
return uncompressedSize;
}
namespace {
struct FlushSizes {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment the structure say if uncompressedSize and compressedSize are the same, then there is no compression.

}
};

test(0.7, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a false case by setting its to 1.0? Thanks!

@facebook-github-bot
Copy link
Contributor

@oerling has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

facebookincubator#8851)

Summary:
Presto uses the uncompressed size as part of checksum, not the compressed size. Presto also does not use the compression result unless it is under 0,8 x the source size. Presto does not require the compressed bit to be set if a compressing exchange codec is in place.

Adds counters to serializers for tracking compression efficacity.

Adds a setting to OutputBufferManager to decide if compression should be attempted for exchange.

Compatibly with Presto, uses uncompressed output if compression yields less then 20% savings. In addition to Presto, skips attempting compression for a number of forthcoming batches if compression was not effective.

Reports the compression input bytes, compressed bytes and bytes for which compression was not attempted in runtimeStats.


Reviewed By: xiaoxmeng

Differential Revision: D54178975

Pulled By: oerling
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D54178975

@facebook-github-bot
Copy link
Contributor

@oerling merged this pull request in 177ff56.

Joe-Abraham pushed a commit to Joe-Abraham/velox that referenced this pull request Jun 7, 2024
facebookincubator#8851)

Summary:
Presto uses the uncompressed size as part of checksum, not the compressed size. Presto also does not use the compression result unless it is under 0,8 x the source size. Presto does not require the compressed bit to be set if a compressing exchange codec is in place.

Adds counters to serializers for tracking compression efficacity.

Adds a setting to OutputBufferManager to decide if compression should be attempted for exchange.

Compatibly with Presto, uses uncompressed output if compression yields less then 20% savings. In addition to Presto, skips attempting compression for a number of forthcoming batches if compression was not effective.

Reports the compression input bytes, compressed bytes and bytes for which compression was not attempted in runtimeStats.

Pull Request resolved: facebookincubator#8851

Reviewed By: xiaoxmeng

Differential Revision: D54178975

Pulled By: oerling

fbshipit-source-id: 389d7c0d1f9590468c91a405c5c5e5834d29c0da
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. fb-exported Merged
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants