Skip to content

Commit

Permalink
feat: use custom actuator endpoint to stop kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
mherwig committed Aug 30, 2024
1 parent 76dc76e commit 49d5ff6
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package de.telekom.horizon.comet.actuator;

import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.WriteOperation;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

@Component
@Endpoint(id = "horizon-prestop")
public class HorizonPreStopActuatorEndpoint {

private final ApplicationEventPublisher applicationEventPublisher;

public HorizonPreStopActuatorEndpoint(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}

@WriteOperation
public void handlePreStop() {
var event = new HorizonPreStopEvent(this, "Got PreStop request. Terminating all connections...");
applicationEventPublisher.publishEvent(event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package de.telekom.horizon.comet.actuator;

import org.springframework.context.ApplicationEvent;

public class HorizonPreStopEvent extends ApplicationEvent {
private String message;

public HorizonPreStopEvent(Object source, String message) {
super(source);
this.message = message;
}
public String getMessage() {
return message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0

package de.telekom.horizon.comet.health;
package de.telekom.horizon.comet.actuator;

import de.telekom.horizon.comet.auth.OAuth2TokenCache;
import org.springframework.boot.actuate.health.Health;
Expand Down
47 changes: 25 additions & 22 deletions src/main/java/de/telekom/horizon/comet/service/CometService.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@

package de.telekom.horizon.comet.service;

import de.telekom.horizon.comet.actuator.HorizonPreStopEvent;
import de.telekom.horizon.comet.client.TokenFetchingFailureEvent;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.event.ContainerStoppedEvent;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
Expand All @@ -32,7 +31,7 @@ public class CometService {

private final ApplicationContext context;

private final AtomicBoolean teardownOngoing = new AtomicBoolean(false);
private final AtomicBoolean isPreStop = new AtomicBoolean(false);


/**
Expand All @@ -47,14 +46,6 @@ public CometService(ConcurrentMessageListenerContainer<String, String> messageLi
this.context = context;
}

@PreDestroy
public void tearDown() {
if (teardownOngoing.compareAndSet(false, true)) {
stopMessageListenerContainer();
}
}


/**
* Starts the Kafka message listener container if it is not null.
* This method is designed to initiate the consumption of Kafka messages.
Expand All @@ -80,19 +71,33 @@ private void stopMessageListenerContainer() {
}

/**
* Handles the application stopping event by stopping the Kafka message listener container.
* Handles an PreStop event usually triggered via container platform which will stop the Kafka message listener container in an expected way.
*
* @param event The application event triggering the handler.
* It should be an instance of {@code ContextClosedEvent} or {@code ExitCodeEvent}.
* It will be an instance of {@code TokenFetchingFailureEvent}.
*/
@EventListener
public void applicationEventHandler(ApplicationEvent event) {
if (event instanceof TokenFetchingFailureEvent) {
log.error(((TokenFetchingFailureEvent) event).getMessage());
public void handleHorizonPreStopEvent(HorizonPreStopEvent event) {
log.info(event.getMessage());

if (isPreStop.compareAndSet(false, true)) {
stopMessageListenerContainer();
}
}

/**
* Handles the event of a token fetching failure which will stop the Kafka message listener container in an unexpected way.
*
* @param event The application event triggering the handler.
* It will be an instance of {@code TokenFetchingFailureEvent}.
*/
@EventListener
public void handleTokenFetchingFailureEvent(TokenFetchingFailureEvent event) {
log.error(event.getMessage());

stopMessageListenerContainer();
}

/**
* Handles the event when the Kafka message listener container is stopped.
* This method gracefully stops the message listener container and initiates the application exit process.
Expand All @@ -101,14 +106,12 @@ public void applicationEventHandler(ApplicationEvent event) {
*/
@EventListener
public void containerStoppedHandler(ContainerStoppedEvent event) {
if (!teardownOngoing.get()) {
if (!isPreStop.get()) {
log.error("MessageListenerContainer stopped unexpectedly with event {}. Exiting...", event.toString());

var exitCode= SpringApplication.exit(context, () -> 1);

System.exit(exitCode);
} else {
System.exit(SpringApplication.exit(context, () -> 1));
} else { // we don't need to shut down, since SIGTERM will usually send by the platform after preStop endpoint has been invoked
log.info("MessageListenerContainer stopped with event {}.", event.toString());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ public DeliveryService(CometConfig cometConfig, HorizonTracer tracer, CallbackUr
private void stopTaskExecutorWithTimeout() {
deliveryTaskExecutor.shutdown();
redeliveryTaskExecutor.shutdown();

}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ spring:
name: Horizon Comet

lifecycle:
timeout-per-shutdown-phase: 30s
timeout-per-shutdown-phase: 60s

server:
shutdown: graceful
Expand Down Expand Up @@ -55,7 +55,7 @@ management:
endpoints:
web:
exposure:
include: health,info,prometheus,heapdump,shutdown
include: health,info,prometheus,heapdump,shutdown,horizon-prestop
endpoint:
health:
show-details: always
Expand Down

0 comments on commit 49d5ff6

Please sign in to comment.