Skip to content

Commit

Permalink
Adds audit logging to bean refresh to catch any gaps in bean subscrip…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
scottopell committed Apr 12, 2023
1 parent 9e624cf commit c6c4270
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 34 deletions.
29 changes: 29 additions & 0 deletions src/main/java/org/datadog/jmxfetch/BeanSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.datadog.jmxfetch;

import java.util.concurrent.CompletableFuture;
import java.util.List;

public class BeanSubscriber implements Runnable {
private List<String> beanScopes;
private Connection connection;
private BeanListener listener;
public CompletableFuture<Boolean> subscriptionSuccessful;

BeanSubscriber(List<String> beanScopes, Connection connection, BeanListener listener) {
this.beanScopes = beanScopes;
this.connection = connection;
this.listener = listener;
this.subscriptionSuccessful = new CompletableFuture<Boolean>();
}

public void run() {
try {
connection.subscribeToBeanScopes(beanScopes, this.listener);
this.subscriptionSuccessful.complete(true);

Thread.currentThread().join();
} catch (Exception e) {
this.subscriptionSuccessful.complete(false);
}
}
}
70 changes: 36 additions & 34 deletions src/main/java/org/datadog/jmxfetch/Instance.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.datadog.jmxfetch.service.ServiceNameProvider;
import org.yaml.snakeyaml.Yaml;

import java.util.concurrent.CompletableFuture;
import static java.util.concurrent.TimeUnit.*;

import java.io.File;
Expand Down Expand Up @@ -263,6 +262,7 @@ public Instance(
log.info("collect_default_jvm_metrics is false - not collecting default JVM metrics");
}

this.beans = new HashSet<>();
Boolean enableBeanSubscription = (Boolean) instanceMap.get("enable_bean_subscription");
this.enableBeanSubscription = enableBeanSubscription != null && enableBeanSubscription;
}
Expand Down Expand Up @@ -426,7 +426,7 @@ public synchronized void init(boolean forceNewConnection)
log.info("Trying to connect to JMX Server at " + this.toString());
connection = getConnection(instanceMap, forceNewConnection);
if (this.enableBeanSubscription) {
log.info("Subscribing for bean notifications before init");
log.info("Subscribing for bean notifications before initial bean refresh");
this.subscribeToBeans();
} else {
log.info("Bean subscription not enabled.");
Expand Down Expand Up @@ -723,33 +723,6 @@ public synchronized void beanUnregistered(ObjectName mBeanName) {
log.info("Bean unregistered event. {}", mBeanName);
}

private class BeanSubscriber implements Runnable {
private List<String> beanScopes;
private Connection connection;
private BeanListener listener;
public CompletableFuture<Boolean> subscriptionSuccessful;

BeanSubscriber(List<String> beanScopes, Connection connection, BeanListener listener) {
this.beanScopes = beanScopes;
this.connection = connection;
this.listener = listener;
this.subscriptionSuccessful = new CompletableFuture<Boolean>();
}

public void run() {
try {
log.info("Subscribing to {} bean scopes", beanScopes.size());

connection.subscribeToBeanScopes(beanScopes, this.listener);
this.subscriptionSuccessful.complete(true);

Thread.currentThread().join();
} catch (Exception e) {
log.warn("Exception while subscribing to beans {}", e);
this.subscriptionSuccessful.complete(false);
}
}
}

private void subscribeToBeans() {
List<String> beanScopes = this.getBeansScopes();
Expand All @@ -758,10 +731,13 @@ private void subscribeToBeans() {
try {
new Thread(subscriber).start();
if (subscriber.subscriptionSuccessful.get(1, SECONDS)) {
log.info("Subscribed successfully!");
log.info("Subscribed to {} bean scopes successfully!", beanScopes.size());
}
} catch (Exception e) {
log.warn("Exception while subscribing to beans {}", e);
log.warn("Bean subscription failed! Will rely on bean_refresh, ensure it is set "
+ " to an appropriate value (currently {} seconds). Exception: {}",
this.refreshBeansPeriod, e);
this.enableBeanSubscription = false;
}
}

Expand All @@ -778,27 +754,53 @@ public List<String> getBeansScopes() {
* certain actions, and fallback if necessary.
*/
private synchronized void refreshBeansList() throws IOException {
this.beans = new HashSet<ObjectName>();
Set<ObjectName> newBeans = new HashSet<>();
String action = appConfig.getAction();
boolean limitQueryScopes =
!action.equals(AppConfig.ACTION_LIST_EVERYTHING)
&& !action.equals(AppConfig.ACTION_LIST_NOT_MATCHING);

boolean fullBeanQueryNeeded = false;
if (limitQueryScopes) {
try {
List<String> beanScopes = getBeansScopes();
for (String scope : beanScopes) {
ObjectName name = new ObjectName(scope);
this.beans.addAll(connection.queryNames(name));
newBeans.addAll(connection.queryNames(name));
}
} catch (Exception e) {
fullBeanQueryNeeded = true;
log.error(
"Unable to compute a common bean scope, querying all beans as a fallback",
e);
}
}

this.beans = (this.beans.isEmpty()) ? connection.queryNames(null) : this.beans;
if (fullBeanQueryNeeded) {
newBeans = connection.queryNames(null);
}
if (this.enableBeanSubscription && !fullBeanQueryNeeded) {
// This code exists to validate the bean-subscription is working properly
// If every new bean and de-registered bean properly fires an event, then
// this.beans (current set that has been updated via subscription) should
// always equal the new set of beans queried (unless it was a full bean query)
Set<ObjectName> beansNotSeen = new HashSet<>();
if (this.beans.containsAll(newBeans)) {
beansNotSeen.addAll(newBeans);
beansNotSeen.removeAll(this.beans);
log.error("Bean refresh found {} new beans that were not already known via subscription", beansNotSeen.size());
}
if (!newBeans.containsAll(this.beans)){
beansNotSeen.addAll(this.beans);
beansNotSeen.removeAll(newBeans);
log.error("Bean refresh found {} FEWER beans than already known via subscription", beansNotSeen.size());
}

for (ObjectName b : beansNotSeen) {
log.error("Bean {} is one that has never been seen before, see why subscription did not detect this bean.", b.toString());
}
}
this.beans = newBeans;
this.lastRefreshTime = System.currentTimeMillis();
}

Expand Down

0 comments on commit c6c4270

Please sign in to comment.