Skip to content

Commit

Permalink
Merge branch 'main' into refactor-span-processor
Browse files Browse the repository at this point in the history
  • Loading branch information
cijothomas authored Jan 21, 2025
2 parents 5fce0a5 + 78db32c commit 6d2edf0
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 64 deletions.
14 changes: 12 additions & 2 deletions opentelemetry-otlp/tests/integration_test/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@

use anyhow::Result;
use opentelemetry::{otel_debug, otel_info};
use std::fs;
use std::fs::File;
use std::fs::{self, File, OpenOptions};
use std::os::unix::fs::PermissionsExt;
use std::sync::{Arc, Mutex, Once, OnceLock};
use testcontainers::core::wait::HttpWaitStrategy;
Expand Down Expand Up @@ -125,6 +124,17 @@ fn upsert_empty_file(path: &str) -> File {
file
}

/// Cleans up file specificed as argument by truncating its content.
///
/// This function is meant to cleanup the generated json file before a test starts,
/// preventing entries from previous tests from interfering with the current test's results.
pub fn cleanup_file(file_path: &str) {
let _ = OpenOptions::new()
.write(true)
.truncate(true)
.open(file_path); // ignore result, as file may not exist
}

///
/// Shuts down our collector container. This should be run as part of each test
/// suite shutting down!
Expand Down
90 changes: 28 additions & 62 deletions opentelemetry-otlp/tests/integration_test/tests/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

use anyhow::Result;
use ctor::dtor;
use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter};
use integration_test_runner::test_utils;
use opentelemetry_otlp::LogExporter;
use opentelemetry_sdk::logs::LoggerProvider;
use opentelemetry_sdk::{logs as sdklogs, Resource};
use std::fs::File;
use std::io::Read;
use std::os::unix::fs::MetadataExt;

fn init_logs(is_simple: bool) -> Result<sdklogs::LoggerProvider> {
let exporter_builder = LogExporter::builder();
Expand Down Expand Up @@ -88,26 +90,26 @@ mod logtests {
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub async fn logs_batch_tokio_multi_thread() -> Result<()> {
logs_batch_tokio_helper().await
logs_tokio_helper(false).await
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub async fn logs_batch_tokio_multi_with_one_worker() -> Result<()> {
logs_batch_tokio_helper().await
logs_tokio_helper(false).await
}

#[tokio::test(flavor = "current_thread")]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub async fn logs_batch_tokio_current() -> Result<()> {
logs_batch_tokio_helper().await
logs_tokio_helper(false).await
}

async fn logs_batch_tokio_helper() -> Result<()> {
use crate::{assert_logs_results, init_logs};
async fn logs_tokio_helper(is_simple: bool) -> Result<()> {
use crate::{assert_logs_results_contains, init_logs};
test_utils::start_collector_container().await?;

let logger_provider = init_logs(false).unwrap();
let logger_provider = init_logs(is_simple).unwrap();
let layer = OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);
// generate a random uuid and store it to expected guid
Expand All @@ -119,58 +121,37 @@ mod logtests {

let _ = logger_provider.shutdown();
tokio::time::sleep(Duration::from_secs(5)).await;
assert_logs_results(test_utils::LOGS_FILE, expected_uuid.as_str())?;
assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?;
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[cfg(any(feature = "tonic-client", feature = "reqwest-client"))]
pub async fn logs_simple_tokio_multi_thread() -> Result<()> {
logs_simple_tokio_helper().await
logs_tokio_helper(true).await
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[cfg(any(feature = "tonic-client", feature = "reqwest-client"))]
pub async fn logs_simple_tokio_multi_with_one_worker() -> Result<()> {
logs_simple_tokio_helper().await
logs_tokio_helper(true).await
}

// Ignored, to be investigated
#[ignore]
#[tokio::test(flavor = "current_thread")]
#[cfg(any(feature = "tonic-client", feature = "reqwest-client"))]
pub async fn logs_simple_tokio_current() -> Result<()> {
logs_simple_tokio_helper().await
}

async fn logs_simple_tokio_helper() -> Result<()> {
use crate::{assert_logs_results, init_logs};
test_utils::start_collector_container().await?;

let logger_provider = init_logs(true).unwrap();
let layer = OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);
info!("Tracing initialized");
// generate a random uuid and store it to expected guid
let expected_uuid = Uuid::new_v4().to_string();
{
let _guard = tracing::subscriber::set_default(subscriber);
info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99);
}

let _ = logger_provider.shutdown();
tokio::time::sleep(Duration::from_secs(5)).await;
assert_logs_results(test_utils::LOGS_FILE, expected_uuid.as_str())?;
Ok(())
logs_tokio_helper(true).await
}

#[test]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub fn logs_batch_non_tokio_main() -> Result<()> {
logs_batch_non_tokio_helper()
logs_non_tokio_helper(false)
}

fn logs_batch_non_tokio_helper() -> Result<()> {
fn logs_non_tokio_helper(is_simple: bool) -> Result<()> {
// Initialize the logger provider inside a tokio runtime
// as this allows tonic client to capture the runtime,
// but actual export occurs from the dedicated std::thread
Expand All @@ -179,7 +160,7 @@ mod logtests {
let logger_provider = rt.block_on(async {
// While we're here setup our collector container too, as this needs tokio to run
test_utils::start_collector_container().await?;
init_logs(false)
init_logs(is_simple)
})?;
let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);
Expand All @@ -192,43 +173,18 @@ mod logtests {

let _ = logger_provider.shutdown();
std::thread::sleep(Duration::from_secs(5));
assert_logs_results(test_utils::LOGS_FILE, expected_uuid.as_str())?;
assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?;
Ok(())
}

#[test]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub fn logs_simple_non_tokio_main() -> Result<()> {
logs_simple_non_tokio_helper()
}

fn logs_simple_non_tokio_helper() -> Result<()> {
// Initialize the logger provider inside a tokio runtime
// as this allows tonic client to capture the runtime,
// but actual export occurs from the main non-tokio thread.
let rt = tokio::runtime::Runtime::new()?;
let logger_provider = rt.block_on(async {
// While we're here setup our collector container too, as this needs tokio to run
test_utils::start_collector_container().await?;
init_logs(true)
})?;
let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);
// generate a random uuid and store it to expected guid
let expected_uuid = Uuid::new_v4().to_string();
{
let _guard = tracing::subscriber::set_default(subscriber);
info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99);
}

let _ = logger_provider.shutdown();
std::thread::sleep(Duration::from_secs(5));
assert_logs_results(test_utils::LOGS_FILE, expected_uuid.as_str())?;
Ok(())
logs_non_tokio_helper(true)
}
}

pub fn assert_logs_results(result: &str, expected_content: &str) -> Result<()> {
pub fn assert_logs_results_contains(result: &str, expected_content: &str) -> Result<()> {
let file = File::open(result)?;
let mut contents = String::new();
let mut reader = std::io::BufReader::new(&file);
Expand All @@ -237,6 +193,16 @@ pub fn assert_logs_results(result: &str, expected_content: &str) -> Result<()> {
Ok(())
}

pub fn assert_logs_results(result: &str, expected: &str) -> Result<()> {
let left = read_logs_from_json(File::open(expected)?)?;
let right = read_logs_from_json(File::open(result)?)?;

LogsAsserter::new(left, right).assert();

assert!(File::open(result).unwrap().metadata().unwrap().size() > 0);
Ok(())
}

///
/// Make sure we stop the collector container, otherwise it will sit around hogging our
/// ports and subsequent test runs will fail.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#![cfg(unix)]

use anyhow::Result;
use ctor::dtor;
use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter};
use integration_test_runner::test_utils;
use opentelemetry_appender_tracing::layer;
use opentelemetry_otlp::LogExporter;
use opentelemetry_sdk::logs::LoggerProvider;
use opentelemetry_sdk::Resource;
use std::fs::File;
use std::os::unix::fs::MetadataExt;
use tracing::info;
use tracing_subscriber::layer::SubscriberExt;

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[cfg(feature = "tonic-client")]
pub async fn test_logs() -> Result<()> {
test_utils::start_collector_container().await?;
test_utils::cleanup_file("./actual/logs.json"); // Ensure logs.json is empty before the test
let exporter_builder = LogExporter::builder().with_tonic();
let exporter = exporter_builder.build()?;
let mut logger_provider_builder = LoggerProvider::builder();
logger_provider_builder = logger_provider_builder.with_batch_exporter(exporter);
let logger_provider = logger_provider_builder
.with_resource(
Resource::builder_empty()
.with_service_name("logs-integration-test")
.build(),
)
.build();
let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);

{
let _guard = tracing::subscriber::set_default(subscriber);
info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99);
}

let _ = logger_provider.shutdown();
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json")?;
Ok(())
}

fn assert_logs_results(result: &str, expected: &str) -> Result<()> {
let left = read_logs_from_json(File::open(expected)?)?;
let right = read_logs_from_json(File::open(result)?)?;

LogsAsserter::new(left, right).assert();

assert!(File::open(result).unwrap().metadata().unwrap().size() > 0);
Ok(())
}

///
/// Make sure we stop the collector container, otherwise it will sit around hogging our
/// ports and subsequent test runs will fail.
///
#[dtor]
fn shutdown() {
println!("metrics::shutdown");
test_utils::stop_collector_container();
}

0 comments on commit 6d2edf0

Please sign in to comment.