Skip to content

Commit

Permalink
Backport #54815 to 23.8: Adjusting num_streams by expected work in …
Browse files Browse the repository at this point in the history
…StorageS3 (#54892)
  • Loading branch information
robot-clickhouse-ci-2 authored Sep 23, 2023
1 parent ea3d886 commit a436016
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 5 deletions.
5 changes: 5 additions & 0 deletions src/Storages/S3Queue/S3QueueSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ StorageS3QueueSource::KeyWithInfo StorageS3QueueSource::QueueGlobIterator::next(
return KeyWithInfo();
}

size_t StorageS3QueueSource::QueueGlobIterator::estimatedKeysCount()
{
return keys_buf.size();
}

StorageS3QueueSource::StorageS3QueueSource(
const ReadFromFormatInfo & info,
const String & format_,
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/S3Queue/S3QueueSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class StorageS3QueueSource : public ISource, WithContext
Strings
filterProcessingFiles(const S3QueueMode & engine_mode, std::unordered_set<String> & exclude_keys, const String & max_file = "");

size_t estimatedKeysCount() override;

private:
UInt64 max_poll_size;
KeysWithInfo keys_buf;
Expand Down
67 changes: 64 additions & 3 deletions src/Storages/StorageS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ class StorageS3Source::DisclosedGlobIterator::Impl : WithContext
return nextAssumeLocked();
}

size_t objectsCount()
{
return buffer.size();
}

~Impl()
{
list_objects_pool.wait();
Expand Down Expand Up @@ -224,7 +229,6 @@ class StorageS3Source::DisclosedGlobIterator::Impl : WithContext
void fillInternalBufferAssumeLocked()
{
buffer.clear();

assert(outcome_future.valid());
auto outcome = outcome_future.get();

Expand Down Expand Up @@ -364,6 +368,11 @@ StorageS3Source::KeyWithInfo StorageS3Source::DisclosedGlobIterator::next()
return pimpl->next();
}

size_t StorageS3Source::DisclosedGlobIterator::estimatedKeysCount()
{
return pimpl->objectsCount();
}

class StorageS3Source::KeysIterator::Impl : WithContext
{
public:
Expand Down Expand Up @@ -425,6 +434,11 @@ class StorageS3Source::KeysIterator::Impl : WithContext
return {key, info};
}

size_t objectsCount()
{
return keys.size();
}

private:
Strings keys;
std::atomic_size_t index = 0;
Expand Down Expand Up @@ -459,6 +473,44 @@ StorageS3Source::KeyWithInfo StorageS3Source::KeysIterator::next()
return pimpl->next();
}

size_t StorageS3Source::KeysIterator::estimatedKeysCount()
{
return pimpl->objectsCount();
}

StorageS3Source::ReadTaskIterator::ReadTaskIterator(
const DB::ReadTaskCallback & callback_,
const size_t max_threads_count)
: callback(callback_)
{
ThreadPool pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, max_threads_count);
auto pool_scheduler = threadPoolCallbackRunner<String>(pool, "S3ReadTaskItr");

std::vector<std::future<String>> keys;
keys.reserve(max_threads_count);
for (size_t i = 0; i < max_threads_count; ++i)
keys.push_back(pool_scheduler([this] { return callback(); }, Priority{}));

pool.wait();
buffer.reserve(max_threads_count);
for (auto & key_future : keys)
buffer.emplace_back(key_future.get(), std::nullopt);
}

StorageS3Source::KeyWithInfo StorageS3Source::ReadTaskIterator::next()
{
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= buffer.size())
return {callback(), {}};

return buffer[current_index];
}

size_t StorageS3Source::ReadTaskIterator::estimatedKeysCount()
{
return buffer.size();
}

StorageS3Source::StorageS3Source(
const ReadFromFormatInfo & info,
const String & format_,
Expand Down Expand Up @@ -965,7 +1017,7 @@ std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
{
if (distributed_processing)
{
return std::make_shared<StorageS3Source::ReadTaskIterator>(local_context->getReadTaskCallback());
return std::make_shared<StorageS3Source::ReadTaskIterator>(local_context->getReadTaskCallback(), local_context->getSettingsRef().max_threads);
}
else if (configuration.withGlobs())
{
Expand Down Expand Up @@ -1017,13 +1069,22 @@ Pipe StorageS3::read(
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(
query_configuration, distributed_processing, local_context, query_info.query, virtual_columns, nullptr, local_context->getFileProgressCallback());

size_t estimated_keys_count = iterator_wrapper->estimatedKeysCount();
if (estimated_keys_count > 1)
num_streams = std::min(num_streams, estimated_keys_count);
else
/// Disclosed glob iterator can underestimate the amount of keys in some cases. We will keep one stream for this particular case.
num_streams = 1;

auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files;

const size_t max_threads = local_context->getSettingsRef().max_threads;
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / num_streams);
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / std::max(num_streams, 1ul));
LOG_DEBUG(&Poco::Logger::get("StorageS3"), "Reading in {} streams, {} threads per stream", num_streams, max_parsing_threads);

pipes.reserve(num_streams);
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageS3Source>(
Expand Down
16 changes: 14 additions & 2 deletions src/Storages/StorageS3.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ class StorageS3Source : public ISource, WithContext
virtual ~IIterator() = default;
virtual KeyWithInfo next() = 0;

/// Estimates how many streams we need to process all files.
/// If keys count >= max_threads_count, the returned number may not represent the actual number of the keys.
/// Intended to be called before any next() calls, may underestimate otherwise
/// fixme: May underestimate if the glob has a strong filter, so there are few matches among the first 1000 ListObjects results.
virtual size_t estimatedKeysCount() = 0;

KeyWithInfo operator ()() { return next(); }
};

Expand All @@ -77,6 +83,7 @@ class StorageS3Source : public ISource, WithContext
std::function<void(FileProgress)> progress_callback_ = {});

KeyWithInfo next() override;
size_t estimatedKeysCount() override;

private:
class Impl;
Expand All @@ -100,6 +107,7 @@ class StorageS3Source : public ISource, WithContext
std::function<void(FileProgress)> progress_callback_ = {});

KeyWithInfo next() override;
size_t estimatedKeysCount() override;

private:
class Impl;
Expand All @@ -110,11 +118,15 @@ class StorageS3Source : public ISource, WithContext
class ReadTaskIterator : public IIterator
{
public:
explicit ReadTaskIterator(const ReadTaskCallback & callback_) : callback(callback_) {}
explicit ReadTaskIterator(const ReadTaskCallback & callback_, const size_t max_threads_count);

KeyWithInfo next() override { return {callback(), {}}; }
KeyWithInfo next() override;
size_t estimatedKeysCount() override;

private:
KeysWithInfo buffer;
std::atomic_size_t index = 0;

ReadTaskCallback callback;
};

Expand Down

0 comments on commit a436016

Please sign in to comment.