Skip to content

Commit

Permalink
sentry: improve reconnect_channel logging (#2054)
Browse files Browse the repository at this point in the history
  • Loading branch information
battlmonstr authored May 30, 2024
1 parent 831b578 commit ac4d0f7
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 22 deletions.
12 changes: 7 additions & 5 deletions silkworm/infra/grpc/client/call.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,22 +118,23 @@ Task<Response> unary_rpc_with_retries(
Request request,
agrpc::GrpcContext& grpc_context,
std::function<Task<void>()>& on_disconnect,
grpc::Channel& channel) {
grpc::Channel& channel,
std::string log_prefix) {
// loop until a successful return or cancellation
while (true) {
try {
co_return (co_await unary_rpc(rpc, stub, request, grpc_context));
} catch (const GrpcStatusError& ex) {
if (is_disconnect_error(ex.status(), channel)) {
log::Warning() << "GRPC call failed: " << ex.what();
log::Warning(log_prefix) << "GRPC call failed: " << ex.what();
} else {
throw;
}
}

co_await on_disconnect();
if (channel.GetState(false) != GRPC_CHANNEL_READY) {
co_await reconnect_channel(channel);
co_await reconnect_channel(channel, log_prefix);
}
}
}
Expand All @@ -146,6 +147,7 @@ Task<void> streaming_rpc_with_retries(
agrpc::GrpcContext& grpc_context,
std::function<Task<void>()>& on_disconnect,
grpc::Channel& channel,
std::string log_prefix,
std::function<Task<void>(Response)> consumer) {
// loop until a successful return or cancellation
while (true) {
Expand All @@ -154,15 +156,15 @@ Task<void> streaming_rpc_with_retries(
break;
} catch (const GrpcStatusError& ex) {
if (is_disconnect_error(ex.status(), channel)) {
log::Warning() << "GRPC streaming call failed: " << ex.what();
log::Warning(log_prefix) << "GRPC streaming call failed: " << ex.what();
} else {
throw;
}
}

co_await on_disconnect();
if (channel.GetState(false) != GRPC_CHANNEL_READY) {
co_await reconnect_channel(channel);
co_await reconnect_channel(channel, log_prefix);
}
}
}
Expand Down
14 changes: 11 additions & 3 deletions silkworm/infra/grpc/client/reconnect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,22 @@ bool is_disconnect_error(const grpc::Status& status, grpc::Channel& channel) {
((code == grpc::StatusCode::DEADLINE_EXCEEDED) && (channel.GetState(false) != GRPC_CHANNEL_READY) && (channel.GetState(false) != GRPC_CHANNEL_SHUTDOWN));
}

Task<void> reconnect_channel(grpc::Channel& channel) {
// min_sec, min_sec*2, min_sec*4, ... max_sec, max_sec, ...
static int64_t backoff_timeout(size_t attempt, int64_t min_sec, int64_t max_sec) {
if (attempt >= 20) return max_sec;
return std::min(min_sec << attempt, max_sec);
}

Task<void> reconnect_channel(grpc::Channel& channel, std::string log_prefix) {
bool is_stopped = false;

std::function<void()> run = [&] {
bool is_connected = false;
size_t attempt = 0;
while (!is_connected && !is_stopped && (channel.GetState(false) != GRPC_CHANNEL_SHUTDOWN)) {
log::Info() << "Reconnecting grpc::Channel...";
auto deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(5, GPR_TIMESPAN));
log::Info(log_prefix) << "Reconnecting grpc::Channel...";
auto timeout = backoff_timeout(attempt++, 5, 600);
auto deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(timeout, GPR_TIMESPAN));
is_connected = channel.WaitForConnected(deadline);
}
};
Expand Down
4 changes: 3 additions & 1 deletion silkworm/infra/grpc/client/reconnect.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

#pragma once

#include <string>

#include <silkworm/infra/concurrency/task.hpp>

#include <grpcpp/grpcpp.h>

namespace silkworm::rpc {

bool is_disconnect_error(const grpc::Status& status, grpc::Channel& channel);
Task<void> reconnect_channel(grpc::Channel& channel);
Task<void> reconnect_channel(grpc::Channel& channel, std::string log_prefix);

} // namespace silkworm::rpc
28 changes: 15 additions & 13 deletions silkworm/sentry/grpc/client/sentry_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,27 +73,27 @@ class SentryClientImpl final : public api::Service {
}

Task<void> reconnect() {
co_await sw_rpc::reconnect_channel(*channel_);
co_await sw_rpc::reconnect_channel(*channel_, "sentry");
}

// rpc SetStatus(StatusData) returns (SetStatusReply);
Task<void> set_status(eth::StatusData status_data) override {
proto::StatusData request = interfaces::proto_status_data_from_status_data(status_data);
co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSetStatus, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSetStatus, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
}

// rpc HandShake(google.protobuf.Empty) returns (HandShakeReply);
Task<uint8_t> handshake() override {
google::protobuf::Empty request;
proto::HandShakeReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncHandShake, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
proto::HandShakeReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncHandShake, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
uint8_t result = interfaces::eth_version_from_protocol(reply.protocol());
co_return result;
}

// rpc NodeInfo(google.protobuf.Empty) returns(types.NodeInfoReply);
Task<NodeInfos> node_infos() override {
google::protobuf::Empty request;
types::NodeInfoReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncNodeInfo, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
types::NodeInfoReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncNodeInfo, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
auto result = interfaces::node_info_from_proto_node_info(reply);
co_return NodeInfos{result};
}
Expand All @@ -104,7 +104,7 @@ class SentryClientImpl final : public api::Service {
request.mutable_data()->CopyFrom(interfaces::outbound_data_from_message(message));
request.mutable_peer_id()->CopyFrom(interfaces::peer_id_from_public_key(public_key));

proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageById, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageById, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
auto result = interfaces::peer_keys_from_sent_peers_ids(reply);
co_return result;
}
Expand All @@ -115,15 +115,15 @@ class SentryClientImpl final : public api::Service {
request.mutable_data()->CopyFrom(interfaces::outbound_data_from_message(message));
request.set_max_peers(max_peers);

proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageToRandomPeers, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageToRandomPeers, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
auto result = interfaces::peer_keys_from_sent_peers_ids(reply);
co_return result;
}

// rpc SendMessageToAll(OutboundMessageData) returns (SentPeers);
Task<PeerKeys> send_message_to_all(Message message) override {
proto::OutboundMessageData request = interfaces::outbound_data_from_message(message);
proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageToAll, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageToAll, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
auto result = interfaces::peer_keys_from_sent_peers_ids(reply);
co_return result;
}
Expand All @@ -136,7 +136,7 @@ class SentryClientImpl final : public api::Service {
// request.set_min_block()
request.set_max_peers(max_peers);

proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageByMinBlock, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageByMinBlock, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
auto result = interfaces::peer_keys_from_sent_peers_ids(reply);
co_return result;
}
Expand All @@ -147,7 +147,7 @@ class SentryClientImpl final : public api::Service {
request.mutable_peer_id()->CopyFrom(interfaces::peer_id_from_public_key(public_key));
// TODO: set_min_block
// request.set_min_block()
co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerMinBlock, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerMinBlock, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
}

// rpc Messages(MessagesRequest) returns (stream InboundMessage);
Expand All @@ -172,21 +172,22 @@ class SentryClientImpl final : public api::Service {
grpc_context_,
on_disconnect_,
*channel_,
"sentry",
std::move(proto_consumer));
}

// rpc Peers(google.protobuf.Empty) returns (PeersReply);
Task<PeerInfos> peers() override {
google::protobuf::Empty request;
proto::PeersReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeers, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
proto::PeersReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeers, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
auto result = interfaces::peer_infos_from_proto_peers_reply(reply);
co_return result;
}

// rpc PeerCount(PeerCountRequest) returns (PeerCountReply);
Task<size_t> peer_count() override {
proto::PeerCountRequest request;
proto::PeerCountReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerCount, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
proto::PeerCountReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerCount, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
auto result = static_cast<size_t>(reply.count());
co_return result;
}
Expand All @@ -195,7 +196,7 @@ class SentryClientImpl final : public api::Service {
Task<std::optional<PeerInfo>> peer_by_id(EccPublicKey public_key) override {
proto::PeerByIdRequest request;
request.mutable_peer_id()->CopyFrom(interfaces::peer_id_from_public_key(public_key));
proto::PeerByIdReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerById, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
proto::PeerByIdReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerById, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
auto result = interfaces::peer_info_opt_from_proto_peer_reply(reply);
co_return result;
}
Expand All @@ -205,7 +206,7 @@ class SentryClientImpl final : public api::Service {
proto::PenalizePeerRequest request;
request.mutable_peer_id()->CopyFrom(interfaces::peer_id_from_public_key(public_key));
request.set_penalty(proto::PenaltyKind::Kick);
co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPenalizePeer, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPenalizePeer, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
}

// rpc PeerEvents(PeerEventsRequest) returns (stream PeerEvent);
Expand All @@ -225,6 +226,7 @@ class SentryClientImpl final : public api::Service {
grpc_context_,
on_disconnect_,
*channel_,
"sentry",
std::move(proto_consumer));
}

Expand Down

0 comments on commit ac4d0f7

Please sign in to comment.