Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(error): clean-up direct error formatting (part 1) #13763

Merged
merged 23 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ rustflags = [
rustflags = [
"--cfg",
"tokio_unstable",
# uncomment the following two lines to enable `TaskLocalAlloc`

# Register the `rw` tool for referencing the custom lints in attributes like `#[allow(rw::xx)]`.
# This is essentially the same as `#![..]` in each crate root.
Copy link
Member

@xxchan xxchan Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, can we use this to enable all rust features? 🫣

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, really interesting! But it seems painful to also override RUSTDOCFLAGS. 😕

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems not to work for doc tests. I'll manually register it. 🤡

"-Zcrate-attr=feature(register_tool)",
"-Zcrate-attr=register_tool(rw)",

# Uncomment the following two lines to enable `TaskLocalAlloc`.
# "--cfg",
# "enable_task_local_alloc",
]
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/doc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:
env:
SCCACHE_GHA_ENABLED: "true"
RUSTC_WRAPPER: "sccache"
RUSTDOCFLAGS: "--cfg docsrs --markdown-css rust.css --markdown-no-toc --index-page /home/runner/work/risingwave/risingwave/docs/rustdoc/index.md -Zunstable-options"
RUSTDOCFLAGS: "--cfg docsrs --markdown-css rust.css --markdown-no-toc --index-page /home/runner/work/risingwave/risingwave/docs/rustdoc/index.md -Zunstable-options -Zcrate-attr=feature(register_tool) -Zcrate-attr=register_tool(rw)"

jobs:
build:
Expand Down
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ci/scripts/check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ sccache --show-stats
sccache --zero-stats

echo "--- Build documentation"
RUSTDOCFLAGS="-Dwarnings" cargo doc --document-private-items --no-deps
RUSTDOCFLAGS="-Dwarnings -Zcrate-attr=feature(register_tool) -Zcrate-attr=register_tool(rw)" cargo doc --document-private-items --no-deps

echo "--- Show sccache stats"
sccache --show-stats
Expand Down
7 changes: 6 additions & 1 deletion lints/src/format_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use clippy_utils::macros::{
find_format_arg_expr, find_format_args, is_format_macro, macro_backtrace,
};
use clippy_utils::ty::implements_trait;
use clippy_utils::{is_trait_method, match_function_call};
use clippy_utils::{is_in_cfg_test, is_in_test_function, is_trait_method, match_function_call};
use rustc_ast::FormatArgsPiece;
use rustc_hir::{Expr, ExprKind};
use rustc_lint::{LateContext, LateLintPass};
Expand Down Expand Up @@ -63,6 +63,11 @@ const TRACING_FIELD_DISPLAY: [&str; 3] = ["tracing_core", "field", "display"];

impl<'tcx> LateLintPass<'tcx> for FormatError {
fn check_expr(&mut self, cx: &LateContext<'tcx>, expr: &'tcx Expr<'_>) {
// Ignore if in test code.
if is_in_cfg_test(cx.tcx, expr.hir_id) || is_in_test_function(cx.tcx, expr.hir_id) {
return;
}

// `%err`, `?err` in tracing events and spans.
if let Some(args) = match_function_call(cx, expr, &TRACING_FIELD_DEBUG)
.or_else(|| match_function_call(cx, expr, &TRACING_FIELD_DISPLAY))
Expand Down
6 changes: 3 additions & 3 deletions src/batch/src/executor/aggregation/orderby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::ops::Range;

use anyhow::anyhow;
use anyhow::Context;
use risingwave_common::array::{Op, RowRef, StreamChunk};
use risingwave_common::estimate_size::EstimateSize;
use risingwave_common::row::{OwnedRow, Row, RowExt};
Expand All @@ -25,7 +25,7 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_expr::aggregate::{
AggStateDyn, AggregateFunction, AggregateState, BoxedAggregateFunction,
};
use risingwave_expr::{ExprError, Result};
use risingwave_expr::Result;

/// `ProjectionOrderBy` is a wrapper of `AggregateFunction` that sorts rows by given columns and
/// then projects columns.
Expand Down Expand Up @@ -77,7 +77,7 @@ impl ProjectionOrderBy {
fn push_row(&self, state: &mut State, row: RowRef<'_>) -> Result<()> {
let key =
memcmp_encoding::encode_row(row.project(&self.order_col_indices), &self.order_types)
.map_err(|e| ExprError::Internal(anyhow!("failed to encode row, error: {}", e)))?;
.context("failed to encode row")?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Anyhow::Context::context to create a new internal error with the given message, while keeping the error source.

let projected_row = row.project(&self.arg_indices).to_owned_row();

state.unordered_values_estimated_heap_size +=
Expand Down
6 changes: 3 additions & 3 deletions src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::iter::repeat;
use std::sync::Arc;

use anyhow::anyhow;
use anyhow::Context;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{
Expand Down Expand Up @@ -231,8 +231,8 @@ impl BoxedExecutorBuilder for InsertExecutor {
.map(|IndexAndExpr { index: i, expr: e }| {
Ok((
i as usize,
build_from_prost(&e.ok_or_else(|| anyhow!("expression is None"))?)
.map_err(|e| anyhow!("failed to build expression: {}", e))?,
build_from_prost(&e.context("expression is None")?)
Copy link
Member Author

@BugenZhao BugenZhao Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides Result, context is also applicable to Options.

If you only have the error, use anyhow::anyhow!(source).context("context message").

.context("failed to build expression")?,
))
})
.collect::<Result<Vec<_>>>()?;
Expand Down
17 changes: 10 additions & 7 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ mod update;
mod utils;
mod values;

use anyhow::anyhow;
use anyhow::Context;
use async_recursion::async_recursion;
pub use delete::*;
pub use expand::*;
Expand Down Expand Up @@ -69,6 +69,7 @@ pub use sort_agg::*;
pub use sort_over_window::SortOverWindowExecutor;
pub use source::*;
pub use table_function::*;
use thiserror_ext::AsReport;
pub use top_n::TopNExecutor;
pub use union::*;
pub use update::*;
Expand Down Expand Up @@ -183,12 +184,14 @@ impl<'a, C: Clone> ExecutorBuilder<'a, C> {

impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> {
pub async fn build(&self) -> Result<BoxedExecutor> {
self.try_build().await.map_err(|e| {
let err_msg = format!("Failed to build executor: {e}");
let plan_node_body = self.plan_node.get_node_body();
error!("{err_msg}, plan node is: \n {plan_node_body:?}");
anyhow!(err_msg).into()
})
self.try_build()
.await
.inspect_err(|e| {
let plan_node = self.plan_node.get_node_body();
error!(error = %e.as_report(), ?plan_node, "failed to build executor");
})
.context("failed to build executor")
.map_err(Into::into)
}

#[async_recursion]
Expand Down
16 changes: 9 additions & 7 deletions src/batch/src/rpc/service/task_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use risingwave_pb::task_service::{
CancelTaskRequest, CancelTaskResponse, CreateTaskRequest, ExecuteRequest, GetDataResponse,
TaskInfoResponse,
};
use thiserror_ext::AsReport;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};

Expand Down Expand Up @@ -93,7 +94,7 @@ impl TaskService for BatchServiceImpl {
state_rx,
))),
Err(e) => {
error!("failed to fire task {}", e);
error!(error = %e.as_report(), "failed to fire task");
Err(e.into())
}
}
Expand Down Expand Up @@ -146,8 +147,9 @@ impl TaskService for BatchServiceImpl {
.await
{
error!(
"failed to build executors and trigger execution of Task {:?}: {}",
task_id, e
error = %e.as_report(),
?task_id,
"failed to build executors and trigger execution"
);
Comment on lines 149 to 153
Copy link
Member Author

@BugenZhao BugenZhao Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To print an error with tracing, specify a field with error = %e.as_report(). Explanations:

  1. Use the Report wrapper for error to visit the source chain when visiting.
  2. Telling tracing to use Display format with %.
  3. Normalize the error field name to error with error = <expr>

return Err(e.into());
}
Expand All @@ -158,12 +160,12 @@ impl TaskService for BatchServiceImpl {
// therefore we would only have one data output.
output_id: 0,
};
let mut output = task.get_task_output(&pb_task_output_id).map_err(|e| {
let mut output = task.get_task_output(&pb_task_output_id).inspect_err(|e| {
error!(
"failed to get task output of Task {:?} in local execution mode",
task_id
error = %e.as_report(),
?task_id,
"failed to get task output in local execution mode",
);
e
})?;
Comment on lines -161 to 169
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer inspect_err if not going to change the error variant. For example, logging it out.

let mut writer = GrpcExchangeWriter::new(tx.clone());
// Always spawn a task and do not block current function.
Expand Down
9 changes: 5 additions & 4 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use risingwave_pb::plan_common::CapturedExecutionContext;
use risingwave_pb::task_service::task_info_response::TaskStatus;
use risingwave_pb::task_service::{GetDataResponse, TaskInfoResponse};
use risingwave_pb::PbFieldNotFound;
use thiserror_ext::AsReport;
use tokio::select;
use tokio::task::JoinHandle;
use tokio_metrics::TaskMonitor;
Expand Down Expand Up @@ -644,7 +645,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
ShutdownMsg::Init => {
// There is no message received from shutdown channel, which means it caused
// task failed.
error!("Batch task failed: {:?}", e);
error!(error = %e.as_report(), "Batch task failed");
error = Some(e);
state = TaskStatus::Failed;
break;
Expand All @@ -671,7 +672,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {

let error = error.map(Arc::new);
*self.failure.lock() = error.clone();
let err_str = error.as_ref().map(|e| e.to_string());
let err_str = error.as_ref().map(|e| e.to_report_string());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we really want to eagerly format the error, use to_report_string to replace to_string. This is basically an alias of e.as_report().to_string().

Note that to_string implies Display trait. The behavior of {} {:#} {:?} {:#?} differ, and there're also corresponding to_report_string variants. Check the documentation of Report.

if let Err(e) = sender.close(error).await {
match e {
SenderError => {
Expand All @@ -689,8 +690,8 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {

if let Err(e) = self.change_state_notify(state, state_tx, err_str).await {
warn!(
"The status receiver in FE has closed so the status push is failed {:}",
e
error = %e.as_report(),
"The status receiver in FE has closed so the status push is failed",
);
}

Expand Down
5 changes: 3 additions & 2 deletions src/common/common_service/src/metrics_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ use std::sync::OnceLock;
use hyper::{Body, Request, Response};
use prometheus::{Encoder, Registry, TextEncoder};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use thiserror_ext::AsReport;
use tower::make::Shared;
use tower::ServiceBuilder;
use tower_http::add_extension::AddExtensionLayer;
use tracing::{info, warn};
use tracing::{error, info, warn};

pub struct MetricsManager {}

Expand All @@ -46,7 +47,7 @@ impl MetricsManager {
let serve_future =
hyper::Server::bind(&listen_socket_addr).serve(Shared::new(service));
if let Err(err) = serve_future.await {
eprintln!("server error: {}", err);
error!(error = %err.as_report(), "metrics service exited with error");
}
});
listen_addr_clone
Expand Down
9 changes: 5 additions & 4 deletions src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_pb::meta::subscribe_response::Info;
use risingwave_pb::meta::{SubscribeResponse, SubscribeType};
use risingwave_rpc_client::error::RpcError;
use risingwave_rpc_client::MetaClient;
use thiserror_ext::AsReport;
use tokio::task::JoinHandle;
use tonic::{Status, Streaming};

Expand Down Expand Up @@ -175,7 +176,7 @@ where
/// call the `handle_initialization_notification` and `handle_notification` to update node data.
pub async fn start(mut self) -> JoinHandle<()> {
if let Err(err) = self.wait_init_notification().await {
tracing::warn!("Receives meta's notification err {:?}", err);
tracing::warn!(error = %err.as_report(), "Receives meta's notification err");
self.re_subscribe().await;
}

Expand All @@ -190,8 +191,8 @@ where
}
self.observer_states.handle_notification(resp.unwrap());
}
Err(e) => {
tracing::error!("Receives meta's notification err {:?}", e);
Err(err) => {
tracing::warn!(error = %err.as_report(), "Receives meta's notification err");
self.re_subscribe().await;
}
}
Expand All @@ -211,7 +212,7 @@ where
tracing::debug!("re-subscribe success");
self.rx = rx;
if let Err(err) = self.wait_init_notification().await {
tracing::warn!("Receives meta's notification err {:?}", err);
tracing::warn!(error = %err.as_report(), "Receives meta's notification err");
continue;
} else {
break;
Expand Down
19 changes: 9 additions & 10 deletions src/common/src/array/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,14 @@ pub fn to_record_batch_with_schema(
if column.data_type() == field.data_type() {
Ok(column)
} else {
cast(&column, field.data_type())
.map_err(|err| ArrayError::FromArrow(err.to_string()))
cast(&column, field.data_type()).map_err(ArrayError::from_arrow)
}
})
.try_collect::<_, _, ArrayError>()?;

let opts = arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity()));
arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts)
.map_err(|err| ArrayError::ToArrow(err.to_string()))
.map_err(ArrayError::to_arrow)
}

// Implement bi-directional `From` between `DataChunk` and `arrow_array::RecordBatch`.
Expand Down Expand Up @@ -84,7 +83,7 @@ impl TryFrom<&DataChunk> for arrow_array::RecordBatch {
let opts =
arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity()));
arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts)
.map_err(|err| ArrayError::ToArrow(err.to_string()))
.map_err(ArrayError::to_arrow)
}
}

Expand Down Expand Up @@ -129,7 +128,7 @@ macro_rules! converts_generic {
.unwrap()
.try_into()?,
)),)*
t => Err(ArrayError::FromArrow(format!("unsupported data type: {t:?}"))),
t => Err(ArrayError::from_arrow(format!("unsupported data type: {t:?}"))),
}
}
}
Expand Down Expand Up @@ -252,8 +251,8 @@ impl TryFrom<&DataType> for arrow_schema::DataType {
datatype.as_ref().try_into()?,
true,
)))),
DataType::Serial => Err(ArrayError::ToArrow(
"Serial type is not supported to convert to arrow".to_string(),
DataType::Serial => Err(ArrayError::to_arrow(
"Serial type is not supported to convert to arrow",
)),
}
}
Expand Down Expand Up @@ -485,9 +484,9 @@ impl TryFrom<&arrow_array::LargeBinaryArray> for DecimalArray {
.map(|o| {
o.map(|s| {
let s = std::str::from_utf8(s)
.map_err(|_| ArrayError::FromArrow(format!("invalid decimal: {s:?}")))?;
.map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))?;
s.parse()
.map_err(|_| ArrayError::FromArrow(format!("invalid decimal: {s:?}")))
.map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
})
.transpose()
})
Expand Down Expand Up @@ -521,7 +520,7 @@ impl TryFrom<&arrow_array::LargeStringArray> for JsonbArray {
.map(|o| {
o.map(|s| {
s.parse()
.map_err(|_| ArrayError::FromArrow(format!("invalid json: {s}")))
.map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
})
.transpose()
})
Expand Down
Loading