Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring Service/Stat Metrics #14

Merged
merged 1 commit into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.commons.stats.CommonStats;
import org.opensearch.performanceanalyzer.commons.stats.measurements.MeasurementSet;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.util.Util;

public abstract class PerformanceAnalyzerMetricsCollector implements Runnable {
Expand All @@ -30,14 +33,24 @@ public enum State {
LogManager.getLogger(PerformanceAnalyzerMetricsCollector.class);
private int timeInterval;
private long startTime;

private String collectorName;
private MeasurementSet statLatencyMetric;
private StatExceptionCode errorMetric;
protected StringBuilder value;

protected State state;
private boolean threadContentionMonitoringEnabled;

protected PerformanceAnalyzerMetricsCollector(int timeInterval, String collectorName) {
protected PerformanceAnalyzerMetricsCollector(
int timeInterval,
String collectorName,
MeasurementSet statLatencyMetric,
StatExceptionCode errorMetric) {
this.timeInterval = timeInterval;
this.collectorName = collectorName;
this.statLatencyMetric = statLatencyMetric;
this.errorMetric = errorMetric;
this.value = new StringBuilder();
this.state = State.HEALTHY;
}
Expand Down Expand Up @@ -65,17 +78,16 @@ public void setStartTime(long startTime) {

public void run() {
try {
long mCurrT = System.currentTimeMillis();
Util.invokePrivileged(() -> collectMetrics(startTime));
CommonStats.WRITER_METRICS_AGGREGATOR.updateStat(
statLatencyMetric, System.currentTimeMillis() - mCurrT);
} catch (Exception ex) {
// - should not be any...but in case, absorbing here
// - logging...we shouldn't be doing as it will slow down; as well as fill up the log.
// Need to
// find a way to catch these
LOG.error(
"Error In Collect Metrics: {} with ExceptionCode: {}",
() -> ex.toString(),
() -> StatExceptionCode.OTHER_COLLECTION_ERROR.toString());
StatsCollector.instance().logException(StatExceptionCode.OTHER_COLLECTION_ERROR);
"[{}] Error In Collect Metrics: {}",
() -> getCollectorName(),
() -> ex.toString());
StatsCollector.instance().logException(errorMetric);
} finally {
bInProgress.set(false);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -25,28 +26,33 @@
import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.commons.rca.Version;
import org.opensearch.performanceanalyzer.commons.stats.CommonStats;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.stats.metrics.WriterMetrics;

public class StatsCollector extends PerformanceAnalyzerMetricsCollector {
private static final Logger STATS_LOGGER = LogManager.getLogger("stats_log");
private static final Logger GENERAL_LOG = LogManager.getLogger(StatsCollector.class);
public static final String COLLECTOR_NAME = "StatsCollector";
public static String STATS_TYPE = "plugin-stats-metadata";

private static final String LOG_ENTRY_INIT =
"------------------------------------------------------------------------";
private static final String LOG_ENTRY_END = "EOE";
private static final String LOG_LINE_BREAK = "\n";
private static final double MILLISECONDS_TO_SECONDS_DIVISOR = 1000D;

private static final Logger STATS_LOGGER = LogManager.getLogger("stats_log");
private static final Logger GENERAL_LOG = LogManager.getLogger(StatsCollector.class);
private static StatsCollector statsCollector = null;
public static String STATS_TYPE = "plugin-stats-metadata";

private final Map<String, String> metadata;
private Map<String, AtomicInteger> counters = new ConcurrentHashMap<>();
private final List<StatExceptionCode> defaultExceptionCodes = new Vector<>();
private Date objectCreationTime = new Date();

private List<StatExceptionCode> defaultExceptionCodes = new Vector<>();

public StatsCollector(String name, int samplingIntervalMillis, Map<String, String> metadata) {
super(samplingIntervalMillis, name);
super(
samplingIntervalMillis,
name,
WriterMetrics.STAT_COLLECTOR_EXECUTION_TIME,
StatExceptionCode.STATS_COLLECTOR_ERROR);
this.metadata = metadata;
addRcaVersionMetadata(this.metadata);
defaultExceptionCodes.add(StatExceptionCode.TOTAL_ERROR);
Expand Down Expand Up @@ -84,6 +90,11 @@ public void collectMetrics(long startTime) {
currentCounters.putIfAbsent(statExceptionCode.toString(), new AtomicInteger(0));
}

/**
* Each run StatsCollector collectMetric(scheduled every 60s) emits 2 entries. The first
* entry via {@link writeStats} writes counters and metrics, and the second {@link
* collectAndWriteRcaStats} writes timers and metrics.
*/
writeStats(
metadata,
currentCounters,
Expand All @@ -104,7 +115,7 @@ private void collectAndWriteRcaStats() {
formatter.getAllMetrics()) {
if (!statsReturn.isEmpty()) {
logStatsRecord(
statsReturn.getCounters(),
counters = null,
statsReturn.getStatsdata(),
statsReturn.getLatencies(),
statsReturn.getStartTimeMillis(),
Expand All @@ -126,11 +137,11 @@ public void logException(StatExceptionCode statExceptionCode) {

public void logStatsRecord(
Map<String, AtomicInteger> counters,
Map<String, String> statsdata,
Map<String, String> statsData,
Map<String, Double> latencies,
long startTimeMillis,
long endTimeMillis) {
writeStats(metadata, counters, statsdata, latencies, startTimeMillis, endTimeMillis);
writeStats(metadata, counters, statsData, latencies, startTimeMillis, endTimeMillis);
}

private void addRcaVersionMetadata(Map<String, String> metadata) {
Expand All @@ -146,7 +157,6 @@ private static Map<String, String> loadMetadata(String fileLocation) {
try (InputStream input =
new FileInputStream(
PluginSettings.instance().getConfigFolderPath() + fileLocation); ) {
// load properties file
props.load(input);
} catch (Exception ex) {
GENERAL_LOG.error(
Expand Down Expand Up @@ -185,27 +195,22 @@ private static void writeStats(
Map<String, Double> latencies,
long startTimeMillis,
long endTimeMillis) {

StringBuilder builder = new StringBuilder();
builder.append(LOG_ENTRY_INIT + LOG_LINE_BREAK);
logValues(metadata, builder);
// Stats Data
logValues(statsdata, builder);
logTimeMetrics(startTimeMillis, endTimeMillis, builder);

Map<String, Double> tmpLatencies;

if (latencies == null) {
tmpLatencies = new ConcurrentHashMap<>();
} else {
tmpLatencies = new ConcurrentHashMap<>(latencies);
}

tmpLatencies.put("total-time", (double) endTimeMillis - startTimeMillis);
addEntry("Timing", getLatencyMetrics(tmpLatencies), builder);

// Timers and Counters
Optional.ofNullable(latencies)
.ifPresent(
e -> latencies.put("total-time", (double) endTimeMillis - startTimeMillis));
addEntry("Timing", getLatencyMetrics(latencies), builder);
addEntry("Counters", getCountersString(counters), builder);
builder.append(LOG_ENTRY_END); // + LOG_LINE_BREAK);
/* Setting this log level to debug (and not info) to avoid logging noisy plugin stats
logs to the console and opensearch.log file */

builder.append(LOG_ENTRY_END);
STATS_LOGGER.debug(builder.toString());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.commons.metrics.ExceptionsAndErrors;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.commons.metrics.WriterMetrics;
import org.opensearch.performanceanalyzer.commons.stats.CommonStats;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.stats.metrics.WriterMetrics;
import org.opensearch.performanceanalyzer.commons.util.Util;

public class EventLogFileHandler {
Expand Down Expand Up @@ -109,8 +110,7 @@ public void renameFromTmpWithPrivilege(long epoch) {
LOG.error("Error moving file {} to {}.", tmpPath.toString(), path.toString(), e);
}
} else {
CommonStats.ERRORS_AND_EXCEPTIONS_AGGREGATOR.updateStat(
ExceptionsAndErrors.WRITER_FILE_CREATION_SKIPPED, "", 1);
StatsCollector.instance().logException(StatExceptionCode.WRITER_FILE_CREATION_SKIPPED);
}
}

Expand Down Expand Up @@ -190,9 +190,9 @@ public void deleteFiles(List<String> filesToDelete) {
}
long duration = System.currentTimeMillis() - startTime;
CommonStats.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.EVENT_LOG_FILES_DELETION_TIME, "", duration);
WriterMetrics.EVENT_LOG_FILES_DELETION_TIME, duration);
CommonStats.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.EVENT_LOG_FILES_DELETED, "", filesDeletedCount);
WriterMetrics.EVENT_LOG_FILES_DELETED, filesDeletedCount);
LOG.debug("'{}' Old writer files cleaned up.", filesDeletedCount);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,53 @@

package org.opensearch.performanceanalyzer.commons.formatter;

import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatsType.LATENCIES;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.opensearch.performanceanalyzer.commons.metrics.MeasurementSet;
import org.opensearch.performanceanalyzer.commons.stats.Statistics;
import java.util.Objects;
import org.opensearch.performanceanalyzer.commons.stats.eval.Statistics;
import org.opensearch.performanceanalyzer.commons.stats.format.Formatter;
import org.opensearch.performanceanalyzer.commons.stats.measurements.MeasurementSet;

public class StatsCollectorFormatter implements Formatter {
StringBuilder formatted;
String sep = "";
private StringBuilder metricsBuilder;
private Map<String, Double> latencyMap = new HashMap<>();

private String sep = "";
long startTime;
long endTime;

public StatsCollectorFormatter() {
formatted = new StringBuilder();
metricsBuilder = new StringBuilder();
latencyMap.clear();
}

private void format(
MeasurementSet measurementSet, Statistics aggregationType, String name, Number value) {
formatted.append(sep);
formatted.append(measurementSet.getName()).append("=").append(value);
if (Objects.equals(measurementSet.getStatsType(), LATENCIES)) {
latencyMap.put(measurementSet.getName(), value.doubleValue());
} else {
formatStat(metricsBuilder, measurementSet, aggregationType, name, value);
}
}

private void formatStat(
StringBuilder metricsBuilder,
MeasurementSet measurementSet,
Statistics aggregationType,
String name,
Number value) {
metricsBuilder.append(sep);
metricsBuilder.append(measurementSet.getName()).append("=").append(value);
if (!measurementSet.getUnit().isEmpty()) {
formatted.append(" ").append(measurementSet.getUnit());
metricsBuilder.append(" ").append(measurementSet.getUnit());
}
formatted.append(" ").append("aggr|").append(aggregationType);
metricsBuilder.append(" ").append("aggr|").append(aggregationType);
if (!name.isEmpty()) {
formatted.append(" ").append("key|").append(name);
metricsBuilder.append(" ").append("key|").append(name);
}
sep = ",";
}
Expand All @@ -60,31 +78,26 @@ public List<StatsCollectorReturn> getAllMetrics() {
List<StatsCollectorReturn> list = new ArrayList<>();
StatsCollectorReturn statsCollectorReturn =
new StatsCollectorReturn(this.startTime, this.endTime);
statsCollectorReturn.statsdata.put("Metrics", formatted.toString());
list.add(statsCollectorReturn);
statsCollectorReturn.statsdata.put("Metrics", metricsBuilder.toString());
statsCollectorReturn.latencies = new HashMap<>(latencyMap);

list.add(statsCollectorReturn);
return list;
}

public static class StatsCollectorReturn {
private Map<String, AtomicInteger> counters;
private Map<String, String> statsdata;
private Map<String, Double> latencies;
private long startTimeMillis;
private long endTimeMillis;

public StatsCollectorReturn(long startTimeMillis, long endTimeMillis) {
counters = new HashMap<>();
statsdata = new HashMap<>();
latencies = new HashMap<>();
this.startTimeMillis = startTimeMillis;
this.endTimeMillis = endTimeMillis;
}

public Map<String, AtomicInteger> getCounters() {
return counters;
}

public Map<String, String> getStatsdata() {
return statsdata;
}
Expand All @@ -102,7 +115,7 @@ public long getEndTimeMillis() {
}

public boolean isEmpty() {
return counters.isEmpty() && statsdata.isEmpty() && latencies.isEmpty();
return statsdata.isEmpty() && latencies.isEmpty();
}
}
}
Loading