Skip to content

Commit

Permalink
Refactoring Service/Stat Metrics (#14)
Browse files Browse the repository at this point in the history
Signed-off-by: Khushboo Rajput <khushbr@amazon.com>
  • Loading branch information
khushbr authored May 25, 2023
1 parent 73bfeb7 commit b61e646
Show file tree
Hide file tree
Showing 40 changed files with 782 additions and 494 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.commons.stats.CommonStats;
import org.opensearch.performanceanalyzer.commons.stats.measurements.MeasurementSet;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.util.Util;

public abstract class PerformanceAnalyzerMetricsCollector implements Runnable {
Expand All @@ -30,14 +33,24 @@ public enum State {
LogManager.getLogger(PerformanceAnalyzerMetricsCollector.class);
private int timeInterval;
private long startTime;

private String collectorName;
private MeasurementSet statLatencyMetric;
private StatExceptionCode errorMetric;
protected StringBuilder value;

protected State state;
private boolean threadContentionMonitoringEnabled;

protected PerformanceAnalyzerMetricsCollector(int timeInterval, String collectorName) {
protected PerformanceAnalyzerMetricsCollector(
int timeInterval,
String collectorName,
MeasurementSet statLatencyMetric,
StatExceptionCode errorMetric) {
this.timeInterval = timeInterval;
this.collectorName = collectorName;
this.statLatencyMetric = statLatencyMetric;
this.errorMetric = errorMetric;
this.value = new StringBuilder();
this.state = State.HEALTHY;
}
Expand Down Expand Up @@ -65,17 +78,16 @@ public void setStartTime(long startTime) {

public void run() {
try {
long mCurrT = System.currentTimeMillis();
Util.invokePrivileged(() -> collectMetrics(startTime));
CommonStats.WRITER_METRICS_AGGREGATOR.updateStat(
statLatencyMetric, System.currentTimeMillis() - mCurrT);
} catch (Exception ex) {
// - should not be any...but in case, absorbing here
// - logging...we shouldn't be doing as it will slow down; as well as fill up the log.
// Need to
// find a way to catch these
LOG.error(
"Error In Collect Metrics: {} with ExceptionCode: {}",
() -> ex.toString(),
() -> StatExceptionCode.OTHER_COLLECTION_ERROR.toString());
StatsCollector.instance().logException(StatExceptionCode.OTHER_COLLECTION_ERROR);
"[{}] Error In Collect Metrics: {}",
() -> getCollectorName(),
() -> ex.toString());
StatsCollector.instance().logException(errorMetric);
} finally {
bInProgress.set(false);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -25,28 +26,33 @@
import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.commons.rca.Version;
import org.opensearch.performanceanalyzer.commons.stats.CommonStats;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.stats.metrics.WriterMetrics;

public class StatsCollector extends PerformanceAnalyzerMetricsCollector {
private static final Logger STATS_LOGGER = LogManager.getLogger("stats_log");
private static final Logger GENERAL_LOG = LogManager.getLogger(StatsCollector.class);
public static final String COLLECTOR_NAME = "StatsCollector";
public static String STATS_TYPE = "plugin-stats-metadata";

private static final String LOG_ENTRY_INIT =
"------------------------------------------------------------------------";
private static final String LOG_ENTRY_END = "EOE";
private static final String LOG_LINE_BREAK = "\n";
private static final double MILLISECONDS_TO_SECONDS_DIVISOR = 1000D;

private static final Logger STATS_LOGGER = LogManager.getLogger("stats_log");
private static final Logger GENERAL_LOG = LogManager.getLogger(StatsCollector.class);
private static StatsCollector statsCollector = null;
public static String STATS_TYPE = "plugin-stats-metadata";

private final Map<String, String> metadata;
private Map<String, AtomicInteger> counters = new ConcurrentHashMap<>();
private final List<StatExceptionCode> defaultExceptionCodes = new Vector<>();
private Date objectCreationTime = new Date();

private List<StatExceptionCode> defaultExceptionCodes = new Vector<>();

public StatsCollector(String name, int samplingIntervalMillis, Map<String, String> metadata) {
super(samplingIntervalMillis, name);
super(
samplingIntervalMillis,
name,
WriterMetrics.STAT_COLLECTOR_EXECUTION_TIME,
StatExceptionCode.STATS_COLLECTOR_ERROR);
this.metadata = metadata;
addRcaVersionMetadata(this.metadata);
defaultExceptionCodes.add(StatExceptionCode.TOTAL_ERROR);
Expand Down Expand Up @@ -84,6 +90,11 @@ public void collectMetrics(long startTime) {
currentCounters.putIfAbsent(statExceptionCode.toString(), new AtomicInteger(0));
}

/**
* Each run StatsCollector collectMetric(scheduled every 60s) emits 2 entries. The first
* entry via {@link writeStats} writes counters and metrics, and the second {@link
* collectAndWriteRcaStats} writes timers and metrics.
*/
writeStats(
metadata,
currentCounters,
Expand All @@ -104,7 +115,7 @@ private void collectAndWriteRcaStats() {
formatter.getAllMetrics()) {
if (!statsReturn.isEmpty()) {
logStatsRecord(
statsReturn.getCounters(),
counters = null,
statsReturn.getStatsdata(),
statsReturn.getLatencies(),
statsReturn.getStartTimeMillis(),
Expand All @@ -126,11 +137,11 @@ public void logException(StatExceptionCode statExceptionCode) {

public void logStatsRecord(
Map<String, AtomicInteger> counters,
Map<String, String> statsdata,
Map<String, String> statsData,
Map<String, Double> latencies,
long startTimeMillis,
long endTimeMillis) {
writeStats(metadata, counters, statsdata, latencies, startTimeMillis, endTimeMillis);
writeStats(metadata, counters, statsData, latencies, startTimeMillis, endTimeMillis);
}

private void addRcaVersionMetadata(Map<String, String> metadata) {
Expand All @@ -146,7 +157,6 @@ private static Map<String, String> loadMetadata(String fileLocation) {
try (InputStream input =
new FileInputStream(
PluginSettings.instance().getConfigFolderPath() + fileLocation); ) {
// load properties file
props.load(input);
} catch (Exception ex) {
GENERAL_LOG.error(
Expand Down Expand Up @@ -185,27 +195,22 @@ private static void writeStats(
Map<String, Double> latencies,
long startTimeMillis,
long endTimeMillis) {

StringBuilder builder = new StringBuilder();
builder.append(LOG_ENTRY_INIT + LOG_LINE_BREAK);
logValues(metadata, builder);
// Stats Data
logValues(statsdata, builder);
logTimeMetrics(startTimeMillis, endTimeMillis, builder);

Map<String, Double> tmpLatencies;

if (latencies == null) {
tmpLatencies = new ConcurrentHashMap<>();
} else {
tmpLatencies = new ConcurrentHashMap<>(latencies);
}

tmpLatencies.put("total-time", (double) endTimeMillis - startTimeMillis);
addEntry("Timing", getLatencyMetrics(tmpLatencies), builder);

// Timers and Counters
Optional.ofNullable(latencies)
.ifPresent(
e -> latencies.put("total-time", (double) endTimeMillis - startTimeMillis));
addEntry("Timing", getLatencyMetrics(latencies), builder);
addEntry("Counters", getCountersString(counters), builder);
builder.append(LOG_ENTRY_END); // + LOG_LINE_BREAK);
/* Setting this log level to debug (and not info) to avoid logging noisy plugin stats
logs to the console and opensearch.log file */

builder.append(LOG_ENTRY_END);
STATS_LOGGER.debug(builder.toString());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.commons.metrics.ExceptionsAndErrors;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.commons.metrics.WriterMetrics;
import org.opensearch.performanceanalyzer.commons.stats.CommonStats;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.stats.metrics.WriterMetrics;
import org.opensearch.performanceanalyzer.commons.util.Util;

public class EventLogFileHandler {
Expand Down Expand Up @@ -109,8 +110,7 @@ public void renameFromTmpWithPrivilege(long epoch) {
LOG.error("Error moving file {} to {}.", tmpPath.toString(), path.toString(), e);
}
} else {
CommonStats.ERRORS_AND_EXCEPTIONS_AGGREGATOR.updateStat(
ExceptionsAndErrors.WRITER_FILE_CREATION_SKIPPED, "", 1);
StatsCollector.instance().logException(StatExceptionCode.WRITER_FILE_CREATION_SKIPPED);
}
}

Expand Down Expand Up @@ -190,9 +190,9 @@ public void deleteFiles(List<String> filesToDelete) {
}
long duration = System.currentTimeMillis() - startTime;
CommonStats.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.EVENT_LOG_FILES_DELETION_TIME, "", duration);
WriterMetrics.EVENT_LOG_FILES_DELETION_TIME, duration);
CommonStats.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.EVENT_LOG_FILES_DELETED, "", filesDeletedCount);
WriterMetrics.EVENT_LOG_FILES_DELETED, filesDeletedCount);
LOG.debug("'{}' Old writer files cleaned up.", filesDeletedCount);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,53 @@

package org.opensearch.performanceanalyzer.commons.formatter;

import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatsType.LATENCIES;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.opensearch.performanceanalyzer.commons.metrics.MeasurementSet;
import org.opensearch.performanceanalyzer.commons.stats.Statistics;
import java.util.Objects;
import org.opensearch.performanceanalyzer.commons.stats.eval.Statistics;
import org.opensearch.performanceanalyzer.commons.stats.format.Formatter;
import org.opensearch.performanceanalyzer.commons.stats.measurements.MeasurementSet;

public class StatsCollectorFormatter implements Formatter {
StringBuilder formatted;
String sep = "";
private StringBuilder metricsBuilder;
private Map<String, Double> latencyMap = new HashMap<>();

private String sep = "";
long startTime;
long endTime;

public StatsCollectorFormatter() {
formatted = new StringBuilder();
metricsBuilder = new StringBuilder();
latencyMap.clear();
}

private void format(
MeasurementSet measurementSet, Statistics aggregationType, String name, Number value) {
formatted.append(sep);
formatted.append(measurementSet.getName()).append("=").append(value);
if (Objects.equals(measurementSet.getStatsType(), LATENCIES)) {
latencyMap.put(measurementSet.getName(), value.doubleValue());
} else {
formatStat(metricsBuilder, measurementSet, aggregationType, name, value);
}
}

private void formatStat(
StringBuilder metricsBuilder,
MeasurementSet measurementSet,
Statistics aggregationType,
String name,
Number value) {
metricsBuilder.append(sep);
metricsBuilder.append(measurementSet.getName()).append("=").append(value);
if (!measurementSet.getUnit().isEmpty()) {
formatted.append(" ").append(measurementSet.getUnit());
metricsBuilder.append(" ").append(measurementSet.getUnit());
}
formatted.append(" ").append("aggr|").append(aggregationType);
metricsBuilder.append(" ").append("aggr|").append(aggregationType);
if (!name.isEmpty()) {
formatted.append(" ").append("key|").append(name);
metricsBuilder.append(" ").append("key|").append(name);
}
sep = ",";
}
Expand All @@ -60,31 +78,26 @@ public List<StatsCollectorReturn> getAllMetrics() {
List<StatsCollectorReturn> list = new ArrayList<>();
StatsCollectorReturn statsCollectorReturn =
new StatsCollectorReturn(this.startTime, this.endTime);
statsCollectorReturn.statsdata.put("Metrics", formatted.toString());
list.add(statsCollectorReturn);
statsCollectorReturn.statsdata.put("Metrics", metricsBuilder.toString());
statsCollectorReturn.latencies = new HashMap<>(latencyMap);

list.add(statsCollectorReturn);
return list;
}

public static class StatsCollectorReturn {
private Map<String, AtomicInteger> counters;
private Map<String, String> statsdata;
private Map<String, Double> latencies;
private long startTimeMillis;
private long endTimeMillis;

public StatsCollectorReturn(long startTimeMillis, long endTimeMillis) {
counters = new HashMap<>();
statsdata = new HashMap<>();
latencies = new HashMap<>();
this.startTimeMillis = startTimeMillis;
this.endTimeMillis = endTimeMillis;
}

public Map<String, AtomicInteger> getCounters() {
return counters;
}

public Map<String, String> getStatsdata() {
return statsdata;
}
Expand All @@ -102,7 +115,7 @@ public long getEndTimeMillis() {
}

public boolean isEmpty() {
return counters.isEmpty() && statsdata.isEmpty() && latencies.isEmpty();
return statsdata.isEmpty() && latencies.isEmpty();
}
}
}
Loading

0 comments on commit b61e646

Please sign in to comment.