Skip to content

Commit

Permalink
use catalog error
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao committed Nov 1, 2022
1 parent 9a4495f commit ea98e26
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 58 deletions.
10 changes: 4 additions & 6 deletions src/frontend/src/binder/relation/table_or_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,11 @@ impl Binder {
{
resolve_source_relation(source_catalog)
} else {
return Err(RwError::from(CatalogError::NotFound(
return Err(CatalogError::NotFound(
"table or source",
table_name.to_string(),
)));
)
.into());
}
}
None => (|| {
Expand Down Expand Up @@ -178,10 +179,7 @@ impl Binder {
}
}

Err(RwError::from(CatalogError::NotFound(
"table or source",
table_name.to_string(),
)))
Err(CatalogError::NotFound("table or source", table_name.to_string()).into())
})()?,
}
};
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ pub fn row_id_column_desc(column_id: ColumnId) -> ColumnDesc {
}
}

pub type CatalogResult<T> = std::result::Result<T, CatalogError>;

#[derive(Error, Debug)]
pub enum CatalogError {
#[error("{0} not found: {1}")]
Expand Down
104 changes: 61 additions & 43 deletions src/frontend/src/catalog/root_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,15 @@ use std::collections::HashMap;
use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::catalog::{CatalogVersion, IndexId, TableId, PG_CATALOG_SCHEMA_NAME};
use risingwave_common::error::Result;
use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
use risingwave_pb::catalog::{
Database as ProstDatabase, Index as ProstIndex, Schema as ProstSchema, Sink as ProstSink,
Source as ProstSource, Table as ProstTable,
};

use super::source_catalog::SourceCatalog;
use super::{CatalogError, SinkId, SourceId};
use super::{CatalogError, CatalogResult, SinkId, SourceId};
use crate::catalog::database_catalog::DatabaseCatalog;
use crate::catalog::schema_catalog::SchemaCatalog;
use crate::catalog::sink_catalog::SinkCatalog;
Expand Down Expand Up @@ -195,64 +193,71 @@ impl Catalog {
.drop_index(index_id);
}

pub fn get_database_by_name(&self, db_name: &str) -> Result<&DatabaseCatalog> {
pub fn get_database_by_name(&self, db_name: &str) -> CatalogResult<&DatabaseCatalog> {
self.database_by_name
.get(db_name)
.ok_or_else(|| CatalogError::NotFound("database", db_name.to_string()).into())
.ok_or_else(|| CatalogError::NotFound("database", db_name.to_string()))
}

pub fn get_database_by_id(&self, db_id: &DatabaseId) -> Result<&DatabaseCatalog> {
pub fn get_database_by_id(&self, db_id: &DatabaseId) -> CatalogResult<&DatabaseCatalog> {
let db_name = self
.db_name_by_id
.get(db_id)
.ok_or_else(|| CatalogError::NotFound("db_id", db_id.to_string()))?;
self.database_by_name
.get(db_name)
.ok_or_else(|| CatalogError::NotFound("database", db_name.to_string()).into())
.ok_or_else(|| CatalogError::NotFound("database", db_name.to_string()))
}

pub fn get_all_schema_names(&self, db_name: &str) -> Result<Vec<String>> {
pub fn get_all_schema_names(&self, db_name: &str) -> CatalogResult<Vec<String>> {
Ok(self.get_database_by_name(db_name)?.get_all_schema_names())
}

pub fn get_all_schema_info(&self, db_name: &str) -> Result<Vec<ProstSchema>> {
pub fn get_all_schema_info(&self, db_name: &str) -> CatalogResult<Vec<ProstSchema>> {
Ok(self.get_database_by_name(db_name)?.get_all_schema_info())
}

pub fn iter_schemas(&self, db_name: &str) -> Result<impl Iterator<Item = &SchemaCatalog>> {
pub fn iter_schemas(
&self,
db_name: &str,
) -> CatalogResult<impl Iterator<Item = &SchemaCatalog>> {
Ok(self.get_database_by_name(db_name)?.iter_schemas())
}

pub fn get_all_database_names(&self) -> Vec<String> {
self.database_by_name.keys().cloned().collect_vec()
}

pub fn get_schema_by_name(&self, db_name: &str, schema_name: &str) -> Result<&SchemaCatalog> {
pub fn get_schema_by_name(
&self,
db_name: &str,
schema_name: &str,
) -> CatalogResult<&SchemaCatalog> {
self.get_database_by_name(db_name)?
.get_schema_by_name(schema_name)
.ok_or_else(|| CatalogError::NotFound("schema", schema_name.to_string()).into())
.ok_or_else(|| CatalogError::NotFound("schema", schema_name.to_string()))
}

pub fn get_table_name_by_id(&self, table_id: TableId) -> Result<String> {
pub fn get_table_name_by_id(&self, table_id: TableId) -> CatalogResult<String> {
self.get_table_by_id(&table_id).map(|table| table.name)
}

pub fn get_schema_by_id(
&self,
db_id: &DatabaseId,
schema_id: &SchemaId,
) -> Result<&SchemaCatalog> {
) -> CatalogResult<&SchemaCatalog> {
self.get_database_by_id(db_id)?
.get_schema_by_id(schema_id)
.ok_or_else(|| CatalogError::NotFound("schema_id", schema_id.to_string()).into())
.ok_or_else(|| CatalogError::NotFound("schema_id", schema_id.to_string()))
}

pub fn first_valid_schema(
&self,
db_name: &str,
search_path: &SearchPath,
user_name: &str,
) -> Result<&SchemaCatalog> {
) -> CatalogResult<&SchemaCatalog> {
for path in search_path.real_path() {
let mut schema_name: &str = path;
if schema_name == USER_NAME_WILD_CARD {
Expand All @@ -263,7 +268,10 @@ impl Catalog {
return schema_catalog;
}
}
bail!("no schema has been selected to create in");
Err(CatalogError::NotFound(
"first valid schema",
"no schema has been selected to create in".to_string(),
))
}

#[inline(always)]
Expand All @@ -272,18 +280,18 @@ impl Catalog {
db_name: &str,
schema_name: &str,
table_name: &str,
) -> Result<&Arc<TableCatalog>> {
) -> CatalogResult<&Arc<TableCatalog>> {
self.get_schema_by_name(db_name, schema_name)?
.get_table_by_name(table_name)
.ok_or_else(|| CatalogError::NotFound("table", table_name.to_string()).into())
.ok_or_else(|| CatalogError::NotFound("table", table_name.to_string()))
}

pub fn get_table_by_name<'a>(
&self,
db_name: &str,
schema_path: SchemaPath<'a>,
table_name: &str,
) -> Result<(&Arc<TableCatalog>, &'a str)> {
) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
match schema_path {
SchemaPath::Name(schema_name) => self
.get_table_by_name_with_schema_name(db_name, schema_name, table_name)
Expand All @@ -301,16 +309,16 @@ impl Catalog {
return Ok((table_catalog, schema_name));
}
}
Err(CatalogError::NotFound("table", table_name.to_string()).into())
Err(CatalogError::NotFound("table", table_name.to_string()))
}
}
}

pub fn get_table_by_id(&self, table_id: &TableId) -> Result<TableCatalog> {
pub fn get_table_by_id(&self, table_id: &TableId) -> CatalogResult<TableCatalog> {
self.table_by_id
.get(table_id)
.cloned()
.ok_or_else(|| CatalogError::NotFound("table id", table_id.to_string()).into())
.ok_or_else(|| CatalogError::NotFound("table id", table_id.to_string()))
}

#[cfg(test)]
Expand All @@ -324,11 +332,15 @@ impl Catalog {
);
}

pub fn get_sys_table_by_name(&self, db_name: &str, table_name: &str) -> Result<&SystemCatalog> {
pub fn get_sys_table_by_name(
&self,
db_name: &str,
table_name: &str,
) -> CatalogResult<&SystemCatalog> {
self.get_schema_by_name(db_name, PG_CATALOG_SCHEMA_NAME)
.unwrap()
.get_system_table_by_name(table_name)
.ok_or_else(|| CatalogError::NotFound("table", table_name.to_string()).into())
.ok_or_else(|| CatalogError::NotFound("table", table_name.to_string()))
}

#[inline(always)]
Expand All @@ -337,18 +349,18 @@ impl Catalog {
db_name: &str,
schema_name: &str,
source_name: &str,
) -> Result<&Arc<SourceCatalog>> {
) -> CatalogResult<&Arc<SourceCatalog>> {
self.get_schema_by_name(db_name, schema_name)?
.get_source_by_name(source_name)
.ok_or_else(|| CatalogError::NotFound("source", source_name.to_string()).into())
.ok_or_else(|| CatalogError::NotFound("source", source_name.to_string()))
}

pub fn get_source_by_name<'a>(
&self,
db_name: &str,
schema_path: SchemaPath<'a>,
source_name: &str,
) -> Result<(&Arc<SourceCatalog>, &'a str)> {
) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
match schema_path {
SchemaPath::Name(schema_name) => self
.get_source_by_name_with_schema_name(db_name, schema_name, source_name)
Expand All @@ -366,7 +378,7 @@ impl Catalog {
return Ok((source_catalog, schema_name));
}
}
Err(CatalogError::NotFound("source", source_name.to_string()).into())
Err(CatalogError::NotFound("source", source_name.to_string()))
}
}
}
Expand All @@ -377,18 +389,18 @@ impl Catalog {
db_name: &str,
schema_name: &str,
sink_name: &str,
) -> Result<&Arc<SinkCatalog>> {
) -> CatalogResult<&Arc<SinkCatalog>> {
self.get_schema_by_name(db_name, schema_name)?
.get_sink_by_name(sink_name)
.ok_or_else(|| CatalogError::NotFound("sink", sink_name.to_string()).into())
.ok_or_else(|| CatalogError::NotFound("sink", sink_name.to_string()))
}

pub fn get_sink_by_name<'a>(
&self,
db_name: &str,
schema_path: SchemaPath<'a>,
sink_name: &str,
) -> Result<(&Arc<SinkCatalog>, &'a str)> {
) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
match schema_path {
SchemaPath::Name(schema_name) => self
.get_sink_by_name_with_schema_name(db_name, schema_name, sink_name)
Expand All @@ -406,7 +418,7 @@ impl Catalog {
return Ok((sink_catalog, schema_name));
}
}
Err(CatalogError::NotFound("sink", sink_name.to_string()).into())
Err(CatalogError::NotFound("sink", sink_name.to_string()))
}
}
}
Expand All @@ -417,18 +429,18 @@ impl Catalog {
db_name: &str,
schema_name: &str,
index_name: &str,
) -> Result<&Arc<IndexCatalog>> {
) -> CatalogResult<&Arc<IndexCatalog>> {
self.get_schema_by_name(db_name, schema_name)?
.get_index_by_name(index_name)
.ok_or_else(|| CatalogError::NotFound("index", index_name.to_string()).into())
.ok_or_else(|| CatalogError::NotFound("index", index_name.to_string()))
}

pub fn get_index_by_name<'a>(
&self,
db_name: &str,
schema_path: SchemaPath<'a>,
index_name: &str,
) -> Result<(&Arc<IndexCatalog>, &'a str)> {
) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
match schema_path {
SchemaPath::Name(schema_name) => self
.get_index_by_name_with_schema_name(db_name, schema_name, index_name)
Expand All @@ -446,7 +458,7 @@ impl Catalog {
return Ok((index_catalog, schema_name));
}
}
Err(CatalogError::NotFound("index", index_name.to_string()).into())
Err(CatalogError::NotFound("index", index_name.to_string()))
}
}
}
Expand All @@ -457,25 +469,31 @@ impl Catalog {
db_name: &str,
schema_name: &str,
relation_name: &str,
) -> Result<()> {
) -> CatalogResult<()> {
let schema = self.get_schema_by_name(db_name, schema_name)?;

// Resolve source first.
if let Some(source) = schema.get_source_by_name(relation_name) {
// TODO: check if it is a materialized source and improve the err msg
if source.is_table() {
Err(CatalogError::Duplicated("table", relation_name.to_string()).into())
Err(CatalogError::Duplicated("table", relation_name.to_string()))
} else {
Err(CatalogError::Duplicated("source", relation_name.to_string()).into())
Err(CatalogError::Duplicated(
"source",
relation_name.to_string(),
))
}
} else if let Some(table) = schema.get_table_by_name(relation_name) {
if table.is_index {
Err(CatalogError::Duplicated("index", relation_name.to_string()).into())
Err(CatalogError::Duplicated("index", relation_name.to_string()))
} else {
Err(CatalogError::Duplicated("materialized view", relation_name.to_string()).into())
Err(CatalogError::Duplicated(
"materialized view",
relation_name.to_string(),
))
}
} else if schema.get_sink_by_name(relation_name).is_some() {
Err(CatalogError::Duplicated("sink", relation_name.to_string()).into())
Err(CatalogError::Duplicated("sink", relation_name.to_string()))
} else {
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ pub async fn handle_create_index(
format!("relation \"{}\" already exists, skipping", index_name),
));
} else {
return Err(e);
return Err(e.into());
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/drop_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub async fn handle_drop_database(
format!("database \"{}\" does not exist, skipping", database_name),
))
} else {
Err(err)
Err(err.into())
};
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/drop_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub async fn handle_drop_index(
format!("index \"{}\" does not exist, skipping", index_name),
))
} else {
Err(err)
Err(err.into())
}
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/drop_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub async fn handle_drop_mv(
),
))
} else {
Err(e)
Err(e.into())
}
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/drop_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub async fn handle_drop_schema(
format!("schema \"{}\" does not exist, skipping", schema_name),
))
} else {
Err(err)
Err(err.into())
};
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/drop_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub async fn handle_drop_sink(
format!("sink \"{}\" does not exist, skipping", sink_name),
))
} else {
Err(e)
Err(e.into())
}
}
};
Expand Down
Loading

0 comments on commit ea98e26

Please sign in to comment.