Skip to content

Commit

Permalink
enhancement(internal_logs, internal_metrics source): Add test helpers…
Browse files Browse the repository at this point in the history
… to assert source compliance (#14133)
  • Loading branch information
neuronull committed Aug 29, 2022
1 parent e7437df commit 4b65039
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 24 deletions.
21 changes: 21 additions & 0 deletions src/internal_events/internal_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use metrics::counter;
use vector_core::internal_event::InternalEvent;

#[derive(Debug)]
pub struct InternalMetricsBytesReceived {
pub byte_size: usize,
}

impl InternalEvent for InternalMetricsBytesReceived {
fn emit(self) {
trace!(
message = "Bytes received.",
byte_size = %self.byte_size,
protocol = "internal",
);
counter!(
"component_received_bytes_total", self.byte_size as u64,
"protocol" => "internal",
);
}
}
4 changes: 4 additions & 0 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub mod http_client;
mod http_scrape;
#[cfg(feature = "sources-internal_logs")]
mod internal_logs;
#[cfg(feature = "sources-internal_metrics")]
mod internal_metrics;
#[cfg(all(unix, feature = "sources-journald"))]
mod journald;
#[cfg(any(feature = "sources-kafka", feature = "sinks-kafka"))]
Expand Down Expand Up @@ -197,6 +199,8 @@ pub(crate) use self::http::*;
pub(crate) use self::http_scrape::*;
#[cfg(feature = "sources-internal_logs")]
pub(crate) use self::internal_logs::*;
#[cfg(feature = "sources-internal_metrics")]
pub(crate) use self::internal_metrics::*;
#[cfg(all(unix, feature = "sources-journald"))]
pub(crate) use self::journald::*;
#[cfg(any(feature = "sources-kafka", feature = "sinks-kafka"))]
Expand Down
23 changes: 17 additions & 6 deletions src/sources/internal_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,20 +119,31 @@ mod tests {
use vector_core::event::Value;

use super::*;
use crate::{event::Event, test_util::collect_ready, trace};
use crate::{
event::Event,
test_util::{
collect_ready,
components::{assert_source_compliance, SOURCE_TAGS},
},
trace,
};

#[test]
fn generates_config() {
crate::test_util::test_generate_config::<InternalLogsConfig>();
}

// This test is fairly overloaded with different cases.
//
// Unfortunately, this can't be easily split out into separate test
// cases because `consume_early_buffer` (called within the
// `start_source` helper) panics when called more than once.
#[tokio::test]
async fn receives_logs() {
// This test is fairly overloaded with different cases.
//
// Unfortunately, this can't be easily split out into separate test
// cases because `consume_early_buffer` (called within the
// `start_source` helper) panics when called more than once.
assert_source_compliance(&SOURCE_TAGS, run_test()).await;
}

async fn run_test() {
let test_id: u8 = rand::random();
let start = chrono::Utc::now();
trace::init(false, false, "debug");
Expand Down
33 changes: 15 additions & 18 deletions src/sources/internal_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use vector_core::ByteSizeOf;

use crate::{
config::{DataType, Output, SourceConfig, SourceContext},
internal_events::{EventsReceived, StreamClosedError},
internal_events::{EventsReceived, InternalMetricsBytesReceived, StreamClosedError},
metrics::Controller,
shutdown::ShutdownSignal,
SourceSender,
Expand Down Expand Up @@ -127,6 +127,8 @@ impl<'a> InternalMetrics<'a> {
let metrics = self.controller.capture_metrics();
let count = metrics.len();
let byte_size = metrics.size_of();

emit!(InternalMetricsBytesReceived { byte_size });
emit!(EventsReceived { count, byte_size });

let batch = metrics.into_iter().map(|mut metric| {
Expand Down Expand Up @@ -170,7 +172,10 @@ mod tests {
Event,
},
metrics::Controller,
test_util, SourceSender,
test_util::{
self,
components::{run_and_assert_source_compliance, SOURCE_TAGS},
},
};

#[test]
Expand Down Expand Up @@ -249,23 +254,15 @@ mod tests {
}

async fn event_from_config(config: InternalMetricsConfig) -> Event {
test_util::trace_init();
let mut events = run_and_assert_source_compliance(
config,
time::Duration::from_millis(100),
&SOURCE_TAGS,
)
.await;

let (sender, mut recv) = SourceSender::new_test();

tokio::spawn(async move {
config
.build(SourceContext::new_test(sender, None))
.await
.unwrap()
.await
.unwrap()
});

time::timeout(time::Duration::from_millis(100), recv.next())
.await
.expect("fetch metrics timeout")
.expect("failed to get metrics from a stream")
assert!(!events.is_empty());
events.remove(0)
}

#[tokio::test]
Expand Down

0 comments on commit 4b65039

Please sign in to comment.