Skip to content

Commit

Permalink
refactor(error): eliminate most RwError usages in common crate (#…
Browse files Browse the repository at this point in the history
…13588)

Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao authored Nov 24, 2023
1 parent f707c5f commit 90ce699
Show file tree
Hide file tree
Showing 45 changed files with 408 additions and 386 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ pub enum BatchError {
BoxedError,
),

#[error("Failed to read from system table: {0}")]
SystemTable(
#[from]
#[backtrace]
BoxedError,
),

// Make the ref-counted type to be a variant for easier code structuring.
#[error(transparent)]
Shared(
Expand Down
6 changes: 5 additions & 1 deletion src/batch/src/executor/sys_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ impl Executor for SysRowSeqScanExecutor {
impl SysRowSeqScanExecutor {
#[try_stream(boxed, ok = DataChunk, error = BatchError)]
async fn do_executor(self: Box<Self>) {
let rows = self.sys_catalog_reader.read_table(&self.table_id).await?;
let rows = self
.sys_catalog_reader
.read_table(&self.table_id)
.await
.map_err(BatchError::SystemTable)?;
let filtered_rows = rows
.iter()
.map(|row| {
Expand Down
1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ strum = "0.25"
strum_macros = "0.25"
sysinfo = { version = "0.29", default-features = false }
thiserror = "1"
thiserror-ext = { workspace = true }
tinyvec = { version = "1", features = ["rustc_1_55", "grab_spare_slice"] }
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
Expand Down
2 changes: 2 additions & 0 deletions src/common/common_service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ prometheus = { version = "0.13" }
risingwave_common = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
thiserror = "1"
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal"] }
tonic = { workspace = true }
tower = { version = "0.4", features = ["util", "load-shed"] }
Expand Down
1 change: 1 addition & 0 deletions src/common/common_service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#![feature(lint_reasons)]
#![feature(impl_trait_in_assoc_type)]
#![feature(error_generic_member_access)]

pub mod metrics_manager;
pub mod observer_manager;
Expand Down
47 changes: 33 additions & 14 deletions src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

use std::time::Duration;

use risingwave_common::bail;
use risingwave_common::error::Result;
use risingwave_pb::meta::subscribe_response::Info;
use risingwave_pb::meta::{SubscribeResponse, SubscribeType};
use risingwave_rpc_client::error::RpcError;
Expand Down Expand Up @@ -80,6 +78,26 @@ impl<S: ObserverState> ObserverManager<RpcNotificationClient, S> {
}
}

/// Error type for [`ObserverManager`].
#[derive(thiserror::Error, Debug)]
pub enum ObserverError {
#[error("notification channel closed")]
ChannelClosed,

#[error(transparent)]
Rpc(
#[from]
#[backtrace]
RpcError,
),
}

impl From<tonic::Status> for ObserverError {
fn from(value: tonic::Status) -> Self {
Self::Rpc(value.into())
}
}

impl<T, S> ObserverManager<T, S>
where
T: NotificationClient,
Expand All @@ -97,24 +115,19 @@ where
}
}

async fn wait_init_notification(&mut self) -> Result<()> {
async fn wait_init_notification(&mut self) -> Result<(), ObserverError> {
let mut notification_vec = Vec::new();
let init_notification = loop {
// notification before init notification must be received successfully.
match self.rx.message().await {
Ok(Some(notification)) => {
match self.rx.message().await? {
Some(notification) => {
if !matches!(notification.info.as_ref().unwrap(), &Info::Snapshot(_)) {
notification_vec.push(notification);
} else {
break notification;
}
}
Ok(None) => {
bail!("notification channel from meta is closed");
}
Err(err) => {
bail!("receives meta's notification err: {:?}", err);
}
None => return Err(ObserverError::ChannelClosed),
}
};

Expand Down Expand Up @@ -231,7 +244,10 @@ impl<T: Send + 'static> Channel for Streaming<T> {
#[async_trait::async_trait]
pub trait NotificationClient: Send + Sync + 'static {
type Channel: Channel<Item = SubscribeResponse>;
async fn subscribe(&self, subscribe_type: SubscribeType) -> Result<Self::Channel>;
async fn subscribe(
&self,
subscribe_type: SubscribeType,
) -> Result<Self::Channel, ObserverError>;
}

pub struct RpcNotificationClient {
Expand All @@ -248,10 +264,13 @@ impl RpcNotificationClient {
impl NotificationClient for RpcNotificationClient {
type Channel = Streaming<SubscribeResponse>;

async fn subscribe(&self, subscribe_type: SubscribeType) -> Result<Self::Channel> {
async fn subscribe(
&self,
subscribe_type: SubscribeType,
) -> Result<Self::Channel, ObserverError> {
self.meta_client
.subscribe(subscribe_type)
.await
.map_err(RpcError::into)
.map_err(Into::into)
}
}
2 changes: 2 additions & 0 deletions src/common/heap_profiling/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ chrono = { version = "0.4", default-features = false, features = [
] }
parking_lot = "0.12"
risingwave_common = { workspace = true }
thiserror = "1"
thiserror-ext = { workspace = true }
tikv-jemalloc-ctl = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio" }
tracing = "0.1"
Expand Down
46 changes: 27 additions & 19 deletions src/common/heap_profiling/src/jeprof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,25 @@

use std::path::Path;
use std::process::Command;
use std::result::Result;
use std::{env, fs};

use anyhow::anyhow;
use risingwave_common::error::Result;
/// Error type for running `jeprof`.
#[derive(thiserror::Error, Debug, thiserror_ext::ContextInto)]
pub enum JeprofError {
#[error(transparent)]
IoError(#[from] std::io::Error),

pub async fn run(profile_path: String, collapsed_path: String) -> Result<()> {
#[error("jeprof exit with an error (stdout: {stdout}, stderr: {stderr}): {inner}")]
ExitError {
#[source]
inner: std::process::ExitStatusError,
stdout: String,
stderr: String,
},
}

pub async fn run(profile_path: String, collapsed_path: String) -> Result<(), JeprofError> {
let executable_path = env::current_exe()?;

let prof_cmd = move || {
Expand All @@ -29,20 +42,15 @@ pub async fn run(profile_path: String, collapsed_path: String) -> Result<()> {
.arg(Path::new(&profile_path))
.output()
};
match tokio::task::spawn_blocking(prof_cmd).await.unwrap() {
Ok(output) => {
if output.status.success() {
fs::write(Path::new(&collapsed_path), &output.stdout)?;
Ok(())
} else {
Err(anyhow!(
"jeprof exit with an error. stdout: {}, stderr: {}",
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr)
)
.into())
}
}
Err(e) => Err(e.into()),
}

let output = tokio::task::spawn_blocking(prof_cmd).await.unwrap()?;

output.status.exit_ok().into_exit_error(
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr),
)?;

fs::write(Path::new(&collapsed_path), &output.stdout)?;

Ok(())
}
2 changes: 2 additions & 0 deletions src/common/heap_profiling/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(exit_status_error)]

pub const MANUALLY_DUMP_SUFFIX: &str = "manual.heap";
pub const AUTO_DUMP_SUFFIX: &str = "auto.heap";
pub const COLLAPSED_SUFFIX: &str = "collapsed";
Expand Down
30 changes: 16 additions & 14 deletions src/common/proc_macro/src/session_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,11 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {

let check_hook = if let Some(check_hook_name) = check_hook_name {
quote! {
#check_hook_name(&val).map_err(|_e| {
ErrorCode::InvalidConfigValue {
config_entry: #entry_name.to_string(),
config_value: val.to_string(),
#check_hook_name(&val).map_err(|e| {
SessionConfigError::InvalidValue {
entry: #entry_name,
value: val.to_string(),
source: anyhow::anyhow!(e),
}
})?;
}
Expand All @@ -131,11 +132,12 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
&mut self,
val: &str,
reporter: &mut impl ConfigReporter
) -> RwResult<()> {
let val_t: #ty = val.parse().map_err(|_e| {
ErrorCode::InvalidConfigValue {
config_entry: #entry_name.to_string(),
config_value: val.to_string(),
) -> SessionConfigResult<()> {
let val_t = <#ty as ::std::str::FromStr>::from_str(val).map_err(|e| {
SessionConfigError::InvalidValue {
entry: #entry_name,
value: val.to_string(),
source: anyhow::anyhow!(e),
}
})?;

Expand All @@ -148,7 +150,7 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
&mut self,
val: #ty,
reporter: &mut impl ConfigReporter
) -> RwResult<()> {
) -> SessionConfigResult<()> {
#check_hook
#report_hook

Expand Down Expand Up @@ -236,18 +238,18 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
#(#struct_impl_reset)*

/// Set a parameter given it's name and value string.
pub fn set(&mut self, key_name: &str, value: String, reporter: &mut impl ConfigReporter) -> RwResult<()> {
pub fn set(&mut self, key_name: &str, value: String, reporter: &mut impl ConfigReporter) -> SessionConfigResult<()> {
match key_name.to_ascii_lowercase().as_ref() {
#(#set_match_branches)*
_ => Err(ErrorCode::UnrecognizedConfigurationParameter(key_name.to_string()).into()),
_ => Err(SessionConfigError::UnrecognizedEntry(key_name.to_string())),
}
}

/// Get a parameter by it's name.
pub fn get(&self, key_name: &str) -> RwResult<String> {
pub fn get(&self, key_name: &str) -> SessionConfigResult<String> {
match key_name.to_ascii_lowercase().as_ref() {
#(#get_match_branches)*
_ => Err(ErrorCode::UnrecognizedConfigurationParameter(key_name.to_string()).into()),
_ => Err(SessionConfigError::UnrecognizedEntry(key_name.to_string())),
}
}

Expand Down
Loading

0 comments on commit 90ce699

Please sign in to comment.