Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sentry: improve reconnect_channel logging #2054

Merged
merged 1 commit into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions silkworm/infra/grpc/client/call.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,22 +118,23 @@ Task<Response> unary_rpc_with_retries(
Request request,
agrpc::GrpcContext& grpc_context,
std::function<Task<void>()>& on_disconnect,
grpc::Channel& channel) {
grpc::Channel& channel,
std::string log_prefix) {
// loop until a successful return or cancellation
while (true) {
try {
co_return (co_await unary_rpc(rpc, stub, request, grpc_context));
} catch (const GrpcStatusError& ex) {
if (is_disconnect_error(ex.status(), channel)) {
log::Warning() << "GRPC call failed: " << ex.what();
log::Warning(log_prefix) << "GRPC call failed: " << ex.what();
} else {
throw;
}
}

co_await on_disconnect();
if (channel.GetState(false) != GRPC_CHANNEL_READY) {
co_await reconnect_channel(channel);
co_await reconnect_channel(channel, log_prefix);
}
}
}
Expand All @@ -146,6 +147,7 @@ Task<void> streaming_rpc_with_retries(
agrpc::GrpcContext& grpc_context,
std::function<Task<void>()>& on_disconnect,
grpc::Channel& channel,
std::string log_prefix,
std::function<Task<void>(Response)> consumer) {
// loop until a successful return or cancellation
while (true) {
Expand All @@ -154,15 +156,15 @@ Task<void> streaming_rpc_with_retries(
break;
} catch (const GrpcStatusError& ex) {
if (is_disconnect_error(ex.status(), channel)) {
log::Warning() << "GRPC streaming call failed: " << ex.what();
log::Warning(log_prefix) << "GRPC streaming call failed: " << ex.what();
} else {
throw;
}
}

co_await on_disconnect();
if (channel.GetState(false) != GRPC_CHANNEL_READY) {
co_await reconnect_channel(channel);
co_await reconnect_channel(channel, log_prefix);
}
}
}
Expand Down
14 changes: 11 additions & 3 deletions silkworm/infra/grpc/client/reconnect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,22 @@ bool is_disconnect_error(const grpc::Status& status, grpc::Channel& channel) {
((code == grpc::StatusCode::DEADLINE_EXCEEDED) && (channel.GetState(false) != GRPC_CHANNEL_READY) && (channel.GetState(false) != GRPC_CHANNEL_SHUTDOWN));
}

Task<void> reconnect_channel(grpc::Channel& channel) {
// min_sec, min_sec*2, min_sec*4, ... max_sec, max_sec, ...
static int64_t backoff_timeout(size_t attempt, int64_t min_sec, int64_t max_sec) {
if (attempt >= 20) return max_sec;
return std::min(min_sec << attempt, max_sec);
}

Task<void> reconnect_channel(grpc::Channel& channel, std::string log_prefix) {
bool is_stopped = false;

std::function<void()> run = [&] {
bool is_connected = false;
size_t attempt = 0;
while (!is_connected && !is_stopped && (channel.GetState(false) != GRPC_CHANNEL_SHUTDOWN)) {
log::Info() << "Reconnecting grpc::Channel...";
auto deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(5, GPR_TIMESPAN));
log::Info(log_prefix) << "Reconnecting grpc::Channel...";
auto timeout = backoff_timeout(attempt++, 5, 600);
auto deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(timeout, GPR_TIMESPAN));
is_connected = channel.WaitForConnected(deadline);
}
};
Expand Down
4 changes: 3 additions & 1 deletion silkworm/infra/grpc/client/reconnect.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

#pragma once

#include <string>

#include <silkworm/infra/concurrency/task.hpp>

#include <grpcpp/grpcpp.h>

namespace silkworm::rpc {

bool is_disconnect_error(const grpc::Status& status, grpc::Channel& channel);
Task<void> reconnect_channel(grpc::Channel& channel);
Task<void> reconnect_channel(grpc::Channel& channel, std::string log_prefix);

} // namespace silkworm::rpc
28 changes: 15 additions & 13 deletions silkworm/sentry/grpc/client/sentry_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,27 +73,27 @@ class SentryClientImpl final : public api::Service {
}

Task<void> reconnect() {
co_await sw_rpc::reconnect_channel(*channel_);
co_await sw_rpc::reconnect_channel(*channel_, "sentry");
}

// rpc SetStatus(StatusData) returns (SetStatusReply);
Task<void> set_status(eth::StatusData status_data) override {
proto::StatusData request = interfaces::proto_status_data_from_status_data(status_data);
co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSetStatus, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSetStatus, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
}

// rpc HandShake(google.protobuf.Empty) returns (HandShakeReply);
Task<uint8_t> handshake() override {
google::protobuf::Empty request;
proto::HandShakeReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncHandShake, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
proto::HandShakeReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncHandShake, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
uint8_t result = interfaces::eth_version_from_protocol(reply.protocol());
co_return result;
}

// rpc NodeInfo(google.protobuf.Empty) returns(types.NodeInfoReply);
Task<NodeInfos> node_infos() override {
google::protobuf::Empty request;
types::NodeInfoReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncNodeInfo, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
types::NodeInfoReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncNodeInfo, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
auto result = interfaces::node_info_from_proto_node_info(reply);
co_return NodeInfos{result};
}
Expand All @@ -104,7 +104,7 @@ class SentryClientImpl final : public api::Service {
request.mutable_data()->CopyFrom(interfaces::outbound_data_from_message(message));
request.mutable_peer_id()->CopyFrom(interfaces::peer_id_from_public_key(public_key));

proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageById, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageById, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
auto result = interfaces::peer_keys_from_sent_peers_ids(reply);
co_return result;
}
Expand All @@ -115,15 +115,15 @@ class SentryClientImpl final : public api::Service {
request.mutable_data()->CopyFrom(interfaces::outbound_data_from_message(message));
request.set_max_peers(max_peers);

proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageToRandomPeers, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageToRandomPeers, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
auto result = interfaces::peer_keys_from_sent_peers_ids(reply);
co_return result;
}

// rpc SendMessageToAll(OutboundMessageData) returns (SentPeers);
Task<PeerKeys> send_message_to_all(Message message) override {
proto::OutboundMessageData request = interfaces::outbound_data_from_message(message);
proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageToAll, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageToAll, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
auto result = interfaces::peer_keys_from_sent_peers_ids(reply);
co_return result;
}
Expand All @@ -136,7 +136,7 @@ class SentryClientImpl final : public api::Service {
// request.set_min_block()
request.set_max_peers(max_peers);

proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageByMinBlock, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageByMinBlock, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
auto result = interfaces::peer_keys_from_sent_peers_ids(reply);
co_return result;
}
Expand All @@ -147,7 +147,7 @@ class SentryClientImpl final : public api::Service {
request.mutable_peer_id()->CopyFrom(interfaces::peer_id_from_public_key(public_key));
// TODO: set_min_block
// request.set_min_block()
co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerMinBlock, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerMinBlock, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
}

// rpc Messages(MessagesRequest) returns (stream InboundMessage);
Expand All @@ -172,21 +172,22 @@ class SentryClientImpl final : public api::Service {
grpc_context_,
on_disconnect_,
*channel_,
"sentry",
std::move(proto_consumer));
}

// rpc Peers(google.protobuf.Empty) returns (PeersReply);
Task<PeerInfos> peers() override {
google::protobuf::Empty request;
proto::PeersReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeers, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
proto::PeersReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeers, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
auto result = interfaces::peer_infos_from_proto_peers_reply(reply);
co_return result;
}

// rpc PeerCount(PeerCountRequest) returns (PeerCountReply);
Task<size_t> peer_count() override {
proto::PeerCountRequest request;
proto::PeerCountReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerCount, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
proto::PeerCountReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerCount, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
auto result = static_cast<size_t>(reply.count());
co_return result;
}
Expand All @@ -195,7 +196,7 @@ class SentryClientImpl final : public api::Service {
Task<std::optional<PeerInfo>> peer_by_id(EccPublicKey public_key) override {
proto::PeerByIdRequest request;
request.mutable_peer_id()->CopyFrom(interfaces::peer_id_from_public_key(public_key));
proto::PeerByIdReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerById, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
proto::PeerByIdReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerById, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
auto result = interfaces::peer_info_opt_from_proto_peer_reply(reply);
co_return result;
}
Expand All @@ -205,7 +206,7 @@ class SentryClientImpl final : public api::Service {
proto::PenalizePeerRequest request;
request.mutable_peer_id()->CopyFrom(interfaces::peer_id_from_public_key(public_key));
request.set_penalty(proto::PenaltyKind::Kick);
co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPenalizePeer, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPenalizePeer, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
}

// rpc PeerEvents(PeerEventsRequest) returns (stream PeerEvent);
Expand All @@ -225,6 +226,7 @@ class SentryClientImpl final : public api::Service {
grpc_context_,
on_disconnect_,
*channel_,
"sentry",
std::move(proto_consumer));
}

Expand Down
Loading