Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Subscription deletion #136

Merged
merged 12 commits into from
Jan 15, 2025

Conversation

ruokun-niu
Copy link
Contributor

Description

Currently, when a continuous query is deleted, the subscription info in the change router is still persisted. This PR focuses on the deletion of subscription when a continuous query is deleted. Specifically, this is achieved by the following steps:

  • When a DELETE command is received in the query worker, the worker will iterate through all of the sources that the query is subscribed to, and then invoke the unsubscription method in the sources' query-api (this is achieved through dapr service invocation).
  • When the unsubscription method is invoked, the query api will publish the following message to the Source's change router:
{"payload":{"queryId":"<query-id>","queryNodeId":"<query-node-id>","source":{"db":"Drasi","table":"SourceUnsubscription"}}}

This message has a similar format as the SourceSubscription message

  • When the change router receives this message, it will format the state entry key and delete it from the dapr state store

Type of change

  • This pull request fixes a bug in Drasi and has an approved issue (issue link required).

Fixes: #issue_number

key: &str,
metadata: Option<HashMap<String, String>>,
) -> Result<(), Box<dyn std::error::Error>> {
let addr = "https://127.0.0.1".to_string();

Check notice

Code scanning / devskim

Accessing localhost could indicate debug code, or could hinder scaling. Note

Do not leave debug code in production

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 6 out of 8 changed files in this pull request and generated no comments.

Files not reviewed (2)
  • query-container/query-host/src/query_worker.rs: Evaluated as low risk
  • sources/shared/query-api/src/api.rs: Evaluated as low risk
Comments suppressed due to low confidence (1)

query-container/query-host/src/models.rs:178

  • The error message 'Failed to unsubscribe' could be more descriptive. Consider including the query ID or subscription ID in the message.
UnsubscribeFailed(String),

let resp = match self
.client
.post(format!("http://{}/unsubscription", app_id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To remain "RESTful", I think this should be a DELETE verb on the subscription/{queryNodeId}/{queryId} route.

state: &Arc<AppState>,
) -> Result<(), axum::http::Response<axum::body::Body>> {
let publisher = &state.publisher;
let unsubscription_event = api::UnsubscriptionEvent {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this just be a ControlEvent with an op of d instead of i on the SourceSubscription table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Will modify this

@@ -391,6 +391,31 @@ async fn process_changes(
} else {
// TODO - supprt other ops on SourceSubscriptions
}
} else if change["payload"]["source"]["table"] == "SourceUnsubscription" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we refactor these deep nested if else statements into a match statement?

table: "SourceSubscription".to_string(),
},
before: None,
after: request.clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typically for these debezium style payloads, the properties for a "delete" will be in the "before" field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update this

…/drasi-platform into subscription-deletion

* 'subscription-deletion' of https://github.com/ruokun-niu/drasi-platform:
  Bump rustls from 0.23.15 to 0.23.19 in /sources/shared/change-router (drasi-project#122)
  Review result reaction (drasi-project#111)
  Initial Rusk source SDK (drasi-project#128)
  Review Event Grid Reaction (drasi-project#134)
  CLI flag for installing Dapr from mcr instead of docker.io (drasi-project#133)
  Review gremlin reaction (drasi-project#115)
  Review Debezium reaction (drasi-project#121)
  VS Code extension config update and gh workflow (drasi-project#123)
  Bump cross-spawn from 7.0.3 to 7.0.6 in /dev-tools/vscode/drasi (drasi-project#116)
let resp = match self
.client
.delete(format!(
"http://{}/subscription/{}/{}",

Check warning

Code scanning / devskim

An HTTP-based URL without TLS was detected. Warning

Insecure URL
@ruokun-niu ruokun-niu merged commit 04af86f into drasi-project:main Jan 15, 2025
30 checks passed
ruokun-niu added a commit to ruokun-niu/drasi-platform that referenced this pull request Jan 21, 2025
…drasi-platform into eventbridge-reaction

* 'eventbridge-reaction' of https://github.com/ruokun-niu/drasi-platform:
  Update Dockerfile
  Update Dockerfile
  Update Dockerfile
  storage queue reaction build
  Initial Kubernetes source (drasi-project#137)
  Source Subscription deletion (drasi-project#136)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants