diff --git a/consumer/pubsub/src/main/java/com/spotify/heroic/consumer/pubsub/PubSubConsumer.java b/consumer/pubsub/src/main/java/com/spotify/heroic/consumer/pubsub/PubSubConsumer.java index 9da6893fd..878d18c55 100644 --- a/consumer/pubsub/src/main/java/com/spotify/heroic/consumer/pubsub/PubSubConsumer.java +++ b/consumer/pubsub/src/main/java/com/spotify/heroic/consumer/pubsub/PubSubConsumer.java @@ -47,9 +47,12 @@ public class PubSubConsumer implements Consumer, LifeCycles { @Inject PubSubConsumer( - AsyncFramework async, Managed connection, - @Named("consuming") AtomicInteger consuming, @Named("total") AtomicInteger total, - @Named("errors") AtomicLong errors, @Named("consumed") LongAdder consumed + AsyncFramework async, + Managed connection, + @Named("consuming") AtomicInteger consuming, + @Named("total") AtomicInteger total, + @Named("errors") AtomicLong errors, + @Named("consumed") LongAdder consumed ) { this.async = async; this.connection = connection; diff --git a/consumer/pubsub/src/main/java/com/spotify/heroic/consumer/pubsub/Receiver.java b/consumer/pubsub/src/main/java/com/spotify/heroic/consumer/pubsub/Receiver.java index 596fb385d..bb5565dbb 100644 --- a/consumer/pubsub/src/main/java/com/spotify/heroic/consumer/pubsub/Receiver.java +++ b/consumer/pubsub/src/main/java/com/spotify/heroic/consumer/pubsub/Receiver.java @@ -21,6 +21,7 @@ package com.spotify.heroic.consumer.pubsub; +import static io.opencensus.trace.AttributeValue.longAttributeValue; import static io.opencensus.trace.AttributeValue.stringAttributeValue; import com.google.cloud.pubsub.v1.AckReplyConsumer; @@ -71,6 +72,7 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer replyConsumer Span span = tracer.spanBuilder("PubSub.receiveMessage").startSpan(); span.putAttribute("id", stringAttributeValue(messageId)); + span.putAttribute("message.size", longAttributeValue(bytes.length)); final FutureReporter.Context consumptionContext = reporter.reportConsumption(); @@ -79,24 +81,23 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer replyConsumer consumer.consume(bytes).onDone(consumptionContext).onFinished(() -> { reporter.reportMessageSize(bytes.length); replyConsumer.ack(); - span.end(); }); } catch (ConsumerSchemaValidationException e) { reporter.reportConsumerSchemaError(); log.error("ID:{} - {}", messageId, e.getMessage(), e); + span.setStatus(Status.INVALID_ARGUMENT.withDescription(e.toString())); // The message will never be processable, ack it to make it go away replyConsumer.ack(); - span.end(); } catch (Exception e) { errors.incrementAndGet(); log.error("ID:{} - Failed to consume", messageId, e); span.setStatus(Status.INTERNAL.withDescription(e.toString())); reporter.reportMessageError(); replyConsumer.nack(); - span.end(); } finally { consumed.increment(); + span.end(); } } diff --git a/heroic-core/src/main/java/com/spotify/heroic/consumer/schemas/Spotify100.java b/heroic-core/src/main/java/com/spotify/heroic/consumer/schemas/Spotify100.java index 865db756e..cfa6cdfd0 100644 --- a/heroic-core/src/main/java/com/spotify/heroic/consumer/schemas/Spotify100.java +++ b/heroic-core/src/main/java/com/spotify/heroic/consumer/schemas/Spotify100.java @@ -23,7 +23,6 @@ import static io.opencensus.trace.AttributeValue.stringAttributeValue; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeType; @@ -51,8 +50,6 @@ import com.spotify.heroic.time.Clock; import dagger.Component; import eu.toolchain.async.AsyncFuture; -import io.opencensus.common.Scope; -import io.opencensus.trace.Span; import io.opencensus.trace.Status; import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; @@ -87,59 +84,59 @@ public Consumer(Clock clock, IngestionGroup ingestion, ConsumerReporter reporter @Override public AsyncFuture consume(final byte[] message) throws ConsumerSchemaException { final JsonNode tree; - final Span span = tracer.spanBuilder("ConsumerSchema.consume").startSpan(); + var scope = tracer.spanBuilder("ConsumerSchema.consume").startScopedSpan(); + var span = tracer.getCurrentSpan(); span.putAttribute("schema", stringAttributeValue("Spotify100")); - try (Scope ws = tracer.withSpan(span)) { - try { - tree = mapper.readTree(message); - } catch (final Exception e) { - span.setStatus(Status.INVALID_ARGUMENT.withDescription(e.toString())); - span.end(); - throw new ConsumerSchemaValidationException("Invalid metric", e); - } - - if (tree.getNodeType() != JsonNodeType.OBJECT) { - span.setStatus( - Status.INVALID_ARGUMENT.withDescription("Metric is not an object")); - span.end(); - throw new ConsumerSchemaValidationException( - "Expected object, but got: " + tree.getNodeType()); - } - - final ObjectNode object = (ObjectNode) tree; - - final JsonNode versionNode = object.remove("version"); - - if (versionNode == null) { - span.setStatus(Status.INVALID_ARGUMENT.withDescription("Missing version")); - span.end(); - throw new ConsumerSchemaValidationException( - "Missing version in received object"); - } - - final Version version; - - try { - version = Version.parse(versionNode.asText()); - } catch (final Exception e) { - span.setStatus(Status.INVALID_ARGUMENT.withDescription("Bad version")); - span.end(); - throw new ConsumerSchemaValidationException("Bad version: " + versionNode); - } - - int major = version.getMajor(); - - if (major == 1) { - return handleVersion1(tree).onFinished(span::end); - } else if (major == 2) { - return handleVersion2(tree).onFinished(span::end); - } - - span.setStatus(Status.INVALID_ARGUMENT.withDescription("Unsupported version")); - span.end(); - throw new ConsumerSchemaValidationException("Unsupported version: " + version); + try { + tree = mapper.readTree(message); + } catch (final Exception e) { + span.setStatus(Status.INVALID_ARGUMENT.withDescription(e.toString())); + scope.close(); + throw new ConsumerSchemaValidationException("Invalid metric", e); + } + + if (tree.getNodeType() != JsonNodeType.OBJECT) { + span.setStatus( + Status.INVALID_ARGUMENT.withDescription("Metric is not an object")); + scope.close(); + throw new ConsumerSchemaValidationException( + "Expected object, but got: " + tree.getNodeType()); } + + final ObjectNode object = (ObjectNode) tree; + + final JsonNode versionNode = object.remove("version"); + + if (versionNode == null) { + span.setStatus(Status.INVALID_ARGUMENT.withDescription("Missing version")); + scope.close(); + throw new ConsumerSchemaValidationException( + "Missing version in received object"); + } + + final Version version; + + try { + version = Version.parse(versionNode.asText()); + } catch (final Exception e) { + span.setStatus(Status.INVALID_ARGUMENT.withDescription("Bad version")); + scope.close(); + throw new ConsumerSchemaValidationException("Bad version: " + versionNode); + } + + span.putAttribute("metric.version", stringAttributeValue(version.toString())); + int major = version.getMajor(); + + if (major == 1) { + return handleVersion1(tree).onFinished(scope::close); + } else if (major == 2) { + return handleVersion2(tree).onFinished(scope::close); + } + + span.setStatus(Status.INVALID_ARGUMENT.withDescription("Unsupported version")); + scope.close(); + throw new ConsumerSchemaValidationException("Unsupported version: " + version); } private AsyncFuture handleVersion1(final JsonNode tree) diff --git a/heroic-core/src/main/java/com/spotify/heroic/consumer/schemas/Spotify100Proto.java b/heroic-core/src/main/java/com/spotify/heroic/consumer/schemas/Spotify100Proto.java index a2e22d94e..681c32dd6 100644 --- a/heroic-core/src/main/java/com/spotify/heroic/consumer/schemas/Spotify100Proto.java +++ b/heroic-core/src/main/java/com/spotify/heroic/consumer/schemas/Spotify100Proto.java @@ -21,6 +21,8 @@ package com.spotify.heroic.consumer.schemas; +import static io.opencensus.trace.AttributeValue.stringAttributeValue; + import com.google.common.collect.ImmutableList; import com.spotify.heroic.common.Series; import com.spotify.heroic.consumer.ConsumerSchema; @@ -41,6 +43,10 @@ import dagger.Component; import eu.toolchain.async.AsyncFramework; import eu.toolchain.async.AsyncFuture; +import io.opencensus.trace.AttributeValue; +import io.opencensus.trace.Status; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -63,6 +69,7 @@ public static class Consumer implements ConsumerSchema.Consumer { private final IngestionGroup ingestion; private final ConsumerReporter reporter; private final AsyncFramework async; + private static final Tracer tracer = Tracing.getTracer(); @Inject public Consumer( @@ -79,10 +86,16 @@ public Consumer( @Override public AsyncFuture consume(final byte[] message) throws ConsumerSchemaException { + var scope = tracer.spanBuilder("ConsumerSchema.consume").startScopedSpan(); + var span = tracer.getCurrentSpan(); + span.putAttribute("schema", stringAttributeValue("Spotify100Proto")); + final List metrics; try { metrics = Spotify100.Batch.parseFrom(Snappy.uncompress(message)).getMetricList(); } catch (IOException e) { + span.setStatus(Status.INVALID_ARGUMENT.withDescription(e.getMessage())); + scope.close(); throw new ConsumerSchemaValidationException("Invalid batch of metrics", e); } @@ -90,24 +103,27 @@ public AsyncFuture consume(final byte[] message) throws ConsumerSchemaExce for (Spotify100.Metric metric : metrics) { if (metric.getTime() <= 0) { + span.setStatus(Status.INVALID_ARGUMENT.withDescription("Negative time set")); + scope.close(); throw new ConsumerSchemaValidationException( "time: field must be a positive number: " + metric.toString()); } reporter.reportMessageDrift(clock.currentTimeMillis() - metric.getTime()); - final Series s = Series.of(metric.getKey(), metric.getTagsMap(), + final Series series = Series.of(metric.getKey(), metric.getTagsMap(), metric.getResourceMap()); - Spotify100.Value distributionTypeValue = metric.getDistributionTypeValue(); Point point = null; if (!metric.hasDistributionTypeValue()) { point = new Point(metric.getTime(), metric.getValue()); } else if (distributionTypeValue.getValueCase() - .equals(Spotify100.Value.ValueCase.DOUBLE_VALUE)) { + .equals(Spotify100.Value.ValueCase.DOUBLE_VALUE) + ) { point = new Point(metric.getTime(), distributionTypeValue.getDoubleValue()); } else if (distributionTypeValue.getValueCase() - .equals(Spotify100.Value.ValueCase.DISTRIBUTION_VALUE)) { + .equals(Spotify100.Value.ValueCase.DISTRIBUTION_VALUE) + ) { Distribution distribution = HeroicDistribution. create(distributionTypeValue.getDistributionValue()); DistributionPoint distributionPoint = @@ -115,17 +131,19 @@ public AsyncFuture consume(final byte[] message) throws ConsumerSchemaExce final List distributionPoints = ImmutableList.of(distributionPoint); ingestions - .add(ingestion.write(new Request(s, + .add(ingestion.write(new Request(series, MetricCollection.distributionPoints(distributionPoints)))); } if (point != null) { List points = ImmutableList.of(point); ingestions - .add(ingestion.write(new Request(s, MetricCollection.points(points)))); + .add(ingestion.write(new Request(series, MetricCollection.points(points)))); } } - reporter.reportMetricsIn(metrics.size()); + var metricsSize = metrics.size(); + reporter.reportMetricsIn(metricsSize); + span.putAttribute("metrics", AttributeValue.longAttributeValue(metricsSize)); // Return Void future, to not leak unnecessary information from the backend but just // allow monitoring of when the consumption is done. return async.collectAndDiscard(ingestions); diff --git a/heroic-core/src/main/java/com/spotify/heroic/ingestion/CoreIngestionGroup.java b/heroic-core/src/main/java/com/spotify/heroic/ingestion/CoreIngestionGroup.java index a1cda931a..e58f1a95c 100644 --- a/heroic-core/src/main/java/com/spotify/heroic/ingestion/CoreIngestionGroup.java +++ b/heroic-core/src/main/java/com/spotify/heroic/ingestion/CoreIngestionGroup.java @@ -51,7 +51,7 @@ import java.util.function.Supplier; public class CoreIngestionGroup implements IngestionGroup { - private final Tracer tracer = Tracing.getTracer(); + private static final Tracer tracer = Tracing.getTracer(); private final AsyncFramework async; private final Supplier filter; @@ -127,7 +127,7 @@ protected AsyncFuture syncWrite(final Request request) { reporter.incrementConcurrentWrites(); try (Scope ws = tracer.withSpan(span)) { - return doWrite(request).onFinished(() -> { + return doWrite(request, span).onFinished(() -> { writePermits.release(); reporter.decrementConcurrentWrites(); span.end(); @@ -135,8 +135,7 @@ protected AsyncFuture syncWrite(final Request request) { } } - protected AsyncFuture doWrite(final Request request) { - final Span span = tracer.spanBuilder("CoreIngestionGroup.doWrite").startSpan(); + protected AsyncFuture doWrite(final Request request, final Span span) { final List> futures = new ArrayList<>(); final Supplier range = rangeSupplier(request); @@ -146,7 +145,7 @@ protected AsyncFuture doWrite(final Request request) { .ifPresent(futures::add); suggest.map(s -> doSuggestWrite(s, request, range.get(), span)).ifPresent(futures::add); - return async.collect(futures, Ingestion.reduce()).onFinished(span::end); + return async.collect(futures, Ingestion.reduce()); } protected AsyncFuture doMetricWrite( diff --git a/heroic-core/src/main/java/com/spotify/heroic/metric/LocalMetricManager.java b/heroic-core/src/main/java/com/spotify/heroic/metric/LocalMetricManager.java index 29de801d7..b43a162af 100644 --- a/heroic-core/src/main/java/com/spotify/heroic/metric/LocalMetricManager.java +++ b/heroic-core/src/main/java/com/spotify/heroic/metric/LocalMetricManager.java @@ -60,9 +60,9 @@ import com.spotify.heroic.tracing.EndSpanFutureReporter; import eu.toolchain.async.AsyncFramework; import eu.toolchain.async.AsyncFuture; +import eu.toolchain.async.FutureDone; import eu.toolchain.async.LazyTransform; import eu.toolchain.async.StreamCollector; -import eu.toolchain.async.FutureDone; import io.opencensus.trace.Span; import io.opencensus.trace.Tracer; import java.util.ArrayList; @@ -71,12 +71,12 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import java.util.function.Function; import javax.inject.Inject; @@ -472,7 +472,7 @@ public AsyncFuture fetch( @Override public AsyncFuture write(final WriteMetric.Request request) { - return write(request, io.opencensus.trace.Tracing.getTracer().getCurrentSpan()); + return write(request); } @Override diff --git a/heroic-core/src/test/java/com/spotify/heroic/consumer/schemas/Spotify100ProtoTest.java b/heroic-core/src/test/java/com/spotify/heroic/consumer/schemas/Spotify100ProtoTest.java index 22fd23110..2eb7d2320 100644 --- a/heroic-core/src/test/java/com/spotify/heroic/consumer/schemas/Spotify100ProtoTest.java +++ b/heroic-core/src/test/java/com/spotify/heroic/consumer/schemas/Spotify100ProtoTest.java @@ -21,7 +21,7 @@ package com.spotify.heroic.consumer.schemas; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -44,7 +44,6 @@ import com.spotify.proto.Spotify100.Metric; import eu.toolchain.async.AsyncFramework; import eu.toolchain.async.AsyncFuture; -import io.opencensus.metrics.export.Value; import java.util.List; import org.junit.Before; import org.junit.Rule; @@ -52,7 +51,7 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.junit.MockitoJUnitRunner; import org.xerial.snappy.Snappy; @RunWith(MockitoJUnitRunner.class) @@ -73,6 +72,9 @@ public class Spotify100ProtoTest { @Mock private AsyncFuture resolved; + @Mock + private AsyncFuture discardedFuture; + private Spotify100Proto.Consumer consumer; @Rule @@ -81,6 +83,7 @@ public class Spotify100ProtoTest { @Before public void setup() { when(clock.currentTimeMillis()).thenReturn(1542830485000L); + when(async.collectAndDiscard(any())).thenReturn(discardedFuture); when(ingestion.write(any(Request.class))).thenReturn(resolved); consumer = new Spotify100Proto.Consumer(clock, ingestion, reporter, async); } diff --git a/heroic-core/src/test/java/com/spotify/heroic/ingestion/CoreIngestionGroupTest.java b/heroic-core/src/test/java/com/spotify/heroic/ingestion/CoreIngestionGroupTest.java index 7575765ac..46eb7655c 100644 --- a/heroic-core/src/test/java/com/spotify/heroic/ingestion/CoreIngestionGroupTest.java +++ b/heroic-core/src/test/java/com/spotify/heroic/ingestion/CoreIngestionGroupTest.java @@ -24,8 +24,8 @@ import com.spotify.heroic.suggest.SuggestBackend; import eu.toolchain.async.AsyncFramework; import eu.toolchain.async.AsyncFuture; -import eu.toolchain.async.FutureDone; import eu.toolchain.async.FutureFinished; +import io.opencensus.trace.BlankSpan; import java.util.List; import java.util.Optional; import java.util.concurrent.Semaphore; @@ -64,10 +64,6 @@ public class CoreIngestionGroupTest { @Mock private AsyncFuture expected; @Mock - private AsyncFuture resolved; - @Mock - private AsyncFuture failed; - @Mock private AsyncFuture other; @Mock private Series series; @@ -87,15 +83,14 @@ public void setup() { } private CoreIngestionGroup setupIngestionGroup( - final Optional metric, final Optional metadata, + final Optional metric, + final Optional metadata, final Optional suggest ) { - // @formatter:off final CoreIngestionGroup group = new CoreIngestionGroup( async, filterSupplier, writePermits, reporter, ingested, metric, metadata, suggest ); - // @formatter:on return spy(group); } @@ -121,7 +116,7 @@ public void testSyncWrite() throws Exception { doReturn(true).when(filter).apply(series); doNothing().when(writePermits).acquire(); doNothing().when(writePermits).release(); - doReturn(expected).when(group).doWrite(request); + doReturn(expected).when(group).doWrite(eq(request), any()); assertEquals(expected, group.syncWrite(request)); @@ -131,7 +126,7 @@ public void testSyncWrite() throws Exception { verify(writePermits).release(); verify(reporter).incrementConcurrentWrites(); verify(reporter).decrementConcurrentWrites(); - verify(group).doWrite(request); + verify(group).doWrite(eq(request), any()); verify(expected).onFinished(any(FutureFinished.class)); } @@ -152,7 +147,7 @@ public void testSyncWriteFiltered() throws Exception { verify(reporter, never()).incrementConcurrentWrites(); verify(reporter, never()).decrementConcurrentWrites(); verify(reporter).reportDroppedByFilter(); - verify(group, never()).doWrite(request); + verify(group, never()).doWrite(eq(request), any()); verify(other, never()).onFinished(any(FutureFinished.class)); } @@ -175,7 +170,7 @@ public void testSyncWriteAcquireThrows() throws Exception { verify(writePermits, never()).release(); verify(reporter, never()).incrementConcurrentWrites(); verify(reporter, never()).decrementConcurrentWrites(); - verify(group, never()).doWrite(request); + verify(group, never()).doWrite(eq(request), any()); verify(other, never()).onFinished(any(FutureFinished.class)); } @@ -192,7 +187,7 @@ public void testDoWrite() { doReturn(other).when(group).doMetadataWrite(eq(metadata), eq(request), eq(range), any()); doReturn(other).when(group).doSuggestWrite(eq(suggest), eq(request), eq(range), any()); - assertEquals(expected, group.doWrite(request)); + assertEquals(expected, group.doWrite(request, BlankSpan.INSTANCE)); verify(group).rangeSupplier(request); verify(group).doMetricWrite(eq(metric), eq(request), any()); @@ -213,7 +208,7 @@ public void testDoWriteSome() { doReturn(other).when(group).doMetricWrite(eq(metric), eq(request), any()); doReturn(other).when(group).doSuggestWrite(eq(suggest), eq(request), eq(range), any()); - assertEquals(expected, group.doWrite(request)); + assertEquals(expected, group.doWrite(request, BlankSpan.INSTANCE)); verify(group).rangeSupplier(request); verify(group).doMetricWrite(eq(metric), eq(request), any()); diff --git a/heroic-dist/src/test/java/com/spotify/heroic/analytics/bigtable/HeroicMetricsConfigurationTest.java b/heroic-dist/src/test/java/com/spotify/heroic/analytics/bigtable/HeroicMetricsConfigurationTest.java index 437156163..bc591ff85 100644 --- a/heroic-dist/src/test/java/com/spotify/heroic/analytics/bigtable/HeroicMetricsConfigurationTest.java +++ b/heroic-dist/src/test/java/com/spotify/heroic/analytics/bigtable/HeroicMetricsConfigurationTest.java @@ -3,16 +3,9 @@ import static com.spotify.heroic.HeroicConfigurationTestUtils.testConfiguration; import static org.junit.Assert.assertEquals; -import com.spotify.heroic.dagger.DaggerCoreComponent; import com.spotify.heroic.metric.LocalMetricManager; import com.spotify.heroic.metric.bigtable.BigtableBackend; import com.spotify.heroic.metric.bigtable.BigtableMetricModule; -import com.spotify.heroic.metric.bigtable.MetricsRowKeySerializer; -import eu.toolchain.async.TinyAsync; -import eu.toolchain.serializer.TinySerializer; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import org.jetbrains.annotations.NotNull; import org.junit.Test; /** @@ -29,16 +22,6 @@ public class HeroicMetricsConfigurationTest { public static final int EXPECTED_MAX_WRITE_BATCH_SIZE = 250; - @NotNull - private static BigtableBackend getBigtableBackend(int maxWriteBatchSize) { - final ExecutorService executor = Executors.newSingleThreadExecutor(); - final TinyAsync async = TinyAsync.builder().executor(executor).build(); - var serializer = TinySerializer.builder().build(); - - var bigtableBackend = new BigtableBackend(async, serializer, new MetricsRowKeySerializer(), null, null, "bananas", false, maxWriteBatchSize, null, null); - return bigtableBackend; - } - private static BigtableMetricModule getBigtableMetricModule(int maxWriteBatchSize) { return new BigtableMetricModule.Builder() .maxWriteBatchSize(maxWriteBatchSize) @@ -57,7 +40,7 @@ public void testMaxWriteBatchSizeConfig() throws Exception { instance.inject( coreComponent -> { var metricManager = - (LocalMetricManager) ((DaggerCoreComponent) coreComponent).metricManager(); + (LocalMetricManager) coreComponent.metricManager(); var analyticsBackend = metricManager .groupSet() diff --git a/heroic-test/src/main/java/com/spotify/heroic/test/AbstractMetricBackendIT.java b/heroic-test/src/main/java/com/spotify/heroic/test/AbstractMetricBackendIT.java index be9a88ccb..1cb1e9e1e 100644 --- a/heroic-test/src/main/java/com/spotify/heroic/test/AbstractMetricBackendIT.java +++ b/heroic-test/src/main/java/com/spotify/heroic/test/AbstractMetricBackendIT.java @@ -22,6 +22,7 @@ package com.spotify.heroic.test; import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeNotNull; import static org.junit.Assume.assumeTrue; import com.google.common.collect.ImmutableList; @@ -82,7 +83,7 @@ public abstract class AbstractMetricBackendIT { * See: https://github.com/spotify/heroic/pull/208 */ protected boolean brokenSegmentsPr208 = false; protected boolean eventSupport = false; - protected Optional maxBatchSize = Optional.empty(); + protected Integer maxBatchSize = null; protected boolean hugeRowKey = true; @Rule @@ -165,8 +166,7 @@ public void testOne() throws Exception { */ @Test public void testMaxBatchSize() throws Exception { - assumeTrue("max batch size", maxBatchSize.isPresent()); - final int maxBatchSize = this.maxBatchSize.get(); + assumeNotNull("max batch size", maxBatchSize); DateRange range = new DateRange(99L, 100L + (maxBatchSize * 4)); new TestCase() diff --git a/metadata/elasticsearch/src/main/java/com/spotify/heroic/metadata/elasticsearch/MetadataBackendKV.java b/metadata/elasticsearch/src/main/java/com/spotify/heroic/metadata/elasticsearch/MetadataBackendKV.java index a9e494e6d..c895c4439 100644 --- a/metadata/elasticsearch/src/main/java/com/spotify/heroic/metadata/elasticsearch/MetadataBackendKV.java +++ b/metadata/elasticsearch/src/main/java/com/spotify/heroic/metadata/elasticsearch/MetadataBackendKV.java @@ -22,6 +22,7 @@ package com.spotify.heroic.metadata.elasticsearch; import static com.spotify.heroic.elasticsearch.ResourceLoader.loadJson; +import static io.opencensus.trace.AttributeValue.booleanAttributeValue; import static java.util.Optional.ofNullable; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.prefixQuery; @@ -117,11 +118,14 @@ import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.FieldSortBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @ElasticsearchScope public class MetadataBackendKV extends AbstractElasticsearchMetadataBackend implements MetadataBackend, LifeCycles { + private static final Logger LOGGER = LoggerFactory.getLogger(MetadataBackendKV.class); private static final Tracer tracer = Tracing.getTracer(); private static final String WRITE_CACHE_SIZE = "write-cache-size"; @@ -292,7 +296,12 @@ public void onFailure(Exception e) { .catchFailed(handleVersionConflict(WriteMetadata::new, reporter::reportWriteDroppedByDuplicate)) .onDone(writeContext) - .onFinished(writeSpan::end); + .onFinished(writeSpan::end) + .onFailed(error -> { + LOGGER.error("Metadata write failed: ", error); + writeSpan.putAttribute("error", booleanAttributeValue(true)); + writeSpan.addAnnotation(error.getMessage()); + }); writes.add(writeMetadataAsyncFuture); diff --git a/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/BigtableBackend.java b/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/BigtableBackend.java index 767eaeb1e..8150f0de6 100644 --- a/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/BigtableBackend.java +++ b/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/BigtableBackend.java @@ -23,9 +23,8 @@ import static io.opencensus.trace.AttributeValue.booleanAttributeValue; import static io.opencensus.trace.AttributeValue.longAttributeValue; +import static io.opencensus.trace.AttributeValue.stringAttributeValue; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.cloud.bigtable.grpc.scanner.FlatRow; import com.google.cloud.bigtable.util.RowKeyUtil; import com.google.common.base.Function; @@ -69,11 +68,10 @@ import eu.toolchain.async.Managed; import eu.toolchain.async.RetryPolicy; import eu.toolchain.async.RetryResult; -import eu.toolchain.serializer.Serializer; -import eu.toolchain.serializer.SerializerFramework; import io.opencensus.common.Scope; import io.opencensus.trace.AttributeValue; import io.opencensus.trace.Span; +import io.opencensus.trace.Status; import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; import java.io.IOException; @@ -100,62 +98,49 @@ public class BigtableBackend extends AbstractMetricBackend implements LifeCycles private static final Logger log = LoggerFactory.getLogger(BigtableBackend.class); /* maximum number of bytes of BigTable row key size allowed*/ - public static final int MAX_KEY_ROW_SIZE = 4000; - - public static final QueryTrace.Identifier FETCH_SEGMENT = + private static final int MAX_KEY_ROW_SIZE = 4000; + private static final QueryTrace.Identifier FETCH_SEGMENT = QueryTrace.identifier(BigtableBackend.class, "fetch_segment"); - public static final QueryTrace.Identifier FETCH = + private static final QueryTrace.Identifier FETCH = QueryTrace.identifier(BigtableBackend.class, "fetch"); + private static final String POINTS = "points"; + private static final String DISTRIBUTION_POINTS = "distributionPoints"; + private static final String EVENTS = "events"; - public static final String POINTS = "points"; - public static final String DISTRIBUTION_POINTS = "distributionPoints"; - public static final String EVENTS = "events"; public static final long PERIOD = 0x100_000_000L; private final AsyncFramework async; - private final SerializerFramework serializer; private final RowKeySerializer rowKeySerializer; - private final Serializer> sortedMapSerializer; private final Managed connection; private final Groups groups; private final String table; private final boolean configure; private final MetricBackendReporter reporter; - private final ObjectMapper mapper; private final Tracer tracer = Tracing.getTracer(); - private static final TypeReference> PAYLOAD_TYPE = - new TypeReference>() { - }; - private final Meter written = new Meter(); private final int maxWriteBatchSize; @Inject public BigtableBackend( final AsyncFramework async, - @Named("common") final SerializerFramework serializer, final RowKeySerializer rowKeySerializer, final Managed connection, final Groups groups, @Named("table") final String table, @Named("configure") final boolean configure, @Named("maxWriteBatchSize") final int maxWriteBatchSize, - MetricBackendReporter reporter, - @Named("application/json") ObjectMapper mapper + MetricBackendReporter reporter ) { super(async); this.async = async; - this.serializer = serializer; this.rowKeySerializer = rowKeySerializer; - this.sortedMapSerializer = serializer.sortedMap(serializer.string(), serializer.string()); this.connection = connection; this.maxWriteBatchSize = maxWriteBatchSize; this.groups = groups; this.table = table; this.configure = configure; this.reporter = reporter; - this.mapper = mapper; } @Override @@ -250,17 +235,15 @@ public AsyncFuture write(final WriteMetric.Request request) { } @Override - public AsyncFuture write( - final WriteMetric.Request request, final Span parentSpan - ) { - return connection.doto(c -> { + public AsyncFuture write(final WriteMetric.Request request, final Span span) { + return connection.doto(conn -> { final Series series = request.getSeries(); final List> results = new ArrayList<>(); - final BigtableDataClient client = c.getDataClient(); + final BigtableDataClient client = conn.getDataClient(); - final MetricCollection g = request.getData(); - results.add(writeTyped(series, client, g, parentSpan)); + final MetricCollection collection = request.getData(); + results.add(writeTyped(series, client, collection, span)); return async.collect(results, WriteMetric.reduce()); }); } @@ -336,20 +319,24 @@ private AsyncFuture stop() { private AsyncFuture writeTyped( final Series series, final BigtableDataClient client, - final MetricCollection g, - final Span parentSpan + final MetricCollection collection, + final Span span ) throws IOException { - switch (g.getType()) { + var type = collection.getType(); + span.putAttribute("metric_type", stringAttributeValue(type.toString())); + switch (type) { case POINT: - return writeBatch(POINTS, series, client, g.getDataAs(Point.class), - d -> serializeValue(d.getValue()), parentSpan); + return writeBatch(POINTS, series, client, collection.getDataAs(Point.class), + d -> serializeValue(d.getValue()), span); case DISTRIBUTION_POINTS: - return writeBatch(DISTRIBUTION_POINTS, series, client, g.getDataAs( + return writeBatch(DISTRIBUTION_POINTS, series, client, collection.getDataAs( DistributionPoint.class), - d -> d.value().getValue(), parentSpan); + d -> d.value().getValue(), span); default: + span.setStatus( + Status.INVALID_ARGUMENT.withDescription("Unsupported metric type")); return async.resolved(new WriteMetric( - new QueryError("Unsupported metric type: " + g.getType()))); + new QueryError("Unsupported metric type: " + collection.getType()))); } } @@ -363,6 +350,7 @@ private AsyncFuture writeBatch( final Scope scope = tracer.withSpan(span); span.putAttribute("type", AttributeValue.stringAttributeValue(columnFamily)); span.putAttribute("batchSize", longAttributeValue(batch.size())); + span.putAttribute("table", stringAttributeValue(table)); // common case for consumers if (batch.size() == 1) { @@ -371,6 +359,11 @@ private AsyncFuture writeBatch( .onFinished(() -> { written.mark(); span.end(); + }) + .onFailed(error -> { + log.error("Bigtable writeOne failed: ", error); + span.putAttribute("error", booleanAttributeValue(true)); + span.addAnnotation(error.getMessage()); }); scope.close(); return future; @@ -415,8 +408,8 @@ private AsyncFuture writeBatch( final RequestTimer timer = WriteMetric.timer(); - for (final Pair e : saved) { - final ByteString rowKeyBytes = rowKeySerializer.serializeFull(e.getKey()); + for (final Pair mutationsPair : saved) { + final ByteString rowKeyBytes = rowKeySerializer.serializeFull(mutationsPair.getKey()); if (rowKeyBytes.size() >= MAX_KEY_ROW_SIZE) { reporter.reportWritesDroppedBySize(); @@ -426,12 +419,18 @@ private AsyncFuture writeBatch( } writes.add(client - .mutateRow(table, rowKeyBytes, e.getValue()) + .mutateRow(table, rowKeyBytes, mutationsPair.getValue()) .directTransform(result -> timer.end())); } scope.close(); - return async.collect(writes.build(), WriteMetric.reduce()).onFinished(span::end); + return async.collect(writes.build(), WriteMetric.reduce()) + .onFinished(span::end) + .onFailed(error -> { + log.error("Bigtable write batch failed: ", error); + span.putAttribute("error", booleanAttributeValue(true)); + span.addAnnotation(error.getMessage()); + }); } private AsyncFuture writeOne( diff --git a/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/api/BigtableDataClientImpl.java b/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/api/BigtableDataClientImpl.java index cedefd321..14038b860 100644 --- a/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/api/BigtableDataClientImpl.java +++ b/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/api/BigtableDataClientImpl.java @@ -103,7 +103,7 @@ public AsyncObservable readRowsObserved( final ResultScanner s = session.getDataClient().readRows(request.toPb(Table.toURI(clusterUri, tableName))); - final ResultScanner scanner = new ResultScanner() { + final ResultScanner scanner = new ResultScanner<>() { @Override public void close() throws IOException { s.close(); diff --git a/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/api/BigtableMutatorImpl.java b/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/api/BigtableMutatorImpl.java index 08075e602..824438ec7 100644 --- a/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/api/BigtableMutatorImpl.java +++ b/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/api/BigtableMutatorImpl.java @@ -32,10 +32,6 @@ import eu.toolchain.async.AsyncFramework; import eu.toolchain.async.AsyncFuture; import eu.toolchain.async.ResolvableFuture; -import io.opencensus.common.Scope; -import io.opencensus.trace.AttributeValue; -import io.opencensus.trace.Span; -import io.opencensus.trace.Status; import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; import java.util.HashMap; @@ -110,55 +106,37 @@ public AsyncFuture close() { private AsyncFuture mutateSingleRow( String tableName, ByteString rowKey, Mutations mutations ) { - final Span span = tracer.spanBuilder("BigtableMutator.mutateSingleRow").startSpan(); - try (Scope ws = tracer.withSpan(span)) { - return convertVoid( - session - .getDataClient() - .mutateRowAsync(toMutateRowRequest(tableName, rowKey, mutations))) - .onFinished(span::end); - } + return convertVoid( + session + .getDataClient() + .mutateRowAsync(toMutateRowRequest(tableName, rowKey, mutations))); } private AsyncFuture mutateBatchRow( String tableName, ByteString rowKey, Mutations mutations ) { - final Span span = tracer.spanBuilder("BigtableMutator.mutateBatchRow").startSpan(); - try (Scope ws = tracer.withSpan(span)) { - span.putAttribute("table", AttributeValue.stringAttributeValue(tableName)); - - final BulkMutation bulkMutation = getOrAddBulkMutation(tableName); - - span.addAnnotation("Adding rows to bulk mutation"); - return convertVoid(bulkMutation.add(toMutateRowRequest(tableName, rowKey, mutations))) - .onFinished(span::end); - } + final BulkMutation bulkMutation = getOrAddBulkMutation(tableName); + return convertVoid(bulkMutation.add(toMutateRowRequest(tableName, rowKey, mutations))); } private BulkMutation getOrAddBulkMutation(String tableName) { - final Span span = tracer.spanBuilder("BigtableMutator.getOrAddBulkMutation").startSpan(); - try (Scope ws = tracer.withSpan(span)) { - span.addAnnotation("Acquiring lock"); - synchronized (tableAccessLock) { - span.addAnnotation("Lock acquired"); - - if (tableToBulkMutation.containsKey(tableName)) { - span.setStatus(Status.ALREADY_EXISTS.withDescription("Mutation exists in map")); - span.end(); - return tableToBulkMutation.get(tableName); - } + var span = tracer.getCurrentSpan(); + synchronized (tableAccessLock) { + if (tableToBulkMutation.containsKey(tableName)) { + span.addAnnotation("Mutation exists in map"); + return tableToBulkMutation.get(tableName); + } - final BulkMutation bulkMutation = session.createBulkMutation( - session - .getOptions() - .getInstanceName() - .toTableName(tableName)); + final BulkMutation bulkMutation = session.createBulkMutation( + session + .getOptions() + .getInstanceName() + .toTableName(tableName)); - tableToBulkMutation.put(tableName, bulkMutation); + tableToBulkMutation.put(tableName, bulkMutation); + span.addAnnotation("Created new mutation"); - span.end(); - return bulkMutation; - } + return bulkMutation; } } @@ -178,7 +156,7 @@ private MutateRowRequest toMutateRowRequest( private AsyncFuture convertVoid(final ListenableFuture request) { final ResolvableFuture future = async.future(); - Futures.addCallback(request, new FutureCallback() { + Futures.addCallback(request, new FutureCallback<>() { @Override public void onSuccess(T result) { future.resolve(null); diff --git a/metric/bigtable/src/test/java/com/spotify/heroic/metric/bigtable/BigtableBackendIT.java b/metric/bigtable/src/test/java/com/spotify/heroic/metric/bigtable/BigtableBackendIT.java index 1ba20442f..654768126 100644 --- a/metric/bigtable/src/test/java/com/spotify/heroic/metric/bigtable/BigtableBackendIT.java +++ b/metric/bigtable/src/test/java/com/spotify/heroic/metric/bigtable/BigtableBackendIT.java @@ -25,7 +25,7 @@ protected void setupSupport() { super.setupSupport(); this.eventSupport = true; - this.maxBatchSize = Optional.of(BigtableMetricModule.DEFAULT_MUTATION_BATCH_SIZE); + this.maxBatchSize = BigtableMetricModule.DEFAULT_MUTATION_BATCH_SIZE; this.brokenSegmentsPr208 = true; } diff --git a/statistics/semantic/src/main/java/com/spotify/heroic/statistics/semantic/SemanticMetricBackendReporter.java b/statistics/semantic/src/main/java/com/spotify/heroic/statistics/semantic/SemanticMetricBackendReporter.java index 3ab69ad30..764edce32 100644 --- a/statistics/semantic/src/main/java/com/spotify/heroic/statistics/semantic/SemanticMetricBackendReporter.java +++ b/statistics/semantic/src/main/java/com/spotify/heroic/statistics/semantic/SemanticMetricBackendReporter.java @@ -46,7 +46,6 @@ import com.spotify.metrics.core.SemanticMetricRegistry; import eu.toolchain.async.AsyncFuture; import io.opencensus.trace.Span; -import io.opencensus.trace.Tracing; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -255,7 +254,7 @@ public AsyncFuture configure() { @Override public AsyncFuture write(final WriteMetric.Request request) { - return write(request, Tracing.getTracer().getCurrentSpan()); + return write(request); } @Override diff --git a/suggest/elasticsearch/src/main/java/com/spotify/heroic/suggest/elasticsearch/SuggestBackendKV.java b/suggest/elasticsearch/src/main/java/com/spotify/heroic/suggest/elasticsearch/SuggestBackendKV.java index 32a0dbd66..b56e75f1c 100644 --- a/suggest/elasticsearch/src/main/java/com/spotify/heroic/suggest/elasticsearch/SuggestBackendKV.java +++ b/suggest/elasticsearch/src/main/java/com/spotify/heroic/suggest/elasticsearch/SuggestBackendKV.java @@ -22,6 +22,7 @@ package com.spotify.heroic.suggest.elasticsearch; import static com.spotify.heroic.elasticsearch.ResourceLoader.loadJson; +import static io.opencensus.trace.AttributeValue.booleanAttributeValue; import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.prefixQuery; @@ -120,13 +121,15 @@ import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @ElasticsearchScope public class SuggestBackendKV extends AbstractElasticsearchBackend implements SuggestBackend, Grouped, LifeCycles { - private NumSuggestionsLimit numSuggestionsLimit = NumSuggestionsLimit.of(); - private final Tracer tracer = Tracing.getTracer(); + private static final Logger LOGGER = LoggerFactory.getLogger(SuggestBackendKV.class); + private static final Tracer tracer = Tracing.getTracer(); private static final String WRITE_CACHE_SIZE = "write-cache-size"; private static final String TAG_TYPE = "tag"; @@ -154,6 +157,7 @@ public class SuggestBackendKV extends AbstractElasticsearchBackend private static final String[] KEY_SUGGEST_SOURCES = new String[] {KEY}; private static final String[] TAG_SUGGEST_SOURCES = new String[] {TAG_SKEY, TAG_SVAL}; + private final NumSuggestionsLimit numSuggestionsLimit; private final Managed connection; private final SuggestBackendReporter reporter; @@ -457,9 +461,9 @@ public AsyncFuture write( errors.add(NodeError.internalError(r.getFailureMessage())); if (r.getFailure().getCause() - instanceof VersionConflictEngineException || + instanceof VersionConflictEngineException || r.getFailure().getMessage() - .contains("version_conflict_engine_exception")) { + .contains("version_conflict_engine_exception")) { reporter.reportWriteDroppedByDuplicate(); } else if (addFailureAnnotation) { rootSpan.addAnnotation(r.getFailureMessage()); @@ -473,8 +477,13 @@ public AsyncFuture write( ImmutableList.of(response.getTook().getMillis()), ImmutableList.of()); }) - .onDone(writeContext) - .onFinished(rootSpan::end); + .onDone(writeContext) + .onFinished(rootSpan::end) + .onFailed(error -> { + LOGGER.error("Metadata write failed: ", error); + rootSpan.putAttribute("error", booleanAttributeValue(true)); + rootSpan.addAnnotation(error.getMessage()); + }); }); }