Skip to content
This repository has been archived by the owner on Mar 27, 2021. It is now read-only.

Improve tracing and logging for write errors. #740

Merged
merged 2 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ public class PubSubConsumer implements Consumer, LifeCycles {

@Inject
PubSubConsumer(
AsyncFramework async, Managed<Connection> connection,
@Named("consuming") AtomicInteger consuming, @Named("total") AtomicInteger total,
@Named("errors") AtomicLong errors, @Named("consumed") LongAdder consumed
AsyncFramework async,
Managed<Connection> connection,
@Named("consuming") AtomicInteger consuming,
@Named("total") AtomicInteger total,
@Named("errors") AtomicLong errors,
@Named("consumed") LongAdder consumed
) {
this.async = async;
this.connection = connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

package com.spotify.heroic.consumer.pubsub;

import static io.opencensus.trace.AttributeValue.longAttributeValue;
import static io.opencensus.trace.AttributeValue.stringAttributeValue;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
Expand Down Expand Up @@ -71,6 +72,7 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer replyConsumer

Span span = tracer.spanBuilder("PubSub.receiveMessage").startSpan();
span.putAttribute("id", stringAttributeValue(messageId));
span.putAttribute("message.size", longAttributeValue(bytes.length));

final FutureReporter.Context consumptionContext = reporter.reportConsumption();

Expand All @@ -79,24 +81,23 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer replyConsumer
consumer.consume(bytes).onDone(consumptionContext).onFinished(() -> {
reporter.reportMessageSize(bytes.length);
replyConsumer.ack();
span.end();
});
} catch (ConsumerSchemaValidationException e) {
reporter.reportConsumerSchemaError();
log.error("ID:{} - {}", messageId, e.getMessage(), e);
span.setStatus(Status.INVALID_ARGUMENT.withDescription(e.toString()));

// The message will never be processable, ack it to make it go away
replyConsumer.ack();
span.end();
} catch (Exception e) {
errors.incrementAndGet();
log.error("ID:{} - Failed to consume", messageId, e);
span.setStatus(Status.INTERNAL.withDescription(e.toString()));
reporter.reportMessageError();
replyConsumer.nack();
span.end();
} finally {
consumed.increment();
span.end();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import static io.opencensus.trace.AttributeValue.stringAttributeValue;


import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeType;
Expand Down Expand Up @@ -51,8 +50,6 @@
import com.spotify.heroic.time.Clock;
import dagger.Component;
import eu.toolchain.async.AsyncFuture;
import io.opencensus.common.Scope;
import io.opencensus.trace.Span;
import io.opencensus.trace.Status;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
Expand Down Expand Up @@ -87,59 +84,59 @@ public Consumer(Clock clock, IngestionGroup ingestion, ConsumerReporter reporter
@Override
public AsyncFuture<Void> consume(final byte[] message) throws ConsumerSchemaException {
final JsonNode tree;
final Span span = tracer.spanBuilder("ConsumerSchema.consume").startSpan();
var scope = tracer.spanBuilder("ConsumerSchema.consume").startScopedSpan();
var span = tracer.getCurrentSpan();
span.putAttribute("schema", stringAttributeValue("Spotify100"));

try (Scope ws = tracer.withSpan(span)) {
try {
tree = mapper.readTree(message);
} catch (final Exception e) {
span.setStatus(Status.INVALID_ARGUMENT.withDescription(e.toString()));
span.end();
throw new ConsumerSchemaValidationException("Invalid metric", e);
}

if (tree.getNodeType() != JsonNodeType.OBJECT) {
span.setStatus(
Status.INVALID_ARGUMENT.withDescription("Metric is not an object"));
span.end();
throw new ConsumerSchemaValidationException(
"Expected object, but got: " + tree.getNodeType());
}

final ObjectNode object = (ObjectNode) tree;

final JsonNode versionNode = object.remove("version");

if (versionNode == null) {
span.setStatus(Status.INVALID_ARGUMENT.withDescription("Missing version"));
span.end();
throw new ConsumerSchemaValidationException(
"Missing version in received object");
}

final Version version;

try {
version = Version.parse(versionNode.asText());
} catch (final Exception e) {
span.setStatus(Status.INVALID_ARGUMENT.withDescription("Bad version"));
span.end();
throw new ConsumerSchemaValidationException("Bad version: " + versionNode);
}

int major = version.getMajor();

if (major == 1) {
return handleVersion1(tree).onFinished(span::end);
} else if (major == 2) {
return handleVersion2(tree).onFinished(span::end);
}

span.setStatus(Status.INVALID_ARGUMENT.withDescription("Unsupported version"));
span.end();
throw new ConsumerSchemaValidationException("Unsupported version: " + version);
try {
tree = mapper.readTree(message);
} catch (final Exception e) {
span.setStatus(Status.INVALID_ARGUMENT.withDescription(e.toString()));
scope.close();
throw new ConsumerSchemaValidationException("Invalid metric", e);
}

if (tree.getNodeType() != JsonNodeType.OBJECT) {
span.setStatus(
Status.INVALID_ARGUMENT.withDescription("Metric is not an object"));
scope.close();
throw new ConsumerSchemaValidationException(
"Expected object, but got: " + tree.getNodeType());
}

final ObjectNode object = (ObjectNode) tree;

final JsonNode versionNode = object.remove("version");

if (versionNode == null) {
span.setStatus(Status.INVALID_ARGUMENT.withDescription("Missing version"));
scope.close();
throw new ConsumerSchemaValidationException(
"Missing version in received object");
}

final Version version;

try {
version = Version.parse(versionNode.asText());
} catch (final Exception e) {
span.setStatus(Status.INVALID_ARGUMENT.withDescription("Bad version"));
scope.close();
throw new ConsumerSchemaValidationException("Bad version: " + versionNode);
}

span.putAttribute("metric.version", stringAttributeValue(version.toString()));
int major = version.getMajor();

if (major == 1) {
return handleVersion1(tree).onFinished(scope::close);
} else if (major == 2) {
return handleVersion2(tree).onFinished(scope::close);
}

span.setStatus(Status.INVALID_ARGUMENT.withDescription("Unsupported version"));
scope.close();
throw new ConsumerSchemaValidationException("Unsupported version: " + version);
}

private AsyncFuture<Void> handleVersion1(final JsonNode tree)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

package com.spotify.heroic.consumer.schemas;

import static io.opencensus.trace.AttributeValue.stringAttributeValue;

import com.google.common.collect.ImmutableList;
import com.spotify.heroic.common.Series;
import com.spotify.heroic.consumer.ConsumerSchema;
Expand All @@ -41,6 +43,10 @@
import dagger.Component;
import eu.toolchain.async.AsyncFramework;
import eu.toolchain.async.AsyncFuture;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Status;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -63,6 +69,7 @@ public static class Consumer implements ConsumerSchema.Consumer {
private final IngestionGroup ingestion;
private final ConsumerReporter reporter;
private final AsyncFramework async;
private static final Tracer tracer = Tracing.getTracer();

@Inject
public Consumer(
Expand All @@ -79,53 +86,64 @@ public Consumer(

@Override
public AsyncFuture<Void> consume(final byte[] message) throws ConsumerSchemaException {
var scope = tracer.spanBuilder("ConsumerSchema.consume").startScopedSpan();
var span = tracer.getCurrentSpan();
span.putAttribute("schema", stringAttributeValue("Spotify100Proto"));

final List<Spotify100.Metric> metrics;
try {
metrics = Spotify100.Batch.parseFrom(Snappy.uncompress(message)).getMetricList();
} catch (IOException e) {
span.setStatus(Status.INVALID_ARGUMENT.withDescription(e.getMessage()));
scope.close();
throw new ConsumerSchemaValidationException("Invalid batch of metrics", e);
}

final List<AsyncFuture<Ingestion>> ingestions = new ArrayList<>();
for (Spotify100.Metric metric : metrics) {

if (metric.getTime() <= 0) {
span.setStatus(Status.INVALID_ARGUMENT.withDescription("Negative time set"));
scope.close();
throw new ConsumerSchemaValidationException(
"time: field must be a positive number: " + metric.toString());
}
reporter.reportMessageDrift(clock.currentTimeMillis() - metric.getTime());

final Series s = Series.of(metric.getKey(), metric.getTagsMap(),
final Series series = Series.of(metric.getKey(), metric.getTagsMap(),
metric.getResourceMap());


Spotify100.Value distributionTypeValue = metric.getDistributionTypeValue();
Point point = null;
if (!metric.hasDistributionTypeValue()) {
point = new Point(metric.getTime(), metric.getValue());
} else if (distributionTypeValue.getValueCase()
.equals(Spotify100.Value.ValueCase.DOUBLE_VALUE)) {
.equals(Spotify100.Value.ValueCase.DOUBLE_VALUE)
) {
point = new Point(metric.getTime(), distributionTypeValue.getDoubleValue());
} else if (distributionTypeValue.getValueCase()
.equals(Spotify100.Value.ValueCase.DISTRIBUTION_VALUE)) {
.equals(Spotify100.Value.ValueCase.DISTRIBUTION_VALUE)
) {
Distribution distribution = HeroicDistribution.
create(distributionTypeValue.getDistributionValue());
DistributionPoint distributionPoint =
DistributionPoint.create(distribution, metric.getTime());
final List<DistributionPoint> distributionPoints =
ImmutableList.of(distributionPoint);
ingestions
.add(ingestion.write(new Request(s,
.add(ingestion.write(new Request(series,
MetricCollection.distributionPoints(distributionPoints))));
}
if (point != null) {
List<Point> points = ImmutableList.of(point);
ingestions
.add(ingestion.write(new Request(s, MetricCollection.points(points))));
.add(ingestion.write(new Request(series, MetricCollection.points(points))));
}

}
reporter.reportMetricsIn(metrics.size());
var metricsSize = metrics.size();
reporter.reportMetricsIn(metricsSize);
span.putAttribute("metrics", AttributeValue.longAttributeValue(metricsSize));
// Return Void future, to not leak unnecessary information from the backend but just
// allow monitoring of when the consumption is done.
return async.collectAndDiscard(ingestions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import java.util.function.Supplier;

public class CoreIngestionGroup implements IngestionGroup {
private final Tracer tracer = Tracing.getTracer();
private static final Tracer tracer = Tracing.getTracer();

private final AsyncFramework async;
private final Supplier<Filter> filter;
Expand Down Expand Up @@ -127,16 +127,15 @@ protected AsyncFuture<Ingestion> syncWrite(final Request request) {
reporter.incrementConcurrentWrites();

try (Scope ws = tracer.withSpan(span)) {
return doWrite(request).onFinished(() -> {
return doWrite(request, span).onFinished(() -> {
writePermits.release();
reporter.decrementConcurrentWrites();
span.end();
});
}
}

protected AsyncFuture<Ingestion> doWrite(final Request request) {
final Span span = tracer.spanBuilder("CoreIngestionGroup.doWrite").startSpan();
protected AsyncFuture<Ingestion> doWrite(final Request request, final Span span) {
final List<AsyncFuture<Ingestion>> futures = new ArrayList<>();

final Supplier<DateRange> range = rangeSupplier(request);
Expand All @@ -146,7 +145,7 @@ protected AsyncFuture<Ingestion> doWrite(final Request request) {
.ifPresent(futures::add);
suggest.map(s -> doSuggestWrite(s, request, range.get(), span)).ifPresent(futures::add);

return async.collect(futures, Ingestion.reduce()).onFinished(span::end);
return async.collect(futures, Ingestion.reduce());
}

protected AsyncFuture<Ingestion> doMetricWrite(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@
import com.spotify.heroic.tracing.EndSpanFutureReporter;
import eu.toolchain.async.AsyncFramework;
import eu.toolchain.async.AsyncFuture;
import eu.toolchain.async.FutureDone;
import eu.toolchain.async.LazyTransform;
import eu.toolchain.async.StreamCollector;
import eu.toolchain.async.FutureDone;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import java.util.ArrayList;
Expand All @@ -71,12 +71,12 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.inject.Inject;
Expand Down Expand Up @@ -472,7 +472,7 @@ public AsyncFuture<FetchData.Result> fetch(

@Override
public AsyncFuture<WriteMetric> write(final WriteMetric.Request request) {
return write(request, io.opencensus.trace.Tracing.getTracer().getCurrentSpan());
return write(request);
}

@Override
Expand Down
Loading