diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 2307b4c21f4..54d58658cb8 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -808,7 +808,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(UInt64, javascript_uda_max_concurrency, 1, "Control the concurrency of JavaScript UDA in a query", 0) \ M(Float, replay_speed, 0., "Control the replay speed..0 < replay_speed < 1, means replay slower.replay_speed == 1, means replay by actual ingest interval.1 < replay_speed < , means replay faster", 0) \ M(UInt64, max_events, 0, "Total events to generate for random stream", 0) \ - M(Int64, eps, -1, "control the random stream eps in query time, defalut value is -1, if it is 0 means no limit.", 0) \ + M(Float, eps, -1., "control the random stream eps in query time, defalut value is -1, if it is 0 means no limit.", 0) \ // End of GLOBAL_SETTINGS #define CONFIGURABLE_GLOBAL_SETTINGS(M) \ diff --git a/src/Storages/Streaming/StorageRandom.cpp b/src/Storages/Streaming/StorageRandom.cpp index 0aa0f5de324..4ce9c7f03f5 100644 --- a/src/Storages/Streaming/StorageRandom.cpp +++ b/src/Storages/Streaming/StorageRandom.cpp @@ -385,26 +385,28 @@ fillColumnWithData(const DataTypePtr type, UInt64 limit, std::tuple(1000 / events_per_second_); + /// every generate_interval we will generate 1 piece of data + normal_interval_events = 1; + slow_eps = true; + } + else + { + /// events_per_second_ > 1, we will generate data evenly within one second + events_per_second = static_cast(events_per_second_); + + /** + * In order to generate events evenly within one second, we have the the interval_time parameter. + * The following code is used to calculate the number of data generated per interval + * For example: events_per_second = 2000, interval_time = 80ms. + * 1s = 1000ms, 1000ms / 80ms = 12(int / int, omit decimals), 1000ms % 80ms = 40ms + * 80 * 11 + (80 + 40) = 1000ms + * So we have 12 intervals, 11 normal interval(80ms) and 1 special interval(80ms + 40ms = 120ms) + * + * Now calculate the number of data generated per interval: + * 2000 / 12 = 166, 2000 % 12 = 8 + * So the number of data generated per normal interval is 166, and the number of data generated in the special interval is 166 + 8 = 174 + * 166 * 11 + 174 = 2000 + * Total number of data generated per second is 2000. + */ + interval_count = 1000 / generate_interval; + last_interval_time = generate_interval + 1000 % generate_interval; + normal_interval_events = events_per_second / interval_count; + last_interval_events = normal_interval_events + events_per_second % interval_count; + } + + boundary_time = MonotonicMilliseconds::now(); for (const auto & elem : block_full) { bool is_reserved_column @@ -468,25 +487,32 @@ class GenerateRandomSource final : public ISource if (!is_streaming) { - auto batch_size = std::min(block_size, total_events - generated_events); + auto batch_size = std::min(max_block_size, total_events - generated_events); generated_events += batch_size; - /// random stream table query will return a block_size of chunk and end query. + /// random stream table query will return a max_block_size of chunk and end query. return doGenerate(batch_size); } - if (events_per_second != 0) + if (events_per_second != 0 || slow_eps) { - /// hign performance mod operation from clickhouse - int is_special = index - index / interval_count * interval_count; auto now_time = MonotonicMilliseconds::now(); - UInt64 batch_size = 0; if (now_time >= boundary_time) { + if (slow_eps) + { + boundary_time += generate_interval; + batch_size = normal_interval_events; + } + else + { + int is_special = index - index / interval_count * interval_count; + boundary_time += (is_special ? generate_interval : last_interval_time); + batch_size = is_special ? normal_interval_events : last_interval_events; + } + /// it's time for next output - boundary_time += (is_special ? generate_interval : last_interval_time); - batch_size = is_special ? normal_interval : last_interval_count; if (total_events) { batch_size = std::min(batch_size, total_events - generated_events); @@ -500,10 +526,10 @@ class GenerateRandomSource final : public ISource } else { - auto batch_size = block_size; + auto batch_size = max_block_size; if (total_events) { - batch_size = std::min(block_size, total_events - generated_events); + batch_size = std::min(max_block_size, total_events - generated_events); generated_events += batch_size; } return doGenerate(batch_size); @@ -547,26 +573,32 @@ class GenerateRandomSource final : public ISource } private: - UInt64 block_size; + UInt64 max_block_size; Block block_full; Block block_to_fill; const ColumnsDescription our_columns; ContextPtr context; + Chunk header_chunk; + + /// next output time Int64 boundary_time; UInt64 events_per_second; - UInt64 normal_interval = 0; - UInt64 last_interval_count = 0; - Chunk header_chunk; - size_t index = 0; - // Set the size of a window for random storages to generate data, measured in milliseconds. - const UInt64 generate_interval = 100; - UInt64 last_interval_time = 0; + /// Set the size of a window for random storages to generate data, measured in milliseconds. + UInt64 generate_interval = 100; + /// (1s = 1000ms) / generate_interval = interval_count size_t interval_count = 0; + /// events_per_second / interval_count = normal_interval_events + UInt64 normal_interval_events = 0; + UInt64 last_interval_events = 0; + UInt64 last_interval_time = 0; + size_t index = 0; UInt64 total_events; UInt64 generated_events = 0; + bool slow_eps = false; + Poco::Logger * log; std::shared_ptr default_actions = nullptr; - // + /// std::tuple data_generate_helper; static Block & prepareBlockToFill(Block & block) @@ -615,7 +647,7 @@ StorageRandom::StorageRandom( const String & comment, std::optional random_seed_, UInt64 shards_, - UInt64 events_per_second_, + Float64 events_per_second_, UInt64 interval_time_) : IStorage(table_id_), shards(shards_), events_per_second(events_per_second_), interval_time(interval_time_) { @@ -721,8 +753,8 @@ Pipe StorageRandom::read( auto events_share = max_events / shards; auto events_remainder = max_events % shards; - /// setting random stream eps in query time, if generate_eps is not defalut value, use generate_eps as eps first. - UInt64 eps = settings.eps < 0 ? events_per_second : static_cast(settings.eps); + /// setting random stream eps in query time, if settings.eps is not defalut value, use settings.eps as eps first. + Float64 eps = settings.eps < 0 ? events_per_second : settings.eps; if (eps < shards) { if (eps == 0) @@ -736,7 +768,7 @@ Pipe StorageRandom::read( block_header, our_columns, context, - 0, + 0., 1000, query_info.syntax_analyzer_result->streaming, events_share, @@ -773,8 +805,8 @@ Pipe StorageRandom::read( } else { - size_t eps_thread = eps / shards; - size_t remainder = eps % shards; + Float64 eps_thread = static_cast<_Float64>(static_cast(eps) / shards); + Float64 remainder = static_cast<_Float64>(static_cast(eps) % shards); /// number of data generated per second is bigger than the number of thread; for (size_t i = 0; i < shards - 1; i++) { diff --git a/src/Storages/Streaming/StorageRandom.h b/src/Storages/Streaming/StorageRandom.h index b15c39150d8..63e5147ea8e 100644 --- a/src/Storages/Streaming/StorageRandom.h +++ b/src/Storages/Streaming/StorageRandom.h @@ -13,7 +13,7 @@ namespace DB class ASTStorage; #define STORAGE_RANDOM_RELATED_SETTINGS(M) \ - M(UInt64, eps, 1000, "Limit how many rows to be generated per second for each thread. Used by RANDOM STREAM. 0 means no limit", 0) \ + M(Float, eps, 1000., "Limit how many rows to be generated per second for each thread. Used by RANDOM STREAM. 0 means no limit", 0) \ M(UInt64, interval_time, 5, "the data generating interval, unit ms", 0) \ M(UInt64, shards, 1, "Shards number for random stream", 0) @@ -67,7 +67,7 @@ class StorageRandom final : public shared_ptr_helper, public ISto private: UInt64 shards; UInt64 random_seed = 0; - UInt64 events_per_second; + Float64 events_per_second; UInt64 interval_time; protected: @@ -77,7 +77,7 @@ class StorageRandom final : public shared_ptr_helper, public ISto const String & comment, std::optional random_seed, UInt64 shards_, - UInt64 events_per_second_, + Float64 events_per_second_, UInt64 interval_time_); };