-
Notifications
You must be signed in to change notification settings - Fork 145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix[MQB]: use CSL to update state on QueueAssignmentAdvisory #584
base: main
Are you sure you want to change the base?
Conversation
852e459
to
8cdba6e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments
"(Committed advisory).*queueAssignmentAdvisory", timeout | ||
) | ||
assert not member.outputs_regex( | ||
"'QueueUnAssignmentAdvisory' will be applied to", timeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks correct for two reasons:
- Should be
QueueAssignmentAdvisory
instead ofQueueUnAssignmentAdvisory
- This log line seems to be during
blazingmq/src/groups/mqb/mqbc/mqbc_clusterutil.cpp
Lines 1000 to 1003 in 7b30704
// Apply 'queueAssignmentAdvisory' to CSL BALL_LOG_INFO << clusterData->identity().description() << ": 'QueueAssignmentAdvisory' will be applied to " << " cluster state ledger: " << queueAdvisory;
<< ": Queue assigned: " << queueAdvisory; | ||
|
||
// Broadcast 'queueAssignmentAdvisory' to all followers | ||
clusterData->messageTransmitter().broadcastMessage(controlMsg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, QueueAssignmentAdvisory
should never be sent out. We should remove
blazingmq/src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Lines 3079 to 3087 in 7b30704
case MsgChoice::SELECTION_ID_QUEUE_ASSIGNMENT_ADVISORY: { | |
dispatcher()->execute( | |
bdlf::BindUtil::bind( | |
&ClusterOrchestrator::processQueueAssignmentAdvisory, | |
&d_clusterOrchestrator, | |
message, | |
source), | |
this); | |
} break; // BREAK |
void ClusterOrchestrator::processQueueAssignmentAdvisory( |
blazingmq/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h
Lines 336 to 339 in 7b30704
virtual void | |
processQueueAssignmentAdvisory(const bmqp_ctrlmsg::ControlMessage& message, | |
mqbnet::ClusterNode* source, | |
bool delayed = false) = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still, preserve enough code such that mqbblp::ClusterStateManager::processBufferedQueueAdvisories()
and mqbblp::ClusterStateManager::processLeaderAdvisory()
do not break.
Signed-off-by: Emelia Lei <wlei29@bloomberg.net>
…visory Signed-off-by: Emelia Lei <wlei29@bloomberg.net>
Signed-off-by: Emelia Lei <wlei29@bloomberg.net>
* fix: always update CSL on QueueUpdateAdvisory Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> * Updating IT Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> --------- Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>
636cc18
to
022f710
Compare
Signed-off-by: Emelia Lei <wlei29@bloomberg.net>
BALL_LOG_INFO << cluster->description() | ||
<< ": Queue assigned: " << queueAdvisory; | ||
|
||
// Broadcast 'queueAssignmentAdvisory' to all followers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about compatibility between broker versions, we should still broadcast this advisory for now, such as old broker can still receive and process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the new leader broadcast through applyRecordInternal
, if the followers are old version, it seems that they can still receive this e_CLUSTER_TYPE
event and process correctly. Could you given an example of where there could be a mismatch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I meant is: Old version followers return on commit callback of e_CLUSTER_TYPE
events, so they rely on having this additionally broadcasted 'queueAssignmentAdvisory' to process correctly. Thus, new version leader should keep broadcasting this additional time for now,
@@ -3076,15 +3076,6 @@ void Cluster::processClusterControlMessage( | |||
source), | |||
this); | |||
} break; // BREAK | |||
case MsgChoice::SELECTION_ID_QUEUE_ASSIGNMENT_ADVISORY: { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about compatibility between different broker versions, we still keep this case as an no-op, in case an old broker sends us this:
case MsgChoice::SELECTION_ID_QUEUE_ASSIGNMENT_ADVISORY: {
// NO-OP
} break; // BREAK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, and upon some thinkings, maybe we should keep this branch and not remove anything related to processQueueAssignmentAdvisory
for now. Consider this: when the old leader calls assignQueue
, it will broadcast SELECTION_ID_QUEUE_ASSIGNMENT_ADVISORY
. If we delete this case for new followers, they won't do anything upon receiving the advisory which is incorrect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Old leader will also write QAA to CSL; old followers will return immediately, but new followers should process QAA as part of CSL commit callback. It should be safe to remove this branch.
Similar to PR #581, this PR invokes CSL commit callback for QueueAssignmentAdvisories and the queue assignment part of LeaderAdvisory, effectively making CSL the source of truth.