diff --git a/src/internal_events/internal_metrics.rs b/src/internal_events/internal_metrics.rs new file mode 100644 index 0000000000000..1e29cb664ccd2 --- /dev/null +++ b/src/internal_events/internal_metrics.rs @@ -0,0 +1,21 @@ +use metrics::counter; +use vector_core::internal_event::InternalEvent; + +#[derive(Debug)] +pub struct InternalMetricsBytesReceived { + pub byte_size: usize, +} + +impl InternalEvent for InternalMetricsBytesReceived { + fn emit(self) { + trace!( + message = "Bytes received.", + byte_size = %self.byte_size, + protocol = "internal", + ); + counter!( + "component_received_bytes_total", self.byte_size as u64, + "protocol" => "internal", + ); + } +} diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index 08d6bb59e1936..024d476f2ae96 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -59,6 +59,8 @@ pub mod http_client; mod http_scrape; #[cfg(feature = "sources-internal_logs")] mod internal_logs; +#[cfg(feature = "sources-internal_metrics")] +mod internal_metrics; #[cfg(all(unix, feature = "sources-journald"))] mod journald; #[cfg(any(feature = "sources-kafka", feature = "sinks-kafka"))] @@ -197,6 +199,8 @@ pub(crate) use self::http::*; pub(crate) use self::http_scrape::*; #[cfg(feature = "sources-internal_logs")] pub(crate) use self::internal_logs::*; +#[cfg(feature = "sources-internal_metrics")] +pub(crate) use self::internal_metrics::*; #[cfg(all(unix, feature = "sources-journald"))] pub(crate) use self::journald::*; #[cfg(any(feature = "sources-kafka", feature = "sinks-kafka"))] diff --git a/src/sources/internal_logs.rs b/src/sources/internal_logs.rs index 6783ea5e25e07..e9a7a68d882b4 100644 --- a/src/sources/internal_logs.rs +++ b/src/sources/internal_logs.rs @@ -119,20 +119,31 @@ mod tests { use vector_core::event::Value; use super::*; - use crate::{event::Event, test_util::collect_ready, trace}; + use crate::{ + event::Event, + test_util::{ + collect_ready, + components::{assert_source_compliance, SOURCE_TAGS}, + }, + trace, + }; #[test] fn generates_config() { crate::test_util::test_generate_config::(); } + // This test is fairly overloaded with different cases. + // + // Unfortunately, this can't be easily split out into separate test + // cases because `consume_early_buffer` (called within the + // `start_source` helper) panics when called more than once. #[tokio::test] async fn receives_logs() { - // This test is fairly overloaded with different cases. - // - // Unfortunately, this can't be easily split out into separate test - // cases because `consume_early_buffer` (called within the - // `start_source` helper) panics when called more than once. + assert_source_compliance(&SOURCE_TAGS, run_test()).await; + } + + async fn run_test() { let test_id: u8 = rand::random(); let start = chrono::Utc::now(); trace::init(false, false, "debug"); diff --git a/src/sources/internal_metrics.rs b/src/sources/internal_metrics.rs index 41208f51e76eb..8dcefb74b5d1f 100644 --- a/src/sources/internal_metrics.rs +++ b/src/sources/internal_metrics.rs @@ -7,7 +7,7 @@ use vector_core::ByteSizeOf; use crate::{ config::{DataType, Output, SourceConfig, SourceContext}, - internal_events::{EventsReceived, StreamClosedError}, + internal_events::{EventsReceived, InternalMetricsBytesReceived, StreamClosedError}, metrics::Controller, shutdown::ShutdownSignal, SourceSender, @@ -127,6 +127,8 @@ impl<'a> InternalMetrics<'a> { let metrics = self.controller.capture_metrics(); let count = metrics.len(); let byte_size = metrics.size_of(); + + emit!(InternalMetricsBytesReceived { byte_size }); emit!(EventsReceived { count, byte_size }); let batch = metrics.into_iter().map(|mut metric| { @@ -170,7 +172,10 @@ mod tests { Event, }, metrics::Controller, - test_util, SourceSender, + test_util::{ + self, + components::{run_and_assert_source_compliance, SOURCE_TAGS}, + }, }; #[test] @@ -249,23 +254,15 @@ mod tests { } async fn event_from_config(config: InternalMetricsConfig) -> Event { - test_util::trace_init(); + let mut events = run_and_assert_source_compliance( + config, + time::Duration::from_millis(100), + &SOURCE_TAGS, + ) + .await; - let (sender, mut recv) = SourceSender::new_test(); - - tokio::spawn(async move { - config - .build(SourceContext::new_test(sender, None)) - .await - .unwrap() - .await - .unwrap() - }); - - time::timeout(time::Duration::from_millis(100), recv.next()) - .await - .expect("fetch metrics timeout") - .expect("failed to get metrics from a stream") + assert!(!events.is_empty()); + events.remove(0) } #[tokio::test]