Skip to content

Commit

Permalink
feat(frontend): show job's max parallelism in system tables (#18672)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao authored Sep 30, 2024
1 parent 6731c26 commit f7e5068
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 53 deletions.
42 changes: 32 additions & 10 deletions e2e_test/ddl/max_parallelism.slt
Original file line number Diff line number Diff line change
@@ -1,8 +1,29 @@
statement ok
create view table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;
create view table_parallelism as select t.name, tf.parallelism, tf.max_parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;

#### BEGIN

# Default max_parallelism should be 256.

statement ok
set streaming_max_parallelism to default;

statement ok
set streaming_parallelism to default;

statement ok
create table t;

query TT
select parallelism, max_parallelism from table_parallelism where name = 't';
----
ADAPTIVE 256

statement ok
drop table t;


# Test customized max_parallelism.

statement ok
set streaming_max_parallelism to 4;
Expand All @@ -21,10 +42,10 @@ set streaming_parallelism to 4;
statement ok
create table t;

query T
select parallelism from table_parallelism where name = 't';
query TT
select parallelism, max_parallelism from table_parallelism where name = 't';
----
FIXED(4)
FIXED(4) 4

statement ok
drop table t;
Expand All @@ -37,19 +58,19 @@ set streaming_parallelism to default;
statement ok
create table t;

query T
select parallelism from table_parallelism where name = 't';
query TT
select parallelism, max_parallelism from table_parallelism where name = 't';
----
ADAPTIVE
ADAPTIVE 4

# Alter parallelism to a valid value, ok.
statement ok
alter table t set parallelism to 4;

query T
select parallelism from table_parallelism where name = 't';
query TT
select parallelism, max_parallelism from table_parallelism where name = 't';
----
FIXED(4)
FIXED(4) 4

# Alter parallelism to an invalid value, return an error.
statement error specified parallelism 8 should not exceed max parallelism 4
Expand All @@ -58,6 +79,7 @@ alter table t set parallelism to 8;
statement ok
drop table t;


#### END

statement ok
Expand Down
2 changes: 2 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ message ListTableFragmentStatesResponse {
uint32 table_id = 1;
TableFragments.State state = 2;
TableParallelism parallelism = 3;
uint32 max_parallelism = 4;
}
repeated TableFragmentState states = 1;
}
Expand All @@ -258,6 +259,7 @@ message ListFragmentDistributionResponse {
repeated uint32 upstream_fragment_ids = 5;
uint32 fragment_type_mask = 6;
uint32 parallelism = 7;
uint32 vnode_count = 8;
}
repeated FragmentDistribution distributions = 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ use risingwave_frontend_macro::system_catalog;
f.state_table_ids,
f.upstream_fragment_ids,
f.flags,
f.parallelism
f.parallelism,
f.max_parallelism
FROM all_streaming_jobs job
INNER JOIN rw_fragments f ON job.id = f.table_id
ORDER BY job.id"
Expand All @@ -52,4 +53,5 @@ struct RwFragmentParallelism {
upstream_fragment_ids: Vec<i32>,
flags: Vec<String>,
parallelism: i32,
max_parallelism: i32,
}
52 changes: 32 additions & 20 deletions src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use risingwave_common::types::Fields;
use risingwave_frontend_macro::system_catalog;
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
use risingwave_pb::stream_plan::FragmentTypeFlag;

use crate::catalog::system_catalog::SysCatalogReaderImpl;
Expand All @@ -29,6 +30,7 @@ struct RwFragment {
upstream_fragment_ids: Vec<i32>,
flags: Vec<String>,
parallelism: i32,
max_parallelism: i32,
}

fn extract_fragment_type_flag(mask: u32) -> Vec<FragmentTypeFlag> {
Expand All @@ -51,26 +53,36 @@ async fn read_rw_fragment(reader: &SysCatalogReaderImpl) -> Result<Vec<RwFragmen

Ok(distributions
.into_iter()
.map(|distribution| RwFragment {
fragment_id: distribution.fragment_id as i32,
table_id: distribution.table_id as i32,
distribution_type: distribution.distribution_type().as_str_name().into(),
state_table_ids: distribution
.state_table_ids
.into_iter()
.map(|id| id as i32)
.collect(),
upstream_fragment_ids: distribution
.upstream_fragment_ids
.into_iter()
.map(|id| id as i32)
.collect(),
flags: extract_fragment_type_flag(distribution.fragment_type_mask)
.into_iter()
.flat_map(|t| t.as_str_name().strip_prefix("FRAGMENT_TYPE_FLAG_"))
.map(|s| s.into())
.collect(),
parallelism: distribution.parallelism as i32,
.map(|distribution| {
let distribution_type = distribution.distribution_type();
let max_parallelism = match distribution_type {
FragmentDistributionType::Single => 1,
FragmentDistributionType::Hash => distribution.vnode_count as i32,
FragmentDistributionType::Unspecified => unreachable!(),
};

RwFragment {
fragment_id: distribution.fragment_id as i32,
table_id: distribution.table_id as i32,
distribution_type: distribution.distribution_type().as_str_name().into(),
state_table_ids: distribution
.state_table_ids
.into_iter()
.map(|id| id as i32)
.collect(),
upstream_fragment_ids: distribution
.upstream_fragment_ids
.into_iter()
.map(|id| id as i32)
.collect(),
flags: extract_fragment_type_flag(distribution.fragment_type_mask)
.into_iter()
.flat_map(|t| t.as_str_name().strip_prefix("FRAGMENT_TYPE_FLAG_"))
.map(|s| s.into())
.collect(),
parallelism: distribution.parallelism as i32,
max_parallelism,
}
})
.collect())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use risingwave_frontend_macro::system_catalog;
job.id,
job.name,
job.relation_type,
tf.parallelism
tf.parallelism,
tf.max_parallelism
FROM all_streaming_jobs job
INNER JOIN rw_table_fragments tf ON job.id = tf.table_id
ORDER BY job.id"
Expand All @@ -42,4 +43,5 @@ struct RwStreamingParallelism {
name: String,
relation_type: String,
parallelism: String,
max_parallelism: i32,
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ struct RwTableFragment {
table_id: i32,
status: String,
parallelism: String,
max_parallelism: i32,
}

#[system_catalog(table, "rw_catalog.rw_table_fragments")]
Expand All @@ -40,6 +41,7 @@ async fn read_rw_table_fragments_info(
table_id: state.table_id as i32,
status: state.state().as_str_name().into(),
parallelism: parallelism.to_uppercase(),
max_parallelism: state.max_parallelism as i32,
}
})
.collect())
Expand Down
7 changes: 6 additions & 1 deletion src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::{HashMap, HashSet};

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VnodeCountCompat;
use risingwave_connector::source::SplitMetaData;
use risingwave_meta::manager::{LocalNotification, MetadataManager};
use risingwave_meta::model;
Expand Down Expand Up @@ -272,6 +273,7 @@ impl StreamManagerService for StreamServiceImpl {
table_id: tf.table_id().table_id,
state: tf.state() as i32,
parallelism: Some(tf.assigned_parallelism.into()),
max_parallelism: tf.max_parallelism as _,
},
)
.collect_vec()
Expand All @@ -280,7 +282,7 @@ impl StreamManagerService for StreamServiceImpl {
let job_states = mgr.catalog_controller.list_streaming_job_states().await?;
job_states
.into_iter()
.map(|(table_id, state, parallelism)| {
.map(|(table_id, state, parallelism, max_parallelism)| {
let parallelism = match parallelism {
StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
StreamingParallelism::Custom => model::TableParallelism::Custom,
Expand All @@ -293,6 +295,7 @@ impl StreamManagerService for StreamServiceImpl {
table_id: table_id as _,
state: PbState::from(state) as _,
parallelism: Some(parallelism.into()),
max_parallelism: max_parallelism as _,
}
})
.collect_vec()
Expand Down Expand Up @@ -323,6 +326,7 @@ impl StreamManagerService for StreamServiceImpl {
upstream_fragment_ids: fragment.upstream_fragment_ids.clone(),
fragment_type_mask: fragment.fragment_type_mask,
parallelism: fragment.actors.len() as _,
vnode_count: fragment.vnode_count() as _,
}
})
})
Expand All @@ -345,6 +349,7 @@ impl StreamManagerService for StreamServiceImpl {
.into_u32_array(),
fragment_type_mask: fragment_desc.fragment_type_mask as _,
parallelism: fragment_desc.parallelism as _,
vnode_count: fragment_desc.vnode_count as _,
}
})
.collect_vec()
Expand Down
6 changes: 4 additions & 2 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,14 +677,15 @@ impl CatalogController {

pub async fn list_streaming_job_states(
&self,
) -> MetaResult<Vec<(ObjectId, JobStatus, StreamingParallelism)>> {
) -> MetaResult<Vec<(ObjectId, JobStatus, StreamingParallelism, i32)>> {
let inner = self.inner.read().await;
let job_states: Vec<(ObjectId, JobStatus, StreamingParallelism)> = StreamingJob::find()
let job_states = StreamingJob::find()
.select_only()
.columns([
streaming_job::Column::JobId,
streaming_job::Column::JobStatus,
streaming_job::Column::Parallelism,
streaming_job::Column::MaxParallelism,
])
.into_tuple()
.all(&inner.db)
Expand Down Expand Up @@ -842,6 +843,7 @@ impl CatalogController {
fragment::Column::DistributionType,
fragment::Column::StateTableIds,
fragment::Column::UpstreamFragmentId,
fragment::Column::VnodeCount,
])
.column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
.join(JoinType::LeftJoin, fragment::Relation::Actor.def())
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/controller/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ pub struct FragmentDesc {
pub state_table_ids: I32Array,
pub upstream_fragment_id: I32Array,
pub parallelism: i64,
pub vnode_count: i64,
}

/// List all objects that are using the given one in a cascade way. It runs a recursive CTE to find all the dependencies.
Expand Down
36 changes: 18 additions & 18 deletions src/tests/simulation/tests/integration_tests/scale/shared_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ async fn test_shared_source() -> Result<()> {
.collect_vec();
validate_splits_aligned(&mut cluster).await?;
expect_test::expect![[r#"
1 1 HASH {2} {} {SOURCE} 6
2 3 HASH {4,3} {3} {MVIEW} 6
3 3 HASH {5} {1} {SOURCE_SCAN} 6"#]]
1 1 HASH {2} {} {SOURCE} 6 256
2 3 HASH {4,3} {3} {MVIEW} 6 256
3 3 HASH {5} {1} {SOURCE_SCAN} 6 256"#]]
.assert_eq(&cluster.run("select * from rw_fragments;").await?);
expect_test::expect![[r#"
1 CREATED ADAPTIVE
3 CREATED ADAPTIVE"#]]
1 CREATED ADAPTIVE 256
3 CREATED ADAPTIVE 256"#]]
.assert_eq(&cluster.run("select * from rw_table_fragments;").await?);

// SourceBackfill cannot be scaled because of NoShuffle.
Expand All @@ -137,9 +137,9 @@ async fn test_shared_source() -> Result<()> {
.await
.unwrap();
expect_test::expect![[r#"
1 1 HASH {2} {} {SOURCE} 6
2 3 HASH {4,3} {3} {MVIEW} 5
3 3 HASH {5} {1} {SOURCE_SCAN} 6"#]]
1 1 HASH {2} {} {SOURCE} 6 256
2 3 HASH {4,3} {3} {MVIEW} 5 256
3 3 HASH {5} {1} {SOURCE_SCAN} 6 256"#]]
.assert_eq(&cluster.run("select * from rw_fragments;").await?);

// source is the NoShuffle upstream. It can be scaled, and the downstream SourceBackfill will be scaled together.
Expand All @@ -156,13 +156,13 @@ async fn test_shared_source() -> Result<()> {
.unwrap();
validate_splits_aligned(&mut cluster).await?;
expect_test::expect![[r#"
1 1 HASH {2} {} {SOURCE} 3
2 3 HASH {4,3} {3} {MVIEW} 5
3 3 HASH {5} {1} {SOURCE_SCAN} 3"#]]
1 1 HASH {2} {} {SOURCE} 3 256
2 3 HASH {4,3} {3} {MVIEW} 5 256
3 3 HASH {5} {1} {SOURCE_SCAN} 3 256"#]]
.assert_eq(&cluster.run("select * from rw_fragments;").await?);
expect_test::expect![[r#"
1 CREATED CUSTOM
3 CREATED CUSTOM"#]]
1 CREATED CUSTOM 256
3 CREATED CUSTOM 256"#]]
.assert_eq(&cluster.run("select * from rw_table_fragments;").await?);

// resolve_no_shuffle for backfill fragment is OK, which will scale the upstream together.
Expand All @@ -180,13 +180,13 @@ async fn test_shared_source() -> Result<()> {
.unwrap();
validate_splits_aligned(&mut cluster).await?;
expect_test::expect![[r#"
1 1 HASH {2} {} {SOURCE} 7
2 3 HASH {4,3} {3} {MVIEW} 5
3 3 HASH {5} {1} {SOURCE_SCAN} 7"#]]
1 1 HASH {2} {} {SOURCE} 7 256
2 3 HASH {4,3} {3} {MVIEW} 5 256
3 3 HASH {5} {1} {SOURCE_SCAN} 7 256"#]]
.assert_eq(&cluster.run("select * from rw_fragments;").await?);
expect_test::expect![[r#"
1 CREATED CUSTOM
3 CREATED CUSTOM"#]]
1 CREATED CUSTOM 256
3 CREATED CUSTOM 256"#]]
.assert_eq(&cluster.run("select * from rw_table_fragments;").await?);
Ok(())
}

0 comments on commit f7e5068

Please sign in to comment.