Skip to content

Commit

Permalink
graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgerlag committed Oct 15, 2024
1 parent 7f0d72d commit ec64b98
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 1 deletion.
80 changes: 79 additions & 1 deletion control-planes/kubernetes_provider/src/actors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,24 @@ impl<TSpec, TStatus> ResourceActor<TSpec, TStatus> {
DaprJson(spec): DaprJson<ResourceRequest<TSpec>>,
) -> impl IntoResponse {
log::info!("Actor configure - {} {}", self.actor_type, self.id);
let instance_id = uuid::Uuid::new_v4().to_string();

let instance_id = self.read_instance_id().await.unwrap_or_else(|e| {
log::error!("Failed to read instance id: {}", e);
None
});

let instance_id = match instance_id {
Some(instance_id) => instance_id,
None => {
let instance_id = Uuid::new_v4().to_string();
if let Err(err) = self.write_instance_id(&instance_id).await {
log::error!("Failed to write instance id: {}", err);
return err.into_response();
}
instance_id
}
};

let platform_specs = self
.spec_builder
.build(spec, &self.runtime_config, &instance_id);
Expand Down Expand Up @@ -115,6 +132,10 @@ impl<TSpec, TStatus> ResourceActor<TSpec, TStatus> {
controller.deprovision();
}

self.delete_instance_id().await.unwrap_or_else(|e| {
log::error!("Failed to delete instance id: {}", e);
});

Json(()).into_response()
}

Expand Down Expand Up @@ -160,6 +181,63 @@ impl<TSpec, TStatus> ResourceActor<TSpec, TStatus> {
}
}

async fn read_instance_id(&self) -> Result<Option<String>, ActorError> {
let mut client = self.dapr_client.clone();
match client
.get_actor_state(format!("{}-instance-id", self.resource_type))
.await
{
Ok(result) => {
if result.data.is_empty() {
log::debug!("No actor state (instance id) found");
return Ok(None);
}
match String::from_utf8(result.data) {
Ok(instance_id) => Ok(Some(instance_id)),
Err(e) => {
log::error!("Failed to deserialize actor state: {}", e);
Err(ActorError::MethodError(Box::new(e)))
}
}
}
Err(e) => {
log::error!("Failed to get actor state: {}", e);
Err(ActorError::MethodError(Box::new(e)))
}
}
}

async fn write_instance_id(&self, instance_id: &str) -> Result<(), ActorError> {
log::debug!("Writing instance id {}", self.id);
let mut client = self.dapr_client.clone();
if let Err(e) = client
.execute_actor_state_transaction(vec![ActorStateOperation::Upsert {
key: format!("{}-instance-id", self.resource_type),
value: Some(instance_id.as_bytes().to_vec()),
}])
.await
{
log::error!("Failed to execute actor state transaction: {}", e);
return Err(ActorError::MethodError(Box::new(e)));
}
Ok(())
}

async fn delete_instance_id(&self) -> Result<(), ActorError> {
log::debug!("Deleting instance id {}", self.id);
let mut client = self.dapr_client.clone();
if let Err(e) = client
.execute_actor_state_transaction(vec![ActorStateOperation::Delete {
key: format!("{}-instance-id", self.resource_type),
}])
.await
{
log::error!("Failed to execute actor state transaction: {}", e);
return Err(ActorError::MethodError(Box::new(e)));
}
Ok(())
}

async fn load_controllers(&self, specs: Vec<KubernetesSpec>) {
log::info!("Loading controllers {}", self.id);
let mut controllers = self.controllers.write().await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public static void main(String[] args) throws IOException, SQLException {
defaultProperties.put("drasi.sourceid", sourceId);
defaultProperties.put("drasi.connector", connector);
defaultProperties.put("drasi.instanceid", instanceId);
defaultProperties.put("server.shutdown", "graceful");
defaultProperties.put("spring.lifecycle.timeout-per-shutdown-phase", "20s");
app.setDefaultProperties(defaultProperties);
app.run(args);
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.drasi;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;

@Component
public class ShutdownListener implements ApplicationListener<ContextClosedEvent> {

private static final Logger log = LoggerFactory.getLogger(ShutdownListener.class);

private final ChangeMonitor changeMonitor;

@Autowired
public ShutdownListener(@Qualifier("changeMonitor") ChangeMonitor changeMonitor) {
this.changeMonitor = changeMonitor;
}

@Override
public void onApplicationEvent(ContextClosedEvent event) {
log.info("Reactivator is shutting down.");
try {
changeMonitor.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

0 comments on commit ec64b98

Please sign in to comment.