From d3e3d8d0cab3ad409b156bdb0a63a142d44aee18 Mon Sep 17 00:00:00 2001 From: huntc Date: Fri, 17 Jul 2020 14:27:17 +1000 Subject: [PATCH] Makes Export methods async This commit recognises that the export and flush methods of span and trace exporters can be, and often are, implemented with long-lived operations over networks. We therefore see that these method return types are represented using Java's Future type to account for this common behaviour. --- .../inmemory/InMemoryMetricExporter.java | 11 ++- .../inmemory/InMemorySpanExporter.java | 11 ++- .../inmemory/InMemoryMetricExporterTest.java | 13 ++- .../inmemory/InMemorySpanExporterTest.java | 10 +- .../jaeger/JaegerGrpcSpanExporter.java | 11 ++- .../logging/LoggingMetricExporter.java | 13 +-- .../logging/LoggingSpanExporter.java | 13 +-- .../logging/LoggingSpanExporterTest.java | 7 +- .../otlp/OtlpGrpcMetricExporter.java | 11 ++- .../exporters/otlp/OtlpGrpcSpanExporter.java | 11 ++- .../otlp/OtlpGrpcMetricExporterTest.java | 37 ++++---- .../otlp/OtlpGrpcSpanExporterTest.java | 35 ++++--- .../exporters/zipkin/ZipkinSpanExporter.java | 11 ++- .../ZipkinSpanExporterEndToEndHttpTest.java | 10 +- .../zipkin/ZipkinSpanExporterTest.java | 10 +- .../sdk/trace/SpanPipelineBenchmark.java | 9 +- .../export/MultiSpanExporterBenchmark.java | 12 +-- .../common/export/CompletableResultCode.java | 95 +++++++++++++++++++ .../metrics/export/IntervalMetricReader.java | 33 +++++-- .../sdk/metrics/export/MetricExporter.java | 23 +---- .../sdk/trace/export/BatchSpanProcessor.java | 70 +++++--------- .../sdk/trace/export/MultiSpanExporter.java | 69 +++++++++----- .../sdk/trace/export/SimpleSpanProcessor.java | 34 +++++-- .../sdk/trace/export/SpanExporter.java | 9 +- .../export/IntervalMetricReaderTest.java | 9 +- .../sdk/trace/TracerSdkTest.java | 9 +- .../trace/export/BatchSpanProcessorTest.java | 33 +++---- .../trace/export/MultiSpanExporterTest.java | 94 +++++++++--------- 28 files changed, 415 insertions(+), 298 deletions(-) create mode 100644 sdk/src/main/java/io/opentelemetry/sdk/common/export/CompletableResultCode.java diff --git a/exporters/inmemory/src/main/java/io/opentelemetry/exporters/inmemory/InMemoryMetricExporter.java b/exporters/inmemory/src/main/java/io/opentelemetry/exporters/inmemory/InMemoryMetricExporter.java index 5ef1c17b4d1..23596e7f1de 100644 --- a/exporters/inmemory/src/main/java/io/opentelemetry/exporters/inmemory/InMemoryMetricExporter.java +++ b/exporters/inmemory/src/main/java/io/opentelemetry/exporters/inmemory/InMemoryMetricExporter.java @@ -16,6 +16,7 @@ package io.opentelemetry.exporters.inmemory; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.MetricExporter; import java.util.ArrayList; @@ -103,12 +104,12 @@ public void reset() { *

If this is called after {@code shutdown}, this will return {@code ResultCode.FAILURE}. */ @Override - public ResultCode export(Collection metrics) { + public CompletableResultCode export(Collection metrics) { if (isStopped) { - return ResultCode.FAILURE; + return CompletableResultCode.ofFailure(); } finishedMetricItems.addAll(metrics); - return ResultCode.SUCCESS; + return CompletableResultCode.ofSuccess(); } /** @@ -118,8 +119,8 @@ public ResultCode export(Collection metrics) { * @return always Success */ @Override - public ResultCode flush() { - return ResultCode.SUCCESS; + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); } /** diff --git a/exporters/inmemory/src/main/java/io/opentelemetry/exporters/inmemory/InMemorySpanExporter.java b/exporters/inmemory/src/main/java/io/opentelemetry/exporters/inmemory/InMemorySpanExporter.java index 1120b958a14..83c6872d567 100644 --- a/exporters/inmemory/src/main/java/io/opentelemetry/exporters/inmemory/InMemorySpanExporter.java +++ b/exporters/inmemory/src/main/java/io/opentelemetry/exporters/inmemory/InMemorySpanExporter.java @@ -16,6 +16,7 @@ package io.opentelemetry.exporters.inmemory; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; import java.util.ArrayList; @@ -85,14 +86,14 @@ public void reset() { } @Override - public ResultCode export(Collection spans) { + public CompletableResultCode export(Collection spans) { synchronized (this) { if (isStopped) { - return ResultCode.FAILURE; + return CompletableResultCode.ofFailure(); } finishedSpanItems.addAll(spans); } - return ResultCode.SUCCESS; + return CompletableResultCode.ofSuccess(); } /** @@ -102,8 +103,8 @@ public ResultCode export(Collection spans) { * @return always Success */ @Override - public ResultCode flush() { - return ResultCode.SUCCESS; + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); } @Override diff --git a/exporters/inmemory/src/test/java/io/opentelemetry/exporters/inmemory/InMemoryMetricExporterTest.java b/exporters/inmemory/src/test/java/io/opentelemetry/exporters/inmemory/InMemoryMetricExporterTest.java index b63126d8072..5b7c9d4a67b 100644 --- a/exporters/inmemory/src/test/java/io/opentelemetry/exporters/inmemory/InMemoryMetricExporterTest.java +++ b/exporters/inmemory/src/test/java/io/opentelemetry/exporters/inmemory/InMemoryMetricExporterTest.java @@ -23,7 +23,6 @@ import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.MetricData.Descriptor; import io.opentelemetry.sdk.metrics.data.MetricData.LongPoint; -import io.opentelemetry.sdk.metrics.export.MetricExporter.ResultCode; import io.opentelemetry.sdk.resources.Resource; import java.util.ArrayList; import java.util.Collections; @@ -54,7 +53,7 @@ void test_getFinishedMetricItems() { metrics.add(generateFakeMetric()); metrics.add(generateFakeMetric()); - assertThat(exporter.export(metrics)).isEqualTo(ResultCode.SUCCESS); + assertThat(exporter.export(metrics).isSuccess()).isTrue(); List metricItems = exporter.getFinishedMetricItems(); assertThat(metricItems).isNotNull(); assertThat(metricItems.size()).isEqualTo(3); @@ -67,7 +66,7 @@ void test_reset() { metrics.add(generateFakeMetric()); metrics.add(generateFakeMetric()); - assertThat(exporter.export(metrics)).isEqualTo(ResultCode.SUCCESS); + assertThat(exporter.export(metrics).isSuccess()).isTrue(); List metricItems = exporter.getFinishedMetricItems(); assertThat(metricItems).isNotNull(); assertThat(metricItems.size()).isEqualTo(3); @@ -84,7 +83,7 @@ void test_shutdown() { metrics.add(generateFakeMetric()); metrics.add(generateFakeMetric()); - assertThat(exporter.export(metrics)).isEqualTo(ResultCode.SUCCESS); + assertThat(exporter.export(metrics).isSuccess()).isTrue(); exporter.shutdown(); List metricItems = exporter.getFinishedMetricItems(); assertThat(metricItems).isNotNull(); @@ -98,13 +97,13 @@ void testShutdown_export() { metrics.add(generateFakeMetric()); metrics.add(generateFakeMetric()); - assertThat(exporter.export(metrics)).isEqualTo(ResultCode.SUCCESS); + assertThat(exporter.export(metrics).isSuccess()).isTrue(); exporter.shutdown(); - assertThat(exporter.export(metrics)).isEqualTo(ResultCode.FAILURE); + assertThat(exporter.export(metrics).isSuccess()).isFalse(); } @Test void test_flush() { - assertThat(exporter.flush()).isEqualTo(ResultCode.SUCCESS); + assertThat(exporter.flush().isSuccess()).isTrue(); } } diff --git a/exporters/inmemory/src/test/java/io/opentelemetry/exporters/inmemory/InMemorySpanExporterTest.java b/exporters/inmemory/src/test/java/io/opentelemetry/exporters/inmemory/InMemorySpanExporterTest.java index 9d0822e8f05..e6833dbe418 100644 --- a/exporters/inmemory/src/test/java/io/opentelemetry/exporters/inmemory/InMemorySpanExporterTest.java +++ b/exporters/inmemory/src/test/java/io/opentelemetry/exporters/inmemory/InMemorySpanExporterTest.java @@ -22,7 +22,6 @@ import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.test.TestSpanData; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; -import io.opentelemetry.sdk.trace.export.SpanExporter.ResultCode; import io.opentelemetry.trace.Span.Kind; import io.opentelemetry.trace.SpanId; import io.opentelemetry.trace.Status; @@ -89,16 +88,13 @@ void shutdown() { @Test void export_ReturnCode() { - assertThat(exporter.export(Collections.singletonList(makeBasicSpan()))) - .isEqualTo(ResultCode.SUCCESS); + assertThat(exporter.export(Collections.singletonList(makeBasicSpan())).isSuccess()).isTrue(); exporter.shutdown(); // After shutdown no more export. - assertThat(exporter.export(Collections.singletonList(makeBasicSpan()))) - .isEqualTo(ResultCode.FAILURE); + assertThat(exporter.export(Collections.singletonList(makeBasicSpan())).isSuccess()).isFalse(); exporter.reset(); // Reset does not do anything if already shutdown. - assertThat(exporter.export(Collections.singletonList(makeBasicSpan()))) - .isEqualTo(ResultCode.FAILURE); + assertThat(exporter.export(Collections.singletonList(makeBasicSpan())).isSuccess()).isFalse(); } static SpanData makeBasicSpan() { diff --git a/exporters/jaeger/src/main/java/io/opentelemetry/exporters/jaeger/JaegerGrpcSpanExporter.java b/exporters/jaeger/src/main/java/io/opentelemetry/exporters/jaeger/JaegerGrpcSpanExporter.java index 059efc77569..6f941123afe 100644 --- a/exporters/jaeger/src/main/java/io/opentelemetry/exporters/jaeger/JaegerGrpcSpanExporter.java +++ b/exporters/jaeger/src/main/java/io/opentelemetry/exporters/jaeger/JaegerGrpcSpanExporter.java @@ -21,6 +21,7 @@ import io.opentelemetry.exporters.jaeger.proto.api_v2.Collector; import io.opentelemetry.exporters.jaeger.proto.api_v2.CollectorServiceGrpc; import io.opentelemetry.exporters.jaeger.proto.api_v2.Model; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; @@ -107,7 +108,7 @@ private JaegerGrpcSpanExporter(String serviceName, ManagedChannel channel, long * @return the result of the operation */ @Override - public ResultCode export(Collection spans) { + public CompletableResultCode export(Collection spans) { Collector.PostSpansRequest request = Collector.PostSpansRequest.newBuilder() .setBatch( @@ -126,10 +127,10 @@ public ResultCode export(Collection spans) { // for now, there's nothing to check in the response object //noinspection ResultOfMethodCallIgnored stub.postSpans(request); - return ResultCode.SUCCESS; + return CompletableResultCode.ofSuccess(); } catch (Throwable e) { logger.log(Level.WARNING, "Failed to export spans", e); - return ResultCode.FAILURE; + return CompletableResultCode.ofFailure(); } } @@ -139,8 +140,8 @@ public ResultCode export(Collection spans) { * @return always Success */ @Override - public ResultCode flush() { - return ResultCode.SUCCESS; + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); } /** diff --git a/exporters/logging/src/main/java/io/opentelemetry/exporters/logging/LoggingMetricExporter.java b/exporters/logging/src/main/java/io/opentelemetry/exporters/logging/LoggingMetricExporter.java index 265179676cb..6a51939b8e2 100644 --- a/exporters/logging/src/main/java/io/opentelemetry/exporters/logging/LoggingMetricExporter.java +++ b/exporters/logging/src/main/java/io/opentelemetry/exporters/logging/LoggingMetricExporter.java @@ -16,6 +16,7 @@ package io.opentelemetry.exporters.logging; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.MetricExporter; import java.util.Collection; @@ -27,12 +28,12 @@ public class LoggingMetricExporter implements MetricExporter { private static final Logger logger = Logger.getLogger(LoggingMetricExporter.class.getName()); @Override - public ResultCode export(Collection metrics) { + public CompletableResultCode export(Collection metrics) { logger.info("Received a collection of " + metrics.size() + " metrics for export."); for (MetricData metricData : metrics) { logger.log(Level.INFO, "metric: {0}", metricData); } - return ResultCode.SUCCESS; + return CompletableResultCode.ofSuccess(); } /** @@ -41,16 +42,16 @@ public ResultCode export(Collection metrics) { * @return the result of the operation */ @Override - public ResultCode flush() { - ResultCode resultCode = ResultCode.SUCCESS; + public CompletableResultCode flush() { + CompletableResultCode resultCode = new CompletableResultCode(); for (Handler handler : logger.getHandlers()) { try { handler.flush(); } catch (Throwable t) { - resultCode = ResultCode.FAILURE; + resultCode.fail(); } } - return resultCode; + return resultCode.succeed(); } @Override diff --git a/exporters/logging/src/main/java/io/opentelemetry/exporters/logging/LoggingSpanExporter.java b/exporters/logging/src/main/java/io/opentelemetry/exporters/logging/LoggingSpanExporter.java index 2923ae7dcd7..77dbf9b2d79 100644 --- a/exporters/logging/src/main/java/io/opentelemetry/exporters/logging/LoggingSpanExporter.java +++ b/exporters/logging/src/main/java/io/opentelemetry/exporters/logging/LoggingSpanExporter.java @@ -16,6 +16,7 @@ package io.opentelemetry.exporters.logging; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; import java.util.Collection; @@ -28,11 +29,11 @@ public class LoggingSpanExporter implements SpanExporter { private static final Logger logger = Logger.getLogger(LoggingSpanExporter.class.getName()); @Override - public ResultCode export(Collection spans) { + public CompletableResultCode export(Collection spans) { for (SpanData span : spans) { logger.log(Level.INFO, "span: {0}", span); } - return ResultCode.SUCCESS; + return CompletableResultCode.ofSuccess(); } /** @@ -41,16 +42,16 @@ public ResultCode export(Collection spans) { * @return the result of the operation */ @Override - public ResultCode flush() { - ResultCode resultCode = ResultCode.SUCCESS; + public CompletableResultCode flush() { + CompletableResultCode resultCode = new CompletableResultCode(); for (Handler handler : logger.getHandlers()) { try { handler.flush(); } catch (Throwable t) { - resultCode = ResultCode.FAILURE; + resultCode.fail(); } } - return resultCode; + return resultCode.succeed(); } @Override diff --git a/exporters/logging/src/test/java/io/opentelemetry/exporters/logging/LoggingSpanExporterTest.java b/exporters/logging/src/test/java/io/opentelemetry/exporters/logging/LoggingSpanExporterTest.java index a55e5bc8d2b..527f12b194b 100644 --- a/exporters/logging/src/test/java/io/opentelemetry/exporters/logging/LoggingSpanExporterTest.java +++ b/exporters/logging/src/test/java/io/opentelemetry/exporters/logging/LoggingSpanExporterTest.java @@ -18,14 +18,13 @@ import static com.google.common.truth.Truth.assertThat; import static java.util.Collections.singletonList; -import static org.junit.jupiter.api.Assertions.assertEquals; import io.opentelemetry.common.AttributeValue; import io.opentelemetry.common.Attributes; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.trace.data.EventImpl; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.test.TestSpanData; -import io.opentelemetry.sdk.trace.export.SpanExporter.ResultCode; import io.opentelemetry.trace.Span.Kind; import io.opentelemetry.trace.SpanId; import io.opentelemetry.trace.Status; @@ -77,8 +76,8 @@ void returnCode() { .setTotalRecordedEvents(1) .setTotalRecordedLinks(0) .build(); - ResultCode resultCode = exporter.export(singletonList(spanData)); - assertEquals(ResultCode.SUCCESS, resultCode); + CompletableResultCode resultCode = exporter.export(singletonList(spanData)); + assertThat(resultCode.isSuccess()).isTrue(); } @Test diff --git a/exporters/otlp/src/main/java/io/opentelemetry/exporters/otlp/OtlpGrpcMetricExporter.java b/exporters/otlp/src/main/java/io/opentelemetry/exporters/otlp/OtlpGrpcMetricExporter.java index 2c9e9bb1cc9..ca42f2f358b 100644 --- a/exporters/otlp/src/main/java/io/opentelemetry/exporters/otlp/OtlpGrpcMetricExporter.java +++ b/exporters/otlp/src/main/java/io/opentelemetry/exporters/otlp/OtlpGrpcMetricExporter.java @@ -25,6 +25,7 @@ import io.grpc.stub.MetadataUtils; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.MetricExporter; @@ -89,7 +90,7 @@ private OtlpGrpcMetricExporter(ManagedChannel channel, long deadlineMs) { * @return the result of the operation */ @Override - public ResultCode export(Collection metrics) { + public CompletableResultCode export(Collection metrics) { ExportMetricsServiceRequest exportMetricsServiceRequest = ExportMetricsServiceRequest.newBuilder() .addAllResourceMetrics(MetricAdapter.toProtoResourceMetrics(metrics)) @@ -104,10 +105,10 @@ public ResultCode export(Collection metrics) { // for now, there's nothing to check in the response object // noinspection ResultOfMethodCallIgnored stub.export(exportMetricsServiceRequest); - return ResultCode.SUCCESS; + return CompletableResultCode.ofSuccess(); } catch (Throwable e) { logger.log(Level.WARNING, "Failed to export metrics", e); - return ResultCode.FAILURE; + return CompletableResultCode.ofFailure(); } } @@ -117,8 +118,8 @@ public ResultCode export(Collection metrics) { * @return always Success */ @Override - public ResultCode flush() { - return ResultCode.SUCCESS; + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); } /** diff --git a/exporters/otlp/src/main/java/io/opentelemetry/exporters/otlp/OtlpGrpcSpanExporter.java b/exporters/otlp/src/main/java/io/opentelemetry/exporters/otlp/OtlpGrpcSpanExporter.java index 4633c75e1ec..25edadfaccd 100644 --- a/exporters/otlp/src/main/java/io/opentelemetry/exporters/otlp/OtlpGrpcSpanExporter.java +++ b/exporters/otlp/src/main/java/io/opentelemetry/exporters/otlp/OtlpGrpcSpanExporter.java @@ -25,6 +25,7 @@ import io.grpc.stub.MetadataUtils; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; @@ -97,7 +98,7 @@ private OtlpGrpcSpanExporter(ManagedChannel channel, long deadlineMs) { * @return the result of the operation */ @Override - public ResultCode export(Collection spans) { + public CompletableResultCode export(Collection spans) { ExportTraceServiceRequest exportTraceServiceRequest = ExportTraceServiceRequest.newBuilder() .addAllResourceSpans(SpanAdapter.toProtoResourceSpans(spans)) @@ -112,10 +113,10 @@ public ResultCode export(Collection spans) { // for now, there's nothing to check in the response object // noinspection ResultOfMethodCallIgnored stub.export(exportTraceServiceRequest); - return ResultCode.SUCCESS; + return CompletableResultCode.ofSuccess(); } catch (Throwable e) { logger.log(Level.WARNING, "Failed to export spans", e); - return ResultCode.FAILURE; + return CompletableResultCode.ofFailure(); } } @@ -125,8 +126,8 @@ public ResultCode export(Collection spans) { * @return always Success */ @Override - public ResultCode flush() { - return ResultCode.SUCCESS; + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); } /** diff --git a/exporters/otlp/src/test/java/io/opentelemetry/exporters/otlp/OtlpGrpcMetricExporterTest.java b/exporters/otlp/src/test/java/io/opentelemetry/exporters/otlp/OtlpGrpcMetricExporterTest.java index 8bbcf3f9b4e..e375d917c85 100644 --- a/exporters/otlp/src/test/java/io/opentelemetry/exporters/otlp/OtlpGrpcMetricExporterTest.java +++ b/exporters/otlp/src/test/java/io/opentelemetry/exporters/otlp/OtlpGrpcMetricExporterTest.java @@ -35,7 +35,6 @@ import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.MetricData.Descriptor; import io.opentelemetry.sdk.metrics.data.MetricData.LongPoint; -import io.opentelemetry.sdk.metrics.export.MetricExporter.ResultCode; import io.opentelemetry.sdk.resources.Resource; import java.io.IOException; import java.util.ArrayList; @@ -91,7 +90,7 @@ public void testExport() { OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.newBuilder().setChannel(inProcessChannel).build(); try { - assertThat(exporter.export(Collections.singletonList(span))).isEqualTo(ResultCode.SUCCESS); + assertThat(exporter.export(Collections.singletonList(span)).isSuccess()).isTrue(); assertThat(fakeCollector.getReceivedMetrics()) .isEqualTo(MetricAdapter.toProtoResourceMetrics(Collections.singletonList(span))); } finally { @@ -108,7 +107,7 @@ public void testExport_MultipleMetrics() { OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.newBuilder().setChannel(inProcessChannel).build(); try { - assertThat(exporter.export(spans)).isEqualTo(ResultCode.SUCCESS); + assertThat(exporter.export(spans).isSuccess()).isTrue(); assertThat(fakeCollector.getReceivedMetrics()) .isEqualTo(MetricAdapter.toProtoResourceMetrics(spans)); } finally { @@ -122,7 +121,7 @@ public void testExport_AfterShutdown() { OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.newBuilder().setChannel(inProcessChannel).build(); exporter.shutdown(); - assertThat(exporter.export(Collections.singletonList(span))).isEqualTo(ResultCode.FAILURE); + assertThat(exporter.export(Collections.singletonList(span)).isSuccess()).isFalse(); } @Test @@ -131,8 +130,8 @@ public void testExport_Cancelled() { OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.newBuilder().setChannel(inProcessChannel).build(); try { - assertThat(exporter.export(Collections.singletonList(generateFakeMetric()))) - .isEqualTo(ResultCode.FAILURE); + assertThat(exporter.export(Collections.singletonList(generateFakeMetric())).isSuccess()) + .isFalse(); } finally { exporter.shutdown(); } @@ -144,8 +143,8 @@ public void testExport_DeadlineExceeded() { OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.newBuilder().setChannel(inProcessChannel).build(); try { - assertThat(exporter.export(Collections.singletonList(generateFakeMetric()))) - .isEqualTo(ResultCode.FAILURE); + assertThat(exporter.export(Collections.singletonList(generateFakeMetric())).isSuccess()) + .isFalse(); } finally { exporter.shutdown(); } @@ -157,8 +156,8 @@ public void testExport_ResourceExhausted() { OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.newBuilder().setChannel(inProcessChannel).build(); try { - assertThat(exporter.export(Collections.singletonList(generateFakeMetric()))) - .isEqualTo(ResultCode.FAILURE); + assertThat(exporter.export(Collections.singletonList(generateFakeMetric())).isSuccess()) + .isFalse(); } finally { exporter.shutdown(); } @@ -170,8 +169,8 @@ public void testExport_OutOfRange() { OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.newBuilder().setChannel(inProcessChannel).build(); try { - assertThat(exporter.export(Collections.singletonList(generateFakeMetric()))) - .isEqualTo(ResultCode.FAILURE); + assertThat(exporter.export(Collections.singletonList(generateFakeMetric())).isSuccess()) + .isFalse(); } finally { exporter.shutdown(); } @@ -183,8 +182,8 @@ public void testExport_Unavailable() { OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.newBuilder().setChannel(inProcessChannel).build(); try { - assertThat(exporter.export(Collections.singletonList(generateFakeMetric()))) - .isEqualTo(ResultCode.FAILURE); + assertThat(exporter.export(Collections.singletonList(generateFakeMetric())).isSuccess()) + .isFalse(); } finally { exporter.shutdown(); } @@ -196,8 +195,8 @@ public void testExport_DataLoss() { OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.newBuilder().setChannel(inProcessChannel).build(); try { - assertThat(exporter.export(Collections.singletonList(generateFakeMetric()))) - .isEqualTo(ResultCode.FAILURE); + assertThat(exporter.export(Collections.singletonList(generateFakeMetric())).isSuccess()) + .isFalse(); } finally { exporter.shutdown(); } @@ -209,8 +208,8 @@ public void testExport_PermissionDenied() { OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.newBuilder().setChannel(inProcessChannel).build(); try { - assertThat(exporter.export(Collections.singletonList(generateFakeMetric()))) - .isEqualTo(ResultCode.FAILURE); + assertThat(exporter.export(Collections.singletonList(generateFakeMetric())).isSuccess()) + .isFalse(); } finally { exporter.shutdown(); } @@ -221,7 +220,7 @@ public void testExport_flush() { OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.newBuilder().setChannel(inProcessChannel).build(); try { - assertThat(exporter.flush()).isEqualTo(ResultCode.SUCCESS); + assertThat(exporter.flush().isSuccess()).isTrue(); } finally { exporter.shutdown(); } diff --git a/exporters/otlp/src/test/java/io/opentelemetry/exporters/otlp/OtlpGrpcSpanExporterTest.java b/exporters/otlp/src/test/java/io/opentelemetry/exporters/otlp/OtlpGrpcSpanExporterTest.java index db91bea8f22..f169aa021b8 100644 --- a/exporters/otlp/src/test/java/io/opentelemetry/exporters/otlp/OtlpGrpcSpanExporterTest.java +++ b/exporters/otlp/src/test/java/io/opentelemetry/exporters/otlp/OtlpGrpcSpanExporterTest.java @@ -31,7 +31,6 @@ import io.opentelemetry.proto.trace.v1.ResourceSpans; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.test.TestSpanData; -import io.opentelemetry.sdk.trace.export.SpanExporter.ResultCode; import io.opentelemetry.trace.Span.Kind; import io.opentelemetry.trace.SpanId; import io.opentelemetry.trace.Status; @@ -93,7 +92,7 @@ public void testExport() { OtlpGrpcSpanExporter exporter = OtlpGrpcSpanExporter.newBuilder().setChannel(inProcessChannel).build(); try { - assertThat(exporter.export(Collections.singletonList(span))).isEqualTo(ResultCode.SUCCESS); + assertThat(exporter.export(Collections.singletonList(span)).isSuccess()).isTrue(); assertThat(fakeCollector.getReceivedSpans()) .isEqualTo(SpanAdapter.toProtoResourceSpans(Collections.singletonList(span))); } finally { @@ -110,7 +109,7 @@ public void testExport_MultipleSpans() { OtlpGrpcSpanExporter exporter = OtlpGrpcSpanExporter.newBuilder().setChannel(inProcessChannel).build(); try { - assertThat(exporter.export(spans)).isEqualTo(ResultCode.SUCCESS); + assertThat(exporter.export(spans).isSuccess()).isTrue(); assertThat(fakeCollector.getReceivedSpans()) .isEqualTo(SpanAdapter.toProtoResourceSpans(spans)); } finally { @@ -125,7 +124,7 @@ public void testExport_AfterShutdown() { OtlpGrpcSpanExporter.newBuilder().setChannel(inProcessChannel).build(); exporter.shutdown(); // TODO: This probably should not be retryable because we never restart the channel. - assertThat(exporter.export(Collections.singletonList(span))).isEqualTo(ResultCode.FAILURE); + assertThat(exporter.export(Collections.singletonList(span)).isSuccess()).isFalse(); } @Test @@ -134,8 +133,8 @@ public void testExport_Cancelled() { OtlpGrpcSpanExporter exporter = OtlpGrpcSpanExporter.newBuilder().setChannel(inProcessChannel).build(); try { - assertThat(exporter.export(Collections.singletonList(generateFakeSpan()))) - .isEqualTo(ResultCode.FAILURE); + assertThat(exporter.export(Collections.singletonList(generateFakeSpan())).isSuccess()) + .isFalse(); } finally { exporter.shutdown(); } @@ -147,8 +146,8 @@ public void testExport_DeadlineExceeded() { OtlpGrpcSpanExporter exporter = OtlpGrpcSpanExporter.newBuilder().setChannel(inProcessChannel).build(); try { - assertThat(exporter.export(Collections.singletonList(generateFakeSpan()))) - .isEqualTo(ResultCode.FAILURE); + assertThat(exporter.export(Collections.singletonList(generateFakeSpan())).isSuccess()) + .isFalse(); } finally { exporter.shutdown(); } @@ -160,8 +159,8 @@ public void testExport_ResourceExhausted() { OtlpGrpcSpanExporter exporter = OtlpGrpcSpanExporter.newBuilder().setChannel(inProcessChannel).build(); try { - assertThat(exporter.export(Collections.singletonList(generateFakeSpan()))) - .isEqualTo(ResultCode.FAILURE); + assertThat(exporter.export(Collections.singletonList(generateFakeSpan())).isSuccess()) + .isFalse(); } finally { exporter.shutdown(); } @@ -173,8 +172,8 @@ public void testExport_OutOfRange() { OtlpGrpcSpanExporter exporter = OtlpGrpcSpanExporter.newBuilder().setChannel(inProcessChannel).build(); try { - assertThat(exporter.export(Collections.singletonList(generateFakeSpan()))) - .isEqualTo(ResultCode.FAILURE); + assertThat(exporter.export(Collections.singletonList(generateFakeSpan())).isSuccess()) + .isFalse(); } finally { exporter.shutdown(); } @@ -186,8 +185,8 @@ public void testExport_Unavailable() { OtlpGrpcSpanExporter exporter = OtlpGrpcSpanExporter.newBuilder().setChannel(inProcessChannel).build(); try { - assertThat(exporter.export(Collections.singletonList(generateFakeSpan()))) - .isEqualTo(ResultCode.FAILURE); + assertThat(exporter.export(Collections.singletonList(generateFakeSpan())).isSuccess()) + .isFalse(); } finally { exporter.shutdown(); } @@ -199,8 +198,8 @@ public void testExport_DataLoss() { OtlpGrpcSpanExporter exporter = OtlpGrpcSpanExporter.newBuilder().setChannel(inProcessChannel).build(); try { - assertThat(exporter.export(Collections.singletonList(generateFakeSpan()))) - .isEqualTo(ResultCode.FAILURE); + assertThat(exporter.export(Collections.singletonList(generateFakeSpan())).isSuccess()) + .isFalse(); } finally { exporter.shutdown(); } @@ -212,8 +211,8 @@ public void testExport_PermissionDenied() { OtlpGrpcSpanExporter exporter = OtlpGrpcSpanExporter.newBuilder().setChannel(inProcessChannel).build(); try { - assertThat(exporter.export(Collections.singletonList(generateFakeSpan()))) - .isEqualTo(ResultCode.FAILURE); + assertThat(exporter.export(Collections.singletonList(generateFakeSpan())).isSuccess()) + .isFalse(); } finally { exporter.shutdown(); } diff --git a/exporters/zipkin/src/main/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporter.java b/exporters/zipkin/src/main/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporter.java index 49893c7f61c..927639a9d5f 100644 --- a/exporters/zipkin/src/main/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporter.java +++ b/exporters/zipkin/src/main/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporter.java @@ -21,6 +21,7 @@ import io.opentelemetry.common.AttributeValue; import io.opentelemetry.common.ReadableAttributes; import io.opentelemetry.common.ReadableKeyValuePairs.KeyValueConsumer; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; import io.opentelemetry.sdk.resources.ResourceConstants; import io.opentelemetry.sdk.trace.data.SpanData; @@ -240,7 +241,7 @@ private static String commaSeparated(List values) { } @Override - public ResultCode export(final Collection spanDataList) { + public CompletableResultCode export(final Collection spanDataList) { List encodedSpans = new ArrayList<>(spanDataList.size()); for (SpanData spanData : spanDataList) { encodedSpans.add(encoder.encode(generateSpan(spanData, localEndpoint))); @@ -249,15 +250,15 @@ public ResultCode export(final Collection spanDataList) { sender.sendSpans(encodedSpans).execute(); } catch (Exception e) { logger.log(Level.WARNING, "Failed to export spans", e); - return ResultCode.FAILURE; + return CompletableResultCode.ofFailure(); } - return ResultCode.SUCCESS; + return CompletableResultCode.ofSuccess(); } @Override - public ResultCode flush() { + public CompletableResultCode flush() { // nothing required here - return ResultCode.SUCCESS; + return CompletableResultCode.ofSuccess(); } @Override diff --git a/exporters/zipkin/src/test/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporterEndToEndHttpTest.java b/exporters/zipkin/src/test/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporterEndToEndHttpTest.java index a252e9c1be9..56f524c571c 100644 --- a/exporters/zipkin/src/test/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporterEndToEndHttpTest.java +++ b/exporters/zipkin/src/test/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporterEndToEndHttpTest.java @@ -20,11 +20,11 @@ import com.google.common.collect.ImmutableList; import io.opentelemetry.common.Attributes; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.trace.data.EventImpl; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.SpanData.Event; import io.opentelemetry.sdk.trace.data.test.TestSpanData; -import io.opentelemetry.sdk.trace.export.SpanExporter; import io.opentelemetry.trace.Span.Kind; import io.opentelemetry.trace.SpanId; import io.opentelemetry.trace.Status; @@ -115,9 +115,9 @@ public void testExportFailedAsWrongEncoderUsed() { zipkin.httpUrl() + ENDPOINT_V2_SPANS, Encoding.JSON, SpanBytesEncoder.PROTO3); SpanData spanData = buildStandardSpan().build(); - SpanExporter.ResultCode resultCode = zipkinSpanExporter.export(Collections.singleton(spanData)); + CompletableResultCode resultCode = zipkinSpanExporter.export(Collections.singleton(spanData)); - assertThat(resultCode).isEqualTo(SpanExporter.ResultCode.FAILURE); + assertThat(resultCode.isSuccess()).isFalse(); List zipkinSpans = zipkin.getTrace(TRACE_ID); assertThat(zipkinSpans).isNotNull(); assertThat(zipkinSpans).isEmpty(); @@ -139,9 +139,9 @@ private static ZipkinSpanExporter buildZipkinExporter( private void exportAndVerify(ZipkinSpanExporter zipkinSpanExporter) { SpanData spanData = buildStandardSpan().build(); - SpanExporter.ResultCode resultCode = zipkinSpanExporter.export(Collections.singleton(spanData)); + CompletableResultCode resultCode = zipkinSpanExporter.export(Collections.singleton(spanData)); - assertThat(resultCode).isEqualTo(SpanExporter.ResultCode.SUCCESS); + assertThat(resultCode.isSuccess()).isTrue(); List zipkinSpans = zipkin.getTrace(TRACE_ID); assertThat(zipkinSpans).isNotNull(); diff --git a/exporters/zipkin/src/test/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporterTest.java b/exporters/zipkin/src/test/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporterTest.java index c144e467a0e..7974c90a86f 100644 --- a/exporters/zipkin/src/test/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporterTest.java +++ b/exporters/zipkin/src/test/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporterTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList; import io.opentelemetry.common.AttributeValue; import io.opentelemetry.common.Attributes; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.resources.ResourceConstants; @@ -31,7 +32,6 @@ import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.SpanData.Event; import io.opentelemetry.sdk.trace.data.test.TestSpanData; -import io.opentelemetry.sdk.trace.export.SpanExporter.ResultCode; import io.opentelemetry.trace.Span.Kind; import io.opentelemetry.trace.SpanId; import io.opentelemetry.trace.Status; @@ -225,11 +225,11 @@ void testExport() throws IOException { byte[] someBytes = new byte[0]; when(mockEncoder.encode(buildZipkinSpan(Span.Kind.SERVER))).thenReturn(someBytes); when(mockSender.sendSpans(Collections.singletonList(someBytes))).thenReturn(mockZipkinCall); - ResultCode resultCode = + CompletableResultCode resultCode = zipkinSpanExporter.export(Collections.singleton(buildStandardSpan().build())); verify(mockZipkinCall).execute(); - assertThat(resultCode).isEqualTo(ResultCode.SUCCESS); + assertThat(resultCode.isSuccess()).isTrue(); } @Test @@ -242,10 +242,10 @@ void testExport_failed() throws IOException { when(mockSender.sendSpans(Collections.singletonList(someBytes))).thenReturn(mockZipkinCall); when(mockZipkinCall.execute()).thenThrow(new IOException()); - ResultCode resultCode = + CompletableResultCode resultCode = zipkinSpanExporter.export(Collections.singleton(buildStandardSpan().build())); - assertThat(resultCode).isEqualTo(ResultCode.FAILURE); + assertThat(resultCode.isSuccess()).isFalse(); } @Test diff --git a/sdk/src/jmh/java/io/opentelemetry/sdk/trace/SpanPipelineBenchmark.java b/sdk/src/jmh/java/io/opentelemetry/sdk/trace/SpanPipelineBenchmark.java index 83222192e14..df8236197d1 100644 --- a/sdk/src/jmh/java/io/opentelemetry/sdk/trace/SpanPipelineBenchmark.java +++ b/sdk/src/jmh/java/io/opentelemetry/sdk/trace/SpanPipelineBenchmark.java @@ -21,6 +21,7 @@ import io.opentelemetry.common.Attributes; import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import io.opentelemetry.sdk.trace.export.SpanExporter; @@ -87,13 +88,13 @@ private void doWork() { private static class NoOpSpanExporter implements SpanExporter { @Override - public ResultCode export(Collection spans) { - return ResultCode.SUCCESS; + public CompletableResultCode export(Collection spans) { + return CompletableResultCode.ofSuccess(); } @Override - public ResultCode flush() { - return ResultCode.SUCCESS; + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); } @Override diff --git a/sdk/src/jmh/java/io/opentelemetry/sdk/trace/export/MultiSpanExporterBenchmark.java b/sdk/src/jmh/java/io/opentelemetry/sdk/trace/export/MultiSpanExporterBenchmark.java index 72063cd6467..03742bc5224 100644 --- a/sdk/src/jmh/java/io/opentelemetry/sdk/trace/export/MultiSpanExporterBenchmark.java +++ b/sdk/src/jmh/java/io/opentelemetry/sdk/trace/export/MultiSpanExporterBenchmark.java @@ -16,9 +16,9 @@ package io.opentelemetry.sdk.trace.export; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.test.TestSpanData; -import io.opentelemetry.sdk.trace.export.SpanExporter.ResultCode; import io.opentelemetry.trace.Span; import io.opentelemetry.trace.SpanId; import io.opentelemetry.trace.Status; @@ -44,13 +44,13 @@ public class MultiSpanExporterBenchmark { private static class NoopSpanExporter implements SpanExporter { @Override - public ResultCode export(Collection spans) { - return ResultCode.SUCCESS; + public CompletableResultCode export(Collection spans) { + return CompletableResultCode.ofSuccess(); } @Override - public ResultCode flush() { - return ResultCode.SUCCESS; + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); } @Override @@ -95,7 +95,7 @@ public final void setup() { @Warmup(iterations = 5, time = 1) @Measurement(iterations = 10, time = 1) @OutputTimeUnit(TimeUnit.MILLISECONDS) - public ResultCode export() { + public CompletableResultCode export() { return exporter.export(spans); } } diff --git a/sdk/src/main/java/io/opentelemetry/sdk/common/export/CompletableResultCode.java b/sdk/src/main/java/io/opentelemetry/sdk/common/export/CompletableResultCode.java new file mode 100644 index 00000000000..e591533198a --- /dev/null +++ b/sdk/src/main/java/io/opentelemetry/sdk/common/export/CompletableResultCode.java @@ -0,0 +1,95 @@ +package io.opentelemetry.sdk.common.export; + +import java.util.ArrayList; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +/** + * The implementation of Export operations are often asynchronous in nature, hence the need to + * convey a result at a later time. CompletableResultCode facilitates this. + * + *

This class models JDK 8's CompletableFuture to afford migration should Open Telemetry's SDK + * select JDK 8 or greater as a baseline, and also to offer familiarity to developers. + */ +public class CompletableResultCode { + /** A convenience for declaring success */ + public static CompletableResultCode ofSuccess() { + return SUCCESS; + } + + /** A convenience for declaring failure */ + public static CompletableResultCode ofFailure() { + return FAILURE; + } + + private static final CompletableResultCode SUCCESS = new CompletableResultCode().succeed(); + private static final CompletableResultCode FAILURE = new CompletableResultCode().fail(); + + public CompletableResultCode() {} + + @Nullable + @GuardedBy("lock") + private Boolean succeeded = null; + + @GuardedBy("lock") + private final ArrayList actions = new ArrayList<>(); + + private final Object lock = new Object(); + + /** The export operation finished successfully. */ + public CompletableResultCode succeed() { + synchronized (lock) { + if (succeeded == null) { + succeeded = true; + for (Runnable action : actions) { + action.run(); + } + } + } + return this; + } + + /** The export operation finished with an error. */ + public CompletableResultCode fail() { + synchronized (lock) { + if (succeeded == null) { + succeeded = false; + for (Runnable action : actions) { + action.run(); + } + } + } + return this; + } + + /** + * Obtain the current state of completion. Generally call once completion is achieved via the + * thenRun method. + * + * @return the current state of completion + */ + public boolean isSuccess() { + synchronized (lock) { + return succeeded != null && succeeded; + } + } + + /** + * Perform an action on completion. Actions are guaranteed to be called only once. + * + *

There should only be one action for this class instance. + * + * @param action the action to perform + * @return this completable result so that it may be further composed + */ + public CompletableResultCode thenRun(Runnable action) { + synchronized (lock) { + if (succeeded != null) { + action.run(); + } else { + this.actions.add(action); + } + } + return this; + } +} diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/export/IntervalMetricReader.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/export/IntervalMetricReader.java index 6a5ff423216..dc621edef21 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/metrics/export/IntervalMetricReader.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/export/IntervalMetricReader.java @@ -19,6 +19,7 @@ import com.google.auto.value.AutoValue; import io.opentelemetry.internal.Utils; import io.opentelemetry.sdk.common.DaemonThreadFactory; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; import io.opentelemetry.sdk.metrics.data.MetricData; import java.util.ArrayList; @@ -29,6 +30,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.concurrent.Immutable; @@ -199,21 +201,38 @@ private IntervalMetricReader(InternalState internalState) { private static final class Exporter implements Runnable { private final InternalState internalState; + private final AtomicBoolean exportAvailable = new AtomicBoolean(true); private Exporter(InternalState internalState) { this.internalState = internalState; } @Override + @SuppressWarnings("BooleanParameter") public void run() { - try { - List metricsList = new ArrayList<>(); - for (MetricProducer metricProducer : internalState.getMetricProducers()) { - metricsList.addAll(metricProducer.collectAllMetrics()); + if (exportAvailable.compareAndSet(true, false)) { + try { + List metricsList = new ArrayList<>(); + for (MetricProducer metricProducer : internalState.getMetricProducers()) { + metricsList.addAll(metricProducer.collectAllMetrics()); + } + final CompletableResultCode result = + internalState.getMetricExporter().export(Collections.unmodifiableList(metricsList)); + result.thenRun( + new Runnable() { + @Override + public void run() { + if (!result.isSuccess()) { + logger.log(Level.FINE, "Exporter failed"); + } + exportAvailable.set(true); + } + }); + } catch (Exception e) { + logger.log(Level.WARNING, "Exporter threw an Exception", e); } - internalState.getMetricExporter().export(Collections.unmodifiableList(metricsList)); - } catch (Exception e) { - logger.log(Level.WARNING, "Metric Exporter threw an Exception", e); + } else { + logger.log(Level.FINE, "Exporter busy. Dropping metrics."); } } diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExporter.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExporter.java index f04d80a8bcb..2759f7bc1f0 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExporter.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExporter.java @@ -16,6 +16,7 @@ package io.opentelemetry.sdk.metrics.export; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.metrics.data.MetricData; import java.util.Collection; @@ -29,36 +30,22 @@ */ public interface MetricExporter { - /** - * The possible results for the export method. - * - * @since 0.1.0 - */ - // TODO: extract this enum and unify it with SpanExporter.ResultCode - enum ResultCode { - /** The export operation finished successfully. */ - SUCCESS, - - /** The export operation finished with an error. */ - FAILURE - } - /** * Exports the collection of given {@link MetricData}. * * @param metrics the collection of {@link MetricData} to be exported. - * @return the result of the export. + * @return the result of the export, which is often an asynchronous operation. * @since 0.1.0 */ - ResultCode export(Collection metrics); + CompletableResultCode export(Collection metrics); /** * Exports the collection of {@link MetricData} that have not yet been exported. * - * @return the result of the flush. + * @return the result of the flush, which is often an asynchronous operation. * @since 0.4.0 */ - ResultCode flush(); + CompletableResultCode flush(); /** Called when the associated IntervalMetricReader is shutdown. */ void shutdown(); diff --git a/sdk/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index 246ea98c15a..a6458558f07 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -24,6 +24,7 @@ import io.opentelemetry.metrics.LongCounter.BoundLongCounter; import io.opentelemetry.metrics.Meter; import io.opentelemetry.sdk.common.DaemonThreadFactory; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; @@ -32,12 +33,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.concurrent.GuardedBy; @@ -85,8 +81,6 @@ public final class BatchSpanProcessor implements SpanProcessor { private static final String WORKER_THREAD_NAME = BatchSpanProcessor.class.getSimpleName() + "_WorkerThread"; - private static final String EXPORTER_THREAD_NAME = - BatchSpanProcessor.class.getSimpleName() + "_ExporterThread"; private final Worker worker; private final Thread workerThread; private final boolean sampled; @@ -96,15 +90,8 @@ private BatchSpanProcessor( boolean sampled, long scheduleDelayMillis, int maxQueueSize, - int maxExportBatchSize, - int exporterTimeoutMillis) { - this.worker = - new Worker( - spanExporter, - scheduleDelayMillis, - maxQueueSize, - maxExportBatchSize, - exporterTimeoutMillis); + int maxExportBatchSize) { + this.worker = new Worker(spanExporter, scheduleDelayMillis, maxQueueSize, maxExportBatchSize); this.workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); this.workerThread.start(); this.sampled = sampled; @@ -165,9 +152,6 @@ private static final class Worker implements Runnable { private static final BoundLongCounter droppedSpans; - private final ExecutorService executorService = - Executors.newSingleThreadExecutor(new DaemonThreadFactory(EXPORTER_THREAD_NAME)); - private static final Logger logger = Logger.getLogger(Worker.class.getName()); private final SpanExporter spanExporter; private final long scheduleDelayMillis; @@ -175,7 +159,7 @@ private static final class Worker implements Runnable { private final int maxExportBatchSize; private final int halfMaxQueueSize; private final Object monitor = new Object(); - private final int exporterTimeoutMillis; + private final AtomicBoolean exportAvailable = new AtomicBoolean(true); @GuardedBy("monitor") private final List spansList; @@ -184,15 +168,13 @@ private Worker( SpanExporter spanExporter, long scheduleDelayMillis, int maxQueueSize, - int maxExportBatchSize, - int exporterTimeoutMillis) { + int maxExportBatchSize) { this.spanExporter = spanExporter; this.scheduleDelayMillis = scheduleDelayMillis; this.maxQueueSize = maxQueueSize; this.halfMaxQueueSize = maxQueueSize >> 1; this.maxExportBatchSize = maxExportBatchSize; this.spansList = new ArrayList<>(maxQueueSize); - this.exporterTimeoutMillis = exporterTimeoutMillis; } private void addSpan(ReadableSpan span) { @@ -242,7 +224,6 @@ public void run() { private void shutdown() { forceFlush(); - executorService.shutdown(); spanExporter.shutdown(); } @@ -277,28 +258,26 @@ private static List createSpanDataForExport( } // Exports the list of SpanData to the SpanExporter. + @SuppressWarnings("BooleanParameter") private void onBatchExport(final List spans) { - Future submission = - executorService.submit( + if (exportAvailable.compareAndSet(true, false)) { + try { + final CompletableResultCode result = spanExporter.export(spans); + result.thenRun( new Runnable() { @Override public void run() { - // In case of any exception thrown by the service handlers catch and log. - try { - spanExporter.export(spans); - } catch (Throwable t) { - logger.log(Level.WARNING, "Exception thrown by the export.", t); + if (!result.isSuccess()) { + logger.log(Level.FINE, "Exporter failed"); } + exportAvailable.set(true); } }); - try { - // wait at most for the configured timeout. - submission.get(exporterTimeoutMillis, TimeUnit.MILLISECONDS); - } catch (InterruptedException | ExecutionException e) { - logger.log(Level.WARNING, "Exception thrown by the export.", e); - } catch (TimeoutException e) { - logger.log(Level.WARNING, "Export timed out. Cancelling execution.", e); - submission.cancel(true); + } catch (Exception e) { + logger.log(Level.WARNING, "Exporter threw an Exception", e); + } + } else { + logger.log(Level.FINE, "Exporter busy. Dropping spans."); } } } @@ -485,13 +464,12 @@ int getMaxExportBatchSize() { * @throws NullPointerException if the {@code spanExporter} is {@code null}. */ public BatchSpanProcessor build() { + /* + * Note that setting an export timeout has no effect - there's no sure way to cancel a + * thread of execution, even by asking an export to cancel any associated threads. + */ return new BatchSpanProcessor( - spanExporter, - exportOnlySampled, - scheduleDelayMillis, - maxQueueSize, - maxExportBatchSize, - exporterTimeoutMillis); + spanExporter, exportOnlySampled, scheduleDelayMillis, maxQueueSize, maxExportBatchSize); } } } diff --git a/sdk/src/main/java/io/opentelemetry/sdk/trace/export/MultiSpanExporter.java b/sdk/src/main/java/io/opentelemetry/sdk/trace/export/MultiSpanExporter.java index babe18e0753..4b19bd01429 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/trace/export/MultiSpanExporter.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/trace/export/MultiSpanExporter.java @@ -16,12 +16,11 @@ package io.opentelemetry.sdk.trace.export; -import static io.opentelemetry.sdk.trace.export.SpanExporter.ResultCode.FAILURE; -import static io.opentelemetry.sdk.trace.export.SpanExporter.ResultCode.SUCCESS; - +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; import java.util.Collection; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.logging.Level; import java.util.logging.Logger; @@ -47,18 +46,20 @@ public static SpanExporter create(List spanExporters) { } @Override - public ResultCode export(Collection spans) { - ResultCode currentResultCode = SUCCESS; + public CompletableResultCode export(Collection spans) { + final CompletableResultCode compositeResultCode = new CompletableResultCode(); + final CountDownLatch completionsToProcess = new CountDownLatch(spanExporters.length); for (SpanExporter spanExporter : spanExporters) { try { - currentResultCode = mergeResultCode(currentResultCode, spanExporter.export(spans)); - } catch (Throwable t) { + final CompletableResultCode singleResult = spanExporter.export(spans); + mergeResultCode(compositeResultCode, singleResult, completionsToProcess); + } catch (Exception e) { // If an exception was thrown by the exporter - logger.log(Level.WARNING, "Exception thrown by the export.", t); - currentResultCode = FAILURE; + logger.log(Level.WARNING, "Exception thrown by the export.", e); + compositeResultCode.fail(); } } - return currentResultCode; + return compositeResultCode; } /** @@ -67,18 +68,19 @@ public ResultCode export(Collection spans) { * @return the result of the operation */ @Override - public ResultCode flush() { - ResultCode currentResultCode = SUCCESS; + public CompletableResultCode flush() { + final CompletableResultCode compositeResultCode = new CompletableResultCode(); + final CountDownLatch completionsToProcess = new CountDownLatch(spanExporters.length); for (SpanExporter spanExporter : spanExporters) { try { - currentResultCode = mergeResultCode(currentResultCode, spanExporter.flush()); - } catch (Throwable t) { + mergeResultCode(compositeResultCode, spanExporter.flush(), completionsToProcess); + } catch (Exception e) { // If an exception was thrown by the exporter - logger.log(Level.WARNING, "Exception thrown by the export.", t); - currentResultCode = FAILURE; + logger.log(Level.WARNING, "Exception thrown by the flush.", e); + compositeResultCode.fail(); } } - return currentResultCode; + return compositeResultCode; } @Override @@ -88,15 +90,30 @@ public void shutdown() { } } - // Returns a merged error code, see the rules in the code. - private static ResultCode mergeResultCode( - ResultCode currentResultCode, ResultCode newResultCode) { - // If both errors are success then return success. - if (currentResultCode == SUCCESS && newResultCode == SUCCESS) { - return SUCCESS; - } - - return FAILURE; + private static void mergeResultCode( + final CompletableResultCode compositeResultCode, + final CompletableResultCode singleResultCode, + final CountDownLatch completionsToProcess) { + singleResultCode.thenRun( + new Runnable() { + @Override + public void run() { + compositeResultCode.thenRun( + new Runnable() { + @Override + public void run() { + completionsToProcess.countDown(); + if (singleResultCode.isSuccess()) { + if (completionsToProcess.getCount() == 0) { + compositeResultCode.succeed(); + } + } else { + compositeResultCode.fail(); + } + } + }); + } + }); } private MultiSpanExporter(List spanExporters) { diff --git a/sdk/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java b/sdk/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java index 58ea113e64d..9bfb10ab27e 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java @@ -17,6 +17,7 @@ package io.opentelemetry.sdk.trace.export; import com.google.common.annotations.VisibleForTesting; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; @@ -25,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; @@ -54,6 +56,7 @@ public final class SimpleSpanProcessor implements SpanProcessor { private final SpanExporter spanExporter; private final boolean sampled; + private final AtomicBoolean exportAvailable = new AtomicBoolean(true); private SimpleSpanProcessor(SpanExporter spanExporter, boolean sampled) { this.spanExporter = Objects.requireNonNull(spanExporter, "spanExporter"); @@ -71,15 +74,30 @@ public boolean isStartRequired() { } @Override + @SuppressWarnings("BooleanParameter") public void onEnd(ReadableSpan span) { - if (sampled && !span.getSpanContext().getTraceFlags().isSampled()) { - return; - } - try { - List spans = Collections.singletonList(span.toSpanData()); - spanExporter.export(spans); - } catch (Throwable e) { - logger.log(Level.WARNING, "Exception thrown by the export.", e); + if (exportAvailable.compareAndSet(true, false)) { + if (sampled && !span.getSpanContext().getTraceFlags().isSampled()) { + return; + } + try { + List spans = Collections.singletonList(span.toSpanData()); + final CompletableResultCode result = spanExporter.export(spans); + result.thenRun( + new Runnable() { + @Override + public void run() { + if (!result.isSuccess()) { + logger.log(Level.FINE, "Exporter failed"); + } + exportAvailable.set(true); + } + }); + } catch (Exception e) { + logger.log(Level.WARNING, "Exporter threw an Exception", e); + } + } else { + logger.log(Level.FINE, "Exporter busy. Dropping spans."); } } diff --git a/sdk/src/main/java/io/opentelemetry/sdk/trace/export/SpanExporter.java b/sdk/src/main/java/io/opentelemetry/sdk/trace/export/SpanExporter.java index 40bb6dbb343..3f65e92453b 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/trace/export/SpanExporter.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/trace/export/SpanExporter.java @@ -16,6 +16,7 @@ package io.opentelemetry.sdk.trace.export; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.trace.TracerSdkProvider; import io.opentelemetry.sdk.trace.data.SpanData; import java.util.Collection; @@ -42,17 +43,17 @@ enum ResultCode { * Called to export sampled {@code Span}s. * * @param spans the collection of sampled Spans to be exported. - * @return the result of the export. + * @return the result of the export, which is often an asynchronous operation. */ - ResultCode export(Collection spans); + CompletableResultCode export(Collection spans); /** * Exports the collection of sampled {@code Span}s that have not yet been exported. * - * @return the result of the flush. + * @return the result of the flush, which is often an asynchronous operation. * @since 0.4.0 */ - ResultCode flush(); + CompletableResultCode flush(); /** * Called when {@link TracerSdkProvider#shutdown()} is called, if this {@code SpanExporter} is diff --git a/sdk/src/test/java/io/opentelemetry/sdk/metrics/export/IntervalMetricReaderTest.java b/sdk/src/test/java/io/opentelemetry/sdk/metrics/export/IntervalMetricReaderTest.java index 5c5db2893e5..9f191a4559a 100644 --- a/sdk/src/test/java/io/opentelemetry/sdk/metrics/export/IntervalMetricReaderTest.java +++ b/sdk/src/test/java/io/opentelemetry/sdk/metrics/export/IntervalMetricReaderTest.java @@ -21,6 +21,7 @@ import io.opentelemetry.common.Labels; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilderTest.ConfigTester; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.MetricData.Descriptor; @@ -164,7 +165,7 @@ private WaitingMetricExporter(boolean shouldThrow) { } @Override - public ResultCode export(Collection metricList) { + public CompletableResultCode export(Collection metricList) { synchronized (monitor) { this.exportedMetrics.add(new ArrayList<>(metricList)); monitor.notifyAll(); @@ -172,12 +173,12 @@ public ResultCode export(Collection metricList) { if (shouldThrow) { throw new RuntimeException("Export Failed!"); } - return ResultCode.SUCCESS; + return CompletableResultCode.ofSuccess(); } @Override - public ResultCode flush() { - return ResultCode.SUCCESS; + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); } @Override diff --git a/sdk/src/test/java/io/opentelemetry/sdk/trace/TracerSdkTest.java b/sdk/src/test/java/io/opentelemetry/sdk/trace/TracerSdkTest.java index 7e7ec3f57b9..59f111ed061 100644 --- a/sdk/src/test/java/io/opentelemetry/sdk/trace/TracerSdkTest.java +++ b/sdk/src/test/java/io/opentelemetry/sdk/trace/TracerSdkTest.java @@ -22,6 +22,7 @@ import io.opentelemetry.common.AttributeValue; import io.opentelemetry.context.Scope; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.trace.StressTestRunner.OperationUpdater; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; @@ -212,14 +213,14 @@ private static class CountingSpanExporter implements SpanExporter { public AtomicLong numberOfSpansExported = new AtomicLong(); @Override - public ResultCode export(Collection spans) { + public CompletableResultCode export(Collection spans) { numberOfSpansExported.addAndGet(spans.size()); - return ResultCode.SUCCESS; + return CompletableResultCode.ofSuccess(); } @Override - public ResultCode flush() { - return ResultCode.SUCCESS; + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); } @Override diff --git a/sdk/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java b/sdk/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java index 8189e125784..40d44ab2f2e 100644 --- a/sdk/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java +++ b/sdk/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java @@ -19,6 +19,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.Mockito.doThrow; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilderTest.ConfigTester; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.Samplers; @@ -304,18 +305,16 @@ void serviceHandlerThrowsException() { @Test @Timeout(5) - public void exporterTimesOut() throws Exception { - final CountDownLatch interruptMarker = new CountDownLatch(1); + public void exporterTimesOut() { WaitingSpanExporter waitingSpanExporter = new WaitingSpanExporter(1) { @Override - public ResultCode export(Collection spans) { - ResultCode result = super.export(spans); + public CompletableResultCode export(Collection spans) { + CompletableResultCode result = super.export(spans); try { - // sleep longer than the configured timout of 100ms + // sleep longer than the configured timeout of 100ms Thread.sleep(1000); - } catch (InterruptedException e) { - interruptMarker.countDown(); + } catch (InterruptedException ignored) { } return result; } @@ -334,10 +333,6 @@ public ResultCode export(Collection spans) { ReadableSpan span = createSampledEndedSpan(SPAN_NAME_1); List exported = waitingSpanExporter.waitForExport(); assertThat(exported).containsExactly(span.toSpanData()); - - // since the interrupt happens outside the execution of the test method, we'll block to make - // sure that the thread was actually interrupted due to the timeout. - interruptMarker.await(); } @Test @@ -428,7 +423,7 @@ private enum State { State state = State.WAIT_TO_BLOCK; @Override - public ResultCode export(Collection spanDataList) { + public CompletableResultCode export(Collection spanDataList) { synchronized (monitor) { while (state != State.UNBLOCKED) { try { @@ -441,12 +436,12 @@ public ResultCode export(Collection spanDataList) { } } } - return ResultCode.SUCCESS; + return CompletableResultCode.ofSuccess(); } @Override - public ResultCode flush() { - return ResultCode.SUCCESS; + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); } private void waitUntilIsBlocked() { @@ -514,17 +509,17 @@ List waitForExport() { } @Override - public ResultCode export(Collection spans) { + public CompletableResultCode export(Collection spans) { this.spanDataList.addAll(spans); for (int i = 0; i < spans.size(); i++) { countDownLatch.countDown(); } - return ResultCode.SUCCESS; + return CompletableResultCode.ofSuccess(); } @Override - public ResultCode flush() { - return ResultCode.SUCCESS; + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); } @Override diff --git a/sdk/src/test/java/io/opentelemetry/sdk/trace/export/MultiSpanExporterTest.java b/sdk/src/test/java/io/opentelemetry/sdk/trace/export/MultiSpanExporterTest.java index f12772f0328..adf98cf4e0e 100644 --- a/sdk/src/test/java/io/opentelemetry/sdk/trace/export/MultiSpanExporterTest.java +++ b/sdk/src/test/java/io/opentelemetry/sdk/trace/export/MultiSpanExporterTest.java @@ -18,13 +18,10 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.ArgumentMatchers.same; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.trace.TestUtils; import io.opentelemetry.sdk.trace.data.SpanData; -import io.opentelemetry.sdk.trace.export.SpanExporter.ResultCode; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -32,6 +29,7 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatchers; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; /** Unit tests for {@link MultiSpanExporterTest}. */ @@ -58,16 +56,17 @@ void oneSpanExporter() { SpanExporter multiSpanExporter = MultiSpanExporter.create(Collections.singletonList(spanExporter1)); - when(spanExporter1.export(same(SPAN_LIST))).thenReturn(ResultCode.SUCCESS); - assertThat(multiSpanExporter.export(SPAN_LIST)).isEqualTo(ResultCode.SUCCESS); - verify(spanExporter1).export(same(SPAN_LIST)); + Mockito.when(spanExporter1.export(same(SPAN_LIST))) + .thenReturn(CompletableResultCode.ofSuccess()); + assertThat(multiSpanExporter.export(SPAN_LIST).isSuccess()).isTrue(); + Mockito.verify(spanExporter1).export(same(SPAN_LIST)); - when(spanExporter1.flush()).thenReturn(ResultCode.SUCCESS); - assertThat(multiSpanExporter.flush()).isEqualTo(ResultCode.SUCCESS); - verify(spanExporter1).flush(); + Mockito.when(spanExporter1.flush()).thenReturn(CompletableResultCode.ofSuccess()); + assertThat(multiSpanExporter.flush().isSuccess()).isTrue(); + Mockito.verify(spanExporter1).flush(); multiSpanExporter.shutdown(); - verify(spanExporter1).shutdown(); + Mockito.verify(spanExporter1).shutdown(); } @Test @@ -75,21 +74,23 @@ void twoSpanExporter() { SpanExporter multiSpanExporter = MultiSpanExporter.create(Arrays.asList(spanExporter1, spanExporter2)); - when(spanExporter1.export(same(SPAN_LIST))).thenReturn(ResultCode.SUCCESS); - when(spanExporter2.export(same(SPAN_LIST))).thenReturn(ResultCode.SUCCESS); - assertThat(multiSpanExporter.export(SPAN_LIST)).isEqualTo(ResultCode.SUCCESS); - verify(spanExporter1).export(same(SPAN_LIST)); - verify(spanExporter2).export(same(SPAN_LIST)); + Mockito.when(spanExporter1.export(same(SPAN_LIST))) + .thenReturn(CompletableResultCode.ofSuccess()); + Mockito.when(spanExporter2.export(same(SPAN_LIST))) + .thenReturn(CompletableResultCode.ofSuccess()); + assertThat(multiSpanExporter.export(SPAN_LIST).isSuccess()).isTrue(); + Mockito.verify(spanExporter1).export(same(SPAN_LIST)); + Mockito.verify(spanExporter2).export(same(SPAN_LIST)); - when(spanExporter1.flush()).thenReturn(ResultCode.SUCCESS); - when(spanExporter2.flush()).thenReturn(ResultCode.SUCCESS); - assertThat(multiSpanExporter.flush()).isEqualTo(ResultCode.SUCCESS); - verify(spanExporter1).flush(); - verify(spanExporter2).flush(); + Mockito.when(spanExporter1.flush()).thenReturn(CompletableResultCode.ofSuccess()); + Mockito.when(spanExporter2.flush()).thenReturn(CompletableResultCode.ofSuccess()); + assertThat(multiSpanExporter.flush().isSuccess()).isTrue(); + Mockito.verify(spanExporter1).flush(); + Mockito.verify(spanExporter2).flush(); multiSpanExporter.shutdown(); - verify(spanExporter1).shutdown(); - verify(spanExporter2).shutdown(); + Mockito.verify(spanExporter1).shutdown(); + Mockito.verify(spanExporter2).shutdown(); } @Test @@ -97,17 +98,19 @@ void twoSpanExporter_OneReturnFailure() { SpanExporter multiSpanExporter = MultiSpanExporter.create(Arrays.asList(spanExporter1, spanExporter2)); - when(spanExporter1.export(same(SPAN_LIST))).thenReturn(ResultCode.SUCCESS); - when(spanExporter2.export(same(SPAN_LIST))).thenReturn(ResultCode.FAILURE); - assertThat(multiSpanExporter.export(SPAN_LIST)).isEqualTo(ResultCode.FAILURE); - verify(spanExporter1).export(same(SPAN_LIST)); - verify(spanExporter2).export(same(SPAN_LIST)); - - when(spanExporter1.flush()).thenReturn(ResultCode.SUCCESS); - when(spanExporter2.flush()).thenReturn(ResultCode.FAILURE); - assertThat(multiSpanExporter.flush()).isEqualTo(ResultCode.FAILURE); - verify(spanExporter1).flush(); - verify(spanExporter2).flush(); + Mockito.when(spanExporter1.export(same(SPAN_LIST))) + .thenReturn(CompletableResultCode.ofSuccess()); + Mockito.when(spanExporter2.export(same(SPAN_LIST))) + .thenReturn(CompletableResultCode.ofFailure()); + assertThat(multiSpanExporter.export(SPAN_LIST).isSuccess()).isFalse(); + Mockito.verify(spanExporter1).export(same(SPAN_LIST)); + Mockito.verify(spanExporter2).export(same(SPAN_LIST)); + + Mockito.when(spanExporter1.flush()).thenReturn(CompletableResultCode.ofSuccess()); + Mockito.when(spanExporter2.flush()).thenReturn(CompletableResultCode.ofFailure()); + assertThat(multiSpanExporter.flush().isSuccess()).isFalse(); + Mockito.verify(spanExporter1).flush(); + Mockito.verify(spanExporter2).flush(); } @Test @@ -115,18 +118,19 @@ void twoSpanExporter_FirstThrows() { SpanExporter multiSpanExporter = MultiSpanExporter.create(Arrays.asList(spanExporter1, spanExporter2)); - doThrow(new IllegalArgumentException("No export for you.")) + Mockito.doThrow(new IllegalArgumentException("No export for you.")) .when(spanExporter1) .export(ArgumentMatchers.anyList()); - when(spanExporter2.export(same(SPAN_LIST))).thenReturn(ResultCode.SUCCESS); - assertThat(multiSpanExporter.export(SPAN_LIST)).isEqualTo(ResultCode.FAILURE); - verify(spanExporter1).export(same(SPAN_LIST)); - verify(spanExporter2).export(same(SPAN_LIST)); - - doThrow(new IllegalArgumentException("No flush for you.")).when(spanExporter1).flush(); - when(spanExporter2.flush()).thenReturn(ResultCode.SUCCESS); - assertThat(multiSpanExporter.flush()).isEqualTo(ResultCode.FAILURE); - verify(spanExporter1).flush(); - verify(spanExporter2).flush(); + Mockito.when(spanExporter2.export(same(SPAN_LIST))) + .thenReturn(CompletableResultCode.ofSuccess()); + assertThat(multiSpanExporter.export(SPAN_LIST).isSuccess()).isFalse(); + Mockito.verify(spanExporter1).export(same(SPAN_LIST)); + Mockito.verify(spanExporter2).export(same(SPAN_LIST)); + + Mockito.doThrow(new IllegalArgumentException("No flush for you.")).when(spanExporter1).flush(); + Mockito.when(spanExporter2.flush()).thenReturn(CompletableResultCode.ofSuccess()); + assertThat(multiSpanExporter.flush().isSuccess()).isFalse(); + Mockito.verify(spanExporter1).flush(); + Mockito.verify(spanExporter2).flush(); } }