Skip to content

Commit

Permalink
Refactoring Service/Stat Metrics (#376) (#387)
Browse files Browse the repository at this point in the history
* Updating the code for service metric change in commons

Signed-off-by: Khushboo Rajput <khushbr@amazon.com>

* Refactoring code, fixing UTs

Signed-off-by: Khushboo Rajput <khushbr@amazon.com>

---------

Signed-off-by: Khushboo Rajput <khushbr@amazon.com>
(cherry picked from commit 7ba9df5)

Co-authored-by: Khushboo Rajput <59671881+khushbr@users.noreply.github.com>
  • Loading branch information
opensearch-trigger-bot[bot] and khushbr authored May 30, 2023
1 parent 22c824a commit fde2f23
Show file tree
Hide file tree
Showing 109 changed files with 750 additions and 1,442 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,24 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.sun.net.httpserver.HttpServer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.util.concurrent.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.collectors.*;
import org.opensearch.performanceanalyzer.commons.collectors.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.config.ConfigStatus;
import org.opensearch.performanceanalyzer.commons.config.PluginSettings;
import org.opensearch.performanceanalyzer.commons.metrics.ExceptionsAndErrors;
import org.opensearch.performanceanalyzer.commons.metrics.MeasurementSet;
import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.commons.stats.CommonStats;
import org.opensearch.performanceanalyzer.commons.stats.IListener;
import org.opensearch.performanceanalyzer.commons.stats.SampleAggregator;
import org.opensearch.performanceanalyzer.commons.util.Util;
import org.opensearch.performanceanalyzer.commons.stats.ServiceMetrics;
import org.opensearch.performanceanalyzer.commons.stats.collectors.SampleAggregator;
import org.opensearch.performanceanalyzer.commons.stats.emitters.ISampler;
import org.opensearch.performanceanalyzer.commons.stats.emitters.PeriodicSamplers;
import org.opensearch.performanceanalyzer.commons.stats.listeners.IListener;
import org.opensearch.performanceanalyzer.commons.stats.measurements.MeasurementSet;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.config.TroubleshootingConfig;
import org.opensearch.performanceanalyzer.core.Util;
import org.opensearch.performanceanalyzer.jvm.GCMetrics;
import org.opensearch.performanceanalyzer.jvm.HeapMetrics;
import org.opensearch.performanceanalyzer.jvm.ThreadList;
Expand All @@ -54,8 +48,6 @@
import org.opensearch.performanceanalyzer.rca.samplers.BatchMetricsEnabledSampler;
import org.opensearch.performanceanalyzer.rca.samplers.MetricsDBFileSampler;
import org.opensearch.performanceanalyzer.rca.samplers.RcaStateSamplers;
import org.opensearch.performanceanalyzer.rca.stats.emitters.ISampler;
import org.opensearch.performanceanalyzer.rca.stats.emitters.PeriodicSamplers;
import org.opensearch.performanceanalyzer.reader.ReaderMetricsProcessor;
import org.opensearch.performanceanalyzer.rest.QueryBatchRequestHandler;
import org.opensearch.performanceanalyzer.rest.QueryMetricsRequestHandler;
Expand All @@ -64,8 +56,9 @@

public class PerformanceAnalyzerApp {

private static final int EXCEPTION_QUEUE_LENGTH = 1;
private static final Logger LOG = LogManager.getLogger(PerformanceAnalyzerApp.class);

private static final int EXCEPTION_QUEUE_LENGTH = 1;
private static final ScheduledMetricCollectorsExecutor METRIC_COLLECTOR_EXECUTOR =
new ScheduledMetricCollectorsExecutor(1, false);
private static final ScheduledExecutorService netOperationsExecutor =
Expand All @@ -75,33 +68,29 @@ public class PerformanceAnalyzerApp {
private static RcaController rcaController = null;
private static final ThreadProvider THREAD_PROVIDER = new ThreadProvider();

public static IListener MISBEHAVING_NODES_LISTENER =
new MisbehavingGraphOperateMethodListener();

public static void initAggregators() {
if (CommonStats.RCA_STATS_REPORTER != null) {
return;
}
CommonStats.RCA_GRAPH_METRICS_AGGREGATOR = new SampleAggregator(RcaGraphMetrics.values());
CommonStats.RCA_RUNTIME_METRICS_AGGREGATOR =
ServiceMetrics.READER_METRICS_AGGREGATOR = new SampleAggregator(ReaderMetrics.values());
ServiceMetrics.RCA_GRAPH_METRICS_AGGREGATOR =
new SampleAggregator(RcaGraphMetrics.values());
ServiceMetrics.RCA_RUNTIME_METRICS_AGGREGATOR =
new SampleAggregator(RcaRuntimeMetrics.values());
CommonStats.RCA_VERTICES_METRICS_AGGREGATOR =
ServiceMetrics.RCA_VERTICES_METRICS_AGGREGATOR =
new SampleAggregator(RcaVerticesMetrics.values());
CommonStats.READER_METRICS_AGGREGATOR = new SampleAggregator(ReaderMetrics.values());

CommonStats.ERRORS_AND_EXCEPTIONS_AGGREGATOR =
final IListener MISBEHAVING_NODES_LISTENER = new MisbehavingGraphOperateMethodListener();
ServiceMetrics.ERRORS_AND_EXCEPTIONS_AGGREGATOR =
new SampleAggregator(
MISBEHAVING_NODES_LISTENER.getMeasurementsListenedTo(),
MISBEHAVING_NODES_LISTENER,
ExceptionsAndErrors.values());

CommonStats.PERIODIC_SAMPLE_AGGREGATOR = new SampleAggregator(getPeriodicMeasurementSets());

CommonStats.initStatsReporter();
ServiceMetrics.PERIODIC_SAMPLE_AGGREGATOR =
new SampleAggregator(getPeriodicMeasurementSets());
ServiceMetrics.initStatsReporter();
}

static {
initAggregators();
Objects.requireNonNull(
ServiceMetrics.STATS_REPORTER, "Service Metrics(Stat) Reporter should not be null");
}

public static PeriodicSamplers PERIODIC_SAMPLERS;
Expand Down Expand Up @@ -143,7 +132,7 @@ public static void main(String[] args) {
AppContext appContext = new AppContext();
PERIODIC_SAMPLERS =
new PeriodicSamplers(
CommonStats.PERIODIC_SAMPLE_AGGREGATOR,
ServiceMetrics.PERIODIC_SAMPLE_AGGREGATOR,
getAllSamplers(appContext),
(MetricsConfiguration.CONFIG_MAP.get(StatsCollector.class)
.samplingInterval)
Expand All @@ -166,8 +155,8 @@ public static void main(String[] args) {
startRcaTopLevelThread(clientServers, connectionManager, appContext, THREAD_PROVIDER);
} else {
LOG.error("Performance analyzer app stopped due to invalid config status.");
CommonStats.ERRORS_AND_EXCEPTIONS_AGGREGATOR.updateStat(
ExceptionsAndErrors.INVALID_CONFIG_RCA_AGENT_STOPPED, "", 1);
StatsCollector.instance()
.logException(StatExceptionCode.INVALID_CONFIG_RCA_AGENT_STOPPED);
}
}

Expand Down Expand Up @@ -219,12 +208,13 @@ public static Thread startErrorHandlingThread(
final PAThreadException exception = errorQueue.take();
handle(exception);
} catch (InterruptedException e) {
CommonStats.READER_METRICS_AGGREGATOR.updateStat(
ReaderMetrics.ERROR_HANDLER_THREAD_STOPPED, "", 1);
LOG.error(
"Exception handling thread interrupted. Reason: {}",
e.getMessage(),
e);
StatsCollector.instance()
.logException(
StatExceptionCode.ERROR_HANDLER_THREAD_STOPPED);
break;
}
}
Expand All @@ -241,17 +231,17 @@ public static Thread startErrorHandlingThread(
* @param exception The exception thrown from the thread.
*/
private static void handle(PAThreadException exception) {
// Currently this will only log an exception and increment a metric indicating that the
// Currently, this will only log an exception and increment a metric indicating that the
// thread has died.
// As an improvement to this functionality, once we know what exceptions are retryable, we
// can have each thread also register an error handler for itself. This handler will know
// what to do when the thread has stopped due to an unexpected exception.
CommonStats.READER_METRICS_AGGREGATOR.updateStat(ReaderMetrics.OTHER, "", 1);
LOG.error(
"Thread: {} ran into an uncaught exception: {}",
exception.getPaThreadName(),
exception.getInnerThrowable(),
exception);
StatsCollector.instance().logException(StatExceptionCode.READER_METRICS_PROCESSOR_ERROR);
}

public static Thread startWebServerThread(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,25 @@
package org.opensearch.performanceanalyzer;


import org.opensearch.performanceanalyzer.commons.metrics.MeasurementSet;
import org.opensearch.performanceanalyzer.rca.framework.metrics.ReaderMetrics;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;

/**
* Enum of threads that are spawned by Performance Analyzer agent. Each enum value encapsulates two
* properties of every PA thread. 1) Name of the thread. 2) Counter name against which error metrics
* need to be recorded when the thread runs into an unhandled exception.
*/
public enum PerformanceAnalyzerThreads {
PA_READER("pa-reader", ReaderMetrics.READER_THREAD_STOPPED),
PA_ERROR_HANDLER("pa-error-handler", ReaderMetrics.ERROR_HANDLER_THREAD_STOPPED),
GRPC_SERVER("grpc-server", ReaderMetrics.GRPC_SERVER_THREAD_STOPPED),
WEB_SERVER("web-server", ReaderMetrics.WEB_SERVER_THREAD_STOPPED),
RCA_CONTROLLER("rca-controller", ReaderMetrics.RCA_CONTROLLER_THREAD_STOPPED),
RCA_SCHEDULER("rca-scheduler", ReaderMetrics.RCA_SCHEDULER_THREAD_STOPPED);
PA_READER("pa-reader", StatExceptionCode.READER_THREAD_STOPPED),
PA_ERROR_HANDLER("pa-error-handler", StatExceptionCode.ERROR_HANDLER_THREAD_STOPPED),
GRPC_SERVER("grpc-server", StatExceptionCode.GRPC_SERVER_THREAD_STOPPED),
WEB_SERVER("web-server", StatExceptionCode.WEB_SERVER_THREAD_STOPPED),
RCA_CONTROLLER("rca-controller", StatExceptionCode.RCA_CONTROLLER_THREAD_STOPPED),
RCA_SCHEDULER("rca-scheduler", StatExceptionCode.RCA_SCHEDULER_THREAD_STOPPED);

private final String value;
private final MeasurementSet threadExceptionCode;
private final StatExceptionCode threadExceptionCode;

PerformanceAnalyzerThreads(final String value, final MeasurementSet threadExceptionCode) {
PerformanceAnalyzerThreads(final String value, final StatExceptionCode threadExceptionCode) {
this.value = value;
this.threadExceptionCode = threadExceptionCode;
}
Expand All @@ -47,7 +46,7 @@ public String toString() {
*
* @return the name of the counter.
*/
public MeasurementSet getThreadExceptionCode() {
public StatExceptionCode getThreadExceptionCode() {
return threadExceptionCode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.commons.metrics.MetricsProcessor;
import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.commons.metrics.WriterMetrics;
import org.opensearch.performanceanalyzer.commons.stats.CommonStats;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics;
import org.opensearch.performanceanalyzer.metrics_generator.DiskMetricsGenerator;
import org.opensearch.performanceanalyzer.metrics_generator.OSMetricsGenerator;

Expand All @@ -25,7 +25,11 @@ public class DisksCollector extends PerformanceAnalyzerMetricsCollector
MetricsConfiguration.CONFIG_MAP.get(DisksCollector.class).samplingInterval;

public DisksCollector() {
super(sTimeInterval, "DisksCollector");
super(
sTimeInterval,
"DisksCollector",
StatMetrics.DISKS_COLLECTOR_EXECUTION_TIME,
StatExceptionCode.DISK_METRICS_COLLECTOR_ERROR);
}

@Override
Expand All @@ -45,15 +49,10 @@ public void collectMetrics(long startTime) {
if (generator == null) {
return;
}
long mCurrT = System.currentTimeMillis();
DiskMetricsGenerator diskMetricsGenerator = generator.getDiskMetricsGenerator();
diskMetricsGenerator.addSample();

saveMetricValues(getMetrics(diskMetricsGenerator), startTime);
CommonStats.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.DISKS_COLLECTOR_EXECUTION_TIME,
"",
System.currentTimeMillis() - mCurrT);
}

private Map<String, DiskMetrics> getMetricsMap(DiskMetricsGenerator diskMetricsGenerator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.commons.metrics.MetricsProcessor;
import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.commons.metrics.WriterMetrics;
import org.opensearch.performanceanalyzer.commons.stats.CommonStats;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics;
import org.opensearch.performanceanalyzer.jvm.GarbageCollectorInfo;

/**
Expand All @@ -30,12 +30,15 @@ public class GCInfoCollector extends PerformanceAnalyzerMetricsCollector
private static final int EXPECTED_KEYS_PATH_LENGTH = 0;

public GCInfoCollector() {
super(SAMPLING_TIME_INTERVAL, "GCInfo");
super(
SAMPLING_TIME_INTERVAL,
"GCInfo",
StatMetrics.GC_INFO_COLLECTOR_EXECUTION_TIME,
StatExceptionCode.GC_INFO_COLLECTOR_ERROR);
}

@Override
public void collectMetrics(long startTime) {
long mCurrT = System.currentTimeMillis();
// Zero the string builder
value.setLength(0);

Expand All @@ -50,10 +53,6 @@ public void collectMetrics(long startTime) {
}

saveMetricValues(value.toString(), startTime);
CommonStats.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.GC_INFO_COLLECTOR_EXECUTION_TIME,
"",
System.currentTimeMillis() - mCurrT);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.commons.metrics.MetricsProcessor;
import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.commons.metrics.WriterMetrics;
import org.opensearch.performanceanalyzer.commons.stats.CommonStats;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics;
import org.opensearch.performanceanalyzer.jvm.GCMetrics;
import org.opensearch.performanceanalyzer.jvm.HeapMetrics;

Expand All @@ -33,12 +33,15 @@ public class HeapMetricsCollector extends PerformanceAnalyzerMetricsCollector
private static final int KEYS_PATH_LENGTH = 0;

public HeapMetricsCollector() {
super(SAMPLING_TIME_INTERVAL, "HeapMetrics");
super(
SAMPLING_TIME_INTERVAL,
"HeapMetrics",
StatMetrics.HEAP_METRICS_COLLECTOR_EXECUTION_TIME,
StatExceptionCode.HEAP_METRICS_COLLECTOR_ERROR);
}

@Override
public void collectMetrics(long startTime) {
long mCurrT = System.currentTimeMillis();
GCMetrics.runGCMetrics();

value.setLength(0);
Expand Down Expand Up @@ -76,10 +79,6 @@ public void collectMetrics(long startTime) {
}

saveMetricValues(value.toString(), startTime);
CommonStats.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.HEAP_METRICS_COLLECTOR_EXECUTION_TIME,
"",
System.currentTimeMillis() - mCurrT);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.commons.metrics.MetricsProcessor;
import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics;
import org.opensearch.performanceanalyzer.metrics_generator.MountedPartitionMetricsGenerator;
import org.opensearch.performanceanalyzer.metrics_generator.OSMetricsGenerator;

Expand All @@ -24,7 +26,11 @@ public class MountedPartitionMetricsCollector extends PerformanceAnalyzerMetrics
private static final int EXPECTED_KEYS_PATH_LENGTH = 0;

public MountedPartitionMetricsCollector() {
super(SAMPLING_TIME_INTERVAL, "MountedPartition");
super(
SAMPLING_TIME_INTERVAL,
"MountedPartition",
StatMetrics.MOUNTED_PARTITION_METRICS_COLLECTOR_EXECUTION_TIME,
StatExceptionCode.MOUNTED_PARTITION_METRICS_COLLECTOR_ERROR);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.commons.metrics.MetricsProcessor;
import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics;
import org.opensearch.performanceanalyzer.metrics_generator.OSMetricsGenerator;
import org.opensearch.performanceanalyzer.metrics_generator.TCPMetricsGenerator;

Expand All @@ -22,7 +24,11 @@ public class NetworkE2ECollector extends PerformanceAnalyzerMetricsCollector
MetricsConfiguration.CONFIG_MAP.get(NetworkE2ECollector.class).samplingInterval;

public NetworkE2ECollector() {
super(sTimeInterval, "NetworkE2ECollector");
super(
sTimeInterval,
"NetworkE2ECollector",
StatMetrics.NETWORK_E2E_COLLECTOR_EXECUTION_TIME,
StatExceptionCode.NETWORK_COLLECTION_ERROR);
}

@Override
Expand Down
Loading

0 comments on commit fde2f23

Please sign in to comment.