diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 4c55f3c6527f..da831687dbd4 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -672,6 +672,15 @@ impl CatalogController { Ok(job_states) } + pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult { + let inner = self.inner.read().await; + let job = StreamingJob::find_by_id(job_id) + .one(&inner.db) + .await? + .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?; + Ok(job.max_parallelism as usize) + } + /// Get all actor ids in the target streaming jobs. pub async fn get_job_actor_mapping( &self, diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 78df862e1d8e..a3d7f6492c6d 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -16,7 +16,7 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::pin::pin; use std::time::Duration; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use futures::future::{select, Either}; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_meta_model_v2::{ObjectId, SourceId}; @@ -892,6 +892,24 @@ impl MetadataManager { } } + pub async fn get_table_max_parallelism(&self, table_id: TableId) -> MetaResult { + match self { + MetadataManager::V1(mgr) => { + let fragments = mgr.fragment_manager.get_fragment_read_guard().await; + Ok(fragments + .table_fragments() + .get(&table_id) + .map(|tf| tf.max_parallelism) + .with_context(|| format!("job {table_id} not found"))?) + } + MetadataManager::V2(mgr) => { + mgr.catalog_controller + .get_max_parallelism_by_id(table_id.table_id as _) + .await + } + } + } + pub fn cluster_id(&self) -> &ClusterId { match self { MetadataManager::V1(mgr) => mgr.cluster_manager.cluster_id(), diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 0511e0e4de37..2e035cfafd51 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -19,7 +19,6 @@ use futures::future::join_all; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::catalog::TableId; -use risingwave_common::hash::VirtualNode; use risingwave_meta_model_v2::ObjectId; use risingwave_pb::catalog::{CreateType, Subscription, Table}; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; @@ -665,6 +664,8 @@ impl GlobalStreamManager { ) -> MetaResult<()> { let _reschedule_job_lock = self.reschedule_lock_write_guard().await; + let table_id = TableId::new(table_id); + let worker_nodes = self .metadata_manager .list_active_streaming_compute_nodes() @@ -683,8 +684,10 @@ impl GlobalStreamManager { .iter() .map(|w| w.parallelism as usize) .sum::(); - // TODO(var-vnode): get correct max parallelism from catalogs. - let max_parallelism = VirtualNode::COUNT_FOR_COMPAT; + let max_parallelism = self + .metadata_manager + .get_table_max_parallelism(table_id) + .await?; match parallelism { TableParallelism::Adaptive => { @@ -709,7 +712,7 @@ impl GlobalStreamManager { } } - let table_parallelism_assignment = HashMap::from([(TableId::new(table_id), parallelism)]); + let table_parallelism_assignment = HashMap::from([(table_id, parallelism)]); if deferred { tracing::debug!(