Skip to content

Commit

Permalink
refactor: minor refactor on create/alter source (#18619)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Sep 20, 2024
1 parent 3c74390 commit 3c94345
Show file tree
Hide file tree
Showing 13 changed files with 45 additions and 84 deletions.
1 change: 1 addition & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ pub enum SourceFormat {
Plain,
}

/// Refer to [`crate::parser::EncodingProperties`]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
pub enum SourceEncode {
#[default]
Expand Down
34 changes: 9 additions & 25 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,17 @@ pub trait CatalogWriter: Send + Sync {
job_type: TableJobType,
) -> Result<()>;

async fn alter_source_column(&self, source: PbSource) -> Result<()>;

async fn create_index(
&self,
index: PbIndex,
table: PbTable,
graph: StreamFragmentGraph,
) -> Result<()>;

async fn create_source(&self, source: PbSource) -> Result<()>;

async fn create_source_with_graph(
async fn create_source(
&self,
source: PbSource,
graph: StreamFragmentGraph,
graph: Option<StreamFragmentGraph>,
) -> Result<()>;

async fn create_sink(
Expand Down Expand Up @@ -199,7 +195,8 @@ pub trait CatalogWriter: Send + Sync {

async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()>;

async fn alter_source_with_sr(&self, source: PbSource) -> Result<()>;
/// Replace the source in the catalog.
async fn alter_source(&self, source: PbSource) -> Result<()>;

async fn alter_parallelism(
&self,
Expand Down Expand Up @@ -299,11 +296,6 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn alter_source_column(&self, source: PbSource) -> Result<()> {
let version = self.meta_client.alter_source_column(source).await?;
self.wait_version(version).await
}

async fn replace_table(
&self,
source: Option<PbSource>,
Expand All @@ -319,20 +311,12 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn create_source(&self, source: PbSource) -> Result<()> {
let version = self.meta_client.create_source(source).await?;
self.wait_version(version).await
}

async fn create_source_with_graph(
async fn create_source(
&self,
source: PbSource,
graph: StreamFragmentGraph,
graph: Option<StreamFragmentGraph>,
) -> Result<()> {
let version = self
.meta_client
.create_source_with_graph(source, graph)
.await?;
let version = self.meta_client.create_source(source, graph).await?;
self.wait_version(version).await
}

Expand Down Expand Up @@ -573,8 +557,8 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn alter_source_with_sr(&self, source: PbSource) -> Result<()> {
let version = self.meta_client.alter_source_with_sr(source).await?;
async fn alter_source(&self, source: PbSource) -> Result<()> {
let version = self.meta_client.alter_source(source).await?;
self.wait_version(version).await
}

Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ pub async fn handle_alter_source_column(
)
.into());
}
SourceEncode::Invalid | SourceEncode::Native => {
SourceEncode::Invalid | SourceEncode::Native | SourceEncode::None => {
return Err(RwError::from(ErrorCode::NotSupported(
format!("alter source with encode {:?}", encode),
"alter source with encode JSON | BYTES | CSV".into(),
"Only source with encode JSON | BYTES | CSV | PARQUET can be altered".into(),
)));
}
_ => {}
SourceEncode::Json | SourceEncode::Csv | SourceEncode::Bytes | SourceEncode::Parquet => {}
}

let columns = &mut catalog.columns;
Expand Down Expand Up @@ -117,7 +117,7 @@ pub async fn handle_alter_source_column(

let catalog_writer = session.catalog_writer()?;
catalog_writer
.alter_source_column(catalog.to_prost(schema_id, db_id))
.alter_source(catalog.to_prost(schema_id, db_id))
.await?;

Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ pub async fn handle_alter_source_with_sr(
pb_source.version += 1;

let catalog_writer = session.catalog_writer()?;
catalog_writer.alter_source_with_sr(pb_source).await?;
catalog_writer.alter_source(pb_source).await?;

Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE))
}
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1720,12 +1720,10 @@ pub async fn handle_create_source(
let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?;
build_graph(stream_plan)?
};
catalog_writer
.create_source_with_graph(source, graph)
.await?;
catalog_writer.create_source(source, Some(graph)).await?;
} else {
// For other sources we don't create a streaming job
catalog_writer.create_source(source).await?;
catalog_writer.create_source(source, None).await?;
}

Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE))
Expand Down
15 changes: 3 additions & 12 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,14 +318,10 @@ impl CatalogWriter for MockCatalogWriter {
Ok(())
}

async fn create_source(&self, source: PbSource) -> Result<()> {
self.create_source_inner(source).map(|_| ())
}

async fn create_source_with_graph(
async fn create_source(
&self,
source: PbSource,
_graph: StreamFragmentGraph,
_graph: Option<StreamFragmentGraph>,
) -> Result<()> {
self.create_source_inner(source).map(|_| ())
}
Expand Down Expand Up @@ -569,12 +565,7 @@ impl CatalogWriter for MockCatalogWriter {
Ok(())
}

async fn alter_source_column(&self, source: PbSource) -> Result<()> {
self.catalog.write().update_source(&source);
Ok(())
}

async fn alter_source_with_sr(&self, source: PbSource) -> Result<()> {
async fn alter_source(&self, source: PbSource) -> Result<()> {
self.catalog.write().update_source(&source);
Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl DdlService for DdlServiceImpl {
None => {
let version = self
.ddl_controller
.run_command(DdlCommand::CreateSource(source))
.run_command(DdlCommand::CreateSourceWithoutStreamingJob(source))
.await?;
Ok(Response::new(CreateSourceResponse {
status: None,
Expand Down Expand Up @@ -695,6 +695,7 @@ impl DdlService for DdlServiceImpl {
}))
}

/// Only support add column for now.
async fn alter_source(
&self,
request: Request<AlterSourceRequest>,
Expand Down
5 changes: 1 addition & 4 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2607,10 +2607,7 @@ impl CatalogController {
.collect())
}

pub async fn alter_source_column(
&self,
pb_source: PbSource,
) -> MetaResult<NotificationVersion> {
pub async fn alter_source(&self, pb_source: PbSource) -> MetaResult<NotificationVersion> {
let source_id = pb_source.id as SourceId;
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2632,7 +2632,7 @@ impl CatalogManager {
Ok(version)
}

pub async fn alter_source_column(&self, source: Source) -> MetaResult<NotificationVersion> {
pub async fn alter_source(&self, source: Source) -> MetaResult<NotificationVersion> {
let source_id = source.get_id();
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
Expand Down
23 changes: 15 additions & 8 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ pub enum DdlCommand {
DropDatabase(DatabaseId),
CreateSchema(Schema),
DropSchema(SchemaId),
CreateSource(Source),
CreateSourceWithoutStreamingJob(Source),
DropSource(SourceId, DropMode),
CreateFunction(Function),
DropFunction(FunctionId),
Expand Down Expand Up @@ -289,7 +289,9 @@ impl DdlController {
DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
DdlCommand::DropSchema(schema_id) => ctrl.drop_schema(schema_id).await,
DdlCommand::CreateSource(source) => ctrl.create_source(source).await,
DdlCommand::CreateSourceWithoutStreamingJob(source) => {
ctrl.create_source_without_streaming_job(source).await
}
DdlCommand::DropSource(source_id, drop_mode) => {
ctrl.drop_source(source_id, drop_mode).await
}
Expand Down Expand Up @@ -340,7 +342,7 @@ impl DdlController {
}
DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
DdlCommand::AlterSourceColumn(source) => ctrl.alter_source_column(source).await,
DdlCommand::AlterSourceColumn(source) => ctrl.alter_source(source).await,
DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
DdlCommand::CreateSubscription(subscription) => {
ctrl.create_subscription(subscription).await
Expand Down Expand Up @@ -458,7 +460,11 @@ impl DdlController {
}
}

async fn create_source(&self, mut source: Source) -> MetaResult<NotificationVersion> {
/// Shared source is handled in [`Self::create_streaming_job`]
async fn create_source_without_streaming_job(
&self,
mut source: Source,
) -> MetaResult<NotificationVersion> {
match &self.metadata_manager {
MetadataManager::V1(mgr) => {
source.id = self.gen_unique_id::<{ IdCategory::Table }>().await?;
Expand Down Expand Up @@ -523,11 +529,12 @@ impl DdlController {
Ok(version)
}

// Maybe we can unify `alter_source_column` and `alter_source_name`.
async fn alter_source_column(&self, source: Source) -> MetaResult<NotificationVersion> {
/// This replaces the source in the catalog.
/// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
async fn alter_source(&self, source: Source) -> MetaResult<NotificationVersion> {
match &self.metadata_manager {
MetadataManager::V1(mgr) => mgr.catalog_manager.alter_source_column(source).await,
MetadataManager::V2(mgr) => mgr.catalog_controller.alter_source_column(source).await,
MetadataManager::V1(mgr) => mgr.catalog_manager.alter_source(source).await,
MetadataManager::V2(mgr) => mgr.catalog_controller.alter_source(source).await,
}
}

Expand Down
27 changes: 4 additions & 23 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,24 +410,14 @@ impl MetaClient {
Ok(resp.version)
}

pub async fn create_source(&self, source: PbSource) -> Result<CatalogVersion> {
let request = CreateSourceRequest {
source: Some(source),
fragment_graph: None,
};

let resp = self.inner.create_source(request).await?;
Ok(resp.version)
}

pub async fn create_source_with_graph(
pub async fn create_source(
&self,
source: PbSource,
graph: StreamFragmentGraph,
graph: Option<StreamFragmentGraph>,
) -> Result<CatalogVersion> {
let request = CreateSourceRequest {
source: Some(source),
fragment_graph: Some(graph),
fragment_graph: graph,
};

let resp = self.inner.create_source(request).await?;
Expand Down Expand Up @@ -509,15 +499,6 @@ impl MetaClient {
Ok(resp.version)
}

// only adding columns is supported
pub async fn alter_source_column(&self, source: PbSource) -> Result<CatalogVersion> {
let request = AlterSourceRequest {
source: Some(source),
};
let resp = self.inner.alter_source(request).await?;
Ok(resp.version)
}

pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<CatalogVersion> {
let request = AlterOwnerRequest {
object: Some(object),
Expand All @@ -540,7 +521,7 @@ impl MetaClient {
Ok(resp.version)
}

pub async fn alter_source_with_sr(&self, source: PbSource) -> Result<CatalogVersion> {
pub async fn alter_source(&self, source: PbSource) -> Result<CatalogVersion> {
let request = AlterSourceRequest {
source: Some(source),
};
Expand Down
1 change: 1 addition & 0 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ impl Encode {
}
}

/// `FORMAT ... ENCODE ... [(a=b, ...)] [KEY ENCODE ...]`
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ConnectorSchema {
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
// Currently, `CreateMviewProgress` is designed for MV backfill, and rw_ddl_progress calculates
// progress based on the number of consumed rows and an estimated total number of rows from hummock.
// For now, we just rely on the same code path, and for source backfill, the progress will always be 99.99%.
tracing::info!("progress finish");
tracing::debug!("progress finish");
let epoch = barrier.epoch;
self.progress.finish(epoch, 114514);
// yield barrier after reporting progress
Expand Down

0 comments on commit 3c94345

Please sign in to comment.