Skip to content

Commit

Permalink
passes timeout to export (#454)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryce-b committed Aug 29, 2023
1 parent f800114 commit ec42fcd
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private class BatchWorker : Thread {
logRecordsCopy = logRecordList
logRecordList.removeAll()
cond.unlock()
self.exportBatch(logRecordList: logRecordsCopy, explicitTimeout: nil)
self.exportBatch(logRecordList: logRecordsCopy, explicitTimeout: exportTimeout)
}
} while true
}
Expand All @@ -114,7 +114,7 @@ private class BatchWorker : Thread {


public func shutdown() {
forceFlush(explicitTimeout: nil)
forceFlush(explicitTimeout: exportTimeout)
_ = logRecordExporter.shutdown()
}

Expand Down
238 changes: 119 additions & 119 deletions Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,139 +15,139 @@ import OpenTelemetryApi
/// exports the spans to wake up and start a new export cycle.
/// This batchSpanProcessor can cause high contention in a very high traffic service.
public struct BatchSpanProcessor: SpanProcessor {
fileprivate var worker: BatchWorker

public init(spanExporter: SpanExporter, scheduleDelay: TimeInterval = 5, exportTimeout: TimeInterval = 30,
maxQueueSize: Int = 2048, maxExportBatchSize: Int = 512, willExportCallback: ((inout [SpanData]) -> Void)? = nil)
{
worker = BatchWorker(spanExporter: spanExporter,
scheduleDelay: scheduleDelay,
exportTimeout: exportTimeout,
maxQueueSize: maxQueueSize,
maxExportBatchSize: maxExportBatchSize,
willExportCallback: willExportCallback)
worker.start()
}

public let isStartRequired = false
public let isEndRequired = true

public func onStart(parentContext: SpanContext?, span: ReadableSpan) {}

public func onEnd(span: ReadableSpan) {
if !span.context.traceFlags.sampled {
return
}
worker.addSpan(span: span)
}

public func shutdown() {
worker.cancel()
worker.shutdown()
}

public func forceFlush(timeout: TimeInterval? = nil) {
worker.forceFlush(explicitTimeout: timeout)
fileprivate var worker: BatchWorker

public init(spanExporter: SpanExporter, scheduleDelay: TimeInterval = 5, exportTimeout: TimeInterval = 30,
maxQueueSize: Int = 2048, maxExportBatchSize: Int = 512, willExportCallback: ((inout [SpanData]) -> Void)? = nil)
{
worker = BatchWorker(spanExporter: spanExporter,
scheduleDelay: scheduleDelay,
exportTimeout: exportTimeout,
maxQueueSize: maxQueueSize,
maxExportBatchSize: maxExportBatchSize,
willExportCallback: willExportCallback)
worker.start()
}

public let isStartRequired = false
public let isEndRequired = true

public func onStart(parentContext: SpanContext?, span: ReadableSpan) {}

public func onEnd(span: ReadableSpan) {
if !span.context.traceFlags.sampled {
return
}
worker.addSpan(span: span)
}

public func shutdown() {
worker.cancel()
worker.shutdown()
}

public func forceFlush(timeout: TimeInterval? = nil) {
worker.forceFlush(explicitTimeout: timeout)
}
}

/// BatchWorker is a thread that batches multiple spans and calls the registered SpanExporter to export
/// the data.
/// The list of batched data is protected by a NSCondition which ensures full concurrency.
private class BatchWorker: Thread {
let spanExporter: SpanExporter
let scheduleDelay: TimeInterval
let maxQueueSize: Int
let exportTimeout: TimeInterval
let maxExportBatchSize: Int
let willExportCallback: ((inout [SpanData]) -> Void)?
let halfMaxQueueSize: Int
private let cond = NSCondition()
var spanList = [ReadableSpan]()
var queue: OperationQueue

init(spanExporter: SpanExporter, scheduleDelay: TimeInterval, exportTimeout: TimeInterval, maxQueueSize: Int, maxExportBatchSize: Int, willExportCallback: ((inout [SpanData]) -> Void)?) {
self.spanExporter = spanExporter
self.scheduleDelay = scheduleDelay
self.exportTimeout = exportTimeout
self.maxQueueSize = maxQueueSize
halfMaxQueueSize = maxQueueSize >> 1
self.maxExportBatchSize = maxExportBatchSize
self.willExportCallback = willExportCallback
queue = OperationQueue()
queue.name = "BatchWorker Queue"
queue.maxConcurrentOperationCount = 1
let spanExporter: SpanExporter
let scheduleDelay: TimeInterval
let maxQueueSize: Int
let exportTimeout: TimeInterval
let maxExportBatchSize: Int
let willExportCallback: ((inout [SpanData]) -> Void)?
let halfMaxQueueSize: Int
private let cond = NSCondition()
var spanList = [ReadableSpan]()
var queue: OperationQueue

init(spanExporter: SpanExporter, scheduleDelay: TimeInterval, exportTimeout: TimeInterval, maxQueueSize: Int, maxExportBatchSize: Int, willExportCallback: ((inout [SpanData]) -> Void)?) {
self.spanExporter = spanExporter
self.scheduleDelay = scheduleDelay
self.exportTimeout = exportTimeout
self.maxQueueSize = maxQueueSize
halfMaxQueueSize = maxQueueSize >> 1
self.maxExportBatchSize = maxExportBatchSize
self.willExportCallback = willExportCallback
queue = OperationQueue()
queue.name = "BatchWorker Queue"
queue.maxConcurrentOperationCount = 1
}

func addSpan(span: ReadableSpan) {
cond.lock()
defer { cond.unlock() }

if spanList.count == maxQueueSize {
// TODO: Record a counter for dropped spans.
return
}

func addSpan(span: ReadableSpan) {
cond.lock()
defer { cond.unlock() }

if spanList.count == maxQueueSize {
// TODO: Record a counter for dropped spans.
return
}
// TODO: Record a gauge for referenced spans.
spanList.append(span)
// Notify the worker thread that at half of the queue is available. It will take
// time anyway for the thread to wake up.
if spanList.count >= halfMaxQueueSize {
cond.broadcast()
}
}

override func main() {
repeat {
autoreleasepool {
var spansCopy: [ReadableSpan]
cond.lock()
if spanList.count < maxExportBatchSize {
repeat {
cond.wait(until: Date().addingTimeInterval(scheduleDelay))
} while spanList.isEmpty
}
spansCopy = spanList
spanList.removeAll()
cond.unlock()
self.exportBatch(spanList: spansCopy, explicitTimeout: nil)
}
} while true
// TODO: Record a gauge for referenced spans.
spanList.append(span)
// Notify the worker thread that at half of the queue is available. It will take
// time anyway for the thread to wake up.
if spanList.count >= halfMaxQueueSize {
cond.broadcast()
}

func shutdown() {
forceFlush(explicitTimeout: nil)
spanExporter.shutdown()
}

public func forceFlush(explicitTimeout: TimeInterval?) {
}

override func main() {
repeat {
autoreleasepool {
var spansCopy: [ReadableSpan]
cond.lock()
if spanList.count < maxExportBatchSize {
repeat {
cond.wait(until: Date().addingTimeInterval(scheduleDelay))
} while spanList.isEmpty
}
spansCopy = spanList
spanList.removeAll()
cond.unlock()
// Execute the batch export outside the synchronized to not block all producers.
exportBatch(spanList: spansCopy, explicitTimeout: explicitTimeout)
}

private func exportBatch(spanList: [ReadableSpan], explicitTimeout: TimeInterval?) {
let exportOperation = BlockOperation { [weak self] in
self?.exportAction(spanList: spanList)
}
let timeoutTimer = DispatchSource.makeTimerSource(queue: DispatchQueue.global())
timeoutTimer.setEventHandler { exportOperation.cancel() }
let maxTimeOut = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, exportTimeout)
timeoutTimer.schedule(deadline: .now() + .milliseconds(Int(maxTimeOut.toMilliseconds)), leeway: .milliseconds(1))
timeoutTimer.activate()
queue.addOperation(exportOperation)
queue.waitUntilAllOperationsAreFinished()
timeoutTimer.cancel()
self.exportBatch(spanList: spansCopy, explicitTimeout: self.exportTimeout)
}
} while true
}

func shutdown() {
forceFlush(explicitTimeout: self.exportTimeout)
spanExporter.shutdown()
}

public func forceFlush(explicitTimeout: TimeInterval?) {
var spansCopy: [ReadableSpan]
cond.lock()
spansCopy = spanList
spanList.removeAll()
cond.unlock()
// Execute the batch export outside the synchronized to not block all producers.
exportBatch(spanList: spansCopy, explicitTimeout: explicitTimeout)
}

private func exportBatch(spanList: [ReadableSpan], explicitTimeout: TimeInterval?) {
let exportOperation = BlockOperation { [weak self] in
self?.exportAction(spanList: spanList)
}

private func exportAction(spanList: [ReadableSpan]) {
stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach {
var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }
willExportCallback?(&spansToExport)
spanExporter.export(spans: spansToExport)
}
let timeoutTimer = DispatchSource.makeTimerSource(queue: DispatchQueue.global())
timeoutTimer.setEventHandler { exportOperation.cancel() }
let maxTimeOut = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, exportTimeout)
timeoutTimer.schedule(deadline: .now() + .milliseconds(Int(maxTimeOut.toMilliseconds)), leeway: .milliseconds(1))
timeoutTimer.activate()
queue.addOperation(exportOperation)
queue.waitUntilAllOperationsAreFinished()
timeoutTimer.cancel()
}

private func exportAction(spanList: [ReadableSpan]) {
stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach {
var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }
willExportCallback?(&spansToExport)
spanExporter.export(spans: spansToExport)
}
}
}

0 comments on commit ec42fcd

Please sign in to comment.