-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
make PrestoSerializer compression compatible with Presto and add stats #8851
Conversation
✅ Deploy Preview for meta-velox canceled.
|
@oerling has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
c657c51
to
7b68fe4
Compare
velox/common/memory/StreamArena.h
Outdated
@@ -62,6 +62,17 @@ class StreamArena { | |||
return pool_; | |||
} | |||
|
|||
/// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment?
velox/common/memory/ByteStream.h
Outdated
@@ -247,6 +247,11 @@ class ByteOutputStream { | |||
} | |||
|
|||
void startWrite(int32_t initialSize) { | |||
ranges_.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is related to this change? Thanks!
velox/exec/Exchange.h
Outdated
@@ -105,6 +108,7 @@ class Exchange : public SourceOperator { | |||
std::vector<std::unique_ptr<SerializedPage>> currentPages_; | |||
bool atEnd_{false}; | |||
std::default_random_engine rng_{std::random_device{}()}; | |||
const common::CompressionKind compressionKind_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move const member first?
velox/exec/Exchange.cpp
Outdated
@@ -122,8 +123,11 @@ RowVectorPtr Exchange::getOutput() { | |||
auto inputStream = page->prepareStreamForDeserialize(); | |||
|
|||
while (!inputStream.atEnd()) { | |||
serializer::presto::PrestoVectorSerde::PrestoOptions options; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this a member of Exchange object? Thanks!
velox/exec/OutputBufferManager.h
Outdated
@@ -117,6 +117,14 @@ class OutputBufferManager { | |||
// Returns NULL if task not found. | |||
std::shared_ptr<OutputBuffer> getBufferIfExists(const std::string& taskId); | |||
|
|||
void enableCompression(common::CompressionKind kind) { | |||
compressionKind_ = kind; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add check that kind is not None?
@@ -3493,6 +3500,7 @@ void flushCompressed( | |||
} | |||
|
|||
writeInt32(output, numRows); | |||
auto maskOffset = output->tellp(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const auto maskOffset =
const int32_t compressedSize = compressed->length(); | ||
writeInt32(output, uncompressedSize); | ||
if (compressedSize > uncompressedSize * 0.8) { | ||
writeInt32(output, uncompressedSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the compressed and uncompressed path share many code except the actual buffer write.
@@ -77,18 +77,24 @@ int64_t computeChecksum( | |||
ByteInputStream* source, | |||
int codecMarker, | |||
int numRows, | |||
int uncompressedSize) { | |||
int uncompressedSize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we use the same type for compressed and uncompressed size? Thanks!
} | ||
|
||
void flushCompressed( | ||
std::pair<int64_t, int64_t> flushCompressed( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
struct FlushSizes {
uint64_t compressedSize;
uint64_t uncompressedSize;
}
makeFlatVector<int32_t>(3'000, [](auto row) { return 1 + row % 3; }), | ||
}); | ||
|
||
auto test = [&](uint64_t maxBytes, int32_t expectedBatches) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is maxBytes used?
@oerling has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@oerling thanks for the update. It looks pretty good now!
velox/exec/Exchange.h
Outdated
@@ -105,6 +110,7 @@ class Exchange : public SourceOperator { | |||
std::vector<std::unique_ptr<SerializedPage>> currentPages_; | |||
bool atEnd_{false}; | |||
std::default_random_engine rng_{std::random_device{}()}; | |||
serializer::presto::PrestoVectorSerde::Options options_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NYC: maybe we shall consider to have a constructor for serializer::presto::PrestoVectorSerde::Options and many places we initialize the Options with a compression kind.
if (!instance_) { | ||
instance_ = std::make_shared<OutputBufferManager>(compressionKind); | ||
} | ||
return instance_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to add a setInstance() API so we can make Prestissimo to use that one?
return uncompressedSize; | ||
} | ||
namespace { | ||
struct FlushSizes { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you comment the structure say if uncompressedSize and compressedSize are the same, then there is no compression.
} | ||
}; | ||
|
||
test(0.7, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a false case by setting its to 1.0? Thanks!
@oerling has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
facebookincubator#8851) Summary: Presto uses the uncompressed size as part of checksum, not the compressed size. Presto also does not use the compression result unless it is under 0,8 x the source size. Presto does not require the compressed bit to be set if a compressing exchange codec is in place. Adds counters to serializers for tracking compression efficacity. Adds a setting to OutputBufferManager to decide if compression should be attempted for exchange. Compatibly with Presto, uses uncompressed output if compression yields less then 20% savings. In addition to Presto, skips attempting compression for a number of forthcoming batches if compression was not effective. Reports the compression input bytes, compressed bytes and bytes for which compression was not attempted in runtimeStats. Reviewed By: xiaoxmeng Differential Revision: D54178975 Pulled By: oerling
This pull request was exported from Phabricator. Differential Revision: D54178975 |
facebookincubator#8851) Summary: Presto uses the uncompressed size as part of checksum, not the compressed size. Presto also does not use the compression result unless it is under 0,8 x the source size. Presto does not require the compressed bit to be set if a compressing exchange codec is in place. Adds counters to serializers for tracking compression efficacity. Adds a setting to OutputBufferManager to decide if compression should be attempted for exchange. Compatibly with Presto, uses uncompressed output if compression yields less then 20% savings. In addition to Presto, skips attempting compression for a number of forthcoming batches if compression was not effective. Reports the compression input bytes, compressed bytes and bytes for which compression was not attempted in runtimeStats. Pull Request resolved: facebookincubator#8851 Reviewed By: xiaoxmeng Differential Revision: D54178975 Pulled By: oerling fbshipit-source-id: 389d7c0d1f9590468c91a405c5c5e5834d29c0da
Presto uses the uncompressed size as part of checksum, not the compressed size. Presto also does not use the compression result unless it is under 0,8 x the source size. Presto does not require the compressed bit to be set if a compressing exchange codec is in place.
Adds counters to serializers for tracking compression efficacity.
Adds a setting to OutputBufferManager to decide if compression should be attempted for exchange.
Compatibly with Presto, uses uncompressed output if compression yields less then 20% savings. In addition to Presto, skips attempting compression for a number of forthcoming batches if compression was not effective.
Reports the compression input bytes, compressed bytes and bytes for which compression was not attempted in runtimeStats.