diff --git a/velox/exec/Exchange.cpp b/velox/exec/Exchange.cpp index 6f09aaf5a9dd..78f91843a4a2 100644 --- a/velox/exec/Exchange.cpp +++ b/velox/exec/Exchange.cpp @@ -123,7 +123,7 @@ RowVectorPtr Exchange::getOutput() { while (!inputStream.atEnd()) { getSerde()->deserialize( - &inputStream, pool(), outputType_, &result_, resultOffset); + &inputStream, pool(), outputType_, &result_, resultOffset, &options_); resultOffset = result_->size(); } } diff --git a/velox/exec/Exchange.h b/velox/exec/Exchange.h index aab0d115ec82..dea346773355 100644 --- a/velox/exec/Exchange.h +++ b/velox/exec/Exchange.h @@ -19,6 +19,8 @@ #include #include "velox/exec/ExchangeClient.h" #include "velox/exec/Operator.h" +#include "velox/exec/OutputBufferManager.h" +#include "velox/serializers/PrestoSerializer.h" namespace facebook::velox::exec { @@ -50,7 +52,10 @@ class Exchange : public SourceOperator { preferredOutputBatchBytes_{ driverCtx->queryConfig().preferredOutputBatchBytes()}, processSplits_{operatorCtx_->driverCtx()->driverId == 0}, - exchangeClient_{std::move(exchangeClient)} {} + exchangeClient_{std::move(exchangeClient)} { + options_.compressionKind = + OutputBufferManager::getInstance().lock()->compressionKind(); + } ~Exchange() override { close(); @@ -105,6 +110,7 @@ class Exchange : public SourceOperator { std::vector> currentPages_; bool atEnd_{false}; std::default_random_engine rng_{std::random_device{}()}; + serializer::presto::PrestoVectorSerde::PrestoOptions options_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/OutputBufferManager.cpp b/velox/exec/OutputBufferManager.cpp index 440fb7fb4aba..0425218d175a 100644 --- a/velox/exec/OutputBufferManager.cpp +++ b/velox/exec/OutputBufferManager.cpp @@ -17,11 +17,21 @@ #include "velox/exec/Task.h" namespace facebook::velox::exec { +// static +void OutputBufferManager::initialize(const Options& options) { + std::lock_guard l(initMutex_); + VELOX_CHECK( + instance_ == nullptr, "May initialize OutputBufferManager only once"); + instance_ = std::make_shared(options); +} // static std::weak_ptr OutputBufferManager::getInstance() { - static auto kInstance = std::make_shared(); - return kInstance; + std::lock_guard l(initMutex_); + if (!instance_) { + instance_ = std::make_shared(Options()); + } + return instance_; } std::shared_ptr OutputBufferManager::getBuffer( diff --git a/velox/exec/OutputBufferManager.h b/velox/exec/OutputBufferManager.h index 410be0bed83e..90df81a9abba 100644 --- a/velox/exec/OutputBufferManager.h +++ b/velox/exec/OutputBufferManager.h @@ -21,6 +21,18 @@ namespace facebook::velox::exec { class OutputBufferManager { public: + /// Options for shuffle. This is initialized once and affects both + /// PartitionedOutput and Exchange. This can be used for controlling + /// compression, protocol version and other matters where shuffle sides should + /// agree. + struct Options { + common::CompressionKind compressionKind{ + common::CompressionKind::CompressionKind_NONE}; + }; + + OutputBufferManager(Options options) + : compressionKind_(options.compressionKind) {} + void initializeTask( std::shared_ptr task, core::PartitionedOutputNode::Kind kind, @@ -84,6 +96,10 @@ class OutputBufferManager { void removeTask(const std::string& taskId); + /// Initializes singleton with 'options'. May be called once before + /// getInstance(). + static void initialize(const Options& options); + static std::weak_ptr getInstance(); uint64_t numBuffers() const; @@ -117,11 +133,21 @@ class OutputBufferManager { // Returns NULL if task not found. std::shared_ptr getBufferIfExists(const std::string& taskId); + void testingSetCompression(common::CompressionKind kind) { + *const_cast(&compressionKind_) = kind; + } + + common::CompressionKind compressionKind() const { + return compressionKind_; + } + private: // Retrieves the set of buffers for a query. // Throws an exception if buffer doesn't exist. std::shared_ptr getBuffer(const std::string& taskId); + const common::CompressionKind compressionKind_; + folly::Synchronized< std::unordered_map>, std::mutex> @@ -129,5 +155,8 @@ class OutputBufferManager { std::function()> listenerFactory_{ nullptr}; + + inline static std::shared_ptr instance_; + inline static std::mutex initMutex_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index 098f06abfc23..25a186780541 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -19,7 +19,6 @@ #include "velox/exec/Task.h" namespace facebook::velox::exec { - namespace detail { BlockingReason Destination::advance( uint64_t maxBytes, @@ -55,7 +54,11 @@ BlockingReason Destination::advance( if (!current_) { current_ = std::make_unique(pool_); auto rowType = asRowType(output->type()); - current_->createStreamTree(rowType, rowsInCurrent_); + serializer::presto::PrestoVectorSerde::PrestoOptions options; + options.compressionKind = + OutputBufferManager::getInstance().lock()->compressionKind(); + options.minCompressionRatio = PartitionedOutput::minCompressionRatio(); + current_->createStreamTree(rowType, rowsInCurrent_, &options); } current_->append( output, folly::Range(&rows_[firstRow], rowIdx_ - firstRow), scratch); @@ -107,6 +110,18 @@ BlockingReason Destination::flush( return blocked ? BlockingReason::kWaitForConsumer : BlockingReason::kNotBlocked; } + +void Destination::updateStats(Operator* op) { + VELOX_CHECK(finished_); + if (current_) { + const auto serializerStats = current_->runtimeStats(); + auto lockedStats = op->stats().wlock(); + for (auto& pair : serializerStats) { + lockedStats->addRuntimeStat(pair.first, pair.second); + } + } +} + } // namespace detail PartitionedOutput::PartitionedOutput( @@ -366,6 +381,7 @@ RowVectorPtr PartitionedOutput::getOutput() { } destination->flush(*bufferManager, bufferReleaseFn_, nullptr); destination->setFinished(); + destination->updateStats(this); } bufferManager->noMoreData(operatorCtx_->task()->taskId()); diff --git a/velox/exec/PartitionedOutput.h b/velox/exec/PartitionedOutput.h index 511494596745..28ef0877458d 100644 --- a/velox/exec/PartitionedOutput.h +++ b/velox/exec/PartitionedOutput.h @@ -85,6 +85,9 @@ class Destination { return bytesInCurrent_; } + /// Adds stats from 'this' to runtime stats of 'op'. + void updateStats(Operator* op); + private: // Sets the next target size for flushing. This is called at the // start of each batch of output for the destination. The effect is @@ -182,6 +185,14 @@ class PartitionedOutput : public Operator { destinations_.clear(); } + static void testingSetMinCompressionRatio(float ratio) { + minCompressionRatio_ = ratio; + } + + static float minCompressionRatio() { + return minCompressionRatio_; + } + private: void initializeInput(RowVectorPtr input); @@ -194,6 +205,10 @@ class PartitionedOutput : public Operator { /// Collect all rows with null keys into nullRows_. void collectNullRows(); + // If compression in serde is enabled, this is the minimum compression that + // must be achieved before starting to skip compression. Used for testing. + inline static float minCompressionRatio_ = 0.8; + const std::vector keyChannels_; const int numDestinations_; const bool replicateNullsAndAny_; diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index 63c654156b3a..64fded225d0a 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -22,6 +22,7 @@ #include "velox/dwio/common/tests/utils/BatchMaker.h" #include "velox/exec/Exchange.h" #include "velox/exec/OutputBufferManager.h" +#include "velox/exec/PartitionedOutput.h" #include "velox/exec/PlanNodeStats.h" #include "velox/exec/RoundRobinPartitionFunction.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" @@ -2000,5 +2001,61 @@ TEST_F(MultiFragmentTest, mergeSmallBatchesInExchange) { test(100'000, 1); } +TEST_F(MultiFragmentTest, compression) { + bufferManager_->testingSetCompression( + common::CompressionKind::CompressionKind_LZ4); + auto guard = folly::makeGuard([&]() { + bufferManager_->testingSetCompression( + common::CompressionKind::CompressionKind_NONE); + }); + + constexpr int32_t kNumRepeats = 1'000'000; + const auto data = makeRowVector({makeFlatVector({1, 2, 3})}); + + const auto producerPlan = test::PlanBuilder() + .values({data}, false, kNumRepeats) + .partitionedOutput({}, 1) + .planNode(); + const auto producerTaskId = "local://t1"; + + const auto plan = test::PlanBuilder() + .exchange(asRowType(data->type())) + .singleAggregation({}, {"sum(c0)"}) + .planNode(); + + const auto expected = + makeRowVector({makeFlatVector(std::vector{6000000})}); + + const auto test = [&](float minCompressionRatio, bool expectSkipCompression) { + PartitionedOutput::testingSetMinCompressionRatio(minCompressionRatio); + auto producerTask = makeTask(producerTaskId, producerPlan); + producerTask->start(1); + + auto consumerTask = test::AssertQueryBuilder(plan) + .split(remoteSplit(producerTaskId)) + .destination(0) + .assertResults(expected); + + auto consumerTaskStats = exec::toPlanStats(consumerTask->taskStats()); + const auto& consumerPlanStats = consumerTaskStats.at("0"); + ASSERT_EQ(data->size() * kNumRepeats, consumerPlanStats.outputRows); + + auto producerTaskStats = exec::toPlanStats(producerTask->taskStats()); + const auto& producerStats = producerTaskStats.at("1"); + // The data is extremely compressible, 1, 2, 3 repeated 1000000 times. + if (!expectSkipCompression) { + EXPECT_LT( + producerStats.customStats.at("compressedBytes").sum, + producerStats.customStats.at("compressionInputBytes").sum); + EXPECT_EQ(0, producerStats.customStats.at("compressionSkippedBytes").sum); + } else { + EXPECT_LT(0, producerStats.customStats.at("compressionSkippedBytes").sum); + } + }; + + test(0.7, false); + test(0.0000001, true); +} + } // namespace } // namespace facebook::velox::exec diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 7d692ffff6a7..c76073790147 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -77,18 +77,24 @@ int64_t computeChecksum( ByteInputStream* source, int codecMarker, int numRows, - int uncompressedSize) { - auto offset = source->tellp(); + int32_t uncompressedSize, + int32_t compressedSize) { + const auto offset = source->tellp(); + const bool compressed = codecMarker & kCompressedBitMask; + if (compressed) { + VELOX_CHECK_LT(compressedSize, uncompressedSize); + } + const int32_t dataSize = compressed ? compressedSize : uncompressedSize; bits::Crc32 crc32; - if (FOLLY_UNLIKELY(source->remainingSize() < uncompressedSize)) { + if (FOLLY_UNLIKELY(source->remainingSize() < dataSize)) { VELOX_FAIL( "Tried to read {} bytes, larger than what's remained in source {} " "bytes. Source details: {}", - uncompressedSize, + dataSize, source->remainingSize(), source->toString()); } - auto remainingBytes = uncompressedSize; + auto remainingBytes = dataSize; while (remainingBytes > 0) { auto data = source->nextView(remainingBytes); if (FOLLY_UNLIKELY(data.size() == 0)) { @@ -170,7 +176,11 @@ PrestoVectorSerde::PrestoOptions toPrestoOptions( if (options == nullptr) { return PrestoVectorSerde::PrestoOptions(); } - return *(static_cast(options)); + auto prestoOptions = + dynamic_cast(options); + VELOX_CHECK_NOT_NULL( + prestoOptions, "Serde options are not Presto-compatible"); + return *prestoOptions; } FOLLY_ALWAYS_INLINE bool needCompression(const folly::io::Codec& codec) { @@ -3420,7 +3430,7 @@ void estimateSerializedSizeInt( } } -void flushUncompressed( +int64_t flushUncompressed( const std::vector>& streams, int32_t numRows, OutputStream* out, @@ -3462,7 +3472,7 @@ void flushUncompressed( // Fill in uncompressedSizeInBytes & sizeInBytes int32_t size = (int32_t)out->tellp() - offset; - int32_t uncompressedSize = size - kHeaderSize; + const int32_t uncompressedSize = size - kHeaderSize; int64_t crc = 0; if (listener) { crc = computeChecksum(listener, codecMask, numRows, uncompressedSize); @@ -3473,13 +3483,59 @@ void flushUncompressed( writeInt32(out, uncompressedSize); writeInt64(out, crc); out->seekp(offset + size); + return uncompressedSize; +} +namespace { +// Represents sizes of a flush. If the sizes are equal, no compression is +// applied. Otherwise 'compressedSize' must be less than 'uncompressedSize'. +struct FlushSizes { + int64_t uncompressedSize; + int64_t compressedSize; +}; +} // namespace + +void flushSerialization( + int32_t numRows, + int32_t uncompressedSize, + int32_t serializationSize, + char codecMask, + const std::unique_ptr& iobuf, + OutputStream* output, + PrestoOutputStreamListener* listener) { + output->write(&codecMask, 1); + writeInt32(output, uncompressedSize); + writeInt32(output, serializationSize); + auto crcOffset = output->tellp(); + // Write zero checksum + writeInt64(output, 0); + // Number of columns and stream content. Unpause CRC. + if (listener) { + listener->resume(); + } + for (auto range : *iobuf) { + output->write(reinterpret_cast(range.data()), range.size()); + } + // Pause CRC computation + if (listener) { + listener->pause(); + } + const int32_t endSize = output->tellp(); + // Fill in crc + int64_t crc = 0; + if (listener) { + crc = computeChecksum(listener, codecMask, numRows, uncompressedSize); + } + output->seekp(crcOffset); + writeInt64(output, crc); + output->seekp(endSize); } -void flushCompressed( +FlushSizes flushCompressed( const std::vector>& streams, const StreamArena& arena, folly::io::Codec& codec, int32_t numRows, + float minCompressionRatio, OutputStream* output, PrestoOutputStreamListener* listener) { char codecMask = kCompressedBitMask; @@ -3493,7 +3549,6 @@ void flushCompressed( } writeInt32(output, numRows); - output->write(&codecMask, 1); IOBufOutputStream out(*(arena.pool()), nullptr, arena.size()); writeInt32(&out, streams.size()); @@ -3507,40 +3562,37 @@ void flushCompressed( uncompressedSize, codec.maxUncompressedLength(), "UncompressedSize exceeds limit"); - auto compressed = codec.compress(out.getIOBuf().get()); - const int32_t compressedSize = compressed->length(); - writeInt32(output, uncompressedSize); - writeInt32(output, compressedSize); - const int32_t crcOffset = output->tellp(); - writeInt64(output, 0); // Write zero checksum - // Number of columns and stream content. Unpause CRC. - if (listener) { - listener->resume(); - } - output->write( - reinterpret_cast(compressed->writableData()), - compressed->length()); - // Pause CRC computation - if (listener) { - listener->pause(); - } - const int32_t endSize = output->tellp(); - // Fill in crc - int64_t crc = 0; - if (listener) { - crc = computeChecksum(listener, codecMask, numRows, compressedSize); + auto iobuf = out.getIOBuf(); + const auto compressedBuffer = codec.compress(iobuf.get()); + const int32_t compressedSize = compressedBuffer->length(); + if (compressedSize > uncompressedSize * minCompressionRatio) { + flushSerialization( + numRows, + uncompressedSize, + uncompressedSize, + codecMask & ~kCompressedBitMask, + iobuf, + output, + listener); + return {uncompressedSize, uncompressedSize}; } - output->seekp(crcOffset); - writeInt64(output, crc); - output->seekp(endSize); + flushSerialization( + numRows, + uncompressedSize, + compressedSize, + codecMask, + compressedBuffer, + output, + listener); + return {uncompressedSize, compressedSize}; } -// Writes the contents to 'out' in wire format -void flushStreams( +FlushSizes flushStreams( const std::vector>& streams, int32_t numRows, const StreamArena& arena, folly::io::Codec& codec, + float minCompressionRatio, OutputStream* out) { auto listener = dynamic_cast(out->listener()); // Reset CRC computation @@ -3549,9 +3601,11 @@ void flushStreams( } if (!needCompression(codec)) { - flushUncompressed(streams, numRows, out, listener); + const auto size = flushUncompressed(streams, numRows, out, listener); + return {size, size}; } else { - flushCompressed(streams, arena, codec, numRows, out, listener); + return flushCompressed( + streams, arena, codec, numRows, minCompressionRatio, out, listener); } } @@ -3585,7 +3639,8 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer { serializeColumn(vector->childAt(i), ranges, streams[i].get(), scratch); } - flushStreams(streams, numRows, arena, *codec_, stream); + flushStreams( + streams, numRows, arena, *codec_, opts_.minCompressionRatio, stream); } private: @@ -3601,7 +3656,8 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer { int32_t numRows, StreamArena* streamArena, const SerdeOpts& opts) - : streamArena_(streamArena), + : opts_(opts), + streamArena_(streamArena), codec_(common::compressionKindToCodec(opts.compressionKind)) { const auto types = rowType->children(); const auto numTypes = types.size(); @@ -3657,7 +3713,54 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer { // numRows(4) | codec(1) | uncompressedSize(4) | compressedSize(4) | // checksum(8) | data void flush(OutputStream* out) override { - flushStreams(streams_, numRows_, *streamArena_, *codec_, out); + constexpr int32_t kMaxCompressionAttemptsToSkip = 30; + if (!needCompression(*codec_)) { + flushStreams( + streams_, + numRows_, + *streamArena_, + *codec_, + opts_.minCompressionRatio, + out); + } else { + if (numCompressionToSkip_ > 0) { + const auto noCompressionCodec = common::compressionKindToCodec( + common::CompressionKind::CompressionKind_NONE); + auto [size, ignore] = flushStreams( + streams_, numRows_, *streamArena_, *noCompressionCodec, 1, out); + stats_.compressionSkippedBytes += size; + --numCompressionToSkip_; + ++stats_.numCompressionSkipped; + } else { + auto [size, compressedSize] = flushStreams( + streams_, + numRows_, + *streamArena_, + *codec_, + opts_.minCompressionRatio, + out); + stats_.compressionInputBytes += size; + stats_.compressedBytes += compressedSize; + if (compressedSize > size * opts_.minCompressionRatio) { + numCompressionToSkip_ = std::min( + kMaxCompressionAttemptsToSkip, 1 + stats_.numCompressionSkipped); + } + } + } + } + + std::unordered_map runtimeStats() override { + std::unordered_map map; + map.insert( + {{"compressedBytes", + RuntimeCounter(stats_.compressedBytes, RuntimeCounter::Unit::kBytes)}, + {"compressionInputBytes", + RuntimeCounter( + stats_.compressionInputBytes, RuntimeCounter::Unit::kBytes)}, + {"compressionSkippedBytes", + RuntimeCounter( + stats_.compressionSkippedBytes, RuntimeCounter::Unit::kBytes)}}); + return map; } void clear() override { @@ -3668,11 +3771,31 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer { } private: + struct CompressionStats { + // Number of times compression was not attempted. + int32_t numCompressionSkipped{0}; + + // uncompressed size for which compression was attempted. + int64_t compressionInputBytes{0}; + + // Compressed bytes. + int64_t compressedBytes{0}; + + // Bytes for which compression was not attempted because of past + // non-performance. + int64_t compressionSkippedBytes{0}; + }; + + const SerdeOpts opts_; StreamArena* const streamArena_; const std::unique_ptr codec_; int32_t numRows_{0}; std::vector> streams_; + + // Count of forthcoming compressions to skip. + int32_t numCompressionToSkip_{0}; + CompressionStats stats_; }; } // namespace @@ -3795,7 +3918,11 @@ void PrestoVectorSerde::deserialize( int64_t actualCheckSum = 0; if (isChecksumBitSet(header.pageCodecMarker)) { actualCheckSum = computeChecksum( - source, header.pageCodecMarker, header.numRows, header.compressedSize); + source, + header.pageCodecMarker, + header.numRows, + header.uncompressedSize, + header.compressedSize); } VELOX_CHECK_EQ( @@ -3818,13 +3945,9 @@ void PrestoVectorSerde::deserialize( } VELOX_CHECK_EQ( - needCompression(*codec), - isCompressedBitSet(header.pageCodecMarker), - "Compression kind {} should align with codec marker.", - common::compressionKindToString( - common::codecTypeToCompressionKind(codec->type()))); + header.checksum, actualCheckSum, "Received corrupted serialized page."); - if (!needCompression(*codec)) { + if (!isCompressedBitSet(header.pageCodecMarker)) { readTopColumns(*source, type, pool, *result, resultOffset, prestoOptions); } else { auto compressBuf = folly::IOBuf::create(header.compressedSize); diff --git a/velox/serializers/PrestoSerializer.h b/velox/serializers/PrestoSerializer.h index 3d3f515a21ef..6bd323568d11 100644 --- a/velox/serializers/PrestoSerializer.h +++ b/velox/serializers/PrestoSerializer.h @@ -51,8 +51,8 @@ class PrestoVectorSerde : public VectorSerde { bool _useLosslessTimestamp, common::CompressionKind _compressionKind, bool _nullsFirst = false) - : useLosslessTimestamp(_useLosslessTimestamp), - compressionKind(_compressionKind), + : VectorSerde::Options(_compressionKind), + useLosslessTimestamp(_useLosslessTimestamp), nullsFirst(_nullsFirst) {} /// Currently presto only supports millisecond precision and the serializer @@ -61,15 +61,17 @@ class PrestoVectorSerde : public VectorSerde { /// currently used for spilling. Is false by default. bool useLosslessTimestamp{false}; - common::CompressionKind compressionKind{ - common::CompressionKind::CompressionKind_NONE}; - /// Serializes nulls of structs before the columns. Used to allow /// single pass reading of in spilling. /// /// TODO: Make Presto also serialize nulls before columns of /// structs. bool nullsFirst{false}; + + /// Minimum achieved compression if compression is enabled. Compressing less + /// than this causes subsequent compression attempts to be skipped. The more + /// times compression misses the target the less frequently it is tried. + float minCompressionRatio{0.8}; }; /// Adds the serialized sizes of the rows of 'vector' in 'ranges[i]' to diff --git a/velox/vector/VectorStream.h b/velox/vector/VectorStream.h index 3992965c1c1d..3c8069d3652b 100644 --- a/velox/vector/VectorStream.h +++ b/velox/vector/VectorStream.h @@ -17,7 +17,9 @@ #include #include "velox/buffer/Buffer.h" +#include "velox/common/base/RuntimeMetrics.h" #include "velox/common/base/Scratch.h" +#include "velox/common/compression/Compression.h" #include "velox/common/memory/ByteStream.h" #include "velox/common/memory/Memory.h" #include "velox/common/memory/MemoryAllocator.h" @@ -91,6 +93,12 @@ class IterativeVectorSerializer { virtual void clear() { VELOX_UNSUPPORTED("clear"); } + + /// Returns serializer-dependent counters, e.g. about compression, data + /// distribution, encoding etc. + virtual std::unordered_map runtimeStats() { + return {}; + } }; /// Serializer that writes a subset of rows from a single RowVector to the @@ -128,7 +136,15 @@ class VectorSerde { // Lets the caller pass options to the Serde. This can be extended to add // custom options by each of its extended classes. struct Options { - virtual ~Options() {} + Options() = default; + + explicit Options(common::CompressionKind _compressionKind) + : compressionKind(_compressionKind) {} + + virtual ~Options() = default; + + common::CompressionKind compressionKind{ + common::CompressionKind::CompressionKind_NONE}; }; /// Adds the serialized size of vector at 'rows[i]' to '*sizes[i]'. @@ -307,6 +323,15 @@ class VectorStreamGroup : public StreamArena { serializer_->clear(); } + /// Returns serializer-dependent counters, e.g. about compression, data + /// distribution, encoding etc. + std::unordered_map runtimeStats() { + if (!serializer_) { + return {}; + } + return serializer_->runtimeStats(); + } + private: std::unique_ptr serializer_; VectorSerde* serde_{nullptr};