diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index d2b5aa1e88b4..59e3585431a6 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -243,6 +243,7 @@ pub enum SourceFormat { Plain, } +/// Refer to [`crate::parser::EncodingProperties`] #[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] pub enum SourceEncode { #[default] diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 5f42d1e73e5b..061aeb737aa3 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -95,8 +95,6 @@ pub trait CatalogWriter: Send + Sync { job_type: TableJobType, ) -> Result<()>; - async fn alter_source_column(&self, source: PbSource) -> Result<()>; - async fn create_index( &self, index: PbIndex, @@ -104,12 +102,10 @@ pub trait CatalogWriter: Send + Sync { graph: StreamFragmentGraph, ) -> Result<()>; - async fn create_source(&self, source: PbSource) -> Result<()>; - - async fn create_source_with_graph( + async fn create_source( &self, source: PbSource, - graph: StreamFragmentGraph, + graph: Option, ) -> Result<()>; async fn create_sink( @@ -199,7 +195,8 @@ pub trait CatalogWriter: Send + Sync { async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()>; - async fn alter_source_with_sr(&self, source: PbSource) -> Result<()>; + /// Replace the source in the catalog. + async fn alter_source(&self, source: PbSource) -> Result<()>; async fn alter_parallelism( &self, @@ -299,11 +296,6 @@ impl CatalogWriter for CatalogWriterImpl { self.wait_version(version).await } - async fn alter_source_column(&self, source: PbSource) -> Result<()> { - let version = self.meta_client.alter_source_column(source).await?; - self.wait_version(version).await - } - async fn replace_table( &self, source: Option, @@ -319,20 +311,12 @@ impl CatalogWriter for CatalogWriterImpl { self.wait_version(version).await } - async fn create_source(&self, source: PbSource) -> Result<()> { - let version = self.meta_client.create_source(source).await?; - self.wait_version(version).await - } - - async fn create_source_with_graph( + async fn create_source( &self, source: PbSource, - graph: StreamFragmentGraph, + graph: Option, ) -> Result<()> { - let version = self - .meta_client - .create_source_with_graph(source, graph) - .await?; + let version = self.meta_client.create_source(source, graph).await?; self.wait_version(version).await } @@ -573,8 +557,8 @@ impl CatalogWriter for CatalogWriterImpl { self.wait_version(version).await } - async fn alter_source_with_sr(&self, source: PbSource) -> Result<()> { - let version = self.meta_client.alter_source_with_sr(source).await?; + async fn alter_source(&self, source: PbSource) -> Result<()> { + let version = self.meta_client.alter_source(source).await?; self.wait_version(version).await } diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index 43985518fd1b..d7b90a03172d 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -82,13 +82,13 @@ pub async fn handle_alter_source_column( ) .into()); } - SourceEncode::Invalid | SourceEncode::Native => { + SourceEncode::Invalid | SourceEncode::Native | SourceEncode::None => { return Err(RwError::from(ErrorCode::NotSupported( format!("alter source with encode {:?}", encode), - "alter source with encode JSON | BYTES | CSV".into(), + "Only source with encode JSON | BYTES | CSV | PARQUET can be altered".into(), ))); } - _ => {} + SourceEncode::Json | SourceEncode::Csv | SourceEncode::Bytes | SourceEncode::Parquet => {} } let columns = &mut catalog.columns; @@ -117,7 +117,7 @@ pub async fn handle_alter_source_column( let catalog_writer = session.catalog_writer()?; catalog_writer - .alter_source_column(catalog.to_prost(schema_id, db_id)) + .alter_source(catalog.to_prost(schema_id, db_id)) .await?; Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE)) diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index 01e09b0c4f5f..25de847fca3f 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -278,7 +278,7 @@ pub async fn handle_alter_source_with_sr( pb_source.version += 1; let catalog_writer = session.catalog_writer()?; - catalog_writer.alter_source_with_sr(pb_source).await?; + catalog_writer.alter_source(pb_source).await?; Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE)) } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 7cbf4fbaf47e..aeaa026dffff 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1720,12 +1720,10 @@ pub async fn handle_create_source( let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?; build_graph(stream_plan)? }; - catalog_writer - .create_source_with_graph(source, graph) - .await?; + catalog_writer.create_source(source, Some(graph)).await?; } else { // For other sources we don't create a streaming job - catalog_writer.create_source(source).await?; + catalog_writer.create_source(source, None).await?; } Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE)) diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 612388926215..7826890c84d1 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -318,14 +318,10 @@ impl CatalogWriter for MockCatalogWriter { Ok(()) } - async fn create_source(&self, source: PbSource) -> Result<()> { - self.create_source_inner(source).map(|_| ()) - } - - async fn create_source_with_graph( + async fn create_source( &self, source: PbSource, - _graph: StreamFragmentGraph, + _graph: Option, ) -> Result<()> { self.create_source_inner(source).map(|_| ()) } @@ -569,12 +565,7 @@ impl CatalogWriter for MockCatalogWriter { Ok(()) } - async fn alter_source_column(&self, source: PbSource) -> Result<()> { - self.catalog.write().update_source(&source); - Ok(()) - } - - async fn alter_source_with_sr(&self, source: PbSource) -> Result<()> { + async fn alter_source(&self, source: PbSource) -> Result<()> { self.catalog.write().update_source(&source); Ok(()) } diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 641bc392f722..de9485ab3e12 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -240,7 +240,7 @@ impl DdlService for DdlServiceImpl { None => { let version = self .ddl_controller - .run_command(DdlCommand::CreateSource(source)) + .run_command(DdlCommand::CreateSourceWithoutStreamingJob(source)) .await?; Ok(Response::new(CreateSourceResponse { status: None, @@ -695,6 +695,7 @@ impl DdlService for DdlServiceImpl { })) } + /// Only support add column for now. async fn alter_source( &self, request: Request, diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 11fe99771a4b..f747100eaed8 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -2607,10 +2607,7 @@ impl CatalogController { .collect()) } - pub async fn alter_source_column( - &self, - pb_source: PbSource, - ) -> MetaResult { + pub async fn alter_source(&self, pb_source: PbSource) -> MetaResult { let source_id = pb_source.id as SourceId; let inner = self.inner.write().await; let txn = inner.db.begin().await?; diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index d139ec8e037c..773428beb9e6 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -2632,7 +2632,7 @@ impl CatalogManager { Ok(version) } - pub async fn alter_source_column(&self, source: Source) -> MetaResult { + pub async fn alter_source(&self, source: Source) -> MetaResult { let source_id = source.get_id(); let core = &mut *self.core.lock().await; let database_core = &mut core.database; diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 4c1988a37d44..43c8b52a571c 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -131,7 +131,7 @@ pub enum DdlCommand { DropDatabase(DatabaseId), CreateSchema(Schema), DropSchema(SchemaId), - CreateSource(Source), + CreateSourceWithoutStreamingJob(Source), DropSource(SourceId, DropMode), CreateFunction(Function), DropFunction(FunctionId), @@ -289,7 +289,9 @@ impl DdlController { DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await, DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await, DdlCommand::DropSchema(schema_id) => ctrl.drop_schema(schema_id).await, - DdlCommand::CreateSource(source) => ctrl.create_source(source).await, + DdlCommand::CreateSourceWithoutStreamingJob(source) => { + ctrl.create_source_without_streaming_job(source).await + } DdlCommand::DropSource(source_id, drop_mode) => { ctrl.drop_source(source_id, drop_mode).await } @@ -340,7 +342,7 @@ impl DdlController { } DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await, DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await, - DdlCommand::AlterSourceColumn(source) => ctrl.alter_source_column(source).await, + DdlCommand::AlterSourceColumn(source) => ctrl.alter_source(source).await, DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await, DdlCommand::CreateSubscription(subscription) => { ctrl.create_subscription(subscription).await @@ -458,7 +460,11 @@ impl DdlController { } } - async fn create_source(&self, mut source: Source) -> MetaResult { + /// Shared source is handled in [`Self::create_streaming_job`] + async fn create_source_without_streaming_job( + &self, + mut source: Source, + ) -> MetaResult { match &self.metadata_manager { MetadataManager::V1(mgr) => { source.id = self.gen_unique_id::<{ IdCategory::Table }>().await?; @@ -523,11 +529,12 @@ impl DdlController { Ok(version) } - // Maybe we can unify `alter_source_column` and `alter_source_name`. - async fn alter_source_column(&self, source: Source) -> MetaResult { + /// This replaces the source in the catalog. + /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated. + async fn alter_source(&self, source: Source) -> MetaResult { match &self.metadata_manager { - MetadataManager::V1(mgr) => mgr.catalog_manager.alter_source_column(source).await, - MetadataManager::V2(mgr) => mgr.catalog_controller.alter_source_column(source).await, + MetadataManager::V1(mgr) => mgr.catalog_manager.alter_source(source).await, + MetadataManager::V2(mgr) => mgr.catalog_controller.alter_source(source).await, } } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 67ea55269b2b..a226e4d67249 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -410,24 +410,14 @@ impl MetaClient { Ok(resp.version) } - pub async fn create_source(&self, source: PbSource) -> Result { - let request = CreateSourceRequest { - source: Some(source), - fragment_graph: None, - }; - - let resp = self.inner.create_source(request).await?; - Ok(resp.version) - } - - pub async fn create_source_with_graph( + pub async fn create_source( &self, source: PbSource, - graph: StreamFragmentGraph, + graph: Option, ) -> Result { let request = CreateSourceRequest { source: Some(source), - fragment_graph: Some(graph), + fragment_graph: graph, }; let resp = self.inner.create_source(request).await?; @@ -509,15 +499,6 @@ impl MetaClient { Ok(resp.version) } - // only adding columns is supported - pub async fn alter_source_column(&self, source: PbSource) -> Result { - let request = AlterSourceRequest { - source: Some(source), - }; - let resp = self.inner.alter_source(request).await?; - Ok(resp.version) - } - pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result { let request = AlterOwnerRequest { object: Some(object), @@ -540,7 +521,7 @@ impl MetaClient { Ok(resp.version) } - pub async fn alter_source_with_sr(&self, source: PbSource) -> Result { + pub async fn alter_source(&self, source: PbSource) -> Result { let request = AlterSourceRequest { source: Some(source), }; diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index b8e1aa245b4e..7dd7564fc666 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -219,6 +219,7 @@ impl Encode { } } +/// `FORMAT ... ENCODE ... [(a=b, ...)] [KEY ENCODE ...]` #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct ConnectorSchema { diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 3f2cd83aca28..77cdc90e546f 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -580,7 +580,7 @@ impl SourceBackfillExecutorInner { // Currently, `CreateMviewProgress` is designed for MV backfill, and rw_ddl_progress calculates // progress based on the number of consumed rows and an estimated total number of rows from hummock. // For now, we just rely on the same code path, and for source backfill, the progress will always be 99.99%. - tracing::info!("progress finish"); + tracing::debug!("progress finish"); let epoch = barrier.epoch; self.progress.finish(epoch, 114514); // yield barrier after reporting progress