diff --git a/silkworm/infra/grpc/client/call.hpp b/silkworm/infra/grpc/client/call.hpp index 3930b6c383..ab7fefcd6e 100644 --- a/silkworm/infra/grpc/client/call.hpp +++ b/silkworm/infra/grpc/client/call.hpp @@ -118,14 +118,15 @@ Task unary_rpc_with_retries( Request request, agrpc::GrpcContext& grpc_context, std::function()>& on_disconnect, - grpc::Channel& channel) { + grpc::Channel& channel, + std::string log_prefix) { // loop until a successful return or cancellation while (true) { try { co_return (co_await unary_rpc(rpc, stub, request, grpc_context)); } catch (const GrpcStatusError& ex) { if (is_disconnect_error(ex.status(), channel)) { - log::Warning() << "GRPC call failed: " << ex.what(); + log::Warning(log_prefix) << "GRPC call failed: " << ex.what(); } else { throw; } @@ -133,7 +134,7 @@ Task unary_rpc_with_retries( co_await on_disconnect(); if (channel.GetState(false) != GRPC_CHANNEL_READY) { - co_await reconnect_channel(channel); + co_await reconnect_channel(channel, log_prefix); } } } @@ -146,6 +147,7 @@ Task streaming_rpc_with_retries( agrpc::GrpcContext& grpc_context, std::function()>& on_disconnect, grpc::Channel& channel, + std::string log_prefix, std::function(Response)> consumer) { // loop until a successful return or cancellation while (true) { @@ -154,7 +156,7 @@ Task streaming_rpc_with_retries( break; } catch (const GrpcStatusError& ex) { if (is_disconnect_error(ex.status(), channel)) { - log::Warning() << "GRPC streaming call failed: " << ex.what(); + log::Warning(log_prefix) << "GRPC streaming call failed: " << ex.what(); } else { throw; } @@ -162,7 +164,7 @@ Task streaming_rpc_with_retries( co_await on_disconnect(); if (channel.GetState(false) != GRPC_CHANNEL_READY) { - co_await reconnect_channel(channel); + co_await reconnect_channel(channel, log_prefix); } } } diff --git a/silkworm/infra/grpc/client/reconnect.cpp b/silkworm/infra/grpc/client/reconnect.cpp index e9e05fd5d8..cd393f0aa0 100644 --- a/silkworm/infra/grpc/client/reconnect.cpp +++ b/silkworm/infra/grpc/client/reconnect.cpp @@ -29,14 +29,22 @@ bool is_disconnect_error(const grpc::Status& status, grpc::Channel& channel) { ((code == grpc::StatusCode::DEADLINE_EXCEEDED) && (channel.GetState(false) != GRPC_CHANNEL_READY) && (channel.GetState(false) != GRPC_CHANNEL_SHUTDOWN)); } -Task reconnect_channel(grpc::Channel& channel) { +// min_sec, min_sec*2, min_sec*4, ... max_sec, max_sec, ... +static int64_t backoff_timeout(size_t attempt, int64_t min_sec, int64_t max_sec) { + if (attempt >= 20) return max_sec; + return std::min(min_sec << attempt, max_sec); +} + +Task reconnect_channel(grpc::Channel& channel, std::string log_prefix) { bool is_stopped = false; std::function run = [&] { bool is_connected = false; + size_t attempt = 0; while (!is_connected && !is_stopped && (channel.GetState(false) != GRPC_CHANNEL_SHUTDOWN)) { - log::Info() << "Reconnecting grpc::Channel..."; - auto deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(5, GPR_TIMESPAN)); + log::Info(log_prefix) << "Reconnecting grpc::Channel..."; + auto timeout = backoff_timeout(attempt++, 5, 600); + auto deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(timeout, GPR_TIMESPAN)); is_connected = channel.WaitForConnected(deadline); } }; diff --git a/silkworm/infra/grpc/client/reconnect.hpp b/silkworm/infra/grpc/client/reconnect.hpp index a8176aadbb..590aaec0db 100644 --- a/silkworm/infra/grpc/client/reconnect.hpp +++ b/silkworm/infra/grpc/client/reconnect.hpp @@ -16,6 +16,8 @@ #pragma once +#include + #include #include @@ -23,6 +25,6 @@ namespace silkworm::rpc { bool is_disconnect_error(const grpc::Status& status, grpc::Channel& channel); -Task reconnect_channel(grpc::Channel& channel); +Task reconnect_channel(grpc::Channel& channel, std::string log_prefix); } // namespace silkworm::rpc diff --git a/silkworm/sentry/grpc/client/sentry_client.cpp b/silkworm/sentry/grpc/client/sentry_client.cpp index e495f96e40..cad018f6d5 100644 --- a/silkworm/sentry/grpc/client/sentry_client.cpp +++ b/silkworm/sentry/grpc/client/sentry_client.cpp @@ -73,19 +73,19 @@ class SentryClientImpl final : public api::Service { } Task reconnect() { - co_await sw_rpc::reconnect_channel(*channel_); + co_await sw_rpc::reconnect_channel(*channel_, "sentry"); } // rpc SetStatus(StatusData) returns (SetStatusReply); Task set_status(eth::StatusData status_data) override { proto::StatusData request = interfaces::proto_status_data_from_status_data(status_data); - co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSetStatus, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_); + co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSetStatus, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry"); } // rpc HandShake(google.protobuf.Empty) returns (HandShakeReply); Task handshake() override { google::protobuf::Empty request; - proto::HandShakeReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncHandShake, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_); + proto::HandShakeReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncHandShake, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry"); uint8_t result = interfaces::eth_version_from_protocol(reply.protocol()); co_return result; } @@ -93,7 +93,7 @@ class SentryClientImpl final : public api::Service { // rpc NodeInfo(google.protobuf.Empty) returns(types.NodeInfoReply); Task node_infos() override { google::protobuf::Empty request; - types::NodeInfoReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncNodeInfo, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_); + types::NodeInfoReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncNodeInfo, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry"); auto result = interfaces::node_info_from_proto_node_info(reply); co_return NodeInfos{result}; } @@ -104,7 +104,7 @@ class SentryClientImpl final : public api::Service { request.mutable_data()->CopyFrom(interfaces::outbound_data_from_message(message)); request.mutable_peer_id()->CopyFrom(interfaces::peer_id_from_public_key(public_key)); - proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageById, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_); + proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageById, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry"); auto result = interfaces::peer_keys_from_sent_peers_ids(reply); co_return result; } @@ -115,7 +115,7 @@ class SentryClientImpl final : public api::Service { request.mutable_data()->CopyFrom(interfaces::outbound_data_from_message(message)); request.set_max_peers(max_peers); - proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageToRandomPeers, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_); + proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageToRandomPeers, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry"); auto result = interfaces::peer_keys_from_sent_peers_ids(reply); co_return result; } @@ -123,7 +123,7 @@ class SentryClientImpl final : public api::Service { // rpc SendMessageToAll(OutboundMessageData) returns (SentPeers); Task send_message_to_all(Message message) override { proto::OutboundMessageData request = interfaces::outbound_data_from_message(message); - proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageToAll, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_); + proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageToAll, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry"); auto result = interfaces::peer_keys_from_sent_peers_ids(reply); co_return result; } @@ -136,7 +136,7 @@ class SentryClientImpl final : public api::Service { // request.set_min_block() request.set_max_peers(max_peers); - proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageByMinBlock, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_); + proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageByMinBlock, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry"); auto result = interfaces::peer_keys_from_sent_peers_ids(reply); co_return result; } @@ -147,7 +147,7 @@ class SentryClientImpl final : public api::Service { request.mutable_peer_id()->CopyFrom(interfaces::peer_id_from_public_key(public_key)); // TODO: set_min_block // request.set_min_block() - co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerMinBlock, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_); + co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerMinBlock, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry"); } // rpc Messages(MessagesRequest) returns (stream InboundMessage); @@ -172,13 +172,14 @@ class SentryClientImpl final : public api::Service { grpc_context_, on_disconnect_, *channel_, + "sentry", std::move(proto_consumer)); } // rpc Peers(google.protobuf.Empty) returns (PeersReply); Task peers() override { google::protobuf::Empty request; - proto::PeersReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeers, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_); + proto::PeersReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeers, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry"); auto result = interfaces::peer_infos_from_proto_peers_reply(reply); co_return result; } @@ -186,7 +187,7 @@ class SentryClientImpl final : public api::Service { // rpc PeerCount(PeerCountRequest) returns (PeerCountReply); Task peer_count() override { proto::PeerCountRequest request; - proto::PeerCountReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerCount, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_); + proto::PeerCountReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerCount, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry"); auto result = static_cast(reply.count()); co_return result; } @@ -195,7 +196,7 @@ class SentryClientImpl final : public api::Service { Task> peer_by_id(EccPublicKey public_key) override { proto::PeerByIdRequest request; request.mutable_peer_id()->CopyFrom(interfaces::peer_id_from_public_key(public_key)); - proto::PeerByIdReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerById, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_); + proto::PeerByIdReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerById, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry"); auto result = interfaces::peer_info_opt_from_proto_peer_reply(reply); co_return result; } @@ -205,7 +206,7 @@ class SentryClientImpl final : public api::Service { proto::PenalizePeerRequest request; request.mutable_peer_id()->CopyFrom(interfaces::peer_id_from_public_key(public_key)); request.set_penalty(proto::PenaltyKind::Kick); - co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPenalizePeer, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_); + co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPenalizePeer, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry"); } // rpc PeerEvents(PeerEventsRequest) returns (stream PeerEvent); @@ -225,6 +226,7 @@ class SentryClientImpl final : public api::Service { grpc_context_, on_disconnect_, *channel_, + "sentry", std::move(proto_consumer)); }