diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index e75ac2c1c755d4..357b604e180604 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -222,8 +222,6 @@ DEFINE_Int32(be_service_threads, "64"); // or 3x the number of cores. This keeps the cores busy without causing excessive // thrashing. DEFINE_Int32(num_threads_per_core, "3"); -// if true, compresses tuple data in Serialize -DEFINE_mBool(compress_rowbatches, "true"); DEFINE_mBool(rowbatch_align_tuple_offset, "false"); // interval between profile reports; in seconds DEFINE_mInt32(status_report_interval, "5"); diff --git a/be/src/common/config.h b/be/src/common/config.h index c911963a87fe17..057e90ecff0535 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -271,8 +271,6 @@ DECLARE_Int32(be_service_threads); // or 3x the number of cores. This keeps the cores busy without causing excessive // thrashing. DECLARE_Int32(num_threads_per_core); -// if true, compresses tuple data in Serialize -DECLARE_mBool(compress_rowbatches); DECLARE_mBool(rowbatch_align_tuple_offset); // interval between profile reports; in seconds DECLARE_mInt32(status_report_interval); diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index e70be36b774eea..59f6534a2c4160 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -254,7 +254,7 @@ std::string ExchangeSinkLocalState::name_suffix() { return name; } -segment_v2::CompressionTypePB& ExchangeSinkLocalState::compression_type() { +segment_v2::CompressionTypePB ExchangeSinkLocalState::compression_type() const { return _parent->cast()._compression_type; } diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index bc91e5dc19d78d..dd8fafe245b323 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -185,7 +185,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { [[nodiscard]] int sender_id() const { return _sender_id; } std::string name_suffix() override; - segment_v2::CompressionTypePB& compression_type(); + segment_v2::CompressionTypePB compression_type() const; std::string debug_string(int indentation_level) const override; std::vector*> channels; diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index a4524d846fb376..34c46566d17502 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -392,9 +392,13 @@ class RuntimeState { if (_query_options.__isset.fragment_transmission_compression_codec) { if (_query_options.fragment_transmission_compression_codec == "lz4") { return segment_v2::CompressionTypePB::LZ4; + } else if (_query_options.fragment_transmission_compression_codec == "snappy") { + return segment_v2::CompressionTypePB::SNAPPY; + } else { + return segment_v2::CompressionTypePB::NO_COMPRESSION; } } - return segment_v2::CompressionTypePB::SNAPPY; + return segment_v2::CompressionTypePB::NO_COMPRESSION; } bool skip_storage_engine_merge() const { diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 5ca921a53d4516..37c7f3ef1158a0 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -866,7 +866,7 @@ Status Block::serialize(int be_exec_version, PBlock* pblock, *uncompressed_bytes = content_uncompressed_size; // compress - if (config::compress_rowbatches && content_uncompressed_size > 0) { + if (compression_type != segment_v2::NO_COMPRESSION && content_uncompressed_size > 0) { SCOPED_RAW_TIMER(&_compress_time_ns); pblock->set_compression_type(compression_type); pblock->set_uncompressed_size(content_uncompressed_size); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index d66295c8705ef8..8291e63fdd7fc7 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -149,7 +149,7 @@ class VDataStreamSender : public DataSink { QueryStatisticsPtr query_statisticsPtr() { return _query_statistics; } bool transfer_large_data_by_brpc() { return _transfer_large_data_by_brpc; } RuntimeProfile::Counter* merge_block_timer() { return _merge_block_timer; } - segment_v2::CompressionTypePB& compression_type() { return _compression_type; } + segment_v2::CompressionTypePB compression_type() const { return _compression_type; } protected: friend class BlockSerializer; diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp index 54dd6c1136fe6f..020c4f2e923cd6 100644 --- a/be/test/vec/core/block_test.cpp +++ b/be/test/vec/core/block_test.cpp @@ -114,7 +114,6 @@ void fill_block_with_array_string(vectorized::Block& block) { } void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_type) { - config::compress_rowbatches = true; // int { auto vec = vectorized::ColumnVector::create(); @@ -296,7 +295,6 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty } TEST(BlockTest, SerializeAndDeserializeBlock) { - config::compress_rowbatches = true; serialize_and_deserialize_test(segment_v2::CompressionTypePB::SNAPPY); serialize_and_deserialize_test(segment_v2::CompressionTypePB::LZ4); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 7578da745e030f..b0a23c4108bfd0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -2707,7 +2707,7 @@ public TQueryOptions toThrift() { tResult.setEnableFunctionPushdown(enableFunctionPushdown); tResult.setEnableCommonExprPushdown(enableCommonExprPushdown); tResult.setCheckOverflowForDecimal(checkOverflowForDecimal); - tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec); + tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec.trim().toLowerCase()); tResult.setEnableLocalExchange(enableLocalExchange); tResult.setSkipStorageEngineMerge(skipStorageEngineMerge);