Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[span processor] Add force_flush method. #358

Merged
merged 4 commits into from
Nov 11, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 53 additions & 6 deletions opentelemetry/src/sdk/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
/// already set). This method is called synchronously within the `Span::end`
/// API, therefore it should not block or throw an exception.
fn on_end(&self, span: SpanData);
/// Force the spans lying in the cache to be exported.
fn force_flush(&self);
/// Shuts down the processor. Called when SDK is shut down. This is an
/// opportunity for processor to do any cleanup required.
/// opportunity for processors to do any cleanup required.
fn shutdown(&mut self);
}

Expand Down Expand Up @@ -123,6 +125,10 @@ impl SpanProcessor for SimpleSpanProcessor {
}
}

fn force_flush(&self) {
// Ignored since all spans in Simple Processor will be exported as they ended.
}

fn shutdown(&mut self) {
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown();
Expand Down Expand Up @@ -196,6 +202,12 @@ impl SpanProcessor for BatchSpanProcessor {
}
}

fn force_flush(&self) {
if let Ok(mut sender) = self.message_sender.lock() {
let _ = sender.try_send(BatchMessage::Flush);
}
}

fn shutdown(&mut self) {
if let Ok(mut sender) = self.message_sender.lock() {
// Send shutdown message to worker future
Expand All @@ -212,7 +224,7 @@ impl SpanProcessor for BatchSpanProcessor {
#[derive(Debug)]
enum BatchMessage {
ExportSpan(SpanData),
Tick,
Flush,
Shutdown,
}

Expand All @@ -230,7 +242,7 @@ impl BatchSpanProcessor {
IS: Stream<Item = ISI> + Send + 'static,
{
let (message_sender, message_receiver) = mpsc::channel(config.max_queue_size);
let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Tick);
let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Flush);

// Spawn worker process via user-defined spawn function.
let worker_handle = spawn(Box::pin(async move {
Expand All @@ -245,8 +257,8 @@ impl BatchSpanProcessor {
spans.push(span);
}
}
// Span batch interval time reached, export current spans.
BatchMessage::Tick => {
// Span batch interval time reached or a force flush has been invoked, export current spans.
BatchMessage::Flush => {
while !spans.is_empty() {
let batch = spans.split_off(
spans.len().saturating_sub(config.max_export_batch_size),
Expand Down Expand Up @@ -450,8 +462,12 @@ mod tests {
OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT,
};
use crate::exporter::trace::stdout;
use crate::testing::trace::{new_test_export_span_data, new_test_exporter};
use crate::sdk::trace::BatchConfig;
use crate::testing::trace::{
new_test_export_span_data, new_test_exporter, new_tokio_test_exporter,
};
use std::time;
use tokio::time::Duration;

#[test]
fn simple_span_processor_on_end_calls_export() {
Expand Down Expand Up @@ -500,4 +516,35 @@ mod tests {
assert_eq!(builder.config.max_export_batch_size, 120);
assert_eq!(builder.config.max_queue_size, 120);
}

#[tokio::test]
async fn test_batch_span_processor() {
let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter();
let mut config = BatchConfig::default();
config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported vis force_flush
TommyCpp marked this conversation as resolved.
Show resolved Hide resolved
let processor = BatchSpanProcessor::new(
Box::new(exporter),
tokio::spawn,
tokio::time::interval,
config,
);
let handle = tokio::spawn(async move {
loop {
if let Some(span) = export_receiver.recv().await {
assert_eq!(span.span_context, new_test_export_span_data().span_context);
break;
}
}
});
tokio::time::delay_for(Duration::from_secs(1)).await; // skip the first
processor.on_end(new_test_export_span_data());
processor.force_flush();

assert!(
tokio::time::timeout(Duration::from_secs(5), handle)
.await
.is_ok(),
"timed out in 5 seconds. force_flush may not export any data when called"
);
}
}
35 changes: 35 additions & 0 deletions opentelemetry/src/testing/trace.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::exporter::trace::SpanData;
use crate::{
exporter::trace::{self as exporter, ExportResult, SpanExporter},
sdk::{
Expand Down Expand Up @@ -82,3 +83,37 @@ pub fn new_test_exporter() -> (TestSpanExporter, Receiver<exporter::SpanData>, R
};
(exporter, rx_export, rx_shutdown)
}

#[derive(Debug)]
pub struct TokioSpanExporter {
tx_export: tokio::sync::mpsc::UnboundedSender<exporter::SpanData>,
tx_shutdown: tokio::sync::mpsc::UnboundedSender<()>,
}

#[async_trait]
impl SpanExporter for TokioSpanExporter {
async fn export(&mut self, batch: Vec<SpanData>) -> ExportResult {
for span_data in batch {
self.tx_export.send(span_data)?;
}
Ok(())
}

fn shutdown(&mut self) {
self.tx_shutdown.send(()).unwrap();
}
}

pub fn new_tokio_test_exporter() -> (
TokioSpanExporter,
tokio::sync::mpsc::UnboundedReceiver<exporter::SpanData>,
tokio::sync::mpsc::UnboundedReceiver<()>,
) {
let (tx_export, rx_export) = tokio::sync::mpsc::unbounded_channel();
let (tx_shutdown, rx_shutdown) = tokio::sync::mpsc::unbounded_channel();
let exporter = TokioSpanExporter {
tx_export,
tx_shutdown,
};
(exporter, rx_export, rx_shutdown)
}