Skip to content

Commit

Permalink
Allow YBClient callback(s) to run in a separate threadpool
Browse files Browse the repository at this point in the history
Summary:
We have this feature already for CQL side of things, where the callback from FlushAsync gets scheduled to run in a handler thread instead of running in the reactor thread. But, Redis was missing that feature.

Allow YBClient to have a separate threadpool for running callbacks. This should address the "blocked reactor for xxx issue".

Test Plan:
Jenkins for tests.
Push to cluster and see a reduction in "blocked reactor for xxx sec"

Seems to reduce the blocked reactor for xxx messages. However, they are not completely eliminated.

We only offload the callback portion to the other threads. If for some reason we take longer in a different stage, for example log trace, or waiting for a lock to update flush state, we might still run into this. They are far fewer though.

I0327 18:18:52.537797 17690 connection.cc:339] Connection (0x2305a850) server 127.0.0.1:37384 => 127.0.0.1:6379 recv error: Network error (yb/util/net/socket.cc:575): recvmsg error: Connection reset by peer (error 104)
W0327 18:19:00.353492 17815 outbound_call.cc:342] RPC callback for RPC call yb.tserver.TabletServerService.Read -> { remote: 0.0.0.0:0 idx: 0 } , state=FINISHED_SUCCESS. blocked reactor thread for 50897.8us. Trace: :
0327 18:19:00.120859 (+     0us) outbound_call.cc:184] Outbound Call initiated.
0327 18:19:00.120926 (+    67us) outbound_call.cc:390] Queued.
0327 18:19:00.120935 (+     9us) outbound_call.cc:401] Call Sent.
0327 18:19:00.302372 (+181437us) async_rpc.cc:132] AsyncRpc::Finished Begin
0327 18:19:00.302420 (+    48us) async_rpc.cc:135] AsyncRpc::Finished Done with Done() new_status = OK
0327 18:19:00.302813 (+   393us) async_rpc.cc:137] AsyncRpc::Finished ProcessedResponseFromTserver
0327 18:19:00.303020 (+   207us) async_rpc.cc:139] see line
0327 18:19:00.303024 (+     4us) batcher.cc:171] CheckForFinishedFlush
0327 18:19:00.303033 (+     9us) batcher.cc:181] Updated state
0327 18:19:00.352620 (+ 49587us) batcher.cc:188] Set FlushFinished
0327 18:19:00.352639 (+    19us) batcher.cc:205] submitting flush_callback_
0327 18:19:00.352734 (+    95us) batcher.cc:220] submitted flush_callback_
0327 18:19:00.352738 (+     4us) async_rpc.cc:141] see line
0327 18:19:00.353239 (+   501us) async_rpc.cc:144] AsyncRpc::Finished All done
0327 18:19:00.353246 (+     7us) outbound_call.cc:328] Done calling
0327 18:19:00.353251 (+     5us) outbound_call.cc:334] Done freeing
0327 18:19:00.353270 (+    19us) trace.cc:320] Dumping to list
Related trace:

Also, if we happen to run through the callback in the threadpool before the reactor thread finishes, the reactor thread might again become responsible for cleaning up the data structures. possibly taking long.

W0327 18:20:45.847632 17815 outbound_call.cc:342] RPC callback for RPC call yb.tserver.TabletServerService.Read -> { remote: 0.0.0.0:0 idx: 0 } , state=FINISHED_SUCCESS. blocked reactor thread for 58550us. Trace: :
0327 18:20:45.649150 (+     0us) outbound_call.cc:184] Outbound Call initiated.
0327 18:20:45.649230 (+    80us) outbound_call.cc:390] Queued.
0327 18:20:45.649240 (+    10us) outbound_call.cc:401] Call Sent.
0327 18:20:45.788850 (+139610us) async_rpc.cc:132] AsyncRpc::Finished Begin
0327 18:20:45.788883 (+    33us) async_rpc.cc:135] AsyncRpc::Finished Done with Done() new_status = OK
0327 18:20:45.790643 (+  1760us) async_rpc.cc:137] AsyncRpc::Finished ProcessedResponseFromTserver
0327 18:20:45.790844 (+   201us) async_rpc.cc:139] see line
0327 18:20:45.790847 (+     3us) batcher.cc:171] CheckForFinishedFlush
0327 18:20:45.790856 (+     9us) batcher.cc:181] Updated state
0327 18:20:45.790878 (+    22us) batcher.cc:188] Set FlushFinished
0327 18:20:45.790883 (+     5us) batcher.cc:205] submitting flush_callback_
0327 18:20:45.790964 (+    81us) batcher.cc:220] submitted flush_callback_
0327 18:20:45.790967 (+     3us) async_rpc.cc:141] see line
0327 18:20:45.791506 (+   539us) batcher.cc:214] cb start
0327 18:20:45.791512 (+     6us) redis_service.cc:397] BlockCallback
0327 18:20:45.791521 (+     9us) redis_service.cc:425] Responding to 251 ops
0327 18:20:45.791999 (+   478us) redis_service.cc:429] Responding to last op
0327 18:20:45.792003 (+     4us) redis_service.cc:432] Responded to all ops
0327 18:20:45.792006 (+     3us) redis_service.cc:441] Release
0327 18:20:45.792019 (+    13us) redis_service.cc:443] Release done
0327 18:20:45.792024 (+     5us) redis_service.cc:452] Processsed done
0327 18:20:45.792025 (+     1us) redis_service.cc:401] BlockCallback Done
0327 18:20:45.792027 (+     2us) redis_service.cc:403] BlockCallback block_ reset
0327 18:20:45.792029 (+     2us) batcher.cc:217] cb end
0327 18:20:45.847360 (+ 55331us) async_rpc.cc:144] AsyncRpc::Finished All done
0327 18:20:45.847377 (+    17us) outbound_call.cc:328] Done calling
0327 18:20:45.847383 (+     6us) outbound_call.cc:334] Done freeing
0327 18:20:45.847401 (+    18us) trace.cc:320] Dumping to list
Related trace:

Reviewers: kannan, mikhail, robert, sergei

Reviewed By: sergei

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D4432
  • Loading branch information
amitanandaiyer committed Apr 4, 2018
1 parent c8908ae commit c2555ed
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 26 deletions.
12 changes: 10 additions & 2 deletions src/yb/client/batcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void Batcher::Abort(const Status& status) {
if (flush_callback_) {
l.unlock();

flush_callback_(status);
RunCallback(status);
}
}

Expand Down Expand Up @@ -189,7 +189,15 @@ void Batcher::CheckForFinishedFlush() {
s = STATUS(IOError, "Some errors occurred");
}

flush_callback_(s);
RunCallback(s);
}

void Batcher::RunCallback(const Status& status) {
auto runnable = std::make_shared<yb::FunctionRunnable>(
[ cb{std::move(flush_callback_)}, status ]() { cb(status); });
if (!client_->callback_threadpool() || !client_->callback_threadpool()->Submit(runnable).ok()) {
runnable->Run();
}
}

MonoTime Batcher::ComputeDeadlineUnlocked() const {
Expand Down
3 changes: 3 additions & 0 deletions src/yb/client/batcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
RemoteTablet* tablet, InFlightOps::const_iterator begin, InFlightOps::const_iterator end,
const bool allow_local_calls_in_curr_thread);

// Calls/Schedules flush_callback_ and resets it to free resources.
void RunCallback(const Status& s);

// Log an error where an Rpc callback has response count mismatch.
void AddOpCountMismatchError();

Expand Down
5 changes: 4 additions & 1 deletion src/yb/client/client-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@
#include "yb/common/entity_ids.h"
#include "yb/common/index.h"
#include "yb/common/wire_protocol.h"
#include "yb/rpc/rpc_fwd.h"
#include "yb/rpc/rpc.h"
#include "yb/rpc/rpc_fwd.h"
#include "yb/util/atomic.h"
#include "yb/util/locks.h"
#include "yb/util/monotime.h"
#include "yb/util/net/net_util.h"
#include "yb/util/threadpool.h"

namespace yb {

Expand Down Expand Up @@ -296,6 +297,8 @@ class YBClient::Data {
// aid in detecting local tservers.
TabletServerId uuid_;

std::unique_ptr<ThreadPool> cb_threadpool_;

private:
DISALLOW_COPY_AND_ASSIGN(Data);
};
Expand Down
19 changes: 19 additions & 0 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,11 @@ YBClientBuilder& YBClientBuilder::set_client_name(const std::string& name) {
return *this;
}

YBClientBuilder& YBClientBuilder::set_callback_threadpool_size(size_t size) {
data_->threadpool_size_ = size;
return *this;
}

YBClientBuilder& YBClientBuilder::set_tserver_uuid(const TabletServerId& uuid) {
data_->uuid_ = uuid;
return *this;
Expand Down Expand Up @@ -345,6 +350,13 @@ Status YBClientBuilder::Build(shared_ptr<YBClient>* client) {
"Could not determine local host names");
c->data_->cloud_info_pb_ = data_->cloud_info_pb_;
c->data_->uuid_ = data_->uuid_;
if (data_->threadpool_size_ > 0) {
ThreadPoolBuilder tpb(data_->client_name_ + "_cb");
tpb.set_max_threads(data_->threadpool_size_);
std::unique_ptr<ThreadPool> tp;
RETURN_NOT_OK_PREPEND(tpb.Build(&tp), "Could not create callback threadpool");
c->data_->cb_threadpool_ = std::move(tp);
}

client->swap(c);
return Status::OK();
Expand All @@ -358,6 +370,9 @@ YBClient::~YBClient() {
if (data_->meta_cache_) {
data_->meta_cache_->Shutdown();
}
if (data_->cb_threadpool_) {
data_->cb_threadpool_->Shutdown();
}
delete data_;
}

Expand Down Expand Up @@ -690,6 +705,10 @@ const std::shared_ptr<rpc::Messenger>& YBClient::messenger() const {
return data_->messenger_;
}

ThreadPool *YBClient::callback_threadpool() {
return data_->cb_threadpool_.get();
}

void YBClient::LookupTabletByKey(const YBTable* table,
const std::string& partition_key,
const MonoTime& deadline,
Expand Down
8 changes: 7 additions & 1 deletion src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@
#include "yb/rpc/rpc_fwd.h"

#include "yb/util/monotime.h"
#include "yb/util/net/net_fwd.h"
#include "yb/util/result.h"
#include "yb/util/status.h"
#include "yb/util/status_callback.h"
#include "yb/util/strongly_typed_bool.h"
#include "yb/util/net/net_fwd.h"
#include "yb/util/threadpool.h"

template<class T> class scoped_refptr;

Expand Down Expand Up @@ -203,6 +204,9 @@ class YBClientBuilder {
// Sets client name to be used for naming the client's messenger/reactors.
YBClientBuilder& set_client_name(const std::string& name);

// Sets the size of the threadpool for calling callbacks.
YBClientBuilder& set_callback_threadpool_size(size_t size);

// Sets skip master leader resolution.
// Used in tests, when we do not have real master.
YBClientBuilder& set_skip_master_leader_resolution(bool value);
Expand Down Expand Up @@ -493,6 +497,8 @@ class YBClient : public std::enable_shared_from_this<YBClient> {

YBClient();

ThreadPool* callback_threadpool();

// Owned.
Data* data_;

Expand Down
3 changes: 3 additions & 0 deletions src/yb/client/client_builder-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ class YBClientBuilder::Data {
// A descriptive name for the client. Useful for embedded ybclients.
std::string client_name_ = "ybclient";

// The size of the threadpool to use for calling callbacks.
size_t threadpool_size_ = 0;

// Placement information for the client.
CloudInfoPB cloud_info_pb_;

Expand Down
4 changes: 2 additions & 2 deletions src/yb/rpc/inbound_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ void InboundCall::NotifyTransferred(const Status& status, Connection* conn) {
if (status.ok()) {
TRACE_TO(trace_, "Transfer finished");
} else {
LOG(WARNING) << "Connection torn down before " << ToString()
<< " could send its response: " << status.ToString();
YB_LOG_EVERY_N_SECS(WARNING, 10) << "Connection torn down before " << ToString()
<< " could send its response: " << status.ToString();
}
if (call_processed_listener_) {
call_processed_listener_(this);
Expand Down
20 changes: 4 additions & 16 deletions src/yb/util/threadpool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,6 @@ namespace yb {
using strings::Substitute;
using std::unique_ptr;

////////////////////////////////////////////////////////
// FunctionRunnable
////////////////////////////////////////////////////////

class FunctionRunnable : public Runnable {
public:
explicit FunctionRunnable(std::function<void()> func) : func_(std::move(func)) {}

void Run() override {
func_();
}

private:
std::function<void()> func_;
};

////////////////////////////////////////////////////////
// ThreadPoolBuilder
///////////////////////////////////////////////////////
Expand Down Expand Up @@ -437,6 +421,10 @@ Status ThreadPool::SubmitFunc(const std::function<void()>& func) {
return Submit(std::shared_ptr<Runnable>(new FunctionRunnable(func)));
}

Status ThreadPool::SubmitFunc(std::function<void()>&& func) {
return Submit(std::shared_ptr<Runnable>(new FunctionRunnable(std::move(func))));
}

Status ThreadPool::Submit(const std::shared_ptr<Runnable>& r) {
return DoSubmit(std::move(r), tokenless_.get());
}
Expand Down
24 changes: 20 additions & 4 deletions src/yb/util/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,14 @@ class ThreadPool {
void Shutdown();

// Submit a function using the yb Closure system.
CHECKED_STATUS SubmitClosure(const Closure& task) WARN_UNUSED_RESULT;
CHECKED_STATUS SubmitClosure(const Closure& task);

// Submit a function binded using std::bind(&FuncName, args...)
CHECKED_STATUS SubmitFunc(const std::function<void()>& func) WARN_UNUSED_RESULT;
CHECKED_STATUS SubmitFunc(const std::function<void()>& func);
CHECKED_STATUS SubmitFunc(std::function<void()>&& func);

// Submit a Runnable class
CHECKED_STATUS Submit(const std::shared_ptr<Runnable>& task)
WARN_UNUSED_RESULT;
CHECKED_STATUS Submit(const std::shared_ptr<Runnable>& task);

// Wait until all the tasks are completed.
void Wait();
Expand Down Expand Up @@ -431,5 +431,21 @@ class ThreadPoolToken {
DISALLOW_COPY_AND_ASSIGN(ThreadPoolToken);
};

////////////////////////////////////////////////////////
// FunctionRunnable
////////////////////////////////////////////////////////

class FunctionRunnable : public Runnable {
public:
explicit FunctionRunnable(std::function<void()> func) : func_(std::move(func)) {}

void Run() override {
func_();
}

private:
std::function<void()> func_;
};

} // namespace yb
#endif // YB_UTIL_THREADPOOL_H
7 changes: 7 additions & 0 deletions src/yb/yql/redis/redisserver/redis_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ DEFINE_int32(redis_max_command_size, 253_MB,
// Maximum value size is 64MB
DEFINE_int32(redis_max_value_size, 64_MB,
"Maximum size of the value in redis");
DEFINE_int32(redis_callbacks_threadpool_size, 64,
"The maximum size for the threadpool which handles callbacks from the ybclient layer");

DEFINE_bool(redis_safe_batch, true, "Use safe batching with Redis service");

Expand Down Expand Up @@ -377,6 +379,10 @@ class Block : public std::enable_shared_from_this<Block> {
explicit BlockCallback(BlockPtr block) : block_(std::move(block)) {}

void operator()(const Status& status) {
// Block context owns the arena upon which this block is created.
// Done is going to free up block's reference to context. So, unless we ensure that
// the context lives beyond the block_.reset() we might get an error while updating the
// ref-count for the block_ (in the area of arena owned by the context).
auto context = block_->context_;
DCHECK(context != nullptr);
block_->Done(status);
Expand Down Expand Up @@ -862,6 +868,7 @@ Status RedisServiceImpl::Impl::SetUpYBClient() {
client_builder.add_master_server_addr(yb_tier_master_addresses_);
client_builder.set_metric_entity(server_->metric_entity());
client_builder.set_parent_mem_tracker(server_->mem_tracker());
client_builder.set_callback_threadpool_size(FLAGS_redis_callbacks_threadpool_size);
RETURN_NOT_OK(client_builder.Build(&client_));

// Add proxy to call local tserver if available.
Expand Down

0 comments on commit c2555ed

Please sign in to comment.