-
Notifications
You must be signed in to change notification settings - Fork 615
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(error): clean-up direct error formatting (part 1) #13763
Changes from 18 commits
739980a
3ceba5a
33794c8
74fbfbd
b3603f9
6c4deb1
dee6e9d
30b8172
6478283
28a6aa1
b22bb2f
57916bf
c4f3d3b
25a7cb9
f5efe3f
5679ace
fcaa579
60bd791
a2d11cb
3e3bcbc
0a3c47b
7721ad8
4a99055
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,7 @@ | |
|
||
use std::ops::Range; | ||
|
||
use anyhow::anyhow; | ||
use anyhow::Context; | ||
use risingwave_common::array::{Op, RowRef, StreamChunk}; | ||
use risingwave_common::estimate_size::EstimateSize; | ||
use risingwave_common::row::{OwnedRow, Row, RowExt}; | ||
|
@@ -25,7 +25,7 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; | |
use risingwave_expr::aggregate::{ | ||
AggStateDyn, AggregateFunction, AggregateState, BoxedAggregateFunction, | ||
}; | ||
use risingwave_expr::{ExprError, Result}; | ||
use risingwave_expr::Result; | ||
|
||
/// `ProjectionOrderBy` is a wrapper of `AggregateFunction` that sorts rows by given columns and | ||
/// then projects columns. | ||
|
@@ -77,7 +77,7 @@ impl ProjectionOrderBy { | |
fn push_row(&self, state: &mut State, row: RowRef<'_>) -> Result<()> { | ||
let key = | ||
memcmp_encoding::encode_row(row.project(&self.order_col_indices), &self.order_types) | ||
.map_err(|e| ExprError::Internal(anyhow!("failed to encode row, error: {}", e)))?; | ||
.context("failed to encode row")?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use |
||
let projected_row = row.project(&self.arg_indices).to_owned_row(); | ||
|
||
state.unordered_values_estimated_heap_size += | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,7 @@ | |
use std::iter::repeat; | ||
use std::sync::Arc; | ||
|
||
use anyhow::anyhow; | ||
use anyhow::Context; | ||
use futures_async_stream::try_stream; | ||
use itertools::Itertools; | ||
use risingwave_common::array::{ | ||
|
@@ -231,8 +231,8 @@ impl BoxedExecutorBuilder for InsertExecutor { | |
.map(|IndexAndExpr { index: i, expr: e }| { | ||
Ok(( | ||
i as usize, | ||
build_from_prost(&e.ok_or_else(|| anyhow!("expression is None"))?) | ||
.map_err(|e| anyhow!("failed to build expression: {}", e))?, | ||
build_from_prost(&e.context("expression is None")?) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Besides If you only have the error, use |
||
.context("failed to build expression")?, | ||
)) | ||
}) | ||
.collect::<Result<Vec<_>>>()?; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ use risingwave_pb::task_service::{ | |
CancelTaskRequest, CancelTaskResponse, CreateTaskRequest, ExecuteRequest, GetDataResponse, | ||
TaskInfoResponse, | ||
}; | ||
use thiserror_ext::AsReport; | ||
use tokio_stream::wrappers::ReceiverStream; | ||
use tonic::{Request, Response, Status}; | ||
|
||
|
@@ -93,7 +94,7 @@ impl TaskService for BatchServiceImpl { | |
state_rx, | ||
))), | ||
Err(e) => { | ||
error!("failed to fire task {}", e); | ||
error!(error = %e.as_report(), "failed to fire task"); | ||
Err(e.into()) | ||
} | ||
} | ||
|
@@ -146,8 +147,9 @@ impl TaskService for BatchServiceImpl { | |
.await | ||
{ | ||
error!( | ||
"failed to build executors and trigger execution of Task {:?}: {}", | ||
task_id, e | ||
error = %e.as_report(), | ||
?task_id, | ||
"failed to build executors and trigger execution" | ||
); | ||
Comment on lines
149
to
153
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To print an error with
|
||
return Err(e.into()); | ||
} | ||
|
@@ -158,12 +160,12 @@ impl TaskService for BatchServiceImpl { | |
// therefore we would only have one data output. | ||
output_id: 0, | ||
}; | ||
let mut output = task.get_task_output(&pb_task_output_id).map_err(|e| { | ||
let mut output = task.get_task_output(&pb_task_output_id).inspect_err(|e| { | ||
error!( | ||
"failed to get task output of Task {:?} in local execution mode", | ||
task_id | ||
error = %e.as_report(), | ||
?task_id, | ||
"failed to get task output in local execution mode", | ||
); | ||
e | ||
})?; | ||
Comment on lines
-161
to
169
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prefer |
||
let mut writer = GrpcExchangeWriter::new(tx.clone()); | ||
// Always spawn a task and do not block current function. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,7 @@ use risingwave_pb::plan_common::CapturedExecutionContext; | |
use risingwave_pb::task_service::task_info_response::TaskStatus; | ||
use risingwave_pb::task_service::{GetDataResponse, TaskInfoResponse}; | ||
use risingwave_pb::PbFieldNotFound; | ||
use thiserror_ext::AsReport; | ||
use tokio::select; | ||
use tokio::task::JoinHandle; | ||
use tokio_metrics::TaskMonitor; | ||
|
@@ -644,7 +645,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> { | |
ShutdownMsg::Init => { | ||
// There is no message received from shutdown channel, which means it caused | ||
// task failed. | ||
error!("Batch task failed: {:?}", e); | ||
error!(error = %e.as_report(), "Batch task failed"); | ||
error = Some(e); | ||
state = TaskStatus::Failed; | ||
break; | ||
|
@@ -671,7 +672,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> { | |
|
||
let error = error.map(Arc::new); | ||
*self.failure.lock() = error.clone(); | ||
let err_str = error.as_ref().map(|e| e.to_string()); | ||
let err_str = error.as_ref().map(|e| e.to_report_string()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we really want to eagerly format the error, use Note that |
||
if let Err(e) = sender.close(error).await { | ||
match e { | ||
SenderError => { | ||
|
@@ -689,8 +690,8 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> { | |
|
||
if let Err(e) = self.change_state_notify(state, state_tx, err_str).await { | ||
warn!( | ||
"The status receiver in FE has closed so the status push is failed {:}", | ||
e | ||
error = %e.as_report(), | ||
"The status receiver in FE has closed so the status push is failed", | ||
); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, can we use this to enable all rust features? 🫣
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, really interesting! But it seems painful to also override
RUSTDOCFLAGS
. 😕There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems not to work for doc tests. I'll manually register it. 🤡