Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(error): provide service name on the client side of gRPC #16254

Merged
merged 3 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 0 additions & 7 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,6 @@ impl From<ValueEncodingError> for BatchError {
}
}

impl From<tonic::Status> for BatchError {
fn from(status: tonic::Status) -> Self {
// Always wrap the status into a `RpcError`.
Self::from(RpcError::from(status))
}
}

impl<'a> From<&'a BatchError> for Status {
fn from(err: &'a BatchError) -> Self {
err.to_status(tonic::Code::Internal, "batch")
Expand Down
3 changes: 2 additions & 1 deletion src/batch/src/execution/grpc_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_expr::expr_context::capture_expr_context;
use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::{self, Plan};
use risingwave_pb::batch_plan::TaskOutputId;
use risingwave_pb::task_service::{ExecuteRequest, GetDataResponse};
use risingwave_rpc_client::error::RpcError;
use risingwave_rpc_client::ComputeClient;
use tonic::Streaming;

Expand Down Expand Up @@ -81,7 +82,7 @@ impl ExchangeSource for GrpcExchangeSource {
}
Some(r) => r,
};
let task_data = res?;
let task_data = res.map_err(RpcError::from_batch_status)?;
let data = DataChunk::from_protobuf(task_data.get_record_batch()?)?.compact();
trace!(
"Receiver taskOutput = {:?}, data = {:?}",
Expand Down
4 changes: 2 additions & 2 deletions src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ pub enum ObserverError {
}

impl From<tonic::Status> for ObserverError {
fn from(value: tonic::Status) -> Self {
Self::Rpc(value.into())
fn from(status: tonic::Status) -> Self {
Self::Rpc(RpcError::from_meta_status(status))
}
}

Expand Down
59 changes: 51 additions & 8 deletions src/error/src/tonic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,24 @@ where
/// A wrapper of [`tonic::Status`] that provides better error message and extracts
/// the source chain from the `details` field.
#[derive(Debug)]
pub struct TonicStatusWrapper(tonic::Status);
pub struct TonicStatusWrapper {
inner: tonic::Status,

/// Optional service name from the client side.
///
/// # Explanation
///
/// [`tonic::Status`] is used for both client and server side. When the error is created on
/// the server side, we encourage developers to provide the service name with
/// [`ToTonicStatus::to_status`], so that the info can be included in the HTTP response and
/// then extracted by the client side (in [`TonicStatusWrapper::new`]).
///
/// However, if there's something wrong with the server side and the error is directly
/// created on the client side, the approach above is not applicable. In this case, the
/// caller should set a "client side" service name to provide better error message. This is
/// achieved by [`TonicStatusWrapperExt::with_client_side_service_name`].
client_side_service_name: Option<ServiceName>,
}

impl TonicStatusWrapper {
/// Create a new [`TonicStatusWrapper`] from the given [`tonic::Status`] and extract
Expand All @@ -115,17 +132,21 @@ impl TonicStatusWrapper {
}
}
}
Self(status)

Self {
inner: status,
client_side_service_name: None,
}
}

/// Returns the reference to the inner [`tonic::Status`].
pub fn inner(&self) -> &tonic::Status {
&self.0
&self.inner
}

/// Consumes `self` and returns the inner [`tonic::Status`].
pub fn into_inner(self) -> tonic::Status {
self.0
self.inner
}
}

Expand All @@ -138,28 +159,50 @@ impl From<tonic::Status> for TonicStatusWrapper {
impl std::fmt::Display for TonicStatusWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "gRPC request")?;

if let Some(service_name) = self
.source()
.and_then(|s| s.downcast_ref::<ServerError>())
.and_then(|s| s.service_name.as_ref())
// if no service name from the server side, use the client side one
.or(self.client_side_service_name.as_ref())
{
write!(f, " to {} service", service_name)?;
}
write!(f, " failed: {}: ", self.0.code())?;
write!(f, " failed: {}: ", self.inner.code())?;

#[expect(rw::format_error)] // intentionally format the source itself
if let Some(source) = self.source() {
// Prefer the source chain from the `details` field.
write!(f, "{}", source)
} else {
write!(f, "{}", self.0.message())
write!(f, "{}", self.inner.message())
}
}
}

#[easy_ext::ext(TonicStatusWrapperExt)]
impl<T> T
where
T: Into<TonicStatusWrapper>,
{
/// Set the client side service name to provide better error message.
///
/// See the documentation on the field `client_side_service_name` for more details.
pub fn with_client_side_service_name(
self,
service_name: impl Into<ServiceName>,
) -> TonicStatusWrapper {
let mut this = self.into();
this.client_side_service_name = Some(service_name.into());
this
}
}

impl std::error::Error for TonicStatusWrapper {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
// Delegate to `self.0` as if we're transparent.
self.0.source()
// Delegate to `self.inner` as if we're transparent.
self.inner.source()
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use risingwave_pb::batch_plan::{
use risingwave_pb::common::{BatchQueryEpoch, HostAddress, WorkerNode};
use risingwave_pb::plan_common::ExprContext;
use risingwave_pb::task_service::{CancelTaskRequest, TaskInfoResponse};
use risingwave_rpc_client::error::RpcError;
use risingwave_rpc_client::ComputeClientPoolRef;
use rw_futures_util::select_all;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -525,7 +526,7 @@ impl StageRunner {
|_| StageState::Failed,
QueryMessage::Stage(Failed {
id: self.stage.id,
reason: SchedulerError::from(e),
reason: RpcError::from_batch_status(e).into(),
}),
)
.await;
Expand Down
11 changes: 0 additions & 11 deletions src/frontend/src/scheduler/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use risingwave_common::session_config::QueryMode;
use risingwave_connector::error::ConnectorError;
use risingwave_rpc_client::error::RpcError;
use thiserror::Error;
use tonic::{Code, Status};

use crate::error::{ErrorCode, RwError};
use crate::scheduler::plan_fragmenter::QueryId;
Expand Down Expand Up @@ -69,16 +68,6 @@ pub enum SchedulerError {
),
}

/// Only if the code is Internal, change it to Execution Error. Otherwise convert to Rpc Error.
impl From<tonic::Status> for SchedulerError {
fn from(s: Status) -> Self {
match s.code() {
Code::Internal => Self::TaskExecutionError(s.message().to_string()),
_ => Self::RpcError(s.into()),
}
}
}

impl From<SchedulerError> for RwError {
fn from(s: SchedulerError) -> Self {
ErrorCode::SchedulerError(Box::new(s)).into()
Expand Down
1 change: 1 addition & 0 deletions src/rpc_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ hyper = "0.14" # required by tonic
itertools = { workspace = true }
lru = { workspace = true }
moka = { version = "0.12", features = ["future"] }
paste = "1"
rand = { workspace = true }
risingwave_common = { workspace = true }
risingwave_error = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions src/rpc_client/src/compactor_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tokio::sync::RwLock;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tonic::transport::{Channel, Endpoint};

use crate::error::Result;
use crate::error::{Result, RpcError};
use crate::retry_rpc;
const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
Expand Down Expand Up @@ -59,7 +59,8 @@ impl CompactorClient {
.monitor_client
.to_owned()
.stack_trace(StackTraceRequest::default())
.await?
.await
.map_err(RpcError::from_compactor_status)?
.into_inner())
}
}
Expand Down
43 changes: 30 additions & 13 deletions src/rpc_client/src/compute_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::transport::{Channel, Endpoint};
use tonic::Streaming;

use crate::error::Result;
use crate::error::{Result, RpcError};
use crate::{RpcClient, RpcClientPool};

#[derive(Clone)]
Expand Down Expand Up @@ -98,7 +98,8 @@ impl ComputeClient {
.get_data(GetDataRequest {
task_output_id: Some(output_id),
})
.await?
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}

Expand Down Expand Up @@ -149,7 +150,8 @@ impl ComputeClient {
up_actor_id,
down_actor_id
)
})?
})
.map_err(RpcError::from_compute_status)?
.into_inner();

Ok((response_stream, permits_tx))
Expand All @@ -172,20 +174,28 @@ impl ComputeClient {
tracing_context: TracingContext::from_current_span().to_protobuf(),
expr_context: Some(expr_context),
})
.await?
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}

pub async fn execute(&self, req: ExecuteRequest) -> Result<Streaming<GetDataResponse>> {
Ok(self.task_client.to_owned().execute(req).await?.into_inner())
Ok(self
.task_client
.to_owned()
.execute(req)
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}

pub async fn cancel(&self, req: CancelTaskRequest) -> Result<CancelTaskResponse> {
Ok(self
.task_client
.to_owned()
.cancel_task(req)
.await?
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}

Expand All @@ -194,7 +204,8 @@ impl ComputeClient {
.monitor_client
.to_owned()
.stack_trace(StackTraceRequest::default())
.await?
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}

Expand All @@ -203,7 +214,8 @@ impl ComputeClient {
.monitor_client
.to_owned()
.get_back_pressure(GetBackPressureRequest::default())
.await?
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}

Expand All @@ -212,7 +224,8 @@ impl ComputeClient {
.monitor_client
.to_owned()
.profiling(ProfilingRequest { sleep_s })
.await?
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}

Expand All @@ -221,7 +234,8 @@ impl ComputeClient {
.monitor_client
.to_owned()
.heap_profiling(HeapProfilingRequest { dir })
.await?
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}

Expand All @@ -230,7 +244,8 @@ impl ComputeClient {
.monitor_client
.to_owned()
.list_heap_profiling(ListHeapProfilingRequest {})
.await?
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}

Expand All @@ -239,7 +254,8 @@ impl ComputeClient {
.monitor_client
.to_owned()
.analyze_heap(AnalyzeHeapRequest { path })
.await?
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}

Expand All @@ -248,7 +264,8 @@ impl ComputeClient {
.config_client
.to_owned()
.show_config(ShowConfigRequest {})
.await?
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}
}
Expand Down
Loading
Loading