Skip to content

Commit

Permalink
random stream supports slow eps
Browse files Browse the repository at this point in the history
  • Loading branch information
lijianan committed Jan 24, 2024
1 parent 00d0411 commit a266c25
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 55 deletions.
2 changes: 1 addition & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, javascript_uda_max_concurrency, 1, "Control the concurrency of JavaScript UDA in a query", 0) \
M(Float, replay_speed, 0., "Control the replay speed..0 < replay_speed < 1, means replay slower.replay_speed == 1, means replay by actual ingest interval.1 < replay_speed < <max_limit>, means replay faster", 0) \
M(UInt64, max_events, 0, "Total events to generate for random stream", 0) \
M(Int64, eps, -1, "control the random stream eps in query time, defalut value is -1, if it is 0 means no limit.", 0) \
M(Float, eps, -1., "control the random stream eps in query time, defalut value is -1, if it is 0 means no limit.", 0) \
// End of GLOBAL_SETTINGS

#define CONFIGURABLE_GLOBAL_SETTINGS(M) \
Expand Down
134 changes: 83 additions & 51 deletions src/Storages/Streaming/StorageRandom.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,26 +385,28 @@ fillColumnWithData(const DataTypePtr type, UInt64 limit, std::tuple<Int64, Int32
}
}

constexpr Float64 EPSILON = 1e-5;

class GenerateRandomSource final : public ISource
{
public:
GenerateRandomSource(
UInt64 block_size_,
UInt64 max_block_size_,
UInt64 random_seed_,
Block block_header_,
const ColumnsDescription & our_columns_,
ContextPtr context_,
UInt64 events_per_second_,
Float64 events_per_second_,
UInt64 interval_time_,
bool is_streaming_,
UInt64 total_events_,
size_t shard_num_)
: ISource(Nested::flatten(prepareBlockToFill(block_header_)), true, ProcessorID::GenerateRandomSourceID)
, block_size(block_size_)
, max_block_size(max_block_size_)
, block_full(std::move(block_header_))
, our_columns(our_columns_)
, context(context_)
, events_per_second(events_per_second_)
// , events_per_second(events_per_second_)
, header_chunk(Nested::flatten(block_full.cloneEmpty()).getColumns(), 0)
, generate_interval(interval_time_)
, total_events(total_events_)
Expand All @@ -413,28 +415,45 @@ class GenerateRandomSource final : public ISource
is_streaming = is_streaming_;
data_generate_helper = std::make_tuple(shard_num_, 1, pcg64(random_seed_));
if (total_events == 0 && !is_streaming)
total_events = events_per_second ? events_per_second : block_size;

/**
* In order to generate events evenly within one second, we have the the interval_time parameter.
* The following code is used to calculate the number of data generated per interval
* For example: events_per_second = 2000, interval_time = 80ms.
* 1s = 1000ms, 1000ms / 80ms = 12(int / int, omit decimals), 1000ms % 80ms = 40ms
* 80 * 11 + (80 + 40) = 1000ms
* So we have 12 intervals, 11 normal interval(80ms) and 1 special interval(80ms + 40ms = 120ms)
*
* Now calculate the number of data generated per interval:
* 2000 / 12 = 166, 2000 % 12 = 8
* So the number of data generated per normal interval is 166, and the number of data generated in the special interval is 166 + 8 = 174
* 166 * 11 + 174 = 2000
* Total number of data generated per second is 2000.
*/
interval_count = 1000 / generate_interval;
last_interval_time = generate_interval + 1000 % generate_interval;
boundary_time = MonotonicMilliseconds::now() + last_interval_time;
normal_interval = events_per_second / interval_count;
last_interval_count = normal_interval + events_per_second % interval_count;

total_events = events_per_second ? events_per_second : max_block_size;

/// the minimum support eps is EPSILON, about 1 piece of data per day, if eps is less than EPSILON, we assume it's 0.
if (std::abs(events_per_second_) < EPSILON)
events_per_second = 0;
else if (events_per_second_ < 1)
{
/// For example, events_per_second_ = 0.5, we will generate 1 data every 2 seconds
generate_interval = static_cast<UInt64>(1000 / events_per_second_);
/// every generate_interval we will generate 1 piece of data
normal_interval_events = 1;
slow_eps = true;
}
else
{
/// events_per_second_ > 1, we will generate data evenly within one second
events_per_second = static_cast<UInt64>(events_per_second_);

/**
* In order to generate events evenly within one second, we have the the interval_time parameter.
* The following code is used to calculate the number of data generated per interval
* For example: events_per_second = 2000, interval_time = 80ms.
* 1s = 1000ms, 1000ms / 80ms = 12(int / int, omit decimals), 1000ms % 80ms = 40ms
* 80 * 11 + (80 + 40) = 1000ms
* So we have 12 intervals, 11 normal interval(80ms) and 1 special interval(80ms + 40ms = 120ms)
*
* Now calculate the number of data generated per interval:
* 2000 / 12 = 166, 2000 % 12 = 8
* So the number of data generated per normal interval is 166, and the number of data generated in the special interval is 166 + 8 = 174
* 166 * 11 + 174 = 2000
* Total number of data generated per second is 2000.
*/
interval_count = 1000 / generate_interval;
last_interval_time = generate_interval + 1000 % generate_interval;
normal_interval_events = events_per_second / interval_count;
last_interval_events = normal_interval_events + events_per_second % interval_count;
}

boundary_time = MonotonicMilliseconds::now();
for (const auto & elem : block_full)
{
bool is_reserved_column
Expand Down Expand Up @@ -468,25 +487,32 @@ class GenerateRandomSource final : public ISource

if (!is_streaming)
{
auto batch_size = std::min(block_size, total_events - generated_events);
auto batch_size = std::min(max_block_size, total_events - generated_events);
generated_events += batch_size;

/// random stream table query will return a block_size of chunk and end query.
/// random stream table query will return a max_block_size of chunk and end query.
return doGenerate(batch_size);
}

if (events_per_second != 0)
if (events_per_second != 0 || slow_eps)
{
/// hign performance mod operation from clickhouse
int is_special = index - index / interval_count * interval_count;
auto now_time = MonotonicMilliseconds::now();

UInt64 batch_size = 0;
if (now_time >= boundary_time)
{
if (slow_eps)
{
boundary_time += generate_interval;
batch_size = normal_interval_events;
}
else
{
int is_special = index - index / interval_count * interval_count;
boundary_time += (is_special ? generate_interval : last_interval_time);
batch_size = is_special ? normal_interval_events : last_interval_events;
}

/// it's time for next output
boundary_time += (is_special ? generate_interval : last_interval_time);
batch_size = is_special ? normal_interval : last_interval_count;
if (total_events)
{
batch_size = std::min(batch_size, total_events - generated_events);
Expand All @@ -500,10 +526,10 @@ class GenerateRandomSource final : public ISource
}
else
{
auto batch_size = block_size;
auto batch_size = max_block_size;
if (total_events)
{
batch_size = std::min(block_size, total_events - generated_events);
batch_size = std::min(max_block_size, total_events - generated_events);
generated_events += batch_size;
}
return doGenerate(batch_size);
Expand Down Expand Up @@ -547,26 +573,32 @@ class GenerateRandomSource final : public ISource
}

private:
UInt64 block_size;
UInt64 max_block_size;
Block block_full;
Block block_to_fill;
const ColumnsDescription our_columns;
ContextPtr context;
Chunk header_chunk;

/// next output time
Int64 boundary_time;
UInt64 events_per_second;
UInt64 normal_interval = 0;
UInt64 last_interval_count = 0;
Chunk header_chunk;
size_t index = 0;
// Set the size of a window for random storages to generate data, measured in milliseconds.
const UInt64 generate_interval = 100;
UInt64 last_interval_time = 0;
/// Set the size of a window for random storages to generate data, measured in milliseconds.
UInt64 generate_interval = 100;
/// (1s = 1000ms) / generate_interval = interval_count
size_t interval_count = 0;
/// events_per_second / interval_count = normal_interval_events
UInt64 normal_interval_events = 0;
UInt64 last_interval_events = 0;
UInt64 last_interval_time = 0;
size_t index = 0;
UInt64 total_events;
UInt64 generated_events = 0;
bool slow_eps = false;

Poco::Logger * log;
std::shared_ptr<ExpressionActions> default_actions = nullptr;
// <shard_num, sequence_num, rng>
/// <shard_num, sequence_num, rng>
std::tuple<Int64, Int32, pcg64> data_generate_helper;

static Block & prepareBlockToFill(Block & block)
Expand Down Expand Up @@ -615,7 +647,7 @@ StorageRandom::StorageRandom(
const String & comment,
std::optional<UInt64> random_seed_,
UInt64 shards_,
UInt64 events_per_second_,
Float64 events_per_second_,
UInt64 interval_time_)
: IStorage(table_id_), shards(shards_), events_per_second(events_per_second_), interval_time(interval_time_)
{
Expand Down Expand Up @@ -721,8 +753,8 @@ Pipe StorageRandom::read(
auto events_share = max_events / shards;
auto events_remainder = max_events % shards;

/// setting random stream eps in query time, if generate_eps is not defalut value, use generate_eps as eps first.
UInt64 eps = settings.eps < 0 ? events_per_second : static_cast<UInt64>(settings.eps);
/// setting random stream eps in query time, if settings.eps is not defalut value, use settings.eps as eps first.
Float64 eps = settings.eps < 0 ? events_per_second : settings.eps;
if (eps < shards)
{
if (eps == 0)
Expand All @@ -736,7 +768,7 @@ Pipe StorageRandom::read(
block_header,
our_columns,
context,
0,
0.,
1000,
query_info.syntax_analyzer_result->streaming,
events_share,
Expand Down Expand Up @@ -773,8 +805,8 @@ Pipe StorageRandom::read(
}
else
{
size_t eps_thread = eps / shards;
size_t remainder = eps % shards;
Float64 eps_thread = static_cast<_Float64>(static_cast<UInt64>(eps) / shards);
Float64 remainder = static_cast<_Float64>(static_cast<UInt64>(eps) % shards);
/// number of data generated per second is bigger than the number of thread;
for (size_t i = 0; i < shards - 1; i++)
{
Expand Down
6 changes: 3 additions & 3 deletions src/Storages/Streaming/StorageRandom.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace DB
class ASTStorage;

#define STORAGE_RANDOM_RELATED_SETTINGS(M) \
M(UInt64, eps, 1000, "Limit how many rows to be generated per second for each thread. Used by RANDOM STREAM. 0 means no limit", 0) \
M(Float, eps, 1000., "Limit how many rows to be generated per second for each thread. Used by RANDOM STREAM. 0 means no limit", 0) \
M(UInt64, interval_time, 5, "the data generating interval, unit ms", 0) \
M(UInt64, shards, 1, "Shards number for random stream", 0)

Expand Down Expand Up @@ -67,7 +67,7 @@ class StorageRandom final : public shared_ptr_helper<StorageRandom>, public ISto
private:
UInt64 shards;
UInt64 random_seed = 0;
UInt64 events_per_second;
Float64 events_per_second;
UInt64 interval_time;

protected:
Expand All @@ -77,7 +77,7 @@ class StorageRandom final : public shared_ptr_helper<StorageRandom>, public ISto
const String & comment,
std::optional<UInt64> random_seed,
UInt64 shards_,
UInt64 events_per_second_,
Float64 events_per_second_,
UInt64 interval_time_);
};

Expand Down

0 comments on commit a266c25

Please sign in to comment.