Skip to content

Commit

Permalink
fetch max parallelism to check alter parallelism
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao committed Sep 26, 2024
1 parent 5c8173e commit 8cfaf3f
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 5 deletions.
9 changes: 9 additions & 0 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,15 @@ impl CatalogController {
Ok(job_states)
}

pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult<usize> {
let inner = self.inner.read().await;
let job = StreamingJob::find_by_id(job_id)
.one(&inner.db)
.await?
.ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
Ok(job.max_parallelism as usize)
}

/// Get all actor ids in the target streaming jobs.
pub async fn get_job_actor_mapping(
&self,
Expand Down
20 changes: 19 additions & 1 deletion src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::{BTreeMap, HashMap, HashSet};
use std::pin::pin;
use std::time::Duration;

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use futures::future::{select, Either};
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_meta_model_v2::{ObjectId, SourceId};
Expand Down Expand Up @@ -892,6 +892,24 @@ impl MetadataManager {
}
}

pub async fn get_table_max_parallelism(&self, table_id: TableId) -> MetaResult<usize> {
match self {
MetadataManager::V1(mgr) => {
let fragments = mgr.fragment_manager.get_fragment_read_guard().await;
Ok(fragments
.table_fragments()
.get(&table_id)
.map(|tf| tf.max_parallelism)
.with_context(|| format!("job {table_id} not found"))?)
}
MetadataManager::V2(mgr) => {
mgr.catalog_controller
.get_max_parallelism_by_id(table_id.table_id as _)
.await
}
}
}

pub fn cluster_id(&self) -> &ClusterId {
match self {
MetadataManager::V1(mgr) => mgr.cluster_manager.cluster_id(),
Expand Down
11 changes: 7 additions & 4 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use futures::future::join_all;
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_meta_model_v2::ObjectId;
use risingwave_pb::catalog::{CreateType, Subscription, Table};
use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
Expand Down Expand Up @@ -665,6 +664,8 @@ impl GlobalStreamManager {
) -> MetaResult<()> {
let _reschedule_job_lock = self.reschedule_lock_write_guard().await;

let table_id = TableId::new(table_id);

let worker_nodes = self
.metadata_manager
.list_active_streaming_compute_nodes()
Expand All @@ -683,8 +684,10 @@ impl GlobalStreamManager {
.iter()
.map(|w| w.parallelism as usize)
.sum::<usize>();
// TODO(var-vnode): get correct max parallelism from catalogs.
let max_parallelism = VirtualNode::COUNT_FOR_COMPAT;
let max_parallelism = self
.metadata_manager
.get_table_max_parallelism(table_id)
.await?;

match parallelism {
TableParallelism::Adaptive => {
Expand All @@ -709,7 +712,7 @@ impl GlobalStreamManager {
}
}

let table_parallelism_assignment = HashMap::from([(TableId::new(table_id), parallelism)]);
let table_parallelism_assignment = HashMap::from([(table_id, parallelism)]);

if deferred {
tracing::debug!(
Expand Down

0 comments on commit 8cfaf3f

Please sign in to comment.