Skip to content

Commit

Permalink
Makes Export methods async
Browse files Browse the repository at this point in the history
This commit recognises that the export and flush methods of span and trace exporters can be, and often are, implemented with long-lived operations over networks. We therefore see that these method return types are represented using Java's Future type to account for this common behaviour.
  • Loading branch information
huntc committed Aug 2, 2020
1 parent 6dffbb8 commit d3e3d8d
Show file tree
Hide file tree
Showing 28 changed files with 415 additions and 298 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.opentelemetry.exporters.inmemory;

import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.util.ArrayList;
Expand Down Expand Up @@ -103,12 +104,12 @@ public void reset() {
* <p>If this is called after {@code shutdown}, this will return {@code ResultCode.FAILURE}.
*/
@Override
public ResultCode export(Collection<MetricData> metrics) {
public CompletableResultCode export(Collection<MetricData> metrics) {
if (isStopped) {
return ResultCode.FAILURE;
return CompletableResultCode.ofFailure();
}
finishedMetricItems.addAll(metrics);
return ResultCode.SUCCESS;
return CompletableResultCode.ofSuccess();
}

/**
Expand All @@ -118,8 +119,8 @@ public ResultCode export(Collection<MetricData> metrics) {
* @return always Success
*/
@Override
public ResultCode flush() {
return ResultCode.SUCCESS;
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.opentelemetry.exporters.inmemory;

import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.ArrayList;
Expand Down Expand Up @@ -85,14 +86,14 @@ public void reset() {
}

@Override
public ResultCode export(Collection<SpanData> spans) {
public CompletableResultCode export(Collection<SpanData> spans) {
synchronized (this) {
if (isStopped) {
return ResultCode.FAILURE;
return CompletableResultCode.ofFailure();
}
finishedSpanItems.addAll(spans);
}
return ResultCode.SUCCESS;
return CompletableResultCode.ofSuccess();
}

/**
Expand All @@ -102,8 +103,8 @@ public ResultCode export(Collection<SpanData> spans) {
* @return always Success
*/
@Override
public ResultCode flush() {
return ResultCode.SUCCESS;
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.MetricData.Descriptor;
import io.opentelemetry.sdk.metrics.data.MetricData.LongPoint;
import io.opentelemetry.sdk.metrics.export.MetricExporter.ResultCode;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -54,7 +53,7 @@ void test_getFinishedMetricItems() {
metrics.add(generateFakeMetric());
metrics.add(generateFakeMetric());

assertThat(exporter.export(metrics)).isEqualTo(ResultCode.SUCCESS);
assertThat(exporter.export(metrics).isSuccess()).isTrue();
List<MetricData> metricItems = exporter.getFinishedMetricItems();
assertThat(metricItems).isNotNull();
assertThat(metricItems.size()).isEqualTo(3);
Expand All @@ -67,7 +66,7 @@ void test_reset() {
metrics.add(generateFakeMetric());
metrics.add(generateFakeMetric());

assertThat(exporter.export(metrics)).isEqualTo(ResultCode.SUCCESS);
assertThat(exporter.export(metrics).isSuccess()).isTrue();
List<MetricData> metricItems = exporter.getFinishedMetricItems();
assertThat(metricItems).isNotNull();
assertThat(metricItems.size()).isEqualTo(3);
Expand All @@ -84,7 +83,7 @@ void test_shutdown() {
metrics.add(generateFakeMetric());
metrics.add(generateFakeMetric());

assertThat(exporter.export(metrics)).isEqualTo(ResultCode.SUCCESS);
assertThat(exporter.export(metrics).isSuccess()).isTrue();
exporter.shutdown();
List<MetricData> metricItems = exporter.getFinishedMetricItems();
assertThat(metricItems).isNotNull();
Expand All @@ -98,13 +97,13 @@ void testShutdown_export() {
metrics.add(generateFakeMetric());
metrics.add(generateFakeMetric());

assertThat(exporter.export(metrics)).isEqualTo(ResultCode.SUCCESS);
assertThat(exporter.export(metrics).isSuccess()).isTrue();
exporter.shutdown();
assertThat(exporter.export(metrics)).isEqualTo(ResultCode.FAILURE);
assertThat(exporter.export(metrics).isSuccess()).isFalse();
}

@Test
void test_flush() {
assertThat(exporter.flush()).isEqualTo(ResultCode.SUCCESS);
assertThat(exporter.flush().isSuccess()).isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.test.TestSpanData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter.ResultCode;
import io.opentelemetry.trace.Span.Kind;
import io.opentelemetry.trace.SpanId;
import io.opentelemetry.trace.Status;
Expand Down Expand Up @@ -89,16 +88,13 @@ void shutdown() {

@Test
void export_ReturnCode() {
assertThat(exporter.export(Collections.singletonList(makeBasicSpan())))
.isEqualTo(ResultCode.SUCCESS);
assertThat(exporter.export(Collections.singletonList(makeBasicSpan())).isSuccess()).isTrue();
exporter.shutdown();
// After shutdown no more export.
assertThat(exporter.export(Collections.singletonList(makeBasicSpan())))
.isEqualTo(ResultCode.FAILURE);
assertThat(exporter.export(Collections.singletonList(makeBasicSpan())).isSuccess()).isFalse();
exporter.reset();
// Reset does not do anything if already shutdown.
assertThat(exporter.export(Collections.singletonList(makeBasicSpan())))
.isEqualTo(ResultCode.FAILURE);
assertThat(exporter.export(Collections.singletonList(makeBasicSpan())).isSuccess()).isFalse();
}

static SpanData makeBasicSpan() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.opentelemetry.exporters.jaeger.proto.api_v2.Collector;
import io.opentelemetry.exporters.jaeger.proto.api_v2.CollectorServiceGrpc;
import io.opentelemetry.exporters.jaeger.proto.api_v2.Model;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.export.ConfigBuilder;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
Expand Down Expand Up @@ -107,7 +108,7 @@ private JaegerGrpcSpanExporter(String serviceName, ManagedChannel channel, long
* @return the result of the operation
*/
@Override
public ResultCode export(Collection<SpanData> spans) {
public CompletableResultCode export(Collection<SpanData> spans) {
Collector.PostSpansRequest request =
Collector.PostSpansRequest.newBuilder()
.setBatch(
Expand All @@ -126,10 +127,10 @@ public ResultCode export(Collection<SpanData> spans) {
// for now, there's nothing to check in the response object
//noinspection ResultOfMethodCallIgnored
stub.postSpans(request);
return ResultCode.SUCCESS;
return CompletableResultCode.ofSuccess();
} catch (Throwable e) {
logger.log(Level.WARNING, "Failed to export spans", e);
return ResultCode.FAILURE;
return CompletableResultCode.ofFailure();
}
}

Expand All @@ -139,8 +140,8 @@ public ResultCode export(Collection<SpanData> spans) {
* @return always Success
*/
@Override
public ResultCode flush() {
return ResultCode.SUCCESS;
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.opentelemetry.exporters.logging;

import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.util.Collection;
Expand All @@ -27,12 +28,12 @@ public class LoggingMetricExporter implements MetricExporter {
private static final Logger logger = Logger.getLogger(LoggingMetricExporter.class.getName());

@Override
public ResultCode export(Collection<MetricData> metrics) {
public CompletableResultCode export(Collection<MetricData> metrics) {
logger.info("Received a collection of " + metrics.size() + " metrics for export.");
for (MetricData metricData : metrics) {
logger.log(Level.INFO, "metric: {0}", metricData);
}
return ResultCode.SUCCESS;
return CompletableResultCode.ofSuccess();
}

/**
Expand All @@ -41,16 +42,16 @@ public ResultCode export(Collection<MetricData> metrics) {
* @return the result of the operation
*/
@Override
public ResultCode flush() {
ResultCode resultCode = ResultCode.SUCCESS;
public CompletableResultCode flush() {
CompletableResultCode resultCode = new CompletableResultCode();
for (Handler handler : logger.getHandlers()) {
try {
handler.flush();
} catch (Throwable t) {
resultCode = ResultCode.FAILURE;
resultCode.fail();
}
}
return resultCode;
return resultCode.succeed();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.opentelemetry.exporters.logging;

import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.Collection;
Expand All @@ -28,11 +29,11 @@ public class LoggingSpanExporter implements SpanExporter {
private static final Logger logger = Logger.getLogger(LoggingSpanExporter.class.getName());

@Override
public ResultCode export(Collection<SpanData> spans) {
public CompletableResultCode export(Collection<SpanData> spans) {
for (SpanData span : spans) {
logger.log(Level.INFO, "span: {0}", span);
}
return ResultCode.SUCCESS;
return CompletableResultCode.ofSuccess();
}

/**
Expand All @@ -41,16 +42,16 @@ public ResultCode export(Collection<SpanData> spans) {
* @return the result of the operation
*/
@Override
public ResultCode flush() {
ResultCode resultCode = ResultCode.SUCCESS;
public CompletableResultCode flush() {
CompletableResultCode resultCode = new CompletableResultCode();
for (Handler handler : logger.getHandlers()) {
try {
handler.flush();
} catch (Throwable t) {
resultCode = ResultCode.FAILURE;
resultCode.fail();
}
}
return resultCode;
return resultCode.succeed();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@

import static com.google.common.truth.Truth.assertThat;
import static java.util.Collections.singletonList;
import static org.junit.jupiter.api.Assertions.assertEquals;

import io.opentelemetry.common.AttributeValue;
import io.opentelemetry.common.Attributes;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.EventImpl;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.test.TestSpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter.ResultCode;
import io.opentelemetry.trace.Span.Kind;
import io.opentelemetry.trace.SpanId;
import io.opentelemetry.trace.Status;
Expand Down Expand Up @@ -77,8 +76,8 @@ void returnCode() {
.setTotalRecordedEvents(1)
.setTotalRecordedLinks(0)
.build();
ResultCode resultCode = exporter.export(singletonList(spanData));
assertEquals(ResultCode.SUCCESS, resultCode);
CompletableResultCode resultCode = exporter.export(singletonList(spanData));
assertThat(resultCode.isSuccess()).isTrue();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.grpc.stub.MetadataUtils;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.export.ConfigBuilder;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
Expand Down Expand Up @@ -89,7 +90,7 @@ private OtlpGrpcMetricExporter(ManagedChannel channel, long deadlineMs) {
* @return the result of the operation
*/
@Override
public ResultCode export(Collection<MetricData> metrics) {
public CompletableResultCode export(Collection<MetricData> metrics) {
ExportMetricsServiceRequest exportMetricsServiceRequest =
ExportMetricsServiceRequest.newBuilder()
.addAllResourceMetrics(MetricAdapter.toProtoResourceMetrics(metrics))
Expand All @@ -104,10 +105,10 @@ public ResultCode export(Collection<MetricData> metrics) {
// for now, there's nothing to check in the response object
// noinspection ResultOfMethodCallIgnored
stub.export(exportMetricsServiceRequest);
return ResultCode.SUCCESS;
return CompletableResultCode.ofSuccess();
} catch (Throwable e) {
logger.log(Level.WARNING, "Failed to export metrics", e);
return ResultCode.FAILURE;
return CompletableResultCode.ofFailure();
}
}

Expand All @@ -117,8 +118,8 @@ public ResultCode export(Collection<MetricData> metrics) {
* @return always Success
*/
@Override
public ResultCode flush() {
return ResultCode.SUCCESS;
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.grpc.stub.MetadataUtils;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.export.ConfigBuilder;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
Expand Down Expand Up @@ -97,7 +98,7 @@ private OtlpGrpcSpanExporter(ManagedChannel channel, long deadlineMs) {
* @return the result of the operation
*/
@Override
public ResultCode export(Collection<SpanData> spans) {
public CompletableResultCode export(Collection<SpanData> spans) {
ExportTraceServiceRequest exportTraceServiceRequest =
ExportTraceServiceRequest.newBuilder()
.addAllResourceSpans(SpanAdapter.toProtoResourceSpans(spans))
Expand All @@ -112,10 +113,10 @@ public ResultCode export(Collection<SpanData> spans) {
// for now, there's nothing to check in the response object
// noinspection ResultOfMethodCallIgnored
stub.export(exportTraceServiceRequest);
return ResultCode.SUCCESS;
return CompletableResultCode.ofSuccess();
} catch (Throwable e) {
logger.log(Level.WARNING, "Failed to export spans", e);
return ResultCode.FAILURE;
return CompletableResultCode.ofFailure();
}
}

Expand All @@ -125,8 +126,8 @@ public ResultCode export(Collection<SpanData> spans) {
* @return always Success
*/
@Override
public ResultCode flush() {
return ResultCode.SUCCESS;
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

/**
Expand Down
Loading

0 comments on commit d3e3d8d

Please sign in to comment.