From 647b3771b5f48c53bc7009e7eee7557727f3169b Mon Sep 17 00:00:00 2001 From: lizhou1111 Date: Wed, 24 Jan 2024 22:04:06 +0800 Subject: [PATCH] refactor RandomSource --- src/Storages/Streaming/StorageRandom.cpp | 141 ++++++++++++----------- 1 file changed, 76 insertions(+), 65 deletions(-) diff --git a/src/Storages/Streaming/StorageRandom.cpp b/src/Storages/Streaming/StorageRandom.cpp index 4ce9c7f03f5..1d527536bee 100644 --- a/src/Storages/Streaming/StorageRandom.cpp +++ b/src/Storages/Streaming/StorageRandom.cpp @@ -36,6 +36,8 @@ #include #include +#include + namespace DB { @@ -408,25 +410,28 @@ class GenerateRandomSource final : public ISource , context(context_) // , events_per_second(events_per_second_) , header_chunk(Nested::flatten(block_full.cloneEmpty()).getColumns(), 0) - , generate_interval(interval_time_) + , generate_interval_ms(interval_time_) , total_events(total_events_) , log(&Poco::Logger::get("GenerateRandSource")) { is_streaming = is_streaming_; data_generate_helper = std::make_tuple(shard_num_, 1, pcg64(random_seed_)); - if (total_events == 0 && !is_streaming) - total_events = events_per_second ? events_per_second : max_block_size; - + /// the minimum support eps is EPSILON, about 1 piece of data per day, if eps is less than EPSILON, we assume it's 0. - if (std::abs(events_per_second_) < EPSILON) + if (std::abs(events_per_second_) < EPSILON || !is_streaming) + { events_per_second = 0; + batch_size_getter = [this]() { return unrestrcitedMode(); }; + if (!is_streaming) + total_events = total_events == 0 ? max_block_size : total_events; + } else if (events_per_second_ < 1) { /// For example, events_per_second_ = 0.5, we will generate 1 data every 2 seconds - generate_interval = static_cast(1000 / events_per_second_); - /// every generate_interval we will generate 1 piece of data - normal_interval_events = 1; - slow_eps = true; + generate_interval_ms = static_cast(1000 / events_per_second_); + + /// every generate_interval_ms we will generate 1 piece of data + batch_size_getter = [this]() { return slowMode(); }; } else { @@ -447,10 +452,12 @@ class GenerateRandomSource final : public ISource * 166 * 11 + 174 = 2000 * Total number of data generated per second is 2000. */ - interval_count = 1000 / generate_interval; - last_interval_time = generate_interval + 1000 % generate_interval; + interval_count = 1000 / generate_interval_ms; + last_interval_ms = generate_interval_ms + 1000 % generate_interval_ms; normal_interval_events = events_per_second / interval_count; last_interval_events = normal_interval_events + events_per_second % interval_count; + + batch_size_getter = [this]() { return normalMode(); }; } boundary_time = MonotonicMilliseconds::now(); @@ -465,8 +472,7 @@ class GenerateRandomSource final : public ISource block_to_fill.insert(elem); } - auto dag - = evaluateMissingDefaults(block_to_fill, block_full.getNamesAndTypesList(), our_columns, context, true, false, true); + auto dag = evaluateMissingDefaults(block_to_fill, block_full.getNamesAndTypesList(), our_columns, context, true, false, true); if (dag) { default_actions = std::make_shared( @@ -476,64 +482,68 @@ class GenerateRandomSource final : public ISource String getName() const override { return "Random"; } -protected: - Chunk generate() override + void checkTotalEventsAndSet(UInt64 & batch_size) { - if (total_events && generated_events >= total_events) + batch_size = std::min(batch_size, total_events - generated_events); + generated_events += batch_size; + } + + /// there three modes of generating data + /// 1. unrestrcited mode: generate data as fast as you can + /// 2. slow mode: eps < 1 + /// 3. normal mode: eps > 1 + UInt64 unrestrcitedMode() + { + UInt64 batch_size = max_block_size; + if (total_events) + checkTotalEventsAndSet(batch_size); + + return batch_size; + } + + UInt64 slowMode() + { + auto now_time = MonotonicMilliseconds::now(); + UInt64 batch_size = 0; + if (now_time >= boundary_time) { - LOG_INFO(log, "Finish generating total_events={} generated_events={}", total_events, generated_events); - return {}; + boundary_time += generate_interval_ms; + batch_size = 1; + if (total_events) + ++generated_events; } + return batch_size; + } - if (!is_streaming) + UInt64 normalMode() + { + auto now_time = MonotonicMilliseconds::now(); + UInt64 batch_size = 0; + if (now_time > boundary_time) { - auto batch_size = std::min(max_block_size, total_events - generated_events); - generated_events += batch_size; + int is_special = index - index / interval_count * interval_count; + boundary_time += (is_special ? generate_interval_ms : last_interval_ms); + batch_size = is_special ? normal_interval_events : last_interval_events; + index++; - /// random stream table query will return a max_block_size of chunk and end query. - return doGenerate(batch_size); + if (total_events) + checkTotalEventsAndSet(batch_size); } - if (events_per_second != 0 || slow_eps) - { - auto now_time = MonotonicMilliseconds::now(); - UInt64 batch_size = 0; - if (now_time >= boundary_time) - { - if (slow_eps) - { - boundary_time += generate_interval; - batch_size = normal_interval_events; - } - else - { - int is_special = index - index / interval_count * interval_count; - boundary_time += (is_special ? generate_interval : last_interval_time); - batch_size = is_special ? normal_interval_events : last_interval_events; - } - - /// it's time for next output - if (total_events) - { - batch_size = std::min(batch_size, total_events - generated_events); - generated_events += batch_size; - } - index++; - } + return batch_size; + } - /// FIXME: if the batch size > max block size, we have to split it. - return doGenerate(batch_size); - } - else +protected: + Chunk generate() override + { + if (total_events && generated_events >= total_events) { - auto batch_size = max_block_size; - if (total_events) - { - batch_size = std::min(max_block_size, total_events - generated_events); - generated_events += batch_size; - } - return doGenerate(batch_size); + LOG_INFO(log, "Finish generating total_events={} generated_events={}", total_events, generated_events); + return {}; } + + UInt64 batch_size = batch_size_getter(); + return doGenerate(batch_size); } Chunk doGenerate(UInt64 block_size_) @@ -573,6 +583,8 @@ class GenerateRandomSource final : public ISource } private: + std::function batch_size_getter; + UInt64 max_block_size; Block block_full; Block block_to_fill; @@ -584,17 +596,16 @@ class GenerateRandomSource final : public ISource Int64 boundary_time; UInt64 events_per_second; /// Set the size of a window for random storages to generate data, measured in milliseconds. - UInt64 generate_interval = 100; - /// (1s = 1000ms) / generate_interval = interval_count + UInt64 generate_interval_ms = 100; + /// (1s = 1000ms) / generate_interval_ms = interval_count size_t interval_count = 0; /// events_per_second / interval_count = normal_interval_events UInt64 normal_interval_events = 0; UInt64 last_interval_events = 0; - UInt64 last_interval_time = 0; + UInt64 last_interval_ms = 0; size_t index = 0; UInt64 total_events; UInt64 generated_events = 0; - bool slow_eps = false; Poco::Logger * log; std::shared_ptr default_actions = nullptr; @@ -841,7 +852,7 @@ Pipe StorageRandom::read( NamesAndTypesList StorageRandom::getVirtuals() const { - return NamesAndTypesList { + return NamesAndTypesList{ {ProtonConsts::RESERVED_EVENT_SEQUENCE_ID, std::make_shared()}, {ProtonConsts::RESERVED_SHARD, std::make_shared()}, };