From 54566066ad7c45c686285668f2e10f511f0d0520 Mon Sep 17 00:00:00 2001 From: biswassri Date: Tue, 31 Jan 2023 07:28:15 -0800 Subject: [PATCH 1/3] JOSDK and fabric8 upgrade --- .../ManagedKafkaAgentController.java | 6 +----- .../controllers/ManagedKafkaController.java | 17 ++--------------- .../operator/events/ControllerEventFilter.java | 10 ++++------ .../operator/events/ResourceEventSource.java | 2 +- .../events/ControllerEventFilterTest.java | 10 +++++----- pom.xml | 4 ++-- 6 files changed, 15 insertions(+), 34 deletions(-) diff --git a/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaAgentController.java b/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaAgentController.java index fb71914de..733a441c5 100644 --- a/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaAgentController.java +++ b/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaAgentController.java @@ -1,6 +1,5 @@ package org.bf2.operator.controllers; -import io.javaoperatorsdk.operator.api.reconciler.Constants; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; @@ -11,7 +10,6 @@ import io.quarkus.scheduler.Scheduled.ConcurrentExecution; import org.bf2.common.ConditionUtils; import org.bf2.common.ManagedKafkaAgentResourceClient; -import org.bf2.operator.events.ControllerEventFilter; import org.bf2.operator.managers.CapacityManager; import org.bf2.operator.managers.InformerManager; import org.bf2.operator.managers.ObservabilityManager; @@ -44,9 +42,7 @@ */ @ApplicationScoped @ControllerConfiguration( - finalizerName = Constants.NO_FINALIZER, - generationAwareEventProcessing = false, - eventFilters = { ControllerEventFilter.class }) + generationAwareEventProcessing = false) public class ManagedKafkaAgentController implements Reconciler { @Inject diff --git a/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaController.java b/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaController.java index 811f424a0..35880f78d 100644 --- a/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaController.java +++ b/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaController.java @@ -1,20 +1,14 @@ package org.bf2.operator.controllers; -import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.client.utils.Serialization; -import io.javaoperatorsdk.operator.api.reconciler.Constants; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; -import io.javaoperatorsdk.operator.processing.event.source.EventSource; import io.micrometer.core.annotation.Counted; import io.micrometer.core.annotation.Timed; import org.bf2.common.ConditionUtils; import org.bf2.common.ManagedKafkaResourceClient; -import org.bf2.operator.events.ControllerEventFilter; import org.bf2.operator.events.ResourceEventSource; import org.bf2.operator.managers.CapacityManager; import org.bf2.operator.managers.IngressControllerManager; @@ -41,17 +35,14 @@ import javax.inject.Inject; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Optional; @ControllerConfiguration( - finalizerName = Constants.NO_FINALIZER, - generationAwareEventProcessing = false, - eventFilters = { ControllerEventFilter.class }) -public class ManagedKafkaController implements Reconciler, EventSourceInitializer { + generationAwareEventProcessing = false) +public class ManagedKafkaController implements Reconciler { // 1 for bootstrap URL + 1 for Admin API server private static final int NUM_NON_BROKER_ROUTES = 2; @@ -128,10 +119,6 @@ public UpdateControl reconcile(ManagedKafka managedKafka, Context } } - @Override - public List prepareEventSources(EventSourceContext context) { - return Arrays.asList(eventSource); - } /** * Extract from the current KafkaInstance overall status (Kafka, Canary and AdminServer) diff --git a/operator/src/main/java/org/bf2/operator/events/ControllerEventFilter.java b/operator/src/main/java/org/bf2/operator/events/ControllerEventFilter.java index b5e8aa0ce..e9d115446 100644 --- a/operator/src/main/java/org/bf2/operator/events/ControllerEventFilter.java +++ b/operator/src/main/java/org/bf2/operator/events/ControllerEventFilter.java @@ -6,8 +6,7 @@ import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.client.utils.Serialization; import io.fabric8.zjsonpatch.JsonDiff; -import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter; +import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter; import org.jboss.logging.Logger; import java.util.Collections; @@ -29,14 +28,13 @@ * * */ -public class ControllerEventFilter implements ResourceEventFilter { +public class ControllerEventFilter implements OnUpdateFilter { private static Logger log = Logger.getLogger(ControllerEventFilter.class); @Override - public boolean acceptChange(ControllerConfiguration configuration, - HasMetadata oldResource, - HasMetadata newResource) { + public boolean accept(HasMetadata oldResource, + HasMetadata newResource) { Optional oldMeta = meta(oldResource); Optional newMeta = meta(newResource); diff --git a/operator/src/main/java/org/bf2/operator/events/ResourceEventSource.java b/operator/src/main/java/org/bf2/operator/events/ResourceEventSource.java index 5af0bb821..1e3ab3847 100644 --- a/operator/src/main/java/org/bf2/operator/events/ResourceEventSource.java +++ b/operator/src/main/java/org/bf2/operator/events/ResourceEventSource.java @@ -45,7 +45,7 @@ protected void handleEvent(HasMetadata resource, ResourceAction action) { // the operator may not have inited yet if (getEventHandler() != null) { ResourceID.fromFirstOwnerReference(resource).ifPresentOrElse( - ownerId -> getEventHandler().handleEvent(new ResourceEvent(action, ownerId)), + ownerId -> getEventHandler().handleEvent(new ResourceEvent(action, ownerId )), () -> log.warnf("%s %s/%s does not have OwnerReference", resource.getKind(), resource.getMetadata().getNamespace(), resource.getMetadata().getName())); } } diff --git a/operator/src/test/java/org/bf2/operator/events/ControllerEventFilterTest.java b/operator/src/test/java/org/bf2/operator/events/ControllerEventFilterTest.java index 3b2f8927f..467d9f33b 100644 --- a/operator/src/test/java/org/bf2/operator/events/ControllerEventFilterTest.java +++ b/operator/src/test/java/org/bf2/operator/events/ControllerEventFilterTest.java @@ -27,7 +27,7 @@ void setup() { @Test void testAddResourceAccepted() { - assertTrue(target.acceptChange(mockConfiguration, null, new ManagedKafka())); + assertTrue(target.accept(null, new ManagedKafka())); } @Test @@ -43,7 +43,7 @@ void testChangedGenerationAccepted() { .endMetadata() .build(); - assertTrue(target.acceptChange(mockConfiguration, oldMk, newMk)); + assertTrue(target.accept(oldMk, newMk)); } @Test @@ -59,7 +59,7 @@ void testChangedAnnotationsAccepted() { .endMetadata() .build(); - assertTrue(target.acceptChange(mockConfiguration, oldMk, newMk)); + assertTrue(target.accept(oldMk, newMk)); } @Test @@ -75,7 +75,7 @@ void testChangedLabelsAccepted() { .endMetadata() .build(); - assertTrue(target.acceptChange(mockConfiguration, oldMk, newMk)); + assertTrue(target.accept(oldMk, newMk)); } @Test @@ -95,6 +95,6 @@ void testUnchangedNotAccepted() { .endMetadata() .build(); - assertFalse(target.acceptChange(mockConfiguration, oldMk, newMk)); + assertFalse(target.accept(oldMk, newMk)); } } diff --git a/pom.xml b/pom.xml index 916a58546..1c1098e54 100644 --- a/pom.xml +++ b/pom.xml @@ -61,8 +61,8 @@ quarkus-bom ${quarkus.platform.version} 0.29.0 - 5.12.2 - 3.0.9 + 5.12.4 + 4.0.7 io.quarkus 1.7.2 0.50.4 From 43d55dfdffd0b8cced0c39766e2cd4487bc5300e Mon Sep 17 00:00:00 2001 From: biswassri Date: Wed, 1 Feb 2023 08:16:31 -0800 Subject: [PATCH 2/3] Changes from PR review --- .../controllers/ManagedKafkaAgentController.java | 4 +++- .../operator/controllers/ManagedKafkaController.java | 4 +++- .../org/bf2/operator/events/ControllerEventFilter.java | 1 - .../org/bf2/operator/events/ResourceEventSource.java | 4 ++-- .../bf2/operator/events/ControllerEventFilterTest.java | 10 ---------- 5 files changed, 8 insertions(+), 15 deletions(-) diff --git a/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaAgentController.java b/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaAgentController.java index 733a441c5..e95e25271 100644 --- a/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaAgentController.java +++ b/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaAgentController.java @@ -10,6 +10,7 @@ import io.quarkus.scheduler.Scheduled.ConcurrentExecution; import org.bf2.common.ConditionUtils; import org.bf2.common.ManagedKafkaAgentResourceClient; +import org.bf2.operator.events.ControllerEventFilter; import org.bf2.operator.managers.CapacityManager; import org.bf2.operator.managers.InformerManager; import org.bf2.operator.managers.ObservabilityManager; @@ -42,7 +43,8 @@ */ @ApplicationScoped @ControllerConfiguration( - generationAwareEventProcessing = false) + generationAwareEventProcessing = false, + onUpdateFilter = ControllerEventFilter.class) public class ManagedKafkaAgentController implements Reconciler { @Inject diff --git a/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaController.java b/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaController.java index 35880f78d..21b365bf4 100644 --- a/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaController.java +++ b/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaController.java @@ -9,6 +9,7 @@ import io.micrometer.core.annotation.Timed; import org.bf2.common.ConditionUtils; import org.bf2.common.ManagedKafkaResourceClient; +import org.bf2.operator.events.ControllerEventFilter; import org.bf2.operator.events.ResourceEventSource; import org.bf2.operator.managers.CapacityManager; import org.bf2.operator.managers.IngressControllerManager; @@ -41,7 +42,8 @@ import java.util.Optional; @ControllerConfiguration( - generationAwareEventProcessing = false) + generationAwareEventProcessing = false, + onUpdateFilter = ControllerEventFilter.class) public class ManagedKafkaController implements Reconciler { // 1 for bootstrap URL + 1 for Admin API server diff --git a/operator/src/main/java/org/bf2/operator/events/ControllerEventFilter.java b/operator/src/main/java/org/bf2/operator/events/ControllerEventFilter.java index e9d115446..7c2836026 100644 --- a/operator/src/main/java/org/bf2/operator/events/ControllerEventFilter.java +++ b/operator/src/main/java/org/bf2/operator/events/ControllerEventFilter.java @@ -40,7 +40,6 @@ public boolean accept(HasMetadata oldResource, Optional newMeta = meta(newResource); boolean resourceChanged = - oldResource == null || // Always accept an "add" change changed(oldMeta, newMeta, this::generation) || changed(oldMeta, newMeta, this::annotations) || changed(oldMeta, newMeta, this::labels); diff --git a/operator/src/main/java/org/bf2/operator/events/ResourceEventSource.java b/operator/src/main/java/org/bf2/operator/events/ResourceEventSource.java index 1e3ab3847..d8d078f9b 100644 --- a/operator/src/main/java/org/bf2/operator/events/ResourceEventSource.java +++ b/operator/src/main/java/org/bf2/operator/events/ResourceEventSource.java @@ -45,13 +45,13 @@ protected void handleEvent(HasMetadata resource, ResourceAction action) { // the operator may not have inited yet if (getEventHandler() != null) { ResourceID.fromFirstOwnerReference(resource).ifPresentOrElse( - ownerId -> getEventHandler().handleEvent(new ResourceEvent(action, ownerId )), + ownerId -> getEventHandler().handleEvent(new ResourceEvent(action, ownerId, null )), () -> log.warnf("%s %s/%s does not have OwnerReference", resource.getKind(), resource.getMetadata().getNamespace(), resource.getMetadata().getName())); } } public void handleEvent(CustomResource resource) { if (getEventHandler() != null) { - getEventHandler().handleEvent(new ResourceEvent(ResourceAction.UPDATED, ResourceID.fromResource(resource))); + getEventHandler().handleEvent(new ResourceEvent(ResourceAction.UPDATED, ResourceID.fromResource(resource), null)); } } } diff --git a/operator/src/test/java/org/bf2/operator/events/ControllerEventFilterTest.java b/operator/src/test/java/org/bf2/operator/events/ControllerEventFilterTest.java index 467d9f33b..fa98b575f 100644 --- a/operator/src/test/java/org/bf2/operator/events/ControllerEventFilterTest.java +++ b/operator/src/test/java/org/bf2/operator/events/ControllerEventFilterTest.java @@ -1,12 +1,9 @@ package org.bf2.operator.events; -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import org.bf2.operator.resources.v1alpha1.ManagedKafka; import org.bf2.operator.resources.v1alpha1.ManagedKafkaBuilder; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import java.util.Map; @@ -16,18 +13,11 @@ class ControllerEventFilterTest { ControllerEventFilter target; - ControllerConfiguration mockConfiguration; @BeforeEach @SuppressWarnings("unchecked") void setup() { target = new ControllerEventFilter(); - mockConfiguration = Mockito.mock(ControllerConfiguration.class); - } - - @Test - void testAddResourceAccepted() { - assertTrue(target.accept(null, new ManagedKafka())); } @Test From eff85f7b59b64dbacc1d113a16eb255639f70ad8 Mon Sep 17 00:00:00 2001 From: biswassri Date: Wed, 1 Feb 2023 13:03:07 -0800 Subject: [PATCH 3/3] Fix for Controller --- .../controllers/ManagedKafkaController.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaController.java b/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaController.java index 21b365bf4..6daf1c65b 100644 --- a/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaController.java +++ b/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaController.java @@ -1,10 +1,14 @@ package org.bf2.operator.controllers; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.client.utils.Serialization; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; import io.micrometer.core.annotation.Counted; import io.micrometer.core.annotation.Timed; import org.bf2.common.ConditionUtils; @@ -38,13 +42,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; @ControllerConfiguration( generationAwareEventProcessing = false, onUpdateFilter = ControllerEventFilter.class) -public class ManagedKafkaController implements Reconciler { +public class ManagedKafkaController implements Reconciler, EventSourceInitializer { // 1 for bootstrap URL + 1 for Admin API server private static final int NUM_NON_BROKER_ROUTES = 2; @@ -120,8 +125,10 @@ public UpdateControl reconcile(ManagedKafka managedKafka, Context } } } - - + @Override + public Map prepareEventSources(EventSourceContext context) { + return Map.of("ownedResources", eventSource); + } /** * Extract from the current KafkaInstance overall status (Kafka, Canary and AdminServer) * a corresponding list of ManagedKafkaCondition(s) to set on the ManagedKafka status @@ -129,6 +136,7 @@ public UpdateControl reconcile(ManagedKafka managedKafka, Context * @param managedKafka ManagedKafka instance * @param invalid */ + private void updateManagedKafkaStatus(ManagedKafka managedKafka, Optional invalid) { ManagedKafkaStatus originalStatus = null; if (managedKafka.getStatus() != null) {