Skip to content

Commit

Permalink
fix(meta): persist correct vnode count for catalogs
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao committed Sep 29, 2024
1 parent 7a83d1a commit eb7d2cf
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,15 +399,19 @@ impl CatalogController {
for fragment in fragments {
let fragment_id = fragment.fragment_id;
let state_table_ids = fragment.state_table_ids.inner_ref().clone();
let vnode_count = fragment.vnode_count;

let fragment = fragment.into_active_model();
Fragment::insert(fragment).exec(&txn).await?;

// Update fragment id for all state tables.
// Fields including `fragment_id` and `vnode_count` were placeholder values before.
// After table fragments are created, update them for all internal tables.
if !for_replace {
for state_table_id in state_table_ids {
table::ActiveModel {
table_id: Set(state_table_id as _),
fragment_id: Set(Some(fragment_id)),
vnode_count: Set(vnode_count),
..Default::default()
}
.update(&txn)
Expand Down Expand Up @@ -1005,22 +1009,25 @@ impl CatalogController {
table.incoming_sinks = Set(incoming_sinks.into());
let table = table.update(txn).await?;

// Update state table fragment id.
let fragment_table_ids: Vec<(FragmentId, I32Array)> = Fragment::find()
// Fields including `fragment_id` and `vnode_count` were placeholder values before.
// After table fragments are created, update them for all internal tables.
let fragment_info: Vec<(FragmentId, I32Array, i32)> = Fragment::find()
.select_only()
.columns([
fragment::Column::FragmentId,
fragment::Column::StateTableIds,
fragment::Column::VnodeCount,
])
.filter(fragment::Column::JobId.eq(dummy_id))
.into_tuple()
.all(txn)
.await?;
for (fragment_id, state_table_ids) in fragment_table_ids {
for (fragment_id, state_table_ids, vnode_count) in fragment_info {
for state_table_id in state_table_ids.into_inner() {
table::ActiveModel {
table_id: Set(state_table_id as _),
fragment_id: Set(Some(fragment_id)),
vnode_count: Set(vnode_count),
..Default::default()
}
.update(txn)
Expand Down

0 comments on commit eb7d2cf

Please sign in to comment.