Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group Logs and Spans by Resource and Instrumentation Scope in OTLP Exporter #1873

Merged
merged 21 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions opentelemetry-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ now use `.with_resource(RESOURCE::default())` to configure Resource when using
These methods would also no longer set the global tracer provider. It would now be the responsibility of users to set it by calling `global::set_tracer_provider(tracer_provider.clone());`. Refer to the [basic-otlp](https://github.com/open-telemetry/opentelemetry-rust/blob/main/opentelemetry-otlp/examples/basic-otlp/src/main.rs) and [basic-otlp-http](https://github.com/open-telemetry/opentelemetry-rust/blob/main/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs) examples on how to initialize OTLP Trace Exporter.
- **Breaking** Correct the misspelling of "webkpi" to "webpki" in features [#1842](https://github.com/open-telemetry/opentelemetry-rust/pull/1842)
- Bump MSRV to 1.70 [#1840](https://github.com/open-telemetry/opentelemetry-rust/pull/1840)
- Group log and Span batch by their resource and instrumentation scope before exporting [#1873](https://github.com/open-telemetry/opentelemetry-rust/pull/1873).
lalitb marked this conversation as resolved.
Show resolved Hide resolved
- This optimization reduces redundancy and improves the efficiency of log export.
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
- The OTLP compliant Collector or Agents would be able to succesfuuly parse these events.
lalitb marked this conversation as resolved.
Show resolved Hide resolved

## v0.16.0

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
.map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData
.collect::<Vec<LogData>>();

let (body, content_type) = { self.build_logs_export_body(owned_batch, &self.resource)? };
let (body, content_type) = { self.build_logs_export_body(owned_batch)? };

Check warning on line 28 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L28

Added line #L28 was not covered by tests
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
Expand Down
22 changes: 7 additions & 15 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
use http::{HeaderName, HeaderValue, Uri};
use opentelemetry_http::HttpClient;
use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;

#[cfg(feature = "logs")]
use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
#[cfg(feature = "trace")]
use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
#[cfg(feature = "logs")]
use opentelemetry_sdk::export::logs::LogData;
#[cfg(feature = "trace")]
Expand Down Expand Up @@ -307,16 +310,9 @@
fn build_trace_export_body(
&self,
spans: Vec<SpanData>,
resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
) -> opentelemetry::trace::TraceResult<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::{
collector::trace::v1::ExportTraceServiceRequest, trace::v1::ResourceSpans,
};

let resource_spans = spans
.into_iter()
.map(|span| ResourceSpans::new(span, resource))
.collect::<Vec<_>>();
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
let resource_spans = group_spans_by_resource_and_scope(spans, &self.resource);

Check warning on line 315 in opentelemetry-otlp/src/exporter/http/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/mod.rs#L314-L315

Added lines #L314 - L315 were not covered by tests

let req = ExportTraceServiceRequest { resource_spans };
match self.protocol {
Expand All @@ -333,13 +329,9 @@
fn build_logs_export_body(
&self,
logs: Vec<LogData>,
resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
) -> opentelemetry::logs::LogResult<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
let resource_logs = logs
.into_iter()
.map(|log_event| (log_event, resource).into())
.collect::<Vec<_>>();
let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource);

Check warning on line 334 in opentelemetry-otlp/src/exporter/http/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/mod.rs#L334

Added line #L334 was not covered by tests
let req = ExportLogsServiceRequest { resource_logs };

match self.protocol {
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/http/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
Err(err) => return Box::pin(std::future::ready(Err(err))),
};

let (body, content_type) = match self.build_trace_export_body(batch, &self.resource) {
let (body, content_type) = match self.build_trace_export_body(batch) {

Check warning on line 24 in opentelemetry-otlp/src/exporter/http/trace.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/trace.rs#L24

Added line #L24 was not covered by tests
Ok(body) => body,
Err(e) => return Box::pin(std::future::ready(Err(e))),
};
Expand Down
18 changes: 9 additions & 9 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
use opentelemetry_sdk::export::logs::{LogData, LogExporter};
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;

use super::BoxInterceptor;

pub(crate) struct TonicLogsClient {
Expand Down Expand Up @@ -65,15 +67,13 @@
None => return Err(LogError::Other("exporter is already shut down".into())),
};

// TODO: Avoid cloning here.
let resource_logs = {
batch
.into_iter()
.map(|log_data_cow| (log_data_cow.into_owned()))
.map(|log_data| (log_data, &self.resource))
.map(Into::into)
.collect()
};
//TODO: avoid cloning here.
let owned_batch = batch
.into_iter()
.map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData
.collect::<Vec<LogData>>();

let resource_logs = group_logs_by_resource_and_scope(owned_batch, &self.resource);

Check warning on line 76 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L70-L76

Added lines #L70 - L76 were not covered by tests

client
.export(Request::from_parts(
Expand Down
11 changes: 3 additions & 8 deletions opentelemetry-otlp/src/exporter/tonic/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use opentelemetry::trace::TraceError;
use opentelemetry_proto::tonic::collector::trace::v1::{
trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
};
use opentelemetry_proto::tonic::trace::v1::ResourceSpans;
use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter};
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;

use super::BoxInterceptor;

pub(crate) struct TonicTracesClient {
Expand Down Expand Up @@ -71,13 +72,7 @@ impl SpanExporter for TonicTracesClient {
}
};

// TODO: Avoid cloning here.
let resource_spans = {
batch
.into_iter()
.map(|log_data| ResourceSpans::new(log_data, &self.resource))
.collect()
};
let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource);

Box::pin(async move {
client
Expand Down
3 changes: 3 additions & 0 deletions opentelemetry-proto/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
## vNext

- Bump MSRV to 1.70 [1864](https://github.com/open-telemetry/opentelemetry-rust/pull/1874)
- **BREAKING** Group log and Span batch by their resource and instrumentation scope before exporting [#1873](https://github.com/open-telemetry/opentelemetry-rust/pull/1873).
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
- Introduced `group_logs_by_resource_and_scope()` and `group_spans_by_resource_and_scope()` methods to group logs and spans by the resource and scope respectively.
- This is a breaking change for exporters consuming the OTLP format. Refer to the OTLP Log and Span exporter in the opentelemetry-otlp crate for the required changes.

## v0.6.0

Expand Down
5 changes: 4 additions & 1 deletion opentelemetry-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ path = "tests/json_deserialize.rs"


[features]
default = []
default = ["full"]

full = ["gen-tonic", "trace", "logs", "metrics", "zpages", "with-serde"]

Expand All @@ -42,6 +42,7 @@ trace = ["opentelemetry/trace", "opentelemetry_sdk/trace"]
metrics = ["opentelemetry/metrics", "opentelemetry_sdk/metrics"]
logs = ["opentelemetry/logs", "opentelemetry_sdk/logs"]
zpages = ["trace"]
testing = ["opentelemetry/testing"]

# add ons
with-schemars = ["schemars"]
Expand All @@ -57,7 +58,9 @@ serde = { workspace = true, optional = true, features = ["serde_derive"] }
hex = { version = "0.4.3", optional = true }

[dev-dependencies]
opentelemetry = { version = "0.23", features = ["testing"], path = "../opentelemetry" }
tonic-build = { workspace = true }
prost-build = { workspace = true }
tempfile = "3.3.0"
serde_json = "1.0"

113 changes: 113 additions & 0 deletions opentelemetry-proto/src/transform/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod tonic {
transform::common::{to_nanos, tonic::ResourceAttributesWithSchema},
};
use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity};
use std::collections::HashMap;

impl From<LogsAnyValue> for AnyValue {
fn from(value: LogsAnyValue) -> Self {
Expand Down Expand Up @@ -143,4 +144,116 @@ pub mod tonic {
}
}
}

pub fn group_logs_by_resource_and_scope(
logs: Vec<opentelemetry_sdk::export::logs::LogData>,
resource: &ResourceAttributesWithSchema,
) -> Vec<ResourceLogs> {
// no need of explicit grouping by resource, as all the logs belong to single resource.
let scope_map = logs.iter().fold(
HashMap::new(),
|mut scope_map: HashMap<
&opentelemetry_sdk::InstrumentationLibrary,
Vec<&opentelemetry_sdk::export::logs::LogData>,
>,
log| {
let instrumentation = &log.instrumentation;
scope_map.entry(instrumentation).or_default().push(log);
scope_map
},
);

let scope_logs = scope_map
.into_iter()
.map(|(instrumentation, log_records)| ScopeLogs {
scope: Some(instrumentation.into()),
schema_url: resource.schema_url.clone().unwrap_or_default(),
log_records: log_records
.into_iter()
.map(|log_data| log_data.record.clone().into())
.collect(),
})
.collect();

vec![ResourceLogs {
resource: Some(Resource {
attributes: resource.attributes.0.clone(),
dropped_attributes_count: 0,
}),
scope_logs,
schema_url: resource.schema_url.clone().unwrap_or_default(),
}]
}
}

#[cfg(test)]
mod tests {
use crate::transform::common::tonic::ResourceAttributesWithSchema;
use opentelemetry::logs::LogRecord as _;
use opentelemetry_sdk::export::logs::LogData;
use opentelemetry_sdk::{logs::LogRecord, Resource};
use std::time::SystemTime;

fn create_test_log_data(instrumentation_name: &str, _message: &str) -> LogData {
let mut logrecord = LogRecord::default();
logrecord.set_timestamp(SystemTime::now());
logrecord.set_observed_timestamp(SystemTime::now());
LogData {
instrumentation: opentelemetry_sdk::InstrumentationLibrary::builder(
instrumentation_name.to_string(),
)
.build(),
record: logrecord,
}
}

#[test]
fn test_group_logs_by_resource_and_scope_single_scope() {
let resource = Resource::default();
let log1 = create_test_log_data("test-lib", "Log 1");
let log2 = create_test_log_data("test-lib", "Log 2");

let logs = vec![log1, log2];
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema

let grouped_logs =
crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource);

assert_eq!(grouped_logs.len(), 1);
let resource_logs = &grouped_logs[0];
assert_eq!(resource_logs.scope_logs.len(), 1);

let scope_logs = &resource_logs.scope_logs[0];
assert_eq!(scope_logs.log_records.len(), 2);
}

#[test]
fn test_group_logs_by_resource_and_scope_multiple_scopes() {
let resource = Resource::default();
let log1 = create_test_log_data("lib1", "Log 1");
let log2 = create_test_log_data("lib2", "Log 2");

let logs = vec![log1, log2];
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
let grouped_logs =
crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource);

assert_eq!(grouped_logs.len(), 1);
let resource_logs = &grouped_logs[0];
assert_eq!(resource_logs.scope_logs.len(), 2);

let scope_logs_1 = &resource_logs
.scope_logs
.iter()
.find(|scope| scope.scope.as_ref().unwrap().name == "lib1")
.unwrap();
let scope_logs_2 = &resource_logs
.scope_logs
.iter()
.find(|scope| scope.scope.as_ref().unwrap().name == "lib2")
.unwrap();

assert_eq!(scope_logs_1.log_records.len(), 1);
assert_eq!(scope_logs_2.log_records.len(), 1);
}
}
Loading
Loading