diff --git a/sdk/src/main/java/io/opentelemetry/sdk/common/export/CompletableResultCode.java b/sdk/src/main/java/io/opentelemetry/sdk/common/export/CompletableResultCode.java index e591533198a..ffe4f5c1630 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/common/export/CompletableResultCode.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/common/export/CompletableResultCode.java @@ -32,7 +32,9 @@ public CompletableResultCode() {} private Boolean succeeded = null; @GuardedBy("lock") - private final ArrayList actions = new ArrayList<>(); + private final ArrayList successfulActions = new ArrayList<>(); + + private final ArrayList failedActions = new ArrayList<>(); private final Object lock = new Object(); @@ -41,7 +43,7 @@ public CompletableResultCode succeed() { synchronized (lock) { if (succeeded == null) { succeeded = true; - for (Runnable action : actions) { + for (Runnable action : successfulActions) { action.run(); } } @@ -54,7 +56,7 @@ public CompletableResultCode fail() { synchronized (lock) { if (succeeded == null) { succeeded = false; - for (Runnable action : actions) { + for (Runnable action : failedActions) { action.run(); } } @@ -75,9 +77,7 @@ public boolean isSuccess() { } /** - * Perform an action on completion. Actions are guaranteed to be called only once. - * - *

There should only be one action for this class instance. + * Perform an action on successful completion. Actions are guaranteed to be called only once. * * @param action the action to perform * @return this completable result so that it may be further composed @@ -87,7 +87,25 @@ public CompletableResultCode thenRun(Runnable action) { if (succeeded != null) { action.run(); } else { - this.actions.add(action); + this.successfulActions.add(action); + } + } + return this; + } + + /** + * Perform an action on failure completion. Failure can also be due to cancellation. Actions are + * guaranteed to be called only once. + * + * @param failedAction the action to perform on failure + * @return this completable result so that it may be further composed + */ + public CompletableResultCode whenFailed(Runnable failedAction) { + synchronized (lock) { + if (succeeded != null) { + failedAction.run(); + } else { + this.failedActions.add(failedAction); } } return this; diff --git a/sdk/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index a6458558f07..96539f48f4a 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -33,6 +33,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; @@ -81,6 +83,8 @@ public final class BatchSpanProcessor implements SpanProcessor { private static final String WORKER_THREAD_NAME = BatchSpanProcessor.class.getSimpleName() + "_WorkerThread"; + private static final String TIMER_THREAD_NAME = + BatchSpanProcessor.class.getSimpleName() + "_TimerThread"; private final Worker worker; private final Thread workerThread; private final boolean sampled; @@ -90,8 +94,15 @@ private BatchSpanProcessor( boolean sampled, long scheduleDelayMillis, int maxQueueSize, - int maxExportBatchSize) { - this.worker = new Worker(spanExporter, scheduleDelayMillis, maxQueueSize, maxExportBatchSize); + int maxExportBatchSize, + int exporterTimeoutMillis) { + this.worker = + new Worker( + spanExporter, + scheduleDelayMillis, + maxQueueSize, + maxExportBatchSize, + exporterTimeoutMillis); this.workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); this.workerThread.start(); this.sampled = sampled; @@ -152,6 +163,8 @@ private static final class Worker implements Runnable { private static final BoundLongCounter droppedSpans; + private final Timer timer = new Timer(TIMER_THREAD_NAME); + private static final Logger logger = Logger.getLogger(Worker.class.getName()); private final SpanExporter spanExporter; private final long scheduleDelayMillis; @@ -159,6 +172,7 @@ private static final class Worker implements Runnable { private final int maxExportBatchSize; private final int halfMaxQueueSize; private final Object monitor = new Object(); + private final int exporterTimeoutMillis; private final AtomicBoolean exportAvailable = new AtomicBoolean(true); @GuardedBy("monitor") @@ -168,12 +182,14 @@ private Worker( SpanExporter spanExporter, long scheduleDelayMillis, int maxQueueSize, - int maxExportBatchSize) { + int maxExportBatchSize, + int exporterTimeoutMillis) { this.spanExporter = spanExporter; this.scheduleDelayMillis = scheduleDelayMillis; this.maxQueueSize = maxQueueSize; this.halfMaxQueueSize = maxQueueSize >> 1; this.maxExportBatchSize = maxExportBatchSize; + this.exporterTimeoutMillis = exporterTimeoutMillis; this.spansList = new ArrayList<>(maxQueueSize); } @@ -224,6 +240,7 @@ public void run() { private void shutdown() { forceFlush(); + timer.cancel(); spanExporter.shutdown(); } @@ -273,6 +290,14 @@ public void run() { exportAvailable.set(true); } }); + timer.schedule( + new TimerTask() { + @Override + public void run() { + result.fail(); + } + }, + exporterTimeoutMillis); } catch (Exception e) { logger.log(Level.WARNING, "Exporter threw an Exception", e); } @@ -464,12 +489,13 @@ int getMaxExportBatchSize() { * @throws NullPointerException if the {@code spanExporter} is {@code null}. */ public BatchSpanProcessor build() { - /* - * Note that setting an export timeout has no effect - there's no sure way to cancel a - * thread of execution, even by asking an export to cancel any associated threads. - */ return new BatchSpanProcessor( - spanExporter, exportOnlySampled, scheduleDelayMillis, maxQueueSize, maxExportBatchSize); + spanExporter, + exportOnlySampled, + scheduleDelayMillis, + maxQueueSize, + maxExportBatchSize, + exporterTimeoutMillis); } } } diff --git a/sdk/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java b/sdk/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java index 40d44ab2f2e..8e7fca9b88a 100644 --- a/sdk/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java +++ b/sdk/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java @@ -305,16 +305,26 @@ void serviceHandlerThrowsException() { @Test @Timeout(5) - public void exporterTimesOut() { + public void exporterTimesOut() throws InterruptedException { + final CountDownLatch interruptMarker = new CountDownLatch(1); WaitingSpanExporter waitingSpanExporter = new WaitingSpanExporter(1) { @Override public CompletableResultCode export(Collection spans) { CompletableResultCode result = super.export(spans); + Thread exporterThread = Thread.currentThread(); + result.whenFailed( + new Runnable() { + @Override + public void run() { + exporterThread.interrupt(); + } + }); try { // sleep longer than the configured timeout of 100ms Thread.sleep(1000); - } catch (InterruptedException ignored) { + } catch (InterruptedException e) { + interruptMarker.countDown(); } return result; } @@ -333,6 +343,10 @@ public CompletableResultCode export(Collection spans) { ReadableSpan span = createSampledEndedSpan(SPAN_NAME_1); List exported = waitingSpanExporter.waitForExport(); assertThat(exported).containsExactly(span.toSpanData()); + + // since the interrupt happens outside the execution of the test method, we'll block to make + // sure that the thread was actually interrupted due to the timeout. + interruptMarker.await(); } @Test