diff --git a/Sources/OpenTelemetrySdk/Logs/Processors/BatchLogRecordProcessor.swift b/Sources/OpenTelemetrySdk/Logs/Processors/BatchLogRecordProcessor.swift index 8748ff34..1de32955 100644 --- a/Sources/OpenTelemetrySdk/Logs/Processors/BatchLogRecordProcessor.swift +++ b/Sources/OpenTelemetrySdk/Logs/Processors/BatchLogRecordProcessor.swift @@ -97,7 +97,7 @@ private class BatchWorker : Thread { logRecordsCopy = logRecordList logRecordList.removeAll() cond.unlock() - self.exportBatch(logRecordList: logRecordsCopy, explicitTimeout: nil) + self.exportBatch(logRecordList: logRecordsCopy, explicitTimeout: exportTimeout) } } while true } @@ -114,7 +114,7 @@ private class BatchWorker : Thread { public func shutdown() { - forceFlush(explicitTimeout: nil) + forceFlush(explicitTimeout: exportTimeout) _ = logRecordExporter.shutdown() } diff --git a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift index 69cdc5aa..c0fa1705 100644 --- a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift +++ b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift @@ -15,139 +15,139 @@ import OpenTelemetryApi /// exports the spans to wake up and start a new export cycle. /// This batchSpanProcessor can cause high contention in a very high traffic service. public struct BatchSpanProcessor: SpanProcessor { - fileprivate var worker: BatchWorker - - public init(spanExporter: SpanExporter, scheduleDelay: TimeInterval = 5, exportTimeout: TimeInterval = 30, - maxQueueSize: Int = 2048, maxExportBatchSize: Int = 512, willExportCallback: ((inout [SpanData]) -> Void)? = nil) - { - worker = BatchWorker(spanExporter: spanExporter, - scheduleDelay: scheduleDelay, - exportTimeout: exportTimeout, - maxQueueSize: maxQueueSize, - maxExportBatchSize: maxExportBatchSize, - willExportCallback: willExportCallback) - worker.start() - } - - public let isStartRequired = false - public let isEndRequired = true - - public func onStart(parentContext: SpanContext?, span: ReadableSpan) {} - - public func onEnd(span: ReadableSpan) { - if !span.context.traceFlags.sampled { - return - } - worker.addSpan(span: span) - } - - public func shutdown() { - worker.cancel() - worker.shutdown() - } - - public func forceFlush(timeout: TimeInterval? = nil) { - worker.forceFlush(explicitTimeout: timeout) + fileprivate var worker: BatchWorker + + public init(spanExporter: SpanExporter, scheduleDelay: TimeInterval = 5, exportTimeout: TimeInterval = 30, + maxQueueSize: Int = 2048, maxExportBatchSize: Int = 512, willExportCallback: ((inout [SpanData]) -> Void)? = nil) + { + worker = BatchWorker(spanExporter: spanExporter, + scheduleDelay: scheduleDelay, + exportTimeout: exportTimeout, + maxQueueSize: maxQueueSize, + maxExportBatchSize: maxExportBatchSize, + willExportCallback: willExportCallback) + worker.start() + } + + public let isStartRequired = false + public let isEndRequired = true + + public func onStart(parentContext: SpanContext?, span: ReadableSpan) {} + + public func onEnd(span: ReadableSpan) { + if !span.context.traceFlags.sampled { + return } + worker.addSpan(span: span) + } + + public func shutdown() { + worker.cancel() + worker.shutdown() + } + + public func forceFlush(timeout: TimeInterval? = nil) { + worker.forceFlush(explicitTimeout: timeout) + } } /// BatchWorker is a thread that batches multiple spans and calls the registered SpanExporter to export /// the data. /// The list of batched data is protected by a NSCondition which ensures full concurrency. private class BatchWorker: Thread { - let spanExporter: SpanExporter - let scheduleDelay: TimeInterval - let maxQueueSize: Int - let exportTimeout: TimeInterval - let maxExportBatchSize: Int - let willExportCallback: ((inout [SpanData]) -> Void)? - let halfMaxQueueSize: Int - private let cond = NSCondition() - var spanList = [ReadableSpan]() - var queue: OperationQueue - - init(spanExporter: SpanExporter, scheduleDelay: TimeInterval, exportTimeout: TimeInterval, maxQueueSize: Int, maxExportBatchSize: Int, willExportCallback: ((inout [SpanData]) -> Void)?) { - self.spanExporter = spanExporter - self.scheduleDelay = scheduleDelay - self.exportTimeout = exportTimeout - self.maxQueueSize = maxQueueSize - halfMaxQueueSize = maxQueueSize >> 1 - self.maxExportBatchSize = maxExportBatchSize - self.willExportCallback = willExportCallback - queue = OperationQueue() - queue.name = "BatchWorker Queue" - queue.maxConcurrentOperationCount = 1 + let spanExporter: SpanExporter + let scheduleDelay: TimeInterval + let maxQueueSize: Int + let exportTimeout: TimeInterval + let maxExportBatchSize: Int + let willExportCallback: ((inout [SpanData]) -> Void)? + let halfMaxQueueSize: Int + private let cond = NSCondition() + var spanList = [ReadableSpan]() + var queue: OperationQueue + + init(spanExporter: SpanExporter, scheduleDelay: TimeInterval, exportTimeout: TimeInterval, maxQueueSize: Int, maxExportBatchSize: Int, willExportCallback: ((inout [SpanData]) -> Void)?) { + self.spanExporter = spanExporter + self.scheduleDelay = scheduleDelay + self.exportTimeout = exportTimeout + self.maxQueueSize = maxQueueSize + halfMaxQueueSize = maxQueueSize >> 1 + self.maxExportBatchSize = maxExportBatchSize + self.willExportCallback = willExportCallback + queue = OperationQueue() + queue.name = "BatchWorker Queue" + queue.maxConcurrentOperationCount = 1 + } + + func addSpan(span: ReadableSpan) { + cond.lock() + defer { cond.unlock() } + + if spanList.count == maxQueueSize { + // TODO: Record a counter for dropped spans. + return } - - func addSpan(span: ReadableSpan) { - cond.lock() - defer { cond.unlock() } - - if spanList.count == maxQueueSize { - // TODO: Record a counter for dropped spans. - return - } - // TODO: Record a gauge for referenced spans. - spanList.append(span) - // Notify the worker thread that at half of the queue is available. It will take - // time anyway for the thread to wake up. - if spanList.count >= halfMaxQueueSize { - cond.broadcast() - } - } - - override func main() { - repeat { - autoreleasepool { - var spansCopy: [ReadableSpan] - cond.lock() - if spanList.count < maxExportBatchSize { - repeat { - cond.wait(until: Date().addingTimeInterval(scheduleDelay)) - } while spanList.isEmpty - } - spansCopy = spanList - spanList.removeAll() - cond.unlock() - self.exportBatch(spanList: spansCopy, explicitTimeout: nil) - } - } while true + // TODO: Record a gauge for referenced spans. + spanList.append(span) + // Notify the worker thread that at half of the queue is available. It will take + // time anyway for the thread to wake up. + if spanList.count >= halfMaxQueueSize { + cond.broadcast() } - - func shutdown() { - forceFlush(explicitTimeout: nil) - spanExporter.shutdown() - } - - public func forceFlush(explicitTimeout: TimeInterval?) { + } + + override func main() { + repeat { + autoreleasepool { var spansCopy: [ReadableSpan] cond.lock() + if spanList.count < maxExportBatchSize { + repeat { + cond.wait(until: Date().addingTimeInterval(scheduleDelay)) + } while spanList.isEmpty + } spansCopy = spanList spanList.removeAll() cond.unlock() - // Execute the batch export outside the synchronized to not block all producers. - exportBatch(spanList: spansCopy, explicitTimeout: explicitTimeout) - } - - private func exportBatch(spanList: [ReadableSpan], explicitTimeout: TimeInterval?) { - let exportOperation = BlockOperation { [weak self] in - self?.exportAction(spanList: spanList) - } - let timeoutTimer = DispatchSource.makeTimerSource(queue: DispatchQueue.global()) - timeoutTimer.setEventHandler { exportOperation.cancel() } - let maxTimeOut = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, exportTimeout) - timeoutTimer.schedule(deadline: .now() + .milliseconds(Int(maxTimeOut.toMilliseconds)), leeway: .milliseconds(1)) - timeoutTimer.activate() - queue.addOperation(exportOperation) - queue.waitUntilAllOperationsAreFinished() - timeoutTimer.cancel() + self.exportBatch(spanList: spansCopy, explicitTimeout: self.exportTimeout) + } + } while true + } + + func shutdown() { + forceFlush(explicitTimeout: self.exportTimeout) + spanExporter.shutdown() + } + + public func forceFlush(explicitTimeout: TimeInterval?) { + var spansCopy: [ReadableSpan] + cond.lock() + spansCopy = spanList + spanList.removeAll() + cond.unlock() + // Execute the batch export outside the synchronized to not block all producers. + exportBatch(spanList: spansCopy, explicitTimeout: explicitTimeout) + } + + private func exportBatch(spanList: [ReadableSpan], explicitTimeout: TimeInterval?) { + let exportOperation = BlockOperation { [weak self] in + self?.exportAction(spanList: spanList) } - - private func exportAction(spanList: [ReadableSpan]) { - stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach { - var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() } - willExportCallback?(&spansToExport) - spanExporter.export(spans: spansToExport) - } + let timeoutTimer = DispatchSource.makeTimerSource(queue: DispatchQueue.global()) + timeoutTimer.setEventHandler { exportOperation.cancel() } + let maxTimeOut = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, exportTimeout) + timeoutTimer.schedule(deadline: .now() + .milliseconds(Int(maxTimeOut.toMilliseconds)), leeway: .milliseconds(1)) + timeoutTimer.activate() + queue.addOperation(exportOperation) + queue.waitUntilAllOperationsAreFinished() + timeoutTimer.cancel() + } + + private func exportAction(spanList: [ReadableSpan]) { + stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach { + var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() } + willExportCallback?(&spansToExport) + spanExporter.export(spans: spansToExport) } + } }