Skip to content

Commit

Permalink
refactor RandomSource
Browse files Browse the repository at this point in the history
  • Loading branch information
lizhou1111 committed Jan 24, 2024
1 parent a266c25 commit 647b377
Showing 1 changed file with 76 additions and 65 deletions.
141 changes: 76 additions & 65 deletions src/Storages/Streaming/StorageRandom.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include <Common/logger_useful.h>
#include <Common/randomSeed.h>

#include <functional>

namespace DB
{

Expand Down Expand Up @@ -408,25 +410,28 @@ class GenerateRandomSource final : public ISource
, context(context_)
// , events_per_second(events_per_second_)
, header_chunk(Nested::flatten(block_full.cloneEmpty()).getColumns(), 0)
, generate_interval(interval_time_)
, generate_interval_ms(interval_time_)
, total_events(total_events_)
, log(&Poco::Logger::get("GenerateRandSource"))
{
is_streaming = is_streaming_;
data_generate_helper = std::make_tuple(shard_num_, 1, pcg64(random_seed_));
if (total_events == 0 && !is_streaming)
total_events = events_per_second ? events_per_second : max_block_size;


/// the minimum support eps is EPSILON, about 1 piece of data per day, if eps is less than EPSILON, we assume it's 0.
if (std::abs(events_per_second_) < EPSILON)
if (std::abs(events_per_second_) < EPSILON || !is_streaming)
{
events_per_second = 0;
batch_size_getter = [this]() { return unrestrcitedMode(); };
if (!is_streaming)
total_events = total_events == 0 ? max_block_size : total_events;
}
else if (events_per_second_ < 1)
{
/// For example, events_per_second_ = 0.5, we will generate 1 data every 2 seconds
generate_interval = static_cast<UInt64>(1000 / events_per_second_);
/// every generate_interval we will generate 1 piece of data
normal_interval_events = 1;
slow_eps = true;
generate_interval_ms = static_cast<UInt64>(1000 / events_per_second_);

/// every generate_interval_ms we will generate 1 piece of data
batch_size_getter = [this]() { return slowMode(); };
}
else
{
Expand All @@ -447,10 +452,12 @@ class GenerateRandomSource final : public ISource
* 166 * 11 + 174 = 2000
* Total number of data generated per second is 2000.
*/
interval_count = 1000 / generate_interval;
last_interval_time = generate_interval + 1000 % generate_interval;
interval_count = 1000 / generate_interval_ms;
last_interval_ms = generate_interval_ms + 1000 % generate_interval_ms;
normal_interval_events = events_per_second / interval_count;
last_interval_events = normal_interval_events + events_per_second % interval_count;

batch_size_getter = [this]() { return normalMode(); };
}

boundary_time = MonotonicMilliseconds::now();
Expand All @@ -465,8 +472,7 @@ class GenerateRandomSource final : public ISource
block_to_fill.insert(elem);
}

auto dag
= evaluateMissingDefaults(block_to_fill, block_full.getNamesAndTypesList(), our_columns, context, true, false, true);
auto dag = evaluateMissingDefaults(block_to_fill, block_full.getNamesAndTypesList(), our_columns, context, true, false, true);
if (dag)
{
default_actions = std::make_shared<ExpressionActions>(
Expand All @@ -476,64 +482,68 @@ class GenerateRandomSource final : public ISource

String getName() const override { return "Random"; }

protected:
Chunk generate() override
void checkTotalEventsAndSet(UInt64 & batch_size)
{
if (total_events && generated_events >= total_events)
batch_size = std::min(batch_size, total_events - generated_events);
generated_events += batch_size;
}

/// there three modes of generating data
/// 1. unrestrcited mode: generate data as fast as you can
/// 2. slow mode: eps < 1
/// 3. normal mode: eps > 1
UInt64 unrestrcitedMode()
{
UInt64 batch_size = max_block_size;
if (total_events)
checkTotalEventsAndSet(batch_size);

return batch_size;
}

UInt64 slowMode()
{
auto now_time = MonotonicMilliseconds::now();
UInt64 batch_size = 0;
if (now_time >= boundary_time)
{
LOG_INFO(log, "Finish generating total_events={} generated_events={}", total_events, generated_events);
return {};
boundary_time += generate_interval_ms;
batch_size = 1;
if (total_events)
++generated_events;
}
return batch_size;
}

if (!is_streaming)
UInt64 normalMode()
{
auto now_time = MonotonicMilliseconds::now();
UInt64 batch_size = 0;
if (now_time > boundary_time)
{
auto batch_size = std::min(max_block_size, total_events - generated_events);
generated_events += batch_size;
int is_special = index - index / interval_count * interval_count;
boundary_time += (is_special ? generate_interval_ms : last_interval_ms);
batch_size = is_special ? normal_interval_events : last_interval_events;
index++;

/// random stream table query will return a max_block_size of chunk and end query.
return doGenerate(batch_size);
if (total_events)
checkTotalEventsAndSet(batch_size);
}

if (events_per_second != 0 || slow_eps)
{
auto now_time = MonotonicMilliseconds::now();
UInt64 batch_size = 0;
if (now_time >= boundary_time)
{
if (slow_eps)
{
boundary_time += generate_interval;
batch_size = normal_interval_events;
}
else
{
int is_special = index - index / interval_count * interval_count;
boundary_time += (is_special ? generate_interval : last_interval_time);
batch_size = is_special ? normal_interval_events : last_interval_events;
}

/// it's time for next output
if (total_events)
{
batch_size = std::min(batch_size, total_events - generated_events);
generated_events += batch_size;
}
index++;
}
return batch_size;
}

/// FIXME: if the batch size > max block size, we have to split it.
return doGenerate(batch_size);
}
else
protected:
Chunk generate() override
{
if (total_events && generated_events >= total_events)
{
auto batch_size = max_block_size;
if (total_events)
{
batch_size = std::min(max_block_size, total_events - generated_events);
generated_events += batch_size;
}
return doGenerate(batch_size);
LOG_INFO(log, "Finish generating total_events={} generated_events={}", total_events, generated_events);
return {};
}

UInt64 batch_size = batch_size_getter();
return doGenerate(batch_size);
}

Chunk doGenerate(UInt64 block_size_)
Expand Down Expand Up @@ -573,6 +583,8 @@ class GenerateRandomSource final : public ISource
}

private:
std::function<UInt64()> batch_size_getter;

UInt64 max_block_size;
Block block_full;
Block block_to_fill;
Expand All @@ -584,17 +596,16 @@ class GenerateRandomSource final : public ISource
Int64 boundary_time;
UInt64 events_per_second;
/// Set the size of a window for random storages to generate data, measured in milliseconds.
UInt64 generate_interval = 100;
/// (1s = 1000ms) / generate_interval = interval_count
UInt64 generate_interval_ms = 100;
/// (1s = 1000ms) / generate_interval_ms = interval_count
size_t interval_count = 0;
/// events_per_second / interval_count = normal_interval_events
UInt64 normal_interval_events = 0;
UInt64 last_interval_events = 0;
UInt64 last_interval_time = 0;
UInt64 last_interval_ms = 0;
size_t index = 0;
UInt64 total_events;
UInt64 generated_events = 0;
bool slow_eps = false;

Poco::Logger * log;
std::shared_ptr<ExpressionActions> default_actions = nullptr;
Expand Down Expand Up @@ -841,7 +852,7 @@ Pipe StorageRandom::read(

NamesAndTypesList StorageRandom::getVirtuals() const
{
return NamesAndTypesList {
return NamesAndTypesList{
{ProtonConsts::RESERVED_EVENT_SEQUENCE_ID, std::make_shared<DataTypeInt64>()},
{ProtonConsts::RESERVED_SHARD, std::make_shared<DataTypeInt32>()},
};
Expand Down

0 comments on commit 647b377

Please sign in to comment.