Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cut down the wait time for sync schema in restart (#7672) #7674

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,33 @@ struct Settings
M(SettingUInt64, manual_compact_pool_size, 1, "The number of worker threads to handle manual compact requests.") \
M(SettingUInt64, manual_compact_max_concurrency, 10, "Max concurrent tasks. It should be larger than pool size.") \
M(SettingUInt64, manual_compact_more_until_ms, 60000, "Continuously compact more segments until reaching specified elapsed time. If 0 is specified, only one segment will be compacted each round.") \
<<<<<<< HEAD
\
M(SettingBool, enable_planner, true, "Enable planner")
=======
M(SettingUInt64, max_bytes_before_external_join, 0, "max bytes used by join before spill, 0 as the default value, 0 means no limit") \
M(SettingInt64, join_restore_concurrency, 0, "join restore concurrency, negative value means restore join serially, 0 means TiFlash choose restore concurrency automatically, 0 as the default value") \
M(SettingUInt64, max_cached_data_bytes_in_spiller, 1024ULL * 1024 * 100, "Max cached data bytes in spiller before spilling, 100MB as the default value, 0 means no limit") \
M(SettingUInt64, max_spilled_rows_per_file, 200000, "Max spilled data rows per spill file, 200000 as the default value, 0 means no limit.") \
M(SettingUInt64, max_spilled_bytes_per_file, 0, "Max spilled data bytes per spill file, 0 as the default value, 0 means no limit.") \
M(SettingBool, enable_planner, true, "Enable planner") \
M(SettingBool, enable_pipeline, false, "Enable pipeline model") \
M(SettingBool, enforce_enable_pipeline, false, "Enforce the enablement of the pipeline model") \
M(SettingUInt64, pipeline_cpu_task_thread_pool_size, 0, "The size of cpu task thread pool. 0 means using number_of_logical_cpu_cores.") \
M(SettingUInt64, pipeline_io_task_thread_pool_size, 0, "The size of io task thread pool. 0 means using number_of_logical_cpu_cores.") \
M(SettingTaskQueueType, pipeline_cpu_task_thread_pool_queue_type, TaskQueueType::DEFAULT, "The task queue of cpu task thread pool") \
M(SettingTaskQueueType, pipeline_io_task_thread_pool_queue_type, TaskQueueType::DEFAULT, "The task queue of io task thread pool") \
M(SettingUInt64, local_tunnel_version, 2, "1: not refined, 2: refined") \
M(SettingBool, force_push_down_all_filters_to_scan, false, "Push down all filters to scan, only used for test") \
M(SettingUInt64, async_recv_version, 1, "1: reactor mode, 2: no additional threads") \
M(SettingUInt64, recv_queue_size, 0, "size of ExchangeReceiver queue, 0 means the size is set to data_source_mpp_task_num * 50") \
M(SettingUInt64, shallow_copy_cross_probe_threshold, 0, "minimum right rows to use shallow copy probe mode for cross join, default is max(1, max_block_size/10)") \
M(SettingInt64, max_buffered_bytes_in_executor, 200LL * 1024 * 1024, "The max buffered size in each executor, 0 mean unlimited, use 200MB as the default value") \
M(SettingUInt64, bg_ddl_sync_schema_interval, 60, "The interval of background DDL sync schema in seconds") \
M(SettingUInt64, ddl_restart_wait_seconds, 180, "The wait time for sync schema in seconds when restart")


>>>>>>> cf98d94329 (Cut down the wait time for sync schema in restart (#7672))
// clang-format on
#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) TYPE NAME{DEFAULT};

Expand Down
122 changes: 122 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,128 @@ class Server::TcpHttpServersHolder
std::vector<std::unique_ptr<Poco::Net::TCPServer>> servers;
};

<<<<<<< HEAD
=======
// By default init global thread pool by hardware_concurrency
// Later we will adjust it by `adjustThreadPoolSize`
void initThreadPool(Poco::Util::LayeredConfiguration & config)
{
size_t default_num_threads = std::max(4UL, 2 * std::thread::hardware_concurrency());

// Note: Global Thread Pool must be larger than sub thread pools.
GlobalThreadPool::initialize(
/*max_threads*/ default_num_threads * 20,
/*max_free_threads*/ default_num_threads,
/*queue_size*/ default_num_threads * 8);

auto disaggregated_mode = getDisaggregatedMode(config);
if (disaggregated_mode == DisaggregatedMode::Compute)
{
RNPagePreparerPool::initialize(
/*max_threads*/ default_num_threads,
/*max_free_threads*/ default_num_threads / 2,
/*queue_size*/ default_num_threads * 2);
RNRemoteReadTaskPool::initialize(
/*max_threads*/ default_num_threads,
/*max_free_threads*/ default_num_threads / 2,
/*queue_size*/ default_num_threads * 2);
}

if (disaggregated_mode == DisaggregatedMode::Compute || disaggregated_mode == DisaggregatedMode::Storage)
{
DataStoreS3Pool::initialize(
/*max_threads*/ default_num_threads,
/*max_free_threads*/ default_num_threads / 2,
/*queue_size*/ default_num_threads * 2);
S3FileCachePool::initialize(
/*max_threads*/ default_num_threads,
/*max_free_threads*/ default_num_threads / 2,
/*queue_size*/ default_num_threads * 2);
}
}

void adjustThreadPoolSize(const Settings & settings, size_t logical_cores)
{
// TODO: make BackgroundPool/BlockableBackgroundPool/DynamicThreadPool spawned from `GlobalThreadPool`
size_t max_io_thread_count = std::ceil(settings.io_thread_count_scale * logical_cores);
// Note: Global Thread Pool must be larger than sub thread pools.
GlobalThreadPool::instance().setMaxThreads(max_io_thread_count * 200);
GlobalThreadPool::instance().setMaxFreeThreads(max_io_thread_count);
GlobalThreadPool::instance().setQueueSize(max_io_thread_count * 400);

if (RNPagePreparerPool::instance)
{
RNPagePreparerPool::instance->setMaxThreads(max_io_thread_count);
RNPagePreparerPool::instance->setMaxFreeThreads(max_io_thread_count / 2);
RNPagePreparerPool::instance->setQueueSize(max_io_thread_count * 2);
}
if (RNRemoteReadTaskPool::instance)
{
RNRemoteReadTaskPool::instance->setMaxThreads(max_io_thread_count);
RNRemoteReadTaskPool::instance->setMaxFreeThreads(max_io_thread_count / 2);
RNRemoteReadTaskPool::instance->setQueueSize(max_io_thread_count * 2);
}
if (DataStoreS3Pool::instance)
{
DataStoreS3Pool::instance->setMaxThreads(max_io_thread_count);
DataStoreS3Pool::instance->setMaxFreeThreads(max_io_thread_count / 2);
DataStoreS3Pool::instance->setQueueSize(max_io_thread_count * 2);
}
if (S3FileCachePool::instance)
{
S3FileCachePool::instance->setMaxThreads(max_io_thread_count);
S3FileCachePool::instance->setMaxFreeThreads(max_io_thread_count / 2);
S3FileCachePool::instance->setQueueSize(max_io_thread_count * 2);
}
}

void syncSchemaWithTiDB(
const TiFlashStorageConfig & storage_config,
BgStorageInitHolder & bg_init_stores,
const std::unique_ptr<Context> & global_context,
const LoggerPtr & log)
{
/// Then, sync schemas with TiDB, and initialize schema sync service.
/// If in API V2 mode, each keyspace's schema is fetch lazily.
if (storage_config.api_version == 1)
{
Stopwatch watch;
while (watch.elapsedSeconds() < global_context->getSettingsRef().ddl_restart_wait_seconds) // retry for 3 mins
{
try
{
global_context->getTMTContext().getSchemaSyncerManager()->syncSchemas(*global_context, NullspaceID);
break;
}
catch (Poco::Exception & e)
{
const int wait_seconds = 3;
LOG_ERROR(
log,
"Bootstrap failed because sync schema error: {}\nWe will sleep for {}"
" seconds and try again.",
e.displayText(),
wait_seconds);
::sleep(wait_seconds);
}
}
LOG_DEBUG(log, "Sync schemas done.");
}

// Init the DeltaMergeStore instances if data exist.
// Make the disk usage correct and prepare for serving
// queries.
bg_init_stores.start(
*global_context,
log,
storage_config.lazily_init_store,
storage_config.s3_config.isS3Enabled());

// init schema sync service with tidb
global_context->initializeSchemaSyncService();
}

>>>>>>> cf98d94329 (Cut down the wait time for sync schema in restart (#7672))
int Server::main(const std::vector<std::string> & /*args*/)
{
setThreadName("TiFlashMain");
Expand Down