diff --git a/CHANGELOG.md b/CHANGELOG.md index 1905d34c6ab..68fb78910e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ * Fix #6282: Allow annotated types with Pattern, Min, and Max with Lists and Maps and CRD generation * Fix #5480: Move `io.fabric8:zjsonpatch` to KubernetesClient project * Fix #6240: Support for multiple files listed in the KUBECONFIG env var +* Fix #6655: Support removing ResourceEventHandler for informers #### Dependency Upgrade * Fix #2632: Bumped OkHttp from 3.12.12 to 4.12.0 diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java index 5fa286e4434..73e8a3dbadf 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java @@ -73,6 +73,13 @@ default SharedIndexInformer removeNamespaceIndex() { */ SharedIndexInformer addEventHandler(ResourceEventHandler handler); + /** + * Remove event handler. + * + * @param handler event handler + */ + SharedIndexInformer removeEventHandler(ResourceEventHandler handler); + /** * Adds an event handler to the shared informer using the specified resync period. * Events to a single handler are delivered sequentially, but there is no diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index 878cfd1bcfd..2bbad0fe846 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -102,6 +102,19 @@ public DefaultSharedIndexInformer addEventHandler(ResourceEventHandler removeEventHandler(ResourceEventHandler handler) { + var listener = this.processor.removeProcessorListener(handler); + if (!started.get() && listener.isPresent()) { + var listenerResyncPeriod = listener.orElseThrow().getResyncPeriodInMillis(); + if (listenerResyncPeriod != 0 && resyncCheckPeriodMillis == listenerResyncPeriod) { + this.processor.getMinimalNonZeroResyncPeriod() + .ifPresent(l -> this.resyncCheckPeriodMillis = l); + } + } + return this; + } + @Override public SharedIndexInformer addEventHandlerWithResyncPeriod(ResourceEventHandler handler, long resyncPeriodMillis) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorListener.java index d84c2aecee2..66ad24592df 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorListener.java @@ -62,6 +62,10 @@ public boolean shouldResync(ZonedDateTime now) { return this.resyncPeriodInMillis != 0 && (now.isAfter(this.nextResync) || now.equals(this.nextResync)); } + public long getResyncPeriodInMillis() { + return resyncPeriodInMillis; + } + public abstract static class Notification { private final T oldObject; private final T newObject; diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java index 55d5ecb0be2..1750b9e849f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.locks.ReadWriteLock; @@ -175,6 +176,32 @@ public ProcessorListener addProcessorListener(ResourceEventHandler } } + public Optional> removeProcessorListener(ResourceEventHandler handler) { + lock.writeLock().lock(); + try { + var targetListener = this.listeners.stream().filter(l -> l.getHandler() == handler).findFirst(); + targetListener.ifPresent(l -> { + this.listeners.remove(l); + if (l.isReSync()) { + this.syncingListeners.remove(l); + } + }); + return targetListener; + } finally { + lock.writeLock().unlock(); + } + } + + public Optional getMinimalNonZeroResyncPeriod() { + lock.readLock().lock(); + try { + return this.listeners.stream().map(ProcessorListener::getResyncPeriodInMillis) + .filter(p -> p > 0L).min(Long::compareTo); + } finally { + lock.readLock().unlock(); + } + } + public void executeIfPossible(Runnable runnable) { try { this.executor.execute(runnable); diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java index 0878a3273f8..b639400d8b0 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java @@ -45,6 +45,7 @@ import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.SharedInformerFactory; +import io.fabric8.kubernetes.client.informers.impl.DefaultSharedIndexInformer; import io.fabric8.kubernetes.client.mock.crd.Animal; import io.fabric8.kubernetes.client.mock.crd.AnimalSpec; import io.fabric8.kubernetes.client.mock.crd.CronTab; @@ -64,7 +65,10 @@ import org.junit.jupiter.api.Test; import java.net.HttpURLConnection; +import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -1159,6 +1163,85 @@ void testGenericKubernetesResourceSharedIndexInformerWithNamespaceConfigured() t assertEquals(0, foundExistingAnimal.getCount()); } + @Test + void removeEventHandlerBeforeStartAdjustsResyncPeriod() { + var longResyncPeriod = 3000; + var shorterResyncPeriod = 2000; + var eventHandlerLongResync = emptyEventHandler(); + var eventHandlerShorterResync = emptyEventHandler(); + + SharedIndexInformer podInformer = factory.sharedIndexInformerFor(Pod.class, 4000); + podInformer.addEventHandlerWithResyncPeriod(eventHandlerLongResync, longResyncPeriod); + podInformer.addEventHandlerWithResyncPeriod(eventHandlerShorterResync, shorterResyncPeriod); + + assertThat(((DefaultSharedIndexInformer) podInformer).getFullResyncPeriod()) + .isEqualTo(shorterResyncPeriod); + + podInformer.removeEventHandler(eventHandlerShorterResync); + + assertThat(((DefaultSharedIndexInformer) podInformer).getFullResyncPeriod()) + .isEqualTo(longResyncPeriod); + } + + @Test + void stopReceivingEventsWhenEventHandlerRemoved() { + String startResourceVersion = "1000"; + var eventEmitTimeWait = 500L; + + server.expect() + .withPath("/api/v1/pods?resourceVersion=0") + .andReturn(200, new PodListBuilder().withNewMetadata() + .withResourceVersion(startResourceVersion) + .endMetadata() + .withItems(Collections.emptyList()) + .build()) + .once(); + server.expect() + .withPath("/api/v1/pods?allowWatchBookmarks=true&resourceVersion=" + startResourceVersion + + "&timeoutSeconds=600&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(eventEmitTimeWait) + .andEmit(new WatchEvent(new PodBuilder().withNewMetadata() + .withNamespace("test") + .withName("pod1") + .withResourceVersion("1001") + .endMetadata() + .build(), "ADDED")) + .waitFor(2 * eventEmitTimeWait) + .andEmit(new WatchEvent(new PodBuilder().withNewMetadata() + .withNamespace("test") + .withName("pod2") + .withResourceVersion("1002") + .endMetadata() + .build(), "ADDED")) + .done() + .always(); + + var handler1 = new AddRecordingEventHandler(); + var handler2 = new AddRecordingEventHandler(); + + try (SharedIndexInformer informer = client.pods().inAnyNamespace().runnableInformer(0)) { + informer.run(); + informer.addEventHandler(handler1); + informer.addEventHandler(handler2); + + await().pollInterval(Duration.ofMillis(100)).untilAsserted(() -> { + assertThat(handler1.getAddedPods()).hasSize(1); + assertThat(handler2.getAddedPods()).hasSize(1); + }); + + informer.removeEventHandler(handler2); + + await().pollDelay(Duration.ofMillis(eventEmitTimeWait)) + .pollInterval(Duration.ofMillis(100)).untilAsserted(() -> { + assertThat(handler1.getAddedPods()).hasSize(2); + assertThat(handler2.getAddedPods()).hasSize(1); + }); + } + + } + @Test void testGenericKubernetesResourceSharedIndexInformerWithAdditionalDeserializers() throws InterruptedException { // Given @@ -1350,6 +1433,22 @@ public void onDelete(T obj, boolean deletedFinalStateUnknown) { } } + private static ResourceEventHandler emptyEventHandler() { + return new ResourceEventHandler<>() { + @Override + public void onAdd(Object obj) { + } + + @Override + public void onUpdate(Object oldObj, Object newObj) { + } + + @Override + public void onDelete(Object obj, boolean deletedFinalStateUnknown) { + } + }; + } + private Star getStar(String name, String resourceVersion) { StarSpec starSpec = new StarSpec(); starSpec.setType("G"); @@ -1372,4 +1471,25 @@ private PodSet getPodSet(String name, String resourceVersion) { return podSet; } + private class AddRecordingEventHandler implements ResourceEventHandler { + private List addedPods = new ArrayList<>(); + + @Override + public void onAdd(Pod obj) { + addedPods.add(obj); + } + + @Override + public void onUpdate(Pod oldObj, Pod newObj) { + } + + @Override + public void onDelete(Pod obj, boolean deletedFinalStateUnknown) { + } + + public List getAddedPods() { + return addedPods; + } + } + }