Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: refine error prompting #4227

Merged
merged 4 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 3 additions & 14 deletions src/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ use std::io::Error as IoError;
use std::sync::Arc;

use memcomparable::Error as MemComparableError;
use prost::Message;
use risingwave_pb::common::Status;
use risingwave_pb::ProstFieldNotFound;
use thiserror::Error;
use tokio::task::JoinError;
use tonic::metadata::{MetadataMap, MetadataValue};
use tonic::Code;

use crate::array::ArrayError;
Expand Down Expand Up @@ -191,17 +189,8 @@ impl From<RwError> for tonic::Status {
ErrorCode::OK => tonic::Status::ok(err.to_string()),
ErrorCode::ExprError(e) => tonic::Status::invalid_argument(e.to_string()),
ErrorCode::PermissionDenied(e) => tonic::Status::permission_denied(e),
_ => {
let bytes = {
let status = err.to_status();
let mut bytes = Vec::<u8>::with_capacity(status.encoded_len());
status.encode(&mut bytes).expect("Failed to encode status.");
bytes
};
let mut header = MetadataMap::new();
header.insert_bin(RW_ERROR_GRPC_HEADER, MetadataValue::from_bytes(&bytes));
tonic::Status::with_metadata(Code::Internal, err.to_string(), header)
}
Comment on lines -194 to -204
Copy link
Member

@xxchan xxchan Jul 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep it, and decode in impl From<tonic::Status> for RwError, just like the legacy java fe https://github.com/singularity-data/risingwave-legacy/pull/1948/files#diff-5aee0d702428cdd3f280945a3ba0c18533c152d30427dd9b3d4d369b4e50f710R54-R65

impl From<tonic::Status> for RwError {
    fn from(err: tonic::Status) -> Self {
        if let Some(err) = err.metadata().get_bin(RW_ERROR_GRPC_HEADER) &&
            let Ok(err) = err.to_bytes() && 
            let Ok(status) = Status::decode(err) {
			// still need to be improved here
            return ErrorCode::InternalError(status.message).into();
        }
		
		...

To be more specific, we should use Status proto stored in grpc status metadata, instead of grpc error codes. In this way, we can have more organized custom error codes and error messages in the future. cc @liurenjie1024
https://github.com/singularity-data/risingwave/blob/9d8b7d2bd7b9041f3272aed081d7cc99a36a968c/proto/common.proto#L7-L13

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(BTW, I just removed #![expect(dead_code)], so after merging main clippy will fail 🥸

Copy link
Contributor Author

@neverchanje neverchanje Jul 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

impl RwError {
    fn to_status(&self) -> Status {
        Status {
            code: self.inner.get_code() as i32,
            message: self.to_string(), // Simply discard the details and encode itself to string.
        }
    }

Actually, we didn't encode the enum ErrorCode into a protobuf struct that can be decoded as it completely was, instead, the ErrorCode as well as its details, was only encoded as a plain string.

Hence, a ConnectorError, after being transported via grpc, if decoded strictly according to the code, will become:

connector error: connector error: xxxx

According to what we've currently implemented, it will be like internal error: connector error: xxx. So such a decoding phase won't take effect anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the current implementation doesn't make full sense, but the idea might be good in the future. Anyway I think it needs to be carefully re-designed, so I think it's also ok to remove it and KISS for now.

ErrorCode::InternalError(e) => tonic::Status::internal(e),
_ => tonic::Status::internal(err.to_string()),
}
}
}
Expand Down Expand Up @@ -386,7 +375,7 @@ impl From<tonic::Status> for RwError {
ErrorCode::InvalidParameterValue(err.message().to_string()).into()
}
Code::PermissionDenied => ErrorCode::PermissionDenied(err.message().to_string()).into(),
_ => ErrorCode::RpcError(err.into()).into(),
_ => ErrorCode::InternalError(err.message().to_string()).into(),
}
}
}
Expand Down
16 changes: 14 additions & 2 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,25 @@ macro_rules! impl_split_enumerator {
pub async fn create(properties: ConnectorProperties) -> Result<Self> {
match properties {
$( ConnectorProperties::$variant_name(props) => $split_enumerator_name::new(props).await.map(Self::$variant_name), )*
other => Err(anyhow!("split enumerator type for config {:?} is not supported", other)),
other => Err(anyhow!(
"split enumerator type for config {:?} is not supported",
other
)),
}
}

pub async fn list_splits(&mut self) -> Result<Vec<SplitImpl>> {
match self {
$( Self::$variant_name(inner) => inner.list_splits().await.map(|ss| ss.into_iter().map(SplitImpl::$variant_name).collect_vec()), )*
$( Self::$variant_name(inner) => inner
.list_splits()
.await
.map(|ss| {
ss.into_iter()
.map(SplitImpl::$variant_name)
.collect_vec()
})
.map_err(|e| ErrorCode::ConnectorError(e.to_string()).into()),
)*
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use bytes::Bytes;
use enum_as_inner::EnumAsInner;
use itertools::Itertools;
use prost::Message;
use risingwave_common::error::ErrorCode;
use risingwave_pb::source::ConnectorSplit;
use serde::{Deserialize, Serialize};

Expand Down
8 changes: 7 additions & 1 deletion src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ impl SplitEnumerator for KafkaSplitEnumerator {
}

async fn list_splits(&mut self) -> anyhow::Result<Vec<KafkaSplit>> {
let topic_partitions = self.fetch_topic_partition()?;
let topic_partitions = self.fetch_topic_partition().map_err(|e| {
anyhow!(
"failed to fetch metadata from kafka ({}): {}",
self.broker_address,
e
)
})?;

let mut start_offsets = self
.fetch_start_offset(topic_partitions.as_ref())
Expand Down
11 changes: 7 additions & 4 deletions src/source/src/parser/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,21 @@ use serde_json::Value;
macro_rules! ensure_float {
($v:ident, $t:ty) => {
$v.as_f64()
.ok_or_else(|| anyhow!(concat!("expect ", stringify!($t))))?
.ok_or_else(|| anyhow!(concat!("expect ", stringify!($t), ", but found {}"), $v))?
};
}

macro_rules! ensure_int {
($v:ident, $t:ty) => {
$v.as_i64()
.ok_or_else(|| anyhow!(concat!("expect ", stringify!($t))))?
.ok_or_else(|| anyhow!(concat!("expect ", stringify!($t), ", but found {}"), $v))?
};
}

macro_rules! ensure_str {
($v:ident, $t:literal) => {
$v.as_str().ok_or_else(|| anyhow!(concat!("expect ", $t)))?
$v.as_str()
.ok_or_else(|| anyhow!(concat!("expect ", $t, ", but found {}"), $v))?
};
}

Expand Down Expand Up @@ -81,6 +82,8 @@ fn do_parse_json_value(column: &ColumnDesc, v: &Value) -> Result<ScalarImpl> {
pub(crate) fn json_parse_value(column: &ColumnDesc, value: Option<&Value>) -> Result<Datum> {
match value {
None | Some(Value::Null) => Ok(None),
Some(v) => Ok(Some(do_parse_json_value(column, v)?)),
Some(v) => Ok(Some(do_parse_json_value(column, v).map_err(|e| {
anyhow!("failed to parse column '{}' from json: {}", column.name, e)
})?)),
}
}