-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Source Subscription deletion #136
Source Subscription deletion #136
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 6 out of 8 changed files in this pull request and generated no comments.
Files not reviewed (2)
- query-container/query-host/src/query_worker.rs: Evaluated as low risk
- sources/shared/query-api/src/api.rs: Evaluated as low risk
Comments suppressed due to low confidence (1)
query-container/query-host/src/models.rs:178
- The error message 'Failed to unsubscribe' could be more descriptive. Consider including the query ID or subscription ID in the message.
UnsubscribeFailed(String),
|
||
let resp = match self | ||
.client | ||
.post(format!("http://{}/unsubscription", app_id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To remain "RESTful", I think this should be a DELETE
verb on the subscription/{queryNodeId}/{queryId}
route.
sources/shared/query-api/src/main.rs
Outdated
state: &Arc<AppState>, | ||
) -> Result<(), axum::http::Response<axum::body::Body>> { | ||
let publisher = &state.publisher; | ||
let unsubscription_event = api::UnsubscriptionEvent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this just be a ControlEvent
with an op
of d
instead of i
on the SourceSubscription
table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Will modify this
@@ -391,6 +391,31 @@ async fn process_changes( | |||
} else { | |||
// TODO - supprt other ops on SourceSubscriptions | |||
} | |||
} else if change["payload"]["source"]["table"] == "SourceUnsubscription" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we refactor these deep nested if else statements into a match
statement?
sources/shared/query-api/src/main.rs
Outdated
table: "SourceSubscription".to_string(), | ||
}, | ||
before: None, | ||
after: request.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typically for these debezium style payloads, the properties for a "delete" will be in the "before" field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update this
…/drasi-platform into subscription-deletion * 'subscription-deletion' of https://github.com/ruokun-niu/drasi-platform: Bump rustls from 0.23.15 to 0.23.19 in /sources/shared/change-router (drasi-project#122) Review result reaction (drasi-project#111) Initial Rusk source SDK (drasi-project#128) Review Event Grid Reaction (drasi-project#134) CLI flag for installing Dapr from mcr instead of docker.io (drasi-project#133) Review gremlin reaction (drasi-project#115) Review Debezium reaction (drasi-project#121) VS Code extension config update and gh workflow (drasi-project#123) Bump cross-spawn from 7.0.3 to 7.0.6 in /dev-tools/vscode/drasi (drasi-project#116)
…drasi-platform into eventbridge-reaction * 'eventbridge-reaction' of https://github.com/ruokun-niu/drasi-platform: Update Dockerfile Update Dockerfile Update Dockerfile storage queue reaction build Initial Kubernetes source (drasi-project#137) Source Subscription deletion (drasi-project#136)
Description
Currently, when a continuous query is deleted, the subscription info in the change router is still persisted. This PR focuses on the deletion of subscription when a continuous query is deleted. Specifically, this is achieved by the following steps:
DELETE
command is received in the query worker, the worker will iterate through all of the sources that the query is subscribed to, and then invoke theunsubscription
method in the sources' query-api (this is achieved through dapr service invocation).unsubscription
method is invoked, the query api will publish the following message to the Source's change router:This message has a similar format as the SourceSubscription message
Type of change
Fixes: #issue_number