-
Notifications
You must be signed in to change notification settings - Fork 851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DISCUSSION: Add SpanWatcherProcessor (spec#373) #697
Closed
Closed
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
277cd7d
Add SpanWatcherProcessor.
Oberon00 7b366a3
Rename things from their BatchSpansProcessor names.
Oberon00 fb4b99e
Use extended ReadableSpan.
Oberon00 4e4f6e5
Make tests more robust & faster.
Oberon00 1e80012
Fix sending new spans too early.
Oberon00 7a0247e
goJF
Oberon00 16c83ab
Actually check allowed
Oberon00 11e35ab
Fix build after rebasing on master.
Oberon00 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
330 changes: 330 additions & 0 deletions
330
sdk/src/main/java/io/opentelemetry/sdk/trace/export/SpanWatcherProcessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,330 @@ | ||
/* | ||
* Copyright 2019, OpenTelemetry Authors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.opentelemetry.sdk.trace.export; | ||
|
||
import com.google.common.util.concurrent.MoreExecutors; | ||
import io.opentelemetry.internal.Utils; | ||
import io.opentelemetry.sdk.trace.ReadableSpan; | ||
import io.opentelemetry.sdk.trace.SpanData; | ||
import io.opentelemetry.sdk.trace.SpanProcessor; | ||
import java.lang.ref.WeakReference; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.logging.Level; | ||
import java.util.logging.Logger; | ||
import javax.annotation.concurrent.GuardedBy; | ||
|
||
/** | ||
* Implementation of the {@link SpanProcessor} that repeatedly reports active spans that are older | ||
* | ||
* <p>All spans started by the SDK are first added to a synchronized list (with a {@code | ||
* maxQueueSize} maximum size, after the size is reached spans are dropped) and exported every | ||
* {@code reportIntervalMillis} to the exporter pipeline in batches of {@code maxExportBatchSize}. | ||
* | ||
* <p>If the queue gets half full a preemptive notification is sent to the worker thread that | ||
* exports the spans to wake up and start a new export cycle. | ||
* | ||
* <p>This batch {@link SpanProcessor} can cause high contention in a very high traffic service. | ||
* TODO: Add a link to the SpanProcessor that uses Disruptor as alternative with low contention. | ||
*/ | ||
public final class SpanWatcherProcessor implements SpanProcessor { | ||
private static final String WORKER_THREAD_NAME = | ||
SpanWatcherProcessor.class.getSimpleName() + "_WorkerThread"; | ||
private final Worker worker; | ||
private final Thread workerThread; | ||
private final boolean sampled; | ||
|
||
private SpanWatcherProcessor( | ||
SpanExporter spanExporter, | ||
boolean sampled, | ||
long reportIntervalMillis, | ||
int maxQueueSize, | ||
int maxExportBatchSize) { | ||
this.worker = new Worker(spanExporter, reportIntervalMillis, maxQueueSize, maxExportBatchSize); | ||
this.workerThread = newThread(worker); | ||
this.workerThread.start(); | ||
this.sampled = sampled; | ||
} | ||
|
||
@Override | ||
public void onStart(ReadableSpan span) { | ||
if (sampled && !span.getSpanContext().getTraceFlags().isSampled()) { | ||
return; | ||
} | ||
worker.addSpan(span); | ||
} | ||
|
||
@Override | ||
public void onEnd(ReadableSpan span) {} | ||
|
||
@Override | ||
public void shutdown() { | ||
workerThread.interrupt(); | ||
worker.flush(); | ||
} | ||
|
||
/** | ||
* Returns a new Builder for {@link SpanWatcherProcessor}. | ||
* | ||
* @param spanExporter the {@code SpanExporter} to where the Spans are pushed. | ||
* @return a new {@link SpanWatcherProcessor}. | ||
* @throws NullPointerException if the {@code spanExporter} is {@code null}. | ||
*/ | ||
public static Builder newBuilder(SpanExporter spanExporter) { | ||
return new Builder(spanExporter); | ||
} | ||
|
||
/** Builder class for {@link SpanWatcherProcessor}. */ | ||
public static final class Builder { | ||
private static final long REPORT_INTERVAL_MILLIS = 5000; | ||
private static final int MAX_QUEUE_SIZE = 2048; | ||
private static final int MAX_EXPORT_BATCH_SIZE = 512; | ||
private final SpanExporter spanExporter; | ||
private long reportIntervalMillis = REPORT_INTERVAL_MILLIS; | ||
private int maxWatchlistSize = MAX_QUEUE_SIZE; | ||
private int maxExportBatchSize = MAX_EXPORT_BATCH_SIZE; | ||
private boolean reportOnlySampled = true; | ||
|
||
private Builder(SpanExporter spanExporter) { | ||
this.spanExporter = Utils.checkNotNull(spanExporter, "spanExporter"); | ||
} | ||
|
||
// TODO: Consider to add support for constant Attributes and/or Resource. | ||
|
||
/** | ||
* Set whether only sampled spans should be reported. | ||
* | ||
* @param sampled report only sampled spans. | ||
* @return this. | ||
*/ | ||
public Builder reportOnlySampled(boolean sampled) { | ||
this.reportOnlySampled = sampled; | ||
return this; | ||
} | ||
|
||
/** | ||
* Sets the delay interval between two consecutive exports. The actual interval may be shorter | ||
* if the batch size is getting larger than {@code maxQueuedSpans / 2}. | ||
* | ||
* <p>Default value is {@code 5000}ms. | ||
* | ||
* @param reportIntervalMillis the delay interval between two consecutive exports. | ||
* @return this. | ||
*/ | ||
public Builder setReportIntervalMillis(long reportIntervalMillis) { | ||
this.reportIntervalMillis = reportIntervalMillis; | ||
return this; | ||
} | ||
|
||
/** | ||
* Sets the maximum number of Spans that are kept in the watchlist before start dropping. | ||
* | ||
* <p>Default value is {@code 2048}. | ||
* | ||
* @param maxWatchlistSize the maximum number of Spans that are kept in the watchlist before | ||
* spans are dropped. | ||
* @return this. | ||
*/ | ||
public Builder setMaxWatchlistSize(int maxWatchlistSize) { | ||
this.maxWatchlistSize = maxWatchlistSize; | ||
return this; | ||
} | ||
|
||
/** | ||
* Sets the maximum batch size for every export. This must be smaller or equal to {@code | ||
* maxQueuedSpans}. | ||
* | ||
* <p>Default value is {@code 512}. | ||
* | ||
* @param maxExportBatchSize the maximum batch size for every export. | ||
* @return this. | ||
*/ | ||
public Builder setMaxExportBatchSize(int maxExportBatchSize) { | ||
Utils.checkArgument(maxExportBatchSize > 0, "maxExportBatchSize must be positive."); | ||
this.maxExportBatchSize = maxExportBatchSize; | ||
return this; | ||
} | ||
|
||
/** | ||
* Returns a new {@link SpanWatcherProcessor}. | ||
* | ||
* @return a new {@link SpanWatcherProcessor}. | ||
* @throws NullPointerException if the {@code spanExporter} is {@code null}. | ||
*/ | ||
public SpanWatcherProcessor build() { | ||
return new SpanWatcherProcessor( | ||
spanExporter, | ||
reportOnlySampled, | ||
reportIntervalMillis, | ||
maxWatchlistSize, | ||
maxExportBatchSize); | ||
} | ||
} | ||
|
||
private static Thread newThread(Runnable runnable) { | ||
Thread thread = MoreExecutors.platformThreadFactory().newThread(runnable); | ||
try { | ||
thread.setName(WORKER_THREAD_NAME); | ||
} catch (SecurityException e) { | ||
// OK if we can't set the name in this environment. | ||
} | ||
return thread; | ||
} | ||
|
||
// Worker is a thread that batches multiple spans and calls the registered SpanExporter to export | ||
Oberon00 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// the data. | ||
// | ||
// The list of batched data is protected by an explicit monitor object which ensures full | ||
// concurrency. | ||
private static final class Worker implements Runnable { | ||
private static final Logger logger = Logger.getLogger(Worker.class.getName()); | ||
private final SpanExporter spanExporter; | ||
private final long reportIntervalMillis; | ||
private final int maxWatchlistSize; | ||
private final int maxExportBatchSize; | ||
private final Object monitor = new Object(); | ||
|
||
@GuardedBy("monitor") | ||
private final List<WeakReference<ReadableSpan>> spanWatchlist; | ||
|
||
private Worker( | ||
SpanExporter spanExporter, | ||
long reportIntervalMillis, | ||
int maxWatchlistSize, | ||
int maxExportBatchSize) { | ||
this.spanExporter = spanExporter; | ||
this.reportIntervalMillis = reportIntervalMillis; | ||
this.maxWatchlistSize = maxWatchlistSize; | ||
this.maxExportBatchSize = maxExportBatchSize; | ||
this.spanWatchlist = new ArrayList<>(maxWatchlistSize); | ||
} | ||
|
||
private void addSpan(ReadableSpan span) { | ||
synchronized (monitor) { | ||
if (spanWatchlist.size() == maxWatchlistSize) { | ||
// TODO: Record a counter for dropped spans. | ||
return; | ||
} | ||
// TODO: Record a gauge for referenced spans. | ||
spanWatchlist.add(new WeakReference<>(span)); | ||
} | ||
|
||
// TODO: We should keep track of spans that have ended but weren't yet removed | ||
// from spanList to clean up if that's the case. | ||
} | ||
|
||
@Override | ||
public void run() { | ||
while (!Thread.currentThread().isInterrupted()) { | ||
// Copy all the batched spans in a separate list to release the monitor lock asap to | ||
// avoid blocking the producer thread. | ||
ArrayList<ReadableSpan> unfinishedSpans; | ||
synchronized (monitor) { | ||
do { | ||
// In the case of a spurious wakeup we export only if we have at least one span in | ||
// the batch. It is acceptable because batching is a best effort mechanism here. | ||
try { | ||
monitor.wait(reportIntervalMillis); | ||
} catch (InterruptedException ie) { | ||
// Preserve the interruption status as per guidance and stop doing any work. | ||
Thread.currentThread().interrupt(); | ||
return; | ||
} | ||
} while (spanWatchlist.isEmpty()); | ||
unfinishedSpans = getUnfinishedSpans(); | ||
} | ||
// Execute the batch export outside the synchronized to not block all producers. | ||
exportBatches(unfinishedSpans); | ||
} | ||
} | ||
|
||
@GuardedBy("monitor") | ||
private ArrayList<ReadableSpan> getUnfinishedSpans() { | ||
ArrayList<ReadableSpan> unfinishedSpans = new ArrayList<>(); | ||
for (int i = 0; i < spanWatchlist.size(); ) { | ||
ReadableSpan span = spanWatchlist.get(i).get(); | ||
if (span == null || span.hasEnded()) { | ||
dropSpan(i); | ||
continue; | ||
} | ||
|
||
// We could also use the time we add()ed the span, but since only in-band spans are supposed | ||
// to be reported, | ||
// using the start timestamp makes just as much sense. | ||
if (span.getLatencyNanos() > reportIntervalMillis * 1000L * 1000L) { | ||
// Many spans will be end()ed so soon that it won't bring much benefit to report them | ||
// earlier. | ||
unfinishedSpans.add(span); | ||
} | ||
++i; | ||
} | ||
return unfinishedSpans; | ||
} | ||
|
||
@GuardedBy("monitor") | ||
private void dropSpan(int i) { | ||
// We don't care about the order of Spans in spanWatchlist, | ||
// so just this is more efficient than just using remove(i). | ||
|
||
final int lastIdx = spanWatchlist.size() - 1; | ||
if (i != lastIdx) { | ||
spanWatchlist.set(i, spanWatchlist.get(lastIdx)); | ||
} | ||
spanWatchlist.remove(lastIdx); | ||
} | ||
|
||
private void flush() { | ||
ArrayList<ReadableSpan> unfinishedSpans; | ||
synchronized (monitor) { | ||
unfinishedSpans = getUnfinishedSpans(); | ||
} | ||
// Execute the batch export outside the synchronized to not block all producers. | ||
exportBatches(unfinishedSpans); | ||
} | ||
|
||
private void exportBatches(ArrayList<ReadableSpan> spanList) { | ||
// TODO: Record a counter for pushed spans. | ||
for (int i = 0; i < spanList.size(); ) { | ||
int batchSizeLimit = Math.min(i + maxExportBatchSize, spanList.size()); | ||
onBatchExport(createSpanDataForExport(spanList, i, batchSizeLimit)); | ||
i = batchSizeLimit; | ||
} | ||
} | ||
|
||
private static List<SpanData> createSpanDataForExport( | ||
ArrayList<ReadableSpan> spanList, int startIndex, int numberToTake) { | ||
List<SpanData> spanDataBuffer = new ArrayList<>(numberToTake); | ||
for (int i = startIndex; i < numberToTake; i++) { | ||
spanDataBuffer.add(spanList.get(i).toSpanData()); | ||
// Remove the reference to the SpanData to allow GC to free the memory. | ||
spanList.set(i, null); | ||
} | ||
return Collections.unmodifiableList(spanDataBuffer); | ||
} | ||
|
||
// Exports the list of Span protos to all the ServiceHandlers. | ||
private void onBatchExport(List<SpanData> spans) { | ||
// In case of any exception thrown by the service handlers continue to run. | ||
try { | ||
spanExporter.export(spans); | ||
} catch (Throwable t) { | ||
logger.log(Level.WARNING, "Exception thrown by the export.", t); | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is very similar to https://github.com/open-telemetry/opentelemetry-java/blob/master/sdk/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpansProcessor.java. Is it possible to share at least the Worker/Queuing logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, as stated in the description, it is probably 80% the same. But the span watcher has no queue, it has a watchlist. Spans are not removed on export. I'm not sure how easily one could share code. Also the types are slightly different since the watcher has to convert to SpanData earlier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As of recently, the conversion to
SpanData
is now at the same point as in theBatchSpansProcessor
, so the types have become even more similar now.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if I understand correctly you will try to avoid duplicated code, is that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey sorry for not replying, I will leave this PR lying around some more while we are still discussing requirements for this span processor (e.g. if it should also optionally export ended spans).