Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Graphene can search metrics by tags #38

Merged
merged 15 commits into from
Jan 12, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
public class TimeSeries {

private String name;
private String pathExpression;
private Long from;
private Long to;
private int step;
private Double[] values;
private Map<TimeSeriesOption, Object> options = new HashMap<>();
private Map<String, String> tags = new HashMap<>();

public TimeSeries(String name, Long from, Long to, int step) {
this(name, from, to, step, new Double[0]);
Expand All @@ -26,6 +28,15 @@ public TimeSeries(String name, Long from, Long to, int step, Double[] values) {
this.values = values;
}

public TimeSeries(String name, String pathExpression, Long from, Long to, int step, Double[] values) {
this.name = name;
this.pathExpression = pathExpression;
this.from = from;
this.to = to;
this.step = step;
this.values = values;
}

public String getName() {
return name;
}
Expand All @@ -34,6 +45,14 @@ public void setName(String name) {
this.name = name;
}

public String getPathExpression() {
return pathExpression;
}

public void setPathExpression(String pathExpression) {
this.pathExpression = pathExpression;
}

public Long getFrom() {
return from;
}
Expand Down Expand Up @@ -82,6 +101,14 @@ public boolean hasOption(TimeSeriesOption option) {
return options.containsKey(option);
}

public Map<String, String> getTags() {
return tags;
}

public void setTags(Map<String, String> tags) {
this.tags = tags;
}

@Override
public String toString() {
return "TimeSeries{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ public class EvaluationException extends Exception{
public EvaluationException() {
}

public EvaluationException(String message) {
super(message);
}

public EvaluationException(Throwable cause) {
super(cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ public static Double percentile(Collection<Double> values, double percentile, bo
return result;
}

public static Double div(List<Double> values) {
if (values.size() != 2) {
return null;
}
Double dividend = values.get(0);
Double divisor = values.get(1);
if (null == dividend || null == divisor || divisor == 0L) {
return null;
}
return dividend / divisor;
}

public static Double last(Collection<Double> values) {
List<Double> filteredValues = filterNulls(values);

Expand Down
113 changes: 79 additions & 34 deletions graphene-common/src/main/java/com/graphene/reader/utils/Grouper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.graphene.reader.beans.TimeSeries;
import com.graphene.reader.exceptions.EvaluationException;
import com.graphene.reader.utils.CollectionUtils;

import java.util.ArrayList;
Expand All @@ -19,10 +22,13 @@ interface AggregationMethod {

private static ImmutableMap<String, AggregationMethod> aggregationMap;
static {
AggregationMethod avg = new AggregationMethod() { public Double apply(List<Double> points) { return CollectionUtils.average(points);}};
AggregationMethod sum = new AggregationMethod() { public Double apply(List<Double> points) { return CollectionUtils.sum(points);}};
AggregationMethod min = new AggregationMethod() { public Double apply(List<Double> points) { return CollectionUtils.min(points);}};
AggregationMethod max = new AggregationMethod() { public Double apply(List<Double> points) { return CollectionUtils.max(points);}};
AggregationMethod avg = CollectionUtils::average;
AggregationMethod sum = CollectionUtils::sum;
AggregationMethod min = CollectionUtils::min;
AggregationMethod max = CollectionUtils::max;
AggregationMethod median = CollectionUtils::median;
AggregationMethod div = CollectionUtils::div;
AggregationMethod count = points -> (double) points.size();

aggregationMap = ImmutableMap.<String, AggregationMethod>builder()
.put("sum", sum)
Expand All @@ -33,50 +39,58 @@ interface AggregationMethod {
.put("minSeries", min)
.put("max", max)
.put("maxSeries", max)
.put("med", median)
.put("median", median)
.put("div", div)
.put("divide", div)
.put("count", count)
.build();
}

private List<TimeSeries> timeSeries;
private String aggregator;
private Long from;
private Long to;

public Grouper(List<TimeSeries> ts, String aggregatorName, Long from, Long to) {
this.timeSeries = ts;
this.aggregator = aggregatorName;
this.from = from;
this.to = to;
}

public List<TimeSeries> byTagKeys(List<String> groupTagKeys) throws EvaluationException {
Map<String, List<TimeSeries>> buckets = Maps.newHashMap();

for (TimeSeries ts : timeSeries) {
List<String> bucketNameParts = Lists.newArrayList();
for (String groupTagKey : groupTagKeys) {
for (Map.Entry<String, String> entry : ts.getTags().entrySet()) {
if (entry.getKey().equalsIgnoreCase(groupTagKey)) {
bucketNameParts.add(entry.getValue());
}
}
}
String bucketName = Joiner.on(".").join(bucketNameParts);
if (buckets.containsKey(bucketName)) {
buckets.get(bucketName).add(ts);
} else {
buckets.put(bucketName, Lists.newArrayList(ts));
}
}

public Grouper(List<TimeSeries> ts, String aggregatorName) {
timeSeries = ts;
aggregator = aggregatorName;
return reduceFromBucket(buckets);
}

public List<TimeSeries> byNodesIndex(int[] indexes) {
public List<TimeSeries> byNodesIndex(int[] indexes) throws EvaluationException {
Map<String, List<TimeSeries>> buckets = new HashMap<>();

for (TimeSeries ts : timeSeries) {
String bucketName = getBucketName(ts.getName(), indexes);
if (!buckets.containsKey(bucketName)) buckets.put(bucketName, new ArrayList<TimeSeries>());
if (!buckets.containsKey(bucketName)) buckets.put(bucketName, new ArrayList<>());
buckets.get(bucketName).add(ts);
}

long from = timeSeries.get(0).getFrom();
long to = timeSeries.get(0).getTo();
int step = timeSeries.get(0).getStep();
int length = timeSeries.get(0).getValues().length;

List<TimeSeries> resultTimeSeries = new ArrayList<>();

for (Map.Entry<String, List<TimeSeries>> bucket : buckets.entrySet()) {
TimeSeries timeSeries = new TimeSeries(bucket.getKey(), from, to, step);
Double[] values = new Double[length];

for (int i = 0; i < length; i++) {
List<Double> points = new ArrayList<>();
for (TimeSeries ts : bucket.getValue()) {
points.add(ts.getValues()[i]);
}
values[i] = aggregationMap.get(aggregator).apply(points);
}
timeSeries.setValues(values);
timeSeries.setName(bucket.getKey());
resultTimeSeries.add(timeSeries);
}

return resultTimeSeries;
return reduceFromBucket(buckets);
}

public static boolean hasAggregationMethod(String name) {
Expand All @@ -95,4 +109,35 @@ private String getBucketName(String name, int[] positions) {
}
return Joiner.on(".").skipNulls().join(parts);
}

private List<TimeSeries> reduceFromBucket(Map<String, List<TimeSeries>> buckets) throws EvaluationException {
long from = this.from;
long to = this.to;
int step = timeSeries.get(0).getStep();
int length = timeSeries.get(0).getValues().length;

List<TimeSeries> resultTimeSeries = new ArrayList<>();
try {
for (Map.Entry<String, List<TimeSeries>> bucket : buckets.entrySet()) {
TimeSeries timeSeries = new TimeSeries(bucket.getKey(), from, to, step);
Double[] values = new Double[length];

for (int i = 0; i < length; i++) {
List<Double> points = new ArrayList<>();
for (TimeSeries ts : bucket.getValue()) {
points.add(ts.getValues()[i]);
}
values[i] = aggregationMap.get(aggregator).apply(points);
}
timeSeries.setValues(values);
timeSeries.setName(bucket.getKey());
timeSeries.setPathExpression(bucket.getKey());
timeSeries.setTags(bucket.getValue().get(0).getTags());
resultTimeSeries.add(timeSeries);
}
return resultTimeSeries;
} catch (Exception e) {
throw new EvaluationException("Cannot reduce series with aggregator = " + aggregator);
}
}
}
30 changes: 30 additions & 0 deletions graphene-common/src/main/kotlin/com/graphene/common/beans/Path.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.graphene.common.beans

import com.google.common.collect.Maps

class Path(var path: String) {
private var tags: MutableMap<String, String> = Maps.newHashMap()

fun setTags(tags: MutableMap<String, String>) {
this.tags = tags
}

fun getTags(): Map<String, String> {
return tags
}

fun addTag(tagKey: String, tagValue: String) {
tags[tagKey] = tagValue
}

override fun equals(obj: Any?): Boolean {
if (obj !is Path) {
return false
}
return path == obj.path
}

override fun hashCode(): Int {
return path.hashCode()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.collect.Lists;
import com.google.common.collect.ObjectArrays;
import com.graphene.common.beans.Path;
import com.graphene.reader.beans.TimeSeries;
import com.graphene.reader.config.Rollup;
import com.graphene.reader.exceptions.EvaluationException;
Expand All @@ -20,7 +21,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;

/** @author Andrei Ivanov */
Expand All @@ -39,14 +39,32 @@ public List<TimeSeries> eval(Target target) throws EvaluationException {
return target.evaluate(this);
}

public List<TimeSeries> evalByTags(String tenant, List<String> tagExpressions, Long from, Long to) throws EvaluationException {
try {
List<Path> paths =
(List<Path>) keySearchHandler.getPathsByTags(tenant, tagExpressions, from, to);

return metricService.getMetricsAsList(
tenant,
paths,
from,
to
);
} catch (ExecutionException | InterruptedException | TooMuchDataExpectedException e) {
logger.error(e.getMessage());
logger.debug(e);
throw new EvaluationException(e);
}
}

public List<List<TimeSeries>> evalByGroup(Target target) throws EvaluationException {
return target.evalByGroup(this);
}

public List<TimeSeries> visit(PathTarget pathTarget) throws EvaluationException {
try {
Set<String> paths =
keySearchHandler.getPaths(pathTarget.getTenant(), Lists.newArrayList(pathTarget.getPath()), pathTarget.getFrom(), pathTarget.getTo());
List<Path> paths =
(List<Path>) keySearchHandler.getPaths(pathTarget.getTenant(), Lists.newArrayList(pathTarget.getPath()), pathTarget.getFrom(), pathTarget.getTo());

logger.debug("resolved paths : " + paths);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public Target visitExpressionPathExpression(GraphiteParser.ExpressionPathExpress
public Target visitExpressionCall(GraphiteParser.ExpressionCallContext ctx) {
GraphiteParser.CallContext call = ctx.call();
try {
GrapheneFunction function = FunctionRegistry.getFunction(context, call.FunctionName().getText(), from, to);
GrapheneFunction function = FunctionRegistry.getFunction(context, call.FunctionName().getText(), tenant, from, to);
function.setText(ctx.getText());

for(GraphiteParser.ArgContext arg : call.args().arg()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public abstract class GrapheneFunction extends Target {
protected String name;
protected Long from;
protected Long to;
protected String tenant;

public GrapheneFunction(String text, String name) {
super(text);
Expand Down Expand Up @@ -66,14 +67,22 @@ public void setTo(Long to) {
this.to = to;
}

public String getTenant() {
return tenant;
}

public void setTenant(String tenant) {
this.tenant = tenant;
}

protected String getResultingName(TimeSeries timeSeries) {
return name + "(" + timeSeries.getName() + ")";
}

@Override
public Target shiftBy(long shift) {
try {
GrapheneFunction function = FunctionRegistry.getFunction(getContext(), name, from + shift, to + shift);
GrapheneFunction function = FunctionRegistry.getFunction(getContext(), name, tenant, from + shift, to + shift);

for (Object argument : arguments) {
if (argument instanceof Target) {
Expand All @@ -95,7 +104,7 @@ public Target shiftBy(long shift) {
@Override
public Target previous(long period) {
try {
GrapheneFunction function = FunctionRegistry.getFunction(getContext(), name, from - period , from - 1);
GrapheneFunction function = FunctionRegistry.getFunction(getContext(), name, tenant,from - period , from - 1);
Dark0096 marked this conversation as resolved.
Show resolved Hide resolved

for (Object argument : arguments) {
if (argument instanceof Target) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public List<TimeSeries> evaluate(TargetEvaluator evaluator) throws EvaluationExc

int[] positions = {((Double) arguments.get(1)).intValue()};

return new Grouper(processedArguments, callbackName).byNodesIndex(positions);
return new Grouper(processedArguments, callbackName, from, to).byNodesIndex(positions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public List<TimeSeries> evaluate(TargetEvaluator evaluator) throws EvaluationExc
positions[i - 2] = ((Double) arguments.get(i)).intValue();
}

return new Grouper(processedArguments, callbackName).byNodesIndex(positions);
return new Grouper(processedArguments, callbackName, from, to).byNodesIndex(positions);
}

@Override
Expand Down
Loading