From 780fcda89161c036982739a83a0e4b1973b0a467 Mon Sep 17 00:00:00 2001 From: Jesse White Date: Thu, 6 Oct 2016 15:29:04 -0400 Subject: [PATCH] Sleep after an exception is thrown too. --- .../k8s/KubernetesDiscoveryAgent.java | 53 +++++++++---------- 1 file changed, 24 insertions(+), 29 deletions(-) diff --git a/src/main/java/org/apache/activemq/transport/discovery/k8s/KubernetesDiscoveryAgent.java b/src/main/java/org/apache/activemq/transport/discovery/k8s/KubernetesDiscoveryAgent.java index 7eba251..4f40db8 100755 --- a/src/main/java/org/apache/activemq/transport/discovery/k8s/KubernetesDiscoveryAgent.java +++ b/src/main/java/org/apache/activemq/transport/discovery/k8s/KubernetesDiscoveryAgent.java @@ -98,48 +98,43 @@ private class KubernetesPodEnumerator implements Runnable { @Override public void run() { while(running.get()) { - Set services = null; try { LOG.info("Enumerating pods with label key: {} label value: {}", podLabelKey, podLabelValue); - services = client.pods().inNamespace(namespace) + final Set availableServices = client.pods().inNamespace(namespace) .withLabel(podLabelKey, podLabelValue) .list().getItems().stream() .map(pod -> String.format(serviceUrlFormat, pod.getStatus().getPodIP())) .collect(Collectors.toSet()); - } catch (RuntimeException e) { - LOG.warn("Failed to enumerate the pods. Will try again later.", e); - continue; - } - final Set availableServices = services; - LOG.debug("Found services: {}", availableServices); + // Determine the list of service we need to add + final List servicesToAdd = availableServices.stream() + .filter(svc -> !knownServices.contains(svc)) + .collect(Collectors.toList()); - // Determine the list of service we need to add - final List servicesToAdd = availableServices.stream() - .filter(svc -> !knownServices.contains(svc)) - .collect(Collectors.toList()); - - // Add them - for (String service : servicesToAdd) { - LOG.info("Adding service: {}", service); - listener.onServiceAdd(new SimpleDiscoveryEvent(service)); - knownServices.add(service); - } + // Add them + for (String service : servicesToAdd) { + LOG.info("Adding service: {}", service); + listener.onServiceAdd(new SimpleDiscoveryEvent(service)); + knownServices.add(service); + } - // Determine the list of services we need to remove - final List servicesToRemove = knownServices.stream() - .filter(svc -> !availableServices.contains(svc)) - .collect(Collectors.toList()); + // Determine the list of services we need to remove + final List servicesToRemove = knownServices.stream() + .filter(svc -> !availableServices.contains(svc)) + .collect(Collectors.toList()); - // Remove them - for (String service : servicesToRemove) { - LOG.info("Removing service: {}", service); - listener.onServiceRemove(new SimpleDiscoveryEvent(service)); - knownServices.remove(service); + // Remove them + for (String service : servicesToRemove) { + LOG.info("Removing service: {}", service); + listener.onServiceRemove(new SimpleDiscoveryEvent(service)); + knownServices.remove(service); + } + } catch (RuntimeException e) { + LOG.warn("Failed to enumerate the pods. Will try again later.", e); } - // Wait before enumerating again + // Sleep before trying again synchronized(k8sSleepMutex) { try { k8sSleepMutex.wait(sleepDelay);