Skip to content

Commit

Permalink
Fix 1295 reactive (#1320)
Browse files Browse the repository at this point in the history
  • Loading branch information
wind57 authored Apr 24, 2023
1 parent 061709a commit ad3196b
Show file tree
Hide file tree
Showing 9 changed files with 458 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.cloud.kubernetes.fabric8.discovery;

import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.commons.logging.LogFactory;

import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
Expand All @@ -40,6 +41,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.log.LogAccessor;

/**
* Auto configuration for discovery clients.
Expand All @@ -56,6 +58,9 @@
@AutoConfigureAfter({ Fabric8AutoConfiguration.class, KubernetesDiscoveryPropertiesAutoConfiguration.class })
public class KubernetesDiscoveryClientAutoConfiguration {

private static final LogAccessor LOG = new LogAccessor(
LogFactory.getLog(KubernetesDiscoveryClientAutoConfiguration.class));

@Bean
@ConditionalOnMissingBean
public KubernetesClientServicesFunction servicesFunction(KubernetesDiscoveryProperties properties,
Expand All @@ -77,6 +82,7 @@ public KubernetesDiscoveryClient kubernetesDiscoveryClient(KubernetesClient clie
@ConditionalOnDiscoveryHealthIndicatorEnabled
public KubernetesDiscoveryClientHealthIndicatorInitializer indicatorInitializer(
ApplicationEventPublisher applicationEventPublisher, PodUtils<?> podUtils) {
LOG.debug(() -> "Will publish InstanceRegisteredEvent from blocking implementation");
return new KubernetesDiscoveryClientHealthIndicatorInitializer(podUtils, applicationEventPublisher);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public KubernetesReactiveDiscoveryClient(KubernetesClient client, KubernetesDisc

@Override
public String description() {
return "Kubernetes Reactive Discovery Client";
return "Fabric8 Kubernetes Reactive Discovery Client";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.cloud.kubernetes.fabric8.discovery.reactive;

import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.commons.logging.LogFactory;

import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
Expand All @@ -32,15 +33,19 @@
import org.springframework.cloud.client.discovery.health.DiscoveryClientHealthIndicatorProperties;
import org.springframework.cloud.client.discovery.health.reactive.ReactiveDiscoveryClientHealthIndicator;
import org.springframework.cloud.client.discovery.simple.reactive.SimpleReactiveDiscoveryClientAutoConfiguration;
import org.springframework.cloud.kubernetes.commons.PodUtils;
import org.springframework.cloud.kubernetes.commons.discovery.ConditionalOnKubernetesDiscoveryEnabled;
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryClientHealthIndicatorInitializer;
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties;
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryPropertiesAutoConfiguration;
import org.springframework.cloud.kubernetes.fabric8.discovery.KubernetesClientServicesFunction;
import org.springframework.cloud.kubernetes.fabric8.discovery.KubernetesClientServicesFunctionProvider;
import org.springframework.cloud.kubernetes.fabric8.discovery.KubernetesDiscoveryClientAutoConfiguration;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.log.LogAccessor;

/**
* Auto configuration for reactive discovery client.
Expand All @@ -58,6 +63,9 @@
KubernetesDiscoveryClientAutoConfiguration.class, KubernetesDiscoveryPropertiesAutoConfiguration.class })
public class KubernetesReactiveDiscoveryClientAutoConfiguration {

private static final LogAccessor LOG = new LogAccessor(
LogFactory.getLog(KubernetesReactiveDiscoveryClientAutoConfiguration.class));

@Bean
@ConditionalOnMissingBean
public KubernetesClientServicesFunction servicesFunction(KubernetesDiscoveryProperties properties,
Expand All @@ -73,6 +81,18 @@ public KubernetesReactiveDiscoveryClient kubernetesReactiveDiscoveryClient(Kuber
return new KubernetesReactiveDiscoveryClient(client, properties, kubernetesClientServicesFunction);
}

/**
* Post an event so that health indicator is initialized.
*/
@Bean
@ConditionalOnClass(name = "org.springframework.boot.actuate.health.ReactiveHealthIndicator")
@ConditionalOnDiscoveryHealthIndicatorEnabled
KubernetesDiscoveryClientHealthIndicatorInitializer reactiveIndicatorInitializer(
ApplicationEventPublisher applicationEventPublisher, PodUtils<?> podUtils) {
LOG.debug(() -> "Will publish InstanceRegisteredEvent from reactive implementation");
return new KubernetesDiscoveryClientHealthIndicatorInitializer(podUtils, applicationEventPublisher);
}

@Bean
@ConditionalOnClass(name = "org.springframework.boot.actuate.health.ReactiveHealthIndicator")
@ConditionalOnDiscoveryHealthIndicatorEnabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void afterEach() {
void verifyDefaults() {
ReactiveDiscoveryClient client = new KubernetesReactiveDiscoveryClient(kubernetesClient,
KubernetesDiscoveryProperties.DEFAULT, KubernetesClient::services);
assertThat(client.description()).isEqualTo("Kubernetes Reactive Discovery Client");
assertThat(client.description()).isEqualTo("Fabric8 Kubernetes Reactive Discovery Client");
assertThat(client.getOrder()).isEqualTo(ReactiveDiscoveryClient.DEFAULT_ORDER);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.kubernetes.fabric8.discovery;

import io.fabric8.kubernetes.api.model.Pod;
import org.apache.commons.logging.LogFactory;

import org.springframework.cloud.client.discovery.event.InstanceRegisteredEvent;
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryClientHealthIndicatorInitializer;
import org.springframework.context.ApplicationListener;
import org.springframework.core.log.LogAccessor;
import org.springframework.stereotype.Component;

/**
* @author wind57
*/
@Component
public class Fabric8ApplicationDiscoveryListener implements ApplicationListener<InstanceRegisteredEvent<?>> {

private static final LogAccessor LOG = new LogAccessor(
LogFactory.getLog(Fabric8ApplicationDiscoveryListener.class));

@Override
public void onApplicationEvent(InstanceRegisteredEvent<?> event) {
Pod pod = (Pod) ((KubernetesDiscoveryClientHealthIndicatorInitializer.RegisteredEventSource) event.getSource())
.pod();
LOG.info(() -> "received InstanceRegisteredEvent from pod with 'app' label value : "
+ pod.getMetadata().getLabels().get("app"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.fabric8.kubernetes.api.model.Endpoints;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
Expand All @@ -33,8 +34,10 @@ public class Fabric8DiscoveryController {

private final KubernetesDiscoveryClient discoveryClient;

public Fabric8DiscoveryController(KubernetesDiscoveryClient discoveryClient) {
this.discoveryClient = discoveryClient;
public Fabric8DiscoveryController(ObjectProvider<KubernetesDiscoveryClient> discoveryClient) {
KubernetesDiscoveryClient[] local = new KubernetesDiscoveryClient[1];
discoveryClient.ifAvailable(x -> local[0] = x);
this.discoveryClient = local[0];
}

@GetMapping("/services")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.kubernetes.fabric8.discovery;

import java.util.List;

import reactor.core.publisher.Mono;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.kubernetes.fabric8.discovery.reactive.KubernetesReactiveDiscoveryClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

/**
* @author wind57
*/
@RestController
public class Fabric8ReactiveDiscoveryController {

private final KubernetesReactiveDiscoveryClient reactiveDiscoveryClient;

public Fabric8ReactiveDiscoveryController(
ObjectProvider<KubernetesReactiveDiscoveryClient> reactiveDiscoveryClient) {
KubernetesReactiveDiscoveryClient[] local = new KubernetesReactiveDiscoveryClient[1];
reactiveDiscoveryClient.ifAvailable(x -> local[0] = x);
this.reactiveDiscoveryClient = local[0];
}

@GetMapping("/reactive/services")
public Mono<List<String>> allServices() {
return reactiveDiscoveryClient.getServices().collectList();
}

@GetMapping("/reactive/service-instances/{serviceId}")
public Mono<List<ServiceInstance>> serviceInstances(@PathVariable("serviceId") String serviceId) {
return reactiveDiscoveryClient.getInstances(serviceId).collectList();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
management:
endpoint:
health:
show-details: always
endpoints:
web:
exposure:
include: "*"
Loading

0 comments on commit ad3196b

Please sign in to comment.