Skip to content

Commit

Permalink
Resource Cleanup on delete (#90)
Browse files Browse the repository at this point in the history
* wip

* wip

* cargo fmt

* linting

* nit

* graceful shutdown
  • Loading branch information
danielgerlag authored Oct 18, 2024
1 parent 3b88274 commit dda3a66
Show file tree
Hide file tree
Showing 35 changed files with 804 additions and 101 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/draft-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ jobs:
- name: Build Management API
uses: docker/build-push-action@ca052bb54ab0790a636c9b5f226502c73d547a25 # v5.4.0
with:
context: ./control-planes
context: '.'
file: ./control-planes/mgmt_api/Dockerfile
platforms: linux/amd64, linux/arm64
tags: |
Expand Down
6 changes: 6 additions & 0 deletions cli/service/resources/default-source-providers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ spec:
default: pg
reactivator:
image: source-debezium-reactivator
deprovisionHandler: true
dapr:
app-port: "80"
config_schema:
type: object
properties:
Expand Down Expand Up @@ -64,6 +67,9 @@ spec:
default: mssql
reactivator:
image: source-debezium-reactivator
deprovisionHandler: true
dapr:
app-port: "80"
config_schema:
type: object
properties:
Expand Down
10 changes: 10 additions & 0 deletions control-planes/kubernetes_provider/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions control-planes/kubernetes_provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ axum = { version = "0.7.5", features = ["macros"] }
kube-derive = "0.87.1"
schemars = "0.8.15"
bytes = "1.7.1"
uuid = { version = "1.10.0", features = ["v4"] }

83 changes: 82 additions & 1 deletion control-planes/kubernetes_provider/src/actors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use resource_provider_api::models::ResourceRequest;
use std::collections::BTreeMap;
use std::marker;
use tokio::sync::RwLock;
use uuid::Uuid;

pub mod querycontainer_actor;
pub mod reaction_actor;
Expand Down Expand Up @@ -67,7 +68,26 @@ impl<TSpec, TStatus> ResourceActor<TSpec, TStatus> {
) -> impl IntoResponse {
log::info!("Actor configure - {} {}", self.actor_type, self.id);

let platform_specs = self.spec_builder.build(spec, &self.runtime_config);
let instance_id = self.read_instance_id().await.unwrap_or_else(|e| {
log::error!("Failed to read instance id: {}", e);
None
});

let instance_id = match instance_id {
Some(instance_id) => instance_id,
None => {
let instance_id = Uuid::new_v4().to_string();
if let Err(err) = self.write_instance_id(&instance_id).await {
log::error!("Failed to write instance id: {}", err);
return err.into_response();
}
instance_id
}
};

let platform_specs = self
.spec_builder
.build(spec, &self.runtime_config, &instance_id);

if let Err(err) = self.write_specs(&platform_specs).await {
log::error!("Failed to write specs: {}", err);
Expand Down Expand Up @@ -112,6 +132,10 @@ impl<TSpec, TStatus> ResourceActor<TSpec, TStatus> {
controller.deprovision();
}

self.delete_instance_id().await.unwrap_or_else(|e| {
log::error!("Failed to delete instance id: {}", e);
});

Json(()).into_response()
}

Expand Down Expand Up @@ -157,6 +181,63 @@ impl<TSpec, TStatus> ResourceActor<TSpec, TStatus> {
}
}

async fn read_instance_id(&self) -> Result<Option<String>, ActorError> {
let mut client = self.dapr_client.clone();
match client
.get_actor_state(format!("{}-instance-id", self.resource_type))
.await
{
Ok(result) => {
if result.data.is_empty() {
log::debug!("No actor state (instance id) found");
return Ok(None);
}
match String::from_utf8(result.data) {
Ok(instance_id) => Ok(Some(instance_id)),
Err(e) => {
log::error!("Failed to deserialize actor state: {}", e);
Err(ActorError::MethodError(Box::new(e)))
}
}
}
Err(e) => {
log::error!("Failed to get actor state: {}", e);
Err(ActorError::MethodError(Box::new(e)))
}
}
}

async fn write_instance_id(&self, instance_id: &str) -> Result<(), ActorError> {
log::debug!("Writing instance id {}", self.id);
let mut client = self.dapr_client.clone();
if let Err(e) = client
.execute_actor_state_transaction(vec![ActorStateOperation::Upsert {
key: format!("{}-instance-id", self.resource_type),
value: Some(instance_id.as_bytes().to_vec()),
}])
.await
{
log::error!("Failed to execute actor state transaction: {}", e);
return Err(ActorError::MethodError(Box::new(e)));
}
Ok(())
}

async fn delete_instance_id(&self) -> Result<(), ActorError> {
log::debug!("Deleting instance id {}", self.id);
let mut client = self.dapr_client.clone();
if let Err(e) = client
.execute_actor_state_transaction(vec![ActorStateOperation::Delete {
key: format!("{}-instance-id", self.resource_type),
}])
.await
{
log::error!("Failed to execute actor state transaction: {}", e);
return Err(ActorError::MethodError(Box::new(e)));
}
Ok(())
}

async fn load_controllers(&self, specs: Vec<KubernetesSpec>) {
log::info!("Loading controllers {}", self.id);
let mut controllers = self.controllers.write().await;
Expand Down
1 change: 1 addition & 0 deletions control-planes/kubernetes_provider/src/spec_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub trait SpecBuilder<TSpec> {
&self,
source: ResourceRequest<TSpec>,
runtime_config: &RuntimeConfig,
instance_id: &str,
) -> Vec<KubernetesSpec>;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl SpecBuilder<QueryContainerSpec> for QueryContainerSpecBuilder {
&self,
source: ResourceRequest<QueryContainerSpec>,
runtime_config: &RuntimeConfig,
instance_id: &str,
) -> Vec<KubernetesSpec> {
let mut specs = Vec::new();

Expand All @@ -46,7 +47,8 @@ impl SpecBuilder<QueryContainerSpec> for QueryContainerSpecBuilder {
1,
Some(4000),
hashmap![
"QUERY_NODE_ID" => ConfigValue::Inline { value: source.id.clone() }
"QUERY_NODE_ID" => ConfigValue::Inline { value: source.id.clone() },
"INSTANCE_ID" => ConfigValue::Inline { value: instance_id.to_string() }
],
None,
None,
Expand Down Expand Up @@ -221,7 +223,8 @@ impl SpecBuilder<QueryContainerSpec> for QueryContainerSpecBuilder {
Some(8080),
hashmap![
"QUERY_NODE_ID" => ConfigValue::Inline { value: source.id.clone() },
"VIEW_STORE_TYPE" => ConfigValue::Inline { value: "mongo".to_string() }
"VIEW_STORE_TYPE" => ConfigValue::Inline { value: "mongo".to_string() },
"INSTANCE_ID" => ConfigValue::Inline { value: instance_id.to_string() }
],
Some(hashmap![
"api" => 80
Expand Down
10 changes: 9 additions & 1 deletion control-planes/kubernetes_provider/src/spec_builder/reaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use kube::core::ObjectMeta;
use resource_provider_api::models::{ConfigValue, EndpointSetting, ReactionSpec, ResourceRequest};
use serde::Serialize;
use std::{
collections::{BTreeMap, HashMap},
collections::BTreeMap,
hash::{Hash, Hasher},
};

Expand All @@ -24,6 +24,7 @@ impl SpecBuilder<ReactionSpec> for ReactionSpecBuilder {
&self,
source: ResourceRequest<ReactionSpec>,
runtime_config: &RuntimeConfig,
instance_id: &str,
) -> Vec<KubernetesSpec> {
let mut specs = Vec::new();

Expand All @@ -37,6 +38,13 @@ impl SpecBuilder<ReactionSpec> for ReactionSpecBuilder {
},
);

env.insert(
"INSTANCE_ID".to_string(),
ConfigValue::Inline {
value: instance_id.to_string(),
},
);

let pub_sub_name = format!("drasi-pubsub-{}", source.id);
env.insert(
"PubsubName".to_string(),
Expand Down
18 changes: 15 additions & 3 deletions control-planes/kubernetes_provider/src/spec_builder/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ impl SpecBuilder<SourceSpec> for SourceSpecBuilder {
&self,
source: ResourceRequest<SourceSpec>,
runtime_config: &RuntimeConfig,
instance_id: &str,
) -> Vec<KubernetesSpec> {
let mut specs = Vec::new();

Expand All @@ -39,7 +40,8 @@ impl SpecBuilder<SourceSpec> for SourceSpecBuilder {
1,
Some(3000),
hashmap![
"SOURCE_ID" => ConfigValue::Inline { value: source.id.clone() }
"SOURCE_ID" => ConfigValue::Inline { value: source.id.clone() },
"INSTANCE_ID" => ConfigValue::Inline { value: instance_id.to_string() }
],
None,
None,
Expand All @@ -65,7 +67,8 @@ impl SpecBuilder<SourceSpec> for SourceSpecBuilder {
1,
Some(3000),
hashmap![
"SOURCE_ID" => ConfigValue::Inline { value: source.id.clone() }
"SOURCE_ID" => ConfigValue::Inline { value: source.id.clone() },
"INSTANCE_ID" => ConfigValue::Inline { value: instance_id.to_string() }
],
None,
None,
Expand All @@ -91,7 +94,8 @@ impl SpecBuilder<SourceSpec> for SourceSpecBuilder {
1,
Some(4001),
hashmap![
"SOURCE_ID" => ConfigValue::Inline { value: source.id.clone() }
"SOURCE_ID" => ConfigValue::Inline { value: source.id.clone() },
"INSTANCE_ID" => ConfigValue::Inline { value: instance_id.to_string() }
],
None,
None,
Expand Down Expand Up @@ -140,6 +144,14 @@ impl SpecBuilder<SourceSpec> for SourceSpecBuilder {
value: source.id.clone(),
},
);

env_var_map.insert(
"INSTANCE_ID".to_string(),
ConfigValue::Inline {
value: instance_id.to_string(),
},
);

if let Some(props) = service_spec.properties {
for (key, value) in props {
env_var_map.insert(key, value);
Expand Down
Loading

0 comments on commit dda3a66

Please sign in to comment.