Skip to content

Commit

Permalink
feat: make it possible to remove event handler from informers (6655)
Browse files Browse the repository at this point in the history
feat: make it possible to remove event handler from informers

Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
---
javadoc

Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
---
format

Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
---
unit test

Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
---
unit test

Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
---
changelog

Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
---
remove not used vars

Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
  • Loading branch information
csviri authored Nov 27, 2024
1 parent 606d359 commit 5aa3d9b
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* Fix #6282: Allow annotated types with Pattern, Min, and Max with Lists and Maps and CRD generation
* Fix #5480: Move `io.fabric8:zjsonpatch` to KubernetesClient project
* Fix #6240: Support for multiple files listed in the KUBECONFIG env var
* Fix #6655: Support removing ResourceEventHandler for informers

#### Dependency Upgrade
* Fix #2632: Bumped OkHttp from 3.12.12 to 4.12.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ default SharedIndexInformer<T> removeNamespaceIndex() {
*/
SharedIndexInformer<T> addEventHandler(ResourceEventHandler<? super T> handler);

/**
* Remove event handler.
*
* @param handler event handler
*/
SharedIndexInformer<T> removeEventHandler(ResourceEventHandler<? super T> handler);

/**
* Adds an event handler to the shared informer using the specified resync period.
* Events to a single handler are delivered sequentially, but there is no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ public DefaultSharedIndexInformer<T, L> addEventHandler(ResourceEventHandler<? s
return this;
}

@Override
public SharedIndexInformer<T> removeEventHandler(ResourceEventHandler<? super T> handler) {
var listener = this.processor.removeProcessorListener(handler);
if (!started.get() && listener.isPresent()) {
var listenerResyncPeriod = listener.orElseThrow().getResyncPeriodInMillis();
if (listenerResyncPeriod != 0 && resyncCheckPeriodMillis == listenerResyncPeriod) {
this.processor.getMinimalNonZeroResyncPeriod()
.ifPresent(l -> this.resyncCheckPeriodMillis = l);
}
}
return this;
}

@Override
public SharedIndexInformer<T> addEventHandlerWithResyncPeriod(ResourceEventHandler<? super T> handler,
long resyncPeriodMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ public boolean shouldResync(ZonedDateTime now) {
return this.resyncPeriodInMillis != 0 && (now.isAfter(this.nextResync) || now.equals(this.nextResync));
}

public long getResyncPeriodInMillis() {
return resyncPeriodInMillis;
}

public abstract static class Notification<T> {
private final T oldObject;
private final T newObject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.ReadWriteLock;
Expand Down Expand Up @@ -175,6 +176,32 @@ public ProcessorListener<T> addProcessorListener(ResourceEventHandler<? super T>
}
}

public Optional<ProcessorListener<T>> removeProcessorListener(ResourceEventHandler<? super T> handler) {
lock.writeLock().lock();
try {
var targetListener = this.listeners.stream().filter(l -> l.getHandler() == handler).findFirst();
targetListener.ifPresent(l -> {
this.listeners.remove(l);
if (l.isReSync()) {
this.syncingListeners.remove(l);
}
});
return targetListener;
} finally {
lock.writeLock().unlock();
}
}

public Optional<Long> getMinimalNonZeroResyncPeriod() {
lock.readLock().lock();
try {
return this.listeners.stream().map(ProcessorListener::getResyncPeriodInMillis)
.filter(p -> p > 0L).min(Long::compareTo);
} finally {
lock.readLock().unlock();
}
}

public void executeIfPossible(Runnable runnable) {
try {
this.executor.execute(runnable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import io.fabric8.kubernetes.client.informers.impl.DefaultSharedIndexInformer;
import io.fabric8.kubernetes.client.mock.crd.Animal;
import io.fabric8.kubernetes.client.mock.crd.AnimalSpec;
import io.fabric8.kubernetes.client.mock.crd.CronTab;
Expand All @@ -64,7 +65,10 @@
import org.junit.jupiter.api.Test;

import java.net.HttpURLConnection;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -1159,6 +1163,85 @@ void testGenericKubernetesResourceSharedIndexInformerWithNamespaceConfigured() t
assertEquals(0, foundExistingAnimal.getCount());
}

@Test
void removeEventHandlerBeforeStartAdjustsResyncPeriod() {
var longResyncPeriod = 3000;
var shorterResyncPeriod = 2000;
var eventHandlerLongResync = emptyEventHandler();
var eventHandlerShorterResync = emptyEventHandler();

SharedIndexInformer<Pod> podInformer = factory.sharedIndexInformerFor(Pod.class, 4000);
podInformer.addEventHandlerWithResyncPeriod(eventHandlerLongResync, longResyncPeriod);
podInformer.addEventHandlerWithResyncPeriod(eventHandlerShorterResync, shorterResyncPeriod);

assertThat(((DefaultSharedIndexInformer<?, ?>) podInformer).getFullResyncPeriod())
.isEqualTo(shorterResyncPeriod);

podInformer.removeEventHandler(eventHandlerShorterResync);

assertThat(((DefaultSharedIndexInformer<?, ?>) podInformer).getFullResyncPeriod())
.isEqualTo(longResyncPeriod);
}

@Test
void stopReceivingEventsWhenEventHandlerRemoved() {
String startResourceVersion = "1000";
var eventEmitTimeWait = 500L;

server.expect()
.withPath("/api/v1/pods?resourceVersion=0")
.andReturn(200, new PodListBuilder().withNewMetadata()
.withResourceVersion(startResourceVersion)
.endMetadata()
.withItems(Collections.emptyList())
.build())
.once();
server.expect()
.withPath("/api/v1/pods?allowWatchBookmarks=true&resourceVersion=" + startResourceVersion
+ "&timeoutSeconds=600&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(eventEmitTimeWait)
.andEmit(new WatchEvent(new PodBuilder().withNewMetadata()
.withNamespace("test")
.withName("pod1")
.withResourceVersion("1001")
.endMetadata()
.build(), "ADDED"))
.waitFor(2 * eventEmitTimeWait)
.andEmit(new WatchEvent(new PodBuilder().withNewMetadata()
.withNamespace("test")
.withName("pod2")
.withResourceVersion("1002")
.endMetadata()
.build(), "ADDED"))
.done()
.always();

var handler1 = new AddRecordingEventHandler();
var handler2 = new AddRecordingEventHandler();

try (SharedIndexInformer<Pod> informer = client.pods().inAnyNamespace().runnableInformer(0)) {
informer.run();
informer.addEventHandler(handler1);
informer.addEventHandler(handler2);

await().pollInterval(Duration.ofMillis(100)).untilAsserted(() -> {
assertThat(handler1.getAddedPods()).hasSize(1);
assertThat(handler2.getAddedPods()).hasSize(1);
});

informer.removeEventHandler(handler2);

await().pollDelay(Duration.ofMillis(eventEmitTimeWait))
.pollInterval(Duration.ofMillis(100)).untilAsserted(() -> {
assertThat(handler1.getAddedPods()).hasSize(2);
assertThat(handler2.getAddedPods()).hasSize(1);
});
}

}

@Test
void testGenericKubernetesResourceSharedIndexInformerWithAdditionalDeserializers() throws InterruptedException {
// Given
Expand Down Expand Up @@ -1350,6 +1433,22 @@ public void onDelete(T obj, boolean deletedFinalStateUnknown) {
}
}

private static ResourceEventHandler<Object> emptyEventHandler() {
return new ResourceEventHandler<>() {
@Override
public void onAdd(Object obj) {
}

@Override
public void onUpdate(Object oldObj, Object newObj) {
}

@Override
public void onDelete(Object obj, boolean deletedFinalStateUnknown) {
}
};
}

private Star getStar(String name, String resourceVersion) {
StarSpec starSpec = new StarSpec();
starSpec.setType("G");
Expand All @@ -1372,4 +1471,25 @@ private PodSet getPodSet(String name, String resourceVersion) {
return podSet;
}

private class AddRecordingEventHandler implements ResourceEventHandler<Pod> {
private List<Pod> addedPods = new ArrayList<>();

@Override
public void onAdd(Pod obj) {
addedPods.add(obj);
}

@Override
public void onUpdate(Pod oldObj, Pod newObj) {
}

@Override
public void onDelete(Pod obj, boolean deletedFinalStateUnknown) {
}

public List<Pod> getAddedPods() {
return addedPods;
}
}

}

0 comments on commit 5aa3d9b

Please sign in to comment.