From 23ae339127062d0dd576c318db4d0ee10328ca61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Mon, 18 Nov 2024 04:09:41 +0100 Subject: [PATCH 1/7] feat: make it possible to remove event handler from informers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../client/informers/SharedIndexInformer.java | 3 +++ .../impl/DefaultSharedIndexInformer.java | 13 +++++++++ .../impl/cache/ProcessorListener.java | 4 +++ .../informers/impl/cache/SharedProcessor.java | 27 +++++++++++++++++++ 4 files changed, 47 insertions(+) diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java index 5fa286e4434..d29f3f7c2ae 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java @@ -73,6 +73,9 @@ default SharedIndexInformer removeNamespaceIndex() { */ SharedIndexInformer addEventHandler(ResourceEventHandler handler); + SharedIndexInformer removeEventHandler(ResourceEventHandler handler); + + /** * Adds an event handler to the shared informer using the specified resync period. * Events to a single handler are delivered sequentially, but there is no diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index 878cfd1bcfd..91e5457ef8d 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -102,6 +102,19 @@ public DefaultSharedIndexInformer addEventHandler(ResourceEventHandler removeEventHandler(ResourceEventHandler handler) { + var listener = this.processor.removeProcessorListener(handler); + if (!started.get() && listener.isPresent()) { + var listenerResyncPeriod = listener.orElseThrow().getResyncPeriodInMillis(); + if (listenerResyncPeriod != 0 && resyncCheckPeriodMillis == listenerResyncPeriod) { + this.processor.getMinimalNonZeroResyncPeriod() + .ifPresent(aLong -> this.resyncCheckPeriodMillis = aLong); + } + } + return this; + } + @Override public SharedIndexInformer addEventHandlerWithResyncPeriod(ResourceEventHandler handler, long resyncPeriodMillis) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorListener.java index d84c2aecee2..66ad24592df 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorListener.java @@ -62,6 +62,10 @@ public boolean shouldResync(ZonedDateTime now) { return this.resyncPeriodInMillis != 0 && (now.isAfter(this.nextResync) || now.equals(this.nextResync)); } + public long getResyncPeriodInMillis() { + return resyncPeriodInMillis; + } + public abstract static class Notification { private final T oldObject; private final T newObject; diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java index 55d5ecb0be2..73d5cda3c47 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.locks.ReadWriteLock; @@ -175,6 +176,32 @@ public ProcessorListener addProcessorListener(ResourceEventHandler } } + public Optional> removeProcessorListener(ResourceEventHandler handler) { + lock.writeLock().lock(); + try { + var targetListener = this.listeners.stream().filter(l->l.getHandler() == handler).findFirst(); + targetListener.ifPresent(l->{ + this.listeners.remove(l); + if (l.isReSync()) { + this.syncingListeners.remove(l); + } + }); + return targetListener; + } finally { + lock.writeLock().unlock(); + } + } + + public Optional getMinimalNonZeroResyncPeriod() { + lock.readLock().lock(); + try { + return this.listeners.stream().map(ProcessorListener::getResyncPeriodInMillis) + .filter(p->p > 0L).min(Long::compareTo); + } finally { + lock.readLock().unlock(); + } + } + public void executeIfPossible(Runnable runnable) { try { this.executor.execute(runnable); From d3e07be8455192781778ed634e7731a409083942 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Mon, 18 Nov 2024 04:21:51 +0100 Subject: [PATCH 2/7] javadoc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../kubernetes/client/informers/SharedIndexInformer.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java index d29f3f7c2ae..86cfa96855a 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java @@ -73,6 +73,11 @@ default SharedIndexInformer removeNamespaceIndex() { */ SharedIndexInformer addEventHandler(ResourceEventHandler handler); + /** + * Remove event handler. + * + * @param handler event handler + */ SharedIndexInformer removeEventHandler(ResourceEventHandler handler); From 145cbd833ec509e2627d5a5fd8d272af33daee15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 21 Nov 2024 15:23:35 +0100 Subject: [PATCH 3/7] format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../client/informers/SharedIndexInformer.java | 1 - .../informers/impl/DefaultSharedIndexInformer.java | 10 +++++----- .../client/informers/impl/cache/SharedProcessor.java | 8 ++++---- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java index 86cfa96855a..73e8a3dbadf 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java @@ -80,7 +80,6 @@ default SharedIndexInformer removeNamespaceIndex() { */ SharedIndexInformer removeEventHandler(ResourceEventHandler handler); - /** * Adds an event handler to the shared informer using the specified resync period. * Events to a single handler are delivered sequentially, but there is no diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index 91e5457ef8d..2bbad0fe846 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -106,11 +106,11 @@ public DefaultSharedIndexInformer addEventHandler(ResourceEventHandler removeEventHandler(ResourceEventHandler handler) { var listener = this.processor.removeProcessorListener(handler); if (!started.get() && listener.isPresent()) { - var listenerResyncPeriod = listener.orElseThrow().getResyncPeriodInMillis(); - if (listenerResyncPeriod != 0 && resyncCheckPeriodMillis == listenerResyncPeriod) { - this.processor.getMinimalNonZeroResyncPeriod() - .ifPresent(aLong -> this.resyncCheckPeriodMillis = aLong); - } + var listenerResyncPeriod = listener.orElseThrow().getResyncPeriodInMillis(); + if (listenerResyncPeriod != 0 && resyncCheckPeriodMillis == listenerResyncPeriod) { + this.processor.getMinimalNonZeroResyncPeriod() + .ifPresent(l -> this.resyncCheckPeriodMillis = l); + } } return this; } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java index 73d5cda3c47..1750b9e849f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java @@ -179,8 +179,8 @@ public ProcessorListener addProcessorListener(ResourceEventHandler public Optional> removeProcessorListener(ResourceEventHandler handler) { lock.writeLock().lock(); try { - var targetListener = this.listeners.stream().filter(l->l.getHandler() == handler).findFirst(); - targetListener.ifPresent(l->{ + var targetListener = this.listeners.stream().filter(l -> l.getHandler() == handler).findFirst(); + targetListener.ifPresent(l -> { this.listeners.remove(l); if (l.isReSync()) { this.syncingListeners.remove(l); @@ -195,8 +195,8 @@ public Optional> removeProcessorListener(ResourceEventHandl public Optional getMinimalNonZeroResyncPeriod() { lock.readLock().lock(); try { - return this.listeners.stream().map(ProcessorListener::getResyncPeriodInMillis) - .filter(p->p > 0L).min(Long::compareTo); + return this.listeners.stream().map(ProcessorListener::getResyncPeriodInMillis) + .filter(p -> p > 0L).min(Long::compareTo); } finally { lock.readLock().unlock(); } From 7069c49d28d08c227685c0df7121ff1ee22f87b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Fri, 22 Nov 2024 17:40:07 +0100 Subject: [PATCH 4/7] unit test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../mock/DefaultSharedIndexInformerTest.java | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java index 5cc338c9e2a..bf2be1b8065 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java @@ -45,6 +45,7 @@ import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.SharedInformerFactory; +import io.fabric8.kubernetes.client.informers.impl.DefaultSharedIndexInformer; import io.fabric8.kubernetes.client.mock.crd.Animal; import io.fabric8.kubernetes.client.mock.crd.AnimalSpec; import io.fabric8.kubernetes.client.mock.crd.CronTab; @@ -74,6 +75,7 @@ import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -1158,6 +1160,31 @@ void testGenericKubernetesResourceSharedIndexInformerWithNamespaceConfigured() t assertEquals(0, foundExistingAnimal.getCount()); } + @Test + void removeEventHandlerBeforeStartAdjustsResyncPeriod() { + var longResyncPeriod = 3000; + var shorterResyncPeriod = 2000; + var eventHandlerLongResync = emptyEventHandler(); + var eventHandlerShorterResync = emptyEventHandler(); + + SharedIndexInformer podInformer = factory.sharedIndexInformerFor(Pod.class, 4000); + podInformer.addEventHandlerWithResyncPeriod(eventHandlerLongResync, longResyncPeriod); + podInformer.addEventHandlerWithResyncPeriod(eventHandlerShorterResync, shorterResyncPeriod); + + assertThat(((DefaultSharedIndexInformer) podInformer).getFullResyncPeriod()) + .isEqualTo(shorterResyncPeriod); + + podInformer.removeEventHandler(eventHandlerShorterResync); + + assertThat(((DefaultSharedIndexInformer) podInformer).getFullResyncPeriod()) + .isEqualTo(longResyncPeriod); + } + + @Test + void stopReceivingEventsWhenEventHandlerRemoved() { + fail(); + } + @Test void testGenericKubernetesResourceSharedIndexInformerWithAdditionalDeserializers() throws InterruptedException { // Given @@ -1352,6 +1379,22 @@ public void onDelete(T obj, boolean deletedFinalStateUnknown) { } } + private static ResourceEventHandler emptyEventHandler() { + return new ResourceEventHandler<>() { + @Override + public void onAdd(Object obj) { + } + + @Override + public void onUpdate(Object oldObj, Object newObj) { + } + + @Override + public void onDelete(Object obj, boolean deletedFinalStateUnknown) { + } + }; + } + private Star getStar(String name, String resourceVersion) { StarSpec starSpec = new StarSpec(); starSpec.setType("G"); From 831a0ed7bc33bec2d3c0cd26b03e3dbe261b77c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Mon, 25 Nov 2024 10:14:48 +0100 Subject: [PATCH 5/7] unit test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../mock/DefaultSharedIndexInformerTest.java | 83 ++++++++++++++++++- 1 file changed, 81 insertions(+), 2 deletions(-) diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java index bf2be1b8065..e89f73577ff 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java @@ -65,7 +65,10 @@ import org.junit.jupiter.api.Test; import java.net.HttpURLConnection; +import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -75,7 +78,6 @@ import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -1182,7 +1184,63 @@ void removeEventHandlerBeforeStartAdjustsResyncPeriod() { @Test void stopReceivingEventsWhenEventHandlerRemoved() { - fail(); + String startResourceVersion = "1000"; + var firstPod = new Pod(); + var secondPod = new Pod(); + var eventEmitTimeWait = 500L; + + server.expect() + .withPath("/api/v1/pods?resourceVersion=0") + .andReturn(200, new PodListBuilder().withNewMetadata() + .withResourceVersion(startResourceVersion) + .endMetadata() + .withItems(Collections.emptyList()) + .build()) + .once(); + server.expect() + .withPath("/api/v1/pods?allowWatchBookmarks=true&resourceVersion=" + startResourceVersion + + "&timeoutSeconds=600&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(eventEmitTimeWait) + .andEmit(new WatchEvent(new PodBuilder().withNewMetadata() + .withNamespace("test") + .withName("pod1") + .withResourceVersion("1001") + .endMetadata() + .build(), "ADDED")) + .waitFor(2 * eventEmitTimeWait) + .andEmit(new WatchEvent(new PodBuilder().withNewMetadata() + .withNamespace("test") + .withName("pod2") + .withResourceVersion("1002") + .endMetadata() + .build(), "ADDED")) + .done() + .always(); + + var handler1 = new AddRecordingEventHandler(); + var handler2 = new AddRecordingEventHandler(); + + try (SharedIndexInformer informer = client.pods().inAnyNamespace().runnableInformer(0)) { + informer.run(); + informer.addEventHandler(handler1); + informer.addEventHandler(handler2); + + await().pollInterval(Duration.ofMillis(100)).untilAsserted(() -> { + assertThat(handler1.getAddedPods()).hasSize(1); + assertThat(handler2.getAddedPods()).hasSize(1); + }); + + informer.removeEventHandler(handler2); + + await().pollDelay(Duration.ofMillis(eventEmitTimeWait)) + .pollInterval(Duration.ofMillis(100)).untilAsserted(() -> { + assertThat(handler1.getAddedPods()).hasSize(2); + assertThat(handler2.getAddedPods()).hasSize(1); + }); + } + } @Test @@ -1417,4 +1475,25 @@ private PodSet getPodSet(String name, String resourceVersion) { return podSet; } + private class AddRecordingEventHandler implements ResourceEventHandler { + private List addedPods = new ArrayList<>(); + + @Override + public void onAdd(Pod obj) { + addedPods.add(obj); + } + + @Override + public void onUpdate(Pod oldObj, Pod newObj) { + } + + @Override + public void onDelete(Pod obj, boolean deletedFinalStateUnknown) { + } + + public List getAddedPods() { + return addedPods; + } + } + } From 79955e7adb4cf06fe414c5e6b419f48c70f9c477 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Mon, 25 Nov 2024 10:23:32 +0100 Subject: [PATCH 6/7] changelog MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a7aa509bb68..2f7730e2492 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ * Fix #6282: Allow annotated types with Pattern, Min, and Max with Lists and Maps and CRD generation * Fix #5480: Move `io.fabric8:zjsonpatch` to KubernetesClient project * Fix #6240: Support for multiple files listed in the KUBECONFIG env var +* Fix #6655: Support removing ResourceEventHandler for informers #### Dependency Upgrade * Fix #2632: Bumped OkHttp from 3.12.12 to 4.12.0 From ade36c066d29ad7f597be37990aea2fca81ef459 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Mon, 25 Nov 2024 10:26:10 +0100 Subject: [PATCH 7/7] remove not used vars MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../kubernetes/client/mock/DefaultSharedIndexInformerTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java index e89f73577ff..4cedef3b595 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java @@ -1185,8 +1185,6 @@ void removeEventHandlerBeforeStartAdjustsResyncPeriod() { @Test void stopReceivingEventsWhenEventHandlerRemoved() { String startResourceVersion = "1000"; - var firstPod = new Pod(); - var secondPod = new Pod(); var eventEmitTimeWait = 500L; server.expect()