Skip to content

Commit

Permalink
fix: check split assignment before pushing mutation (#18134)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <tabvision@bupt.icu>
  • Loading branch information
tabVersion authored Aug 21, 2024
1 parent 5d35cf3 commit 723833a
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 5 deletions.
23 changes: 19 additions & 4 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ use super::trace::TracedEpoch;
use crate::barrier::{GlobalBarrierManagerContext, InflightSubscriptionInfo};
use crate::manager::{DdlType, InflightFragmentInfo, MetadataManager, StreamingJob, WorkerId};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig};
use crate::stream::{
build_actor_connector_splits, validate_assignment, SplitAssignment, ThrottleConfig,
};
use crate::MetaResult;

/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
Expand Down Expand Up @@ -520,9 +522,14 @@ impl CommandContext {
}

Command::SourceSplitAssignment(change) => {
let mut checked_assignment = change.clone();
checked_assignment
.iter_mut()
.for_each(|(_, assignment)| validate_assignment(assignment));

let mut diff = HashMap::new();

for actor_splits in change.values() {
for actor_splits in checked_assignment.values() {
diff.extend(actor_splits.clone());
}

Expand Down Expand Up @@ -570,7 +577,12 @@ impl CommandContext {
})
.collect();
let added_actors = table_fragments.actor_ids();
let actor_splits = split_assignment

let mut checked_split_assignment = split_assignment.clone();
checked_split_assignment
.iter_mut()
.for_each(|(_, assignment)| validate_assignment(assignment));
let actor_splits = checked_split_assignment
.values()
.flat_map(build_actor_connector_splits)
.collect();
Expand Down Expand Up @@ -776,7 +788,10 @@ impl CommandContext {
let mut actor_splits = HashMap::new();

for reschedule in reschedules.values() {
for (actor_id, splits) in &reschedule.actor_splits {
let mut checked_assignment = reschedule.actor_splits.clone();
validate_assignment(&mut checked_assignment);

for (actor_id, splits) in &checked_assignment {
actor_splits.insert(
*actor_id as ActorId,
ConnectorSplits {
Expand Down
74 changes: 73 additions & 1 deletion src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,32 @@ where
)
}

pub fn validate_assignment(assignment: &mut HashMap<ActorId, Vec<SplitImpl>>) {
// check if one split is assign to multiple actors
let mut split_to_actor = HashMap::new();
for (actor_id, splits) in &mut *assignment {
let _ = splits.iter().map(|split| {
split_to_actor
.entry(split.id())
.or_insert_with(Vec::new)
.push(*actor_id)
});
}

for (split_id, actor_ids) in &mut split_to_actor {
if actor_ids.len() > 1 {
tracing::warn!(split_id = ?split_id, actor_ids = ?actor_ids, "split is assigned to multiple actors");
}
// keep the first actor and remove the rest from the assignment
for actor_id in actor_ids.iter().skip(1) {
assignment
.get_mut(actor_id)
.unwrap()
.retain(|split| split.id() != *split_id);
}
}
}

fn align_backfill_splits(
backfill_actors: impl IntoIterator<Item = (ActorId, Vec<ActorId>)>,
upstream_assignment: &HashMap<ActorId, Vec<SplitImpl>>,
Expand Down Expand Up @@ -1143,11 +1169,14 @@ mod tests {

use risingwave_common::types::JsonbVal;
use risingwave_connector::error::ConnectorResult;
use risingwave_connector::source::{SplitId, SplitMetaData};
use risingwave_connector::source::test_source::TestSourceSplit;
use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData};
use serde::{Deserialize, Serialize};

use super::validate_assignment;
use crate::model::{ActorId, FragmentId};
use crate::stream::source_manager::{reassign_splits, SplitDiffOptions};
use crate::stream::SplitAssignment;

#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
struct TestSplit {
Expand Down Expand Up @@ -1268,6 +1297,49 @@ mod tests {
assert!(!diff.is_empty())
}

#[test]
fn test_validate_assignment() {
let mut fragment_assignment: SplitAssignment;
let test_assignment: HashMap<ActorId, Vec<SplitImpl>> = maplit::hashmap! {
0 => vec![SplitImpl::Test(
TestSourceSplit {id: "1".into(), properties: Default::default(), offset: Default::default()}
), SplitImpl::Test(
TestSourceSplit {id: "2".into(), properties: Default::default(), offset: Default::default()}
)],
1 => vec![SplitImpl::Test(
TestSourceSplit {id: "3".into(), properties: Default::default(), offset: Default::default()}
)],
2 => vec![SplitImpl::Test(
TestSourceSplit {id: "1".into(), properties: Default::default(), offset: Default::default()}
)],
};
fragment_assignment = maplit::hashmap! {
1 => test_assignment,
};

fragment_assignment.iter_mut().for_each(|(_, assignment)| {
validate_assignment(assignment);
});

{
let mut split_to_actor = HashMap::new();
for actor_to_splits in fragment_assignment.values() {
for (actor_id, splits) in actor_to_splits {
let _ = splits.iter().map(|split| {
split_to_actor
.entry(split.id())
.or_insert_with(Vec::new)
.push(*actor_id)
});
}
}

for actor_ids in split_to_actor.values() {
assert_eq!(actor_ids.len(), 1);
}
}
}

#[test]
fn test_reassign_splits() {
let actor_splits = HashMap::new();
Expand Down

0 comments on commit 723833a

Please sign in to comment.