Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sentry: add exception handling for main tasks #2032

Merged
merged 3 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion silkworm/sentry/discovery/discovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,16 @@ Discovery::~Discovery() {
}

Task<void> Discovery::run() {
return p_impl_->run();
try {
return p_impl_->run();
} catch (const boost::system::system_error& se) {
if (se.code() == boost::system::errc::operation_canceled) {
log::Debug("sentry") << "Discovery::run unexpected end [operation_canceled]";
} else {
log::Critical("sentry") << "Discovery::run unexpected end [" + std::string{se.what()} + "]";
}
throw se;
}
}

Task<std::vector<Discovery::PeerCandidate>> Discovery::request_peer_candidates(
Expand Down
27 changes: 26 additions & 1 deletion silkworm/sentry/rlpx/peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,27 @@ Task<void> Peer::handle() {
}
log::Error("sentry") << "Peer::handle system_error: " << ex.what();
throw;
} catch (const std::nested_exception& ne) {
try {
ne.rethrow_nested();
} catch (const DisconnectedError&) {
log::Debug("sentry") << "Peer::handle nested disconnection error";
auto reason = disconnect_reason_.get().value_or(DisconnectReason::DisconnectRequested);
disconnect_reason_.set({reason});
co_return;
} catch (const boost::system::system_error& ex) {
if (is_fatal_network_error(ex)) {
log::Debug("sentry") << "Peer::handle nested network error: " << ex.what();
auto reason = disconnect_reason_.get().value_or(DisconnectReason::NetworkError);
disconnect_reason_.set({reason});
co_return;
} else if (ex.code() == boost::system::errc::operation_canceled) {
log::Debug("sentry") << "Peer::handle nested cancellation";
co_return;
}
log::Error("sentry") << "Peer::handle nested system_error: " << ex.what();
throw;
}
} catch (const std::exception& ex) {
log::Error("sentry") << "Peer::handle exception: " << ex.what();
throw;
Expand Down Expand Up @@ -313,7 +334,11 @@ void Peer::close() {
}

void Peer::post_message(const std::shared_ptr<Peer>& peer, const Message& message) {
peer->send_message_tasks_.spawn(peer->strand_, Peer::send_message(peer, message));
try {
peer->send_message_tasks_.spawn(peer->strand_, Peer::send_message(peer, message));
} catch (const concurrency::TaskGroup::SpawnAfterCloseError&) {
log::Warning("sentry") << "Peer::post_message cannot spawn send_message after close";
}
}

Task<void> Peer::send_message(std::shared_ptr<Peer> peer, Message message) {
Expand Down
12 changes: 11 additions & 1 deletion silkworm/sentry/rlpx/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,17 @@ Task<void> Server::run(
while (acceptor.is_open()) {
auto client_executor = executor_pool.any_executor();
SocketStream stream{client_executor};
co_await acceptor.async_accept(stream.socket(), use_awaitable);
try {
co_await acceptor.async_accept(stream.socket(), use_awaitable);
} catch (const boost::system::system_error& ex) {
if (ex.code() == boost::system::errc::invalid_argument) {
log::Error("sentry") << "Sentry RLPx server got invalid_argument on accept port=" << port_;
continue;
} else {
log::Critical("sentry") << "Sentry RLPx server unexpected end [" + std::string{ex.what()} + "]";
throw;
}
}

auto remote_endpoint = stream.socket().remote_endpoint();
log::Debug("sentry") << "rlpx::Server client connected from " << remote_endpoint;
Expand Down
82 changes: 68 additions & 14 deletions silkworm/sentry/sentry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,24 @@ Task<void> SentryImpl::run_tasks() {
co_await status_manager_.wait_for_status();
log::Info("sentry") << "Sentry received initial status message";

co_await (
run_status_manager() &&
run_server() &&
run_discovery() &&
run_peer_manager() &&
run_message_sender() &&
run_message_receiver() &&
run_peer_manager_api() &&
run_peer_discovery_feedback());
try {
co_await (
run_status_manager() &&
run_server() &&
run_discovery() &&
run_peer_manager() &&
run_message_sender() &&
run_message_receiver() &&
run_peer_manager_api() &&
run_peer_discovery_feedback());
} catch (const boost::system::system_error& se) {
if (se.code() == boost::system::errc::operation_canceled) {
log::Debug("sentry") << "Sentry run_tasks unexpected end [operation_canceled]";
} else {
log::Critical("sentry") << "Sentry run_tasks unexpected end [" + std::string{se.what()} + "]";
}
throw se;
}
}

std::unique_ptr<rlpx::Protocol> SentryImpl::make_protocol() {
Expand Down Expand Up @@ -234,23 +243,68 @@ Task<void> SentryImpl::run_discovery() {
}

Task<void> SentryImpl::run_peer_manager() {
return peer_manager_.run(rlpx_server_, discovery_, make_protocol(), client_factory());
try {
return peer_manager_.run(rlpx_server_, discovery_, make_protocol(), client_factory());
} catch (const boost::system::system_error& se) {
if (se.code() == boost::system::errc::operation_canceled) {
log::Debug("sentry") << "run_peer_manager unexpected end [operation_canceled]";
} else {
log::Critical("sentry") << "run_peer_manager unexpected end [" + std::string{se.what()} + "]";
}
throw se;
}
}

Task<void> SentryImpl::run_message_sender() {
return message_sender_.run(peer_manager_);
try {
return message_sender_.run(peer_manager_);
} catch (const boost::system::system_error& se) {
if (se.code() == boost::system::errc::operation_canceled) {
log::Debug("sentry") << "run_message_sender unexpected end [operation_canceled]";
} else {
log::Critical("sentry") << "run_message_sender unexpected end [" + std::string{se.what()} + "]";
}
throw se;
}
}

Task<void> SentryImpl::run_message_receiver() {
return MessageReceiver::run(message_receiver_, peer_manager_);
try {
return MessageReceiver::run(message_receiver_, peer_manager_);
} catch (const boost::system::system_error& se) {
if (se.code() == boost::system::errc::operation_canceled) {
log::Debug("sentry") << "run_message_receiver unexpected end [operation_canceled]";
} else {
log::Critical("sentry") << "run_message_receiver unexpected end [" + std::string{se.what()} + "]";
}
throw se;
}
}

Task<void> SentryImpl::run_peer_manager_api() {
return PeerManagerApi::run(peer_manager_api_);
try {
return PeerManagerApi::run(peer_manager_api_);
} catch (const boost::system::system_error& se) {
if (se.code() == boost::system::errc::operation_canceled) {
log::Debug("sentry") << "run_peer_manager_api unexpected end [operation_canceled]";
} else {
log::Critical("sentry") << "run_peer_manager_api unexpected end [" + std::string{se.what()} + "]";
}
throw se;
}
}

Task<void> SentryImpl::run_peer_discovery_feedback() {
return PeerDiscoveryFeedback::run(peer_discovery_feedback_, peer_manager_, discovery_);
try {
return PeerDiscoveryFeedback::run(peer_discovery_feedback_, peer_manager_, discovery_);
} catch (const boost::system::system_error& se) {
if (se.code() == boost::system::errc::operation_canceled) {
log::Debug("sentry") << "run_peer_discovery_feedback unexpected end [operation_canceled]";
} else {
log::Critical("sentry") << "run_peer_discovery_feedback unexpected end [" + std::string{se.what()} + "]";
}
throw se;
}
}

Task<void> SentryImpl::run_grpc_server() {
Expand Down
15 changes: 12 additions & 3 deletions silkworm/sentry/status_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,18 @@ Task<void> StatusManager::wait_for_status() {
}

Task<void> StatusManager::run() {
// loop until wait_for_status() throws a cancelled exception
while (true) {
co_await wait_for_status();
try {
// loop until wait_for_status() throws a cancelled exception
while (true) {
co_await wait_for_status();
}
} catch (const boost::system::system_error& se) {
if (se.code() == boost::system::errc::operation_canceled) {
log::Debug("sentry") << "StatusManager::run unexpected end [operation_canceled]";
} else {
log::Critical("sentry") << "StatusManager::run unexpected end [" + std::string{se.what()} + "]";
}
throw se;
}
}

Expand Down
Loading