Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support removing ResourceEventHandler for informers #6655

Merged
merged 7 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* Fix #6282: Allow annotated types with Pattern, Min, and Max with Lists and Maps and CRD generation
* Fix #5480: Move `io.fabric8:zjsonpatch` to KubernetesClient project
* Fix #6240: Support for multiple files listed in the KUBECONFIG env var
* Fix #6655: Support removing ResourceEventHandler for informers

#### Dependency Upgrade
* Fix #2632: Bumped OkHttp from 3.12.12 to 4.12.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ default SharedIndexInformer<T> removeNamespaceIndex() {
*/
SharedIndexInformer<T> addEventHandler(ResourceEventHandler<? super T> handler);

/**
* Remove event handler.
*
* @param handler event handler
*/
SharedIndexInformer<T> removeEventHandler(ResourceEventHandler<? super T> handler);

/**
* Adds an event handler to the shared informer using the specified resync period.
* Events to a single handler are delivered sequentially, but there is no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ public DefaultSharedIndexInformer<T, L> addEventHandler(ResourceEventHandler<? s
return this;
}

@Override
public SharedIndexInformer<T> removeEventHandler(ResourceEventHandler<? super T> handler) {
var listener = this.processor.removeProcessorListener(handler);
if (!started.get() && listener.isPresent()) {
var listenerResyncPeriod = listener.orElseThrow().getResyncPeriodInMillis();
if (listenerResyncPeriod != 0 && resyncCheckPeriodMillis == listenerResyncPeriod) {
this.processor.getMinimalNonZeroResyncPeriod()
.ifPresent(l -> this.resyncCheckPeriodMillis = l);
}
}
return this;
}

@Override
public SharedIndexInformer<T> addEventHandlerWithResyncPeriod(ResourceEventHandler<? super T> handler,
long resyncPeriodMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ public boolean shouldResync(ZonedDateTime now) {
return this.resyncPeriodInMillis != 0 && (now.isAfter(this.nextResync) || now.equals(this.nextResync));
}

public long getResyncPeriodInMillis() {
return resyncPeriodInMillis;
}

public abstract static class Notification<T> {
private final T oldObject;
private final T newObject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.ReadWriteLock;
Expand Down Expand Up @@ -175,6 +176,32 @@ public ProcessorListener<T> addProcessorListener(ResourceEventHandler<? super T>
}
}

public Optional<ProcessorListener<T>> removeProcessorListener(ResourceEventHandler<? super T> handler) {
lock.writeLock().lock();
try {
var targetListener = this.listeners.stream().filter(l -> l.getHandler() == handler).findFirst();
targetListener.ifPresent(l -> {
this.listeners.remove(l);
if (l.isReSync()) {
this.syncingListeners.remove(l);
}
});
return targetListener;
} finally {
lock.writeLock().unlock();
}
}

public Optional<Long> getMinimalNonZeroResyncPeriod() {
lock.readLock().lock();
try {
return this.listeners.stream().map(ProcessorListener::getResyncPeriodInMillis)
.filter(p -> p > 0L).min(Long::compareTo);
} finally {
lock.readLock().unlock();
}
}

public void executeIfPossible(Runnable runnable) {
try {
this.executor.execute(runnable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import io.fabric8.kubernetes.client.informers.impl.DefaultSharedIndexInformer;
import io.fabric8.kubernetes.client.mock.crd.Animal;
import io.fabric8.kubernetes.client.mock.crd.AnimalSpec;
import io.fabric8.kubernetes.client.mock.crd.CronTab;
Expand All @@ -64,7 +65,10 @@
import org.junit.jupiter.api.Test;

import java.net.HttpURLConnection;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -1158,6 +1162,85 @@ void testGenericKubernetesResourceSharedIndexInformerWithNamespaceConfigured() t
assertEquals(0, foundExistingAnimal.getCount());
}

@Test
void removeEventHandlerBeforeStartAdjustsResyncPeriod() {
var longResyncPeriod = 3000;
var shorterResyncPeriod = 2000;
var eventHandlerLongResync = emptyEventHandler();
var eventHandlerShorterResync = emptyEventHandler();

SharedIndexInformer<Pod> podInformer = factory.sharedIndexInformerFor(Pod.class, 4000);
podInformer.addEventHandlerWithResyncPeriod(eventHandlerLongResync, longResyncPeriod);
podInformer.addEventHandlerWithResyncPeriod(eventHandlerShorterResync, shorterResyncPeriod);

assertThat(((DefaultSharedIndexInformer<?, ?>) podInformer).getFullResyncPeriod())
.isEqualTo(shorterResyncPeriod);

podInformer.removeEventHandler(eventHandlerShorterResync);

assertThat(((DefaultSharedIndexInformer<?, ?>) podInformer).getFullResyncPeriod())
.isEqualTo(longResyncPeriod);
}

@Test
void stopReceivingEventsWhenEventHandlerRemoved() {
String startResourceVersion = "1000";
var eventEmitTimeWait = 500L;

server.expect()
.withPath("/api/v1/pods?resourceVersion=0")
.andReturn(200, new PodListBuilder().withNewMetadata()
.withResourceVersion(startResourceVersion)
.endMetadata()
.withItems(Collections.emptyList())
.build())
.once();
server.expect()
.withPath("/api/v1/pods?allowWatchBookmarks=true&resourceVersion=" + startResourceVersion
+ "&timeoutSeconds=600&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(eventEmitTimeWait)
.andEmit(new WatchEvent(new PodBuilder().withNewMetadata()
.withNamespace("test")
.withName("pod1")
.withResourceVersion("1001")
.endMetadata()
.build(), "ADDED"))
.waitFor(2 * eventEmitTimeWait)
.andEmit(new WatchEvent(new PodBuilder().withNewMetadata()
.withNamespace("test")
.withName("pod2")
.withResourceVersion("1002")
.endMetadata()
.build(), "ADDED"))
.done()
.always();

var handler1 = new AddRecordingEventHandler();
var handler2 = new AddRecordingEventHandler();

try (SharedIndexInformer<Pod> informer = client.pods().inAnyNamespace().runnableInformer(0)) {
informer.run();
informer.addEventHandler(handler1);
informer.addEventHandler(handler2);

await().pollInterval(Duration.ofMillis(100)).untilAsserted(() -> {
assertThat(handler1.getAddedPods()).hasSize(1);
assertThat(handler2.getAddedPods()).hasSize(1);
});

informer.removeEventHandler(handler2);

await().pollDelay(Duration.ofMillis(eventEmitTimeWait))
.pollInterval(Duration.ofMillis(100)).untilAsserted(() -> {
assertThat(handler1.getAddedPods()).hasSize(2);
assertThat(handler2.getAddedPods()).hasSize(1);
});
}

}

@Test
void testGenericKubernetesResourceSharedIndexInformerWithAdditionalDeserializers() throws InterruptedException {
// Given
Expand Down Expand Up @@ -1352,6 +1435,22 @@ public void onDelete(T obj, boolean deletedFinalStateUnknown) {
}
}

private static ResourceEventHandler<Object> emptyEventHandler() {
return new ResourceEventHandler<>() {
@Override
public void onAdd(Object obj) {
}

@Override
public void onUpdate(Object oldObj, Object newObj) {
}

@Override
public void onDelete(Object obj, boolean deletedFinalStateUnknown) {
}
};
}

private Star getStar(String name, String resourceVersion) {
StarSpec starSpec = new StarSpec();
starSpec.setType("G");
Expand All @@ -1374,4 +1473,25 @@ private PodSet getPodSet(String name, String resourceVersion) {
return podSet;
}

private class AddRecordingEventHandler implements ResourceEventHandler<Pod> {
private List<Pod> addedPods = new ArrayList<>();

@Override
public void onAdd(Pod obj) {
addedPods.add(obj);
}

@Override
public void onUpdate(Pod oldObj, Pod newObj) {
}

@Override
public void onDelete(Pod obj, boolean deletedFinalStateUnknown) {
}

public List<Pod> getAddedPods() {
return addedPods;
}
}

}
Loading