Skip to content

Commit

Permalink
Re-instated batch span processor cancellation
Browse files Browse the repository at this point in the history
Cancellation now provides the exporter with the opportunity to handle it. An exporter could, for example, cancel activity in flight in the way it knows how.
  • Loading branch information
huntc committed Aug 2, 2020
1 parent d3e3d8d commit f67c7b3
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ public CompletableResultCode() {}
private Boolean succeeded = null;

@GuardedBy("lock")
private final ArrayList<Runnable> actions = new ArrayList<>();
private final ArrayList<Runnable> successfulActions = new ArrayList<>();

private final ArrayList<Runnable> failedActions = new ArrayList<>();

private final Object lock = new Object();

Expand All @@ -41,7 +43,7 @@ public CompletableResultCode succeed() {
synchronized (lock) {
if (succeeded == null) {
succeeded = true;
for (Runnable action : actions) {
for (Runnable action : successfulActions) {
action.run();
}
}
Expand All @@ -54,7 +56,7 @@ public CompletableResultCode fail() {
synchronized (lock) {
if (succeeded == null) {
succeeded = false;
for (Runnable action : actions) {
for (Runnable action : failedActions) {
action.run();
}
}
Expand All @@ -75,9 +77,7 @@ public boolean isSuccess() {
}

/**
* Perform an action on completion. Actions are guaranteed to be called only once.
*
* <p>There should only be one action for this class instance.
* Perform an action on successful completion. Actions are guaranteed to be called only once.
*
* @param action the action to perform
* @return this completable result so that it may be further composed
Expand All @@ -87,7 +87,25 @@ public CompletableResultCode thenRun(Runnable action) {
if (succeeded != null) {
action.run();
} else {
this.actions.add(action);
this.successfulActions.add(action);
}
}
return this;
}

/**
* Perform an action on failure completion. Failure can also be due to cancellation. Actions are
* guaranteed to be called only once.
*
* @param failedAction the action to perform on failure
* @return this completable result so that it may be further composed
*/
public CompletableResultCode whenFailed(Runnable failedAction) {
synchronized (lock) {
if (succeeded != null) {
failedAction.run();
} else {
this.failedActions.add(failedAction);
}
}
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -81,6 +83,8 @@ public final class BatchSpanProcessor implements SpanProcessor {

private static final String WORKER_THREAD_NAME =
BatchSpanProcessor.class.getSimpleName() + "_WorkerThread";
private static final String TIMER_THREAD_NAME =
BatchSpanProcessor.class.getSimpleName() + "_TimerThread";
private final Worker worker;
private final Thread workerThread;
private final boolean sampled;
Expand All @@ -90,8 +94,15 @@ private BatchSpanProcessor(
boolean sampled,
long scheduleDelayMillis,
int maxQueueSize,
int maxExportBatchSize) {
this.worker = new Worker(spanExporter, scheduleDelayMillis, maxQueueSize, maxExportBatchSize);
int maxExportBatchSize,
int exporterTimeoutMillis) {
this.worker =
new Worker(
spanExporter,
scheduleDelayMillis,
maxQueueSize,
maxExportBatchSize,
exporterTimeoutMillis);
this.workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
this.workerThread.start();
this.sampled = sampled;
Expand Down Expand Up @@ -152,13 +163,16 @@ private static final class Worker implements Runnable {

private static final BoundLongCounter droppedSpans;

private final Timer timer = new Timer(TIMER_THREAD_NAME);

private static final Logger logger = Logger.getLogger(Worker.class.getName());
private final SpanExporter spanExporter;
private final long scheduleDelayMillis;
private final int maxQueueSize;
private final int maxExportBatchSize;
private final int halfMaxQueueSize;
private final Object monitor = new Object();
private final int exporterTimeoutMillis;
private final AtomicBoolean exportAvailable = new AtomicBoolean(true);

@GuardedBy("monitor")
Expand All @@ -168,12 +182,14 @@ private Worker(
SpanExporter spanExporter,
long scheduleDelayMillis,
int maxQueueSize,
int maxExportBatchSize) {
int maxExportBatchSize,
int exporterTimeoutMillis) {
this.spanExporter = spanExporter;
this.scheduleDelayMillis = scheduleDelayMillis;
this.maxQueueSize = maxQueueSize;
this.halfMaxQueueSize = maxQueueSize >> 1;
this.maxExportBatchSize = maxExportBatchSize;
this.exporterTimeoutMillis = exporterTimeoutMillis;
this.spansList = new ArrayList<>(maxQueueSize);
}

Expand Down Expand Up @@ -224,6 +240,7 @@ public void run() {

private void shutdown() {
forceFlush();
timer.cancel();
spanExporter.shutdown();
}

Expand Down Expand Up @@ -273,6 +290,14 @@ public void run() {
exportAvailable.set(true);
}
});
timer.schedule(
new TimerTask() {
@Override
public void run() {
result.fail();
}
},
exporterTimeoutMillis);
} catch (Exception e) {
logger.log(Level.WARNING, "Exporter threw an Exception", e);
}
Expand Down Expand Up @@ -464,12 +489,13 @@ int getMaxExportBatchSize() {
* @throws NullPointerException if the {@code spanExporter} is {@code null}.
*/
public BatchSpanProcessor build() {
/*
* Note that setting an export timeout has no effect - there's no sure way to cancel a
* thread of execution, even by asking an export to cancel any associated threads.
*/
return new BatchSpanProcessor(
spanExporter, exportOnlySampled, scheduleDelayMillis, maxQueueSize, maxExportBatchSize);
spanExporter,
exportOnlySampled,
scheduleDelayMillis,
maxQueueSize,
maxExportBatchSize,
exporterTimeoutMillis);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -305,16 +305,26 @@ void serviceHandlerThrowsException() {

@Test
@Timeout(5)
public void exporterTimesOut() {
public void exporterTimesOut() throws InterruptedException {
final CountDownLatch interruptMarker = new CountDownLatch(1);
WaitingSpanExporter waitingSpanExporter =
new WaitingSpanExporter(1) {
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
CompletableResultCode result = super.export(spans);
Thread exporterThread = Thread.currentThread();
result.whenFailed(
new Runnable() {
@Override
public void run() {
exporterThread.interrupt();
}
});
try {
// sleep longer than the configured timeout of 100ms
Thread.sleep(1000);
} catch (InterruptedException ignored) {
} catch (InterruptedException e) {
interruptMarker.countDown();
}
return result;
}
Expand All @@ -333,6 +343,10 @@ public CompletableResultCode export(Collection<SpanData> spans) {
ReadableSpan span = createSampledEndedSpan(SPAN_NAME_1);
List<SpanData> exported = waitingSpanExporter.waitForExport();
assertThat(exported).containsExactly(span.toSpanData());

// since the interrupt happens outside the execution of the test method, we'll block to make
// sure that the thread was actually interrupted due to the timeout.
interruptMarker.await();
}

@Test
Expand Down

0 comments on commit f67c7b3

Please sign in to comment.