-
Notifications
You must be signed in to change notification settings - Fork 569
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: check split assignment before pushing mutation #18134
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -588,6 +588,32 @@ where | |
) | ||
} | ||
|
||
pub fn validate_assignment(assignment: &mut HashMap<ActorId, Vec<SplitImpl>>) { | ||
// check if one split is assign to multiple actors | ||
let mut split_to_actor = HashMap::new(); | ||
for (actor_id, splits) in &mut *assignment { | ||
let _ = splits.iter().map(|split| { | ||
split_to_actor | ||
.entry(split.id()) | ||
.or_insert_with(Vec::new) | ||
.push(*actor_id) | ||
}); | ||
} | ||
|
||
for (split_id, actor_ids) in &mut split_to_actor { | ||
if actor_ids.len() > 1 { | ||
tracing::warn!(split_id = ?split_id, actor_ids = ?actor_ids, "split is assigned to multiple actors"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tend to panic here to expose the problem early. Otherwise, it will almost certainly trigger a panic in storage side, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't know the root cause and have no way to fix the persisted data. From the frequency the users meet the problem, letting it panic and telling users to drop all sources can hurt the users' experience. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Oh, the following code will fix the multiple assignment issue and the compactor will no longer panic. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I see your point |
||
} | ||
// keep the first actor and remove the rest from the assignment | ||
for actor_id in actor_ids.iter().skip(1) { | ||
assignment | ||
.get_mut(actor_id) | ||
.unwrap() | ||
.retain(|split| split.id() != *split_id); | ||
} | ||
} | ||
} | ||
|
||
fn align_backfill_splits( | ||
backfill_actors: impl IntoIterator<Item = (ActorId, Vec<ActorId>)>, | ||
upstream_assignment: &HashMap<ActorId, Vec<SplitImpl>>, | ||
|
@@ -1143,11 +1169,14 @@ mod tests { | |
|
||
use risingwave_common::types::JsonbVal; | ||
use risingwave_connector::error::ConnectorResult; | ||
use risingwave_connector::source::{SplitId, SplitMetaData}; | ||
use risingwave_connector::source::test_source::TestSourceSplit; | ||
use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData}; | ||
use serde::{Deserialize, Serialize}; | ||
|
||
use super::validate_assignment; | ||
use crate::model::{ActorId, FragmentId}; | ||
use crate::stream::source_manager::{reassign_splits, SplitDiffOptions}; | ||
use crate::stream::SplitAssignment; | ||
|
||
#[derive(Debug, Copy, Clone, Serialize, Deserialize)] | ||
struct TestSplit { | ||
|
@@ -1268,6 +1297,49 @@ mod tests { | |
assert!(!diff.is_empty()) | ||
} | ||
|
||
#[test] | ||
fn test_validate_assignment() { | ||
let mut fragment_assignment: SplitAssignment; | ||
let test_assignment: HashMap<ActorId, Vec<SplitImpl>> = maplit::hashmap! { | ||
0 => vec![SplitImpl::Test( | ||
TestSourceSplit {id: "1".into(), properties: Default::default(), offset: Default::default()} | ||
), SplitImpl::Test( | ||
TestSourceSplit {id: "2".into(), properties: Default::default(), offset: Default::default()} | ||
)], | ||
1 => vec![SplitImpl::Test( | ||
TestSourceSplit {id: "3".into(), properties: Default::default(), offset: Default::default()} | ||
)], | ||
2 => vec![SplitImpl::Test( | ||
TestSourceSplit {id: "1".into(), properties: Default::default(), offset: Default::default()} | ||
)], | ||
}; | ||
fragment_assignment = maplit::hashmap! { | ||
1 => test_assignment, | ||
}; | ||
|
||
fragment_assignment.iter_mut().for_each(|(_, assignment)| { | ||
validate_assignment(assignment); | ||
}); | ||
|
||
{ | ||
let mut split_to_actor = HashMap::new(); | ||
for actor_to_splits in fragment_assignment.values() { | ||
for (actor_id, splits) in actor_to_splits { | ||
let _ = splits.iter().map(|split| { | ||
split_to_actor | ||
.entry(split.id()) | ||
.or_insert_with(Vec::new) | ||
.push(*actor_id) | ||
}); | ||
} | ||
} | ||
|
||
for actor_ids in split_to_actor.values() { | ||
assert_eq!(actor_ids.len(), 1); | ||
} | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_reassign_splits() { | ||
let actor_splits = HashMap::new(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it's just an in-memory validation, I think it's totally okay to keep it here.