Skip to content

Commit

Permalink
fix(rpc): workaround for tonic hanging on large error message (#18639)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao authored Sep 23, 2024
1 parent a5ec3ea commit d5dbc6a
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 30 deletions.
29 changes: 15 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ normal = ["workspace-hack"]
[dependencies]
auto_impl = "1"
bytes = "1"
cfg-or-panic = "0.2"
clap = { workspace = true }
easy-ext = "1"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
Expand Down
46 changes: 30 additions & 16 deletions src/common/metrics/src/monitor/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::sync::LazyLock;
use std::task::{Context, Poll};
use std::time::Duration;

use cfg_or_panic::cfg_or_panic;
use futures::FutureExt;
use http::Uri;
use hyper_util::client::legacy::connect::dns::{GaiAddrs, GaiFuture, GaiResolver, Name};
Expand Down Expand Up @@ -591,23 +592,40 @@ impl Service<Name> for MonitoredGaiResolver {
}
}

#[cfg_or_panic(not(madsim))]
fn monitored_http_connector(
connection_type: impl Into<String>,
config: TcpConfig,
) -> MonitoredConnection<HttpConnector<MonitoredGaiResolver>, MonitorNewConnectionImpl> {
let resolver = MonitoredGaiResolver::default();
let mut http = HttpConnector::new_with_resolver(resolver);

http.enforce_http(false);
http.set_nodelay(config.tcp_nodelay);
http.set_keepalive(config.keepalive_duration);

monitor_connector(http, connection_type)
}

/// Attach general configurations to the endpoint.
#[cfg_or_panic(not(madsim))]
fn configure_endpoint(endpoint: Endpoint) -> Endpoint {
// This is to mitigate https://github.com/risingwavelabs/risingwave/issues/18039.
// TODO: remove this after https://github.com/hyperium/hyper/issues/3724 gets resolved.
endpoint.http2_max_header_list_size(16 * 1024 * 1024)
}

#[easy_ext::ext(EndpointExt)]
impl Endpoint {
pub async fn monitored_connect(
self,
mut self,
connection_type: impl Into<String>,
config: TcpConfig,
) -> Result<Channel, tonic::transport::Error> {
#[cfg(not(madsim))]
{
let resolver = MonitoredGaiResolver::default();
let mut http = HttpConnector::new_with_resolver(resolver);

http.enforce_http(false);
http.set_nodelay(config.tcp_nodelay);
http.set_keepalive(config.keepalive_duration);

let connector = monitor_connector(http, connection_type);
self = configure_endpoint(self);
let connector = monitored_http_connector(connection_type, config);
self.connect_with_connector(connector).await
}
#[cfg(madsim)]
Expand All @@ -618,16 +636,12 @@ impl Endpoint {

#[cfg(not(madsim))]
pub fn monitored_connect_lazy(
self,
mut self,
connection_type: impl Into<String>,
config: TcpConfig,
) -> Channel {
let mut http = HttpConnector::new();
http.enforce_http(false);
http.set_nodelay(config.tcp_nodelay);
http.set_keepalive(config.keepalive_duration);

let connector = monitor_connector(http, connection_type);
self = configure_endpoint(self);
let connector = monitored_http_connector(connection_type, config);
self.connect_with_connector_lazy(connector)
}
}
Expand Down

0 comments on commit d5dbc6a

Please sign in to comment.