Skip to content

Commit

Permalink
enhancement(observability): Convert BytesReceived to a registered e…
Browse files Browse the repository at this point in the history
…vent (#13934)

This change moves the common `BytesReceived` into `vector-common` and converts
it and all uses of it to a registered event.
  • Loading branch information
bruceg committed Aug 26, 2022
1 parent 0e0d746 commit 573b309
Show file tree
Hide file tree
Showing 31 changed files with 262 additions and 170 deletions.
7 changes: 4 additions & 3 deletions lib/vector-common/src/event_test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ pub fn debug_print_events() {
/// events contain the right fields, etc.
pub fn record_internal_event(event: &str) {
// Remove leading '&'
// Remove trailing '{fields…}'
let event = event.strip_prefix('&').unwrap_or(event);
// Remove trailing '{fields…}'
let event = event.find('{').map_or(event, |par| &event[..par]);
let event = event.trim();
// Remove trailing '::from…'
let event = event.find(':').map_or(event, |colon| &event[..colon]);

EVENTS_RECORDED.with(|er| er.borrow_mut().insert(event.into()));
EVENTS_RECORDED.with(|er| er.borrow_mut().insert(event.trim().into()));
}
41 changes: 41 additions & 0 deletions lib/vector-common/src/internal_event/bytes_received.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use metrics::{register_counter, Counter};

use super::{ByteSize, InternalEventHandle, Protocol, RegisterInternalEvent, SharedString};

pub struct BytesReceived {
pub protocol: SharedString,
}

impl RegisterInternalEvent for BytesReceived {
type Handle = Handle;

fn register(self) -> Self::Handle {
Handle {
received_bytes: register_counter!("component_received_bytes_total", "protocol" => self.protocol.clone()),
protocol: self.protocol,
}
}
}

impl From<Protocol> for BytesReceived {
fn from(protocol: Protocol) -> Self {
Self {
protocol: protocol.0,
}
}
}

#[derive(Clone)]
pub struct Handle {
protocol: SharedString,
received_bytes: Counter,
}

impl InternalEventHandle for Handle {
type Data = ByteSize;

fn emit(&self, data: Self::Data) {
self.received_bytes.increment(data.0 as u64);
trace!(message = "Bytes received.", byte_size = %data.0, protocol = %self.protocol);
}
}
8 changes: 8 additions & 0 deletions lib/vector-common/src/internal_event/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
mod bytes_received;
mod bytes_sent;
mod events_received;
mod events_sent;

pub use metrics::SharedString;

pub use bytes_received::BytesReceived;
pub use bytes_sent::BytesSent;
pub use events_received::{EventsReceived, OldEventsReceived};
pub use events_sent::{EventsSent, DEFAULT_OUTPUT};
Expand Down Expand Up @@ -102,3 +104,9 @@ impl Protocol {
pub const UDP: Protocol = Protocol(SharedString::const_str("udp"));
pub const UNIX: Protocol = Protocol(SharedString::const_str("unix"));
}

impl From<&'static str> for Protocol {
fn from(s: &'static str) -> Self {
Self(SharedString::const_str(s))
}
}
2 changes: 1 addition & 1 deletion scripts/check-events
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ Find.find('.') do |path|

if trait == 'InternalEvent'
# Look-aside internal events that defer their implementation to a registered event.
if ! block.include? '.register('
if ! block.include? 'register('
event.impl_internal_event = true
event.scan_metrics(block)
event.scan_logs(block)
Expand Down
13 changes: 0 additions & 13 deletions src/internal_events/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,6 @@ pub use vector_core::internal_event::{EventsReceived, OldEventsReceived};

use super::prelude::{error_stage, error_type};

#[derive(Debug)]
pub struct BytesReceived<'a> {
pub byte_size: usize,
pub protocol: &'a str,
}

impl<'a> InternalEvent for BytesReceived<'a> {
fn emit(self) {
trace!(message = "Bytes received.", byte_size = %self.byte_size, protocol = %self.protocol);
counter!("component_received_bytes_total", self.byte_size as u64, "protocol" => self.protocol.to_string());
}
}

#[derive(Debug)]
pub struct EndpointBytesReceived<'a> {
pub byte_size: usize,
Expand Down
9 changes: 4 additions & 5 deletions src/sources/aws_ecs_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures::StreamExt;
use hyper::{Body, Client, Request};
use tokio::time;
use tokio_stream::wrappers::IntervalStream;
use vector_common::internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol};
use vector_config::configurable_component;
use vector_core::config::LogNamespace;
use vector_core::ByteSizeOf;
Expand All @@ -12,7 +13,7 @@ use crate::{
config::{self, GenerateConfig, Output, SourceConfig, SourceContext},
internal_events::{
AwsEcsMetricsEventsReceived, AwsEcsMetricsHttpError, AwsEcsMetricsParseError,
AwsEcsMetricsResponseError, BytesReceived, RequestCompleted, StreamClosedError,
AwsEcsMetricsResponseError, RequestCompleted, StreamClosedError,
},
shutdown::ShutdownSignal,
SourceSender,
Expand Down Expand Up @@ -165,6 +166,7 @@ async fn aws_ecs_metrics(
) -> Result<(), ()> {
let interval = time::Duration::from_secs(interval);
let mut interval = IntervalStream::new(time::interval(interval)).take_until(shutdown);
let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
while interval.next().await.is_some() {
let client = Client::new();

Expand All @@ -183,10 +185,7 @@ async fn aws_ecs_metrics(
end: Instant::now()
});

emit!(BytesReceived {
byte_size: body.len(),
protocol: "http",
});
bytes_received.emit(ByteSize(body.len()));

match parser::parse(body.as_ref(), namespace.clone()) {
Ok(metrics) => {
Expand Down
14 changes: 10 additions & 4 deletions src/sources/aws_kinesis_firehose/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use bytes::{Buf, Bytes};
use chrono::Utc;
use flate2::read::MultiGzDecoder;
use snafu::ResultExt;
use vector_common::internal_event::{BytesReceived, Protocol};
use warp::{http::StatusCode, Filter};

use super::{
Expand All @@ -26,6 +27,14 @@ pub fn firehose(
acknowledgements: bool,
out: SourceSender,
) -> impl Filter<Extract = impl warp::Reply, Error = Infallible> + Clone {
let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
let context = handlers::Context {
compression: record_compression,
decoder,
acknowledgements,
bytes_received,
out,
};
warp::post()
.and(emit_received())
.and(authenticate(access_key))
Expand All @@ -44,10 +53,7 @@ pub fn firehose(
.untuple_one(),
)
.and(parse_body())
.and(warp::any().map(move || record_compression))
.and(warp::any().map(move || decoder.clone()))
.and(warp::any().map(move || acknowledgements))
.and(warp::any().map(move || out.clone()))
.and(warp::any().map(move || context.clone()))
.and_then(handlers::firehose)
.recover(handle_firehose_rejection)
}
Expand Down
36 changes: 21 additions & 15 deletions src/sources/aws_kinesis_firehose/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use lookup::path;
use snafu::{ResultExt, Snafu};
use tokio_util::codec::FramedRead;
use vector_common::finalization::AddBatchNotifier;
use vector_common::internal_event::{
ByteSize, BytesReceived, InternalEventHandle as _, Registered,
};
use vector_core::{event::BatchNotifier, ByteSizeOf};
use warp::reject;

Expand All @@ -22,34 +25,36 @@ use crate::{
config::log_schema,
event::{BatchStatus, Event},
internal_events::{
AwsKinesisFirehoseAutomaticRecordDecodeError, BytesReceived, EventsReceived,
StreamClosedError,
AwsKinesisFirehoseAutomaticRecordDecodeError, EventsReceived, StreamClosedError,
},
SourceSender,
};

#[derive(Clone)]
pub(super) struct Context {
pub(super) compression: Compression,
pub(super) decoder: Decoder,
pub(super) acknowledgements: bool,
pub(super) bytes_received: Registered<BytesReceived>,
pub(super) out: SourceSender,
}

/// Publishes decoded events from the FirehoseRequest to the pipeline
pub async fn firehose(
pub(super) async fn firehose(
request_id: String,
source_arn: String,
request: FirehoseRequest,
compression: Compression,
decoder: Decoder,
acknowledgements: bool,
mut out: SourceSender,
mut context: Context,
) -> Result<impl warp::Reply, reject::Rejection> {
for record in request.records {
let bytes = decode_record(&record, compression)
let bytes = decode_record(&record, context.compression)
.with_context(|_| ParseRecordsSnafu {
request_id: request_id.clone(),
})
.map_err(reject::custom)?;
emit!(BytesReceived {
byte_size: bytes.len(),
protocol: "http",
});
context.bytes_received.emit(ByteSize(bytes.len()));

let mut stream = FramedRead::new(bytes.as_ref(), decoder.clone());
let mut stream = FramedRead::new(bytes.as_ref(), context.decoder.clone());
loop {
match stream.next().await {
Some(Ok((mut events, _byte_size))) => {
Expand All @@ -58,7 +63,8 @@ pub async fn firehose(
byte_size: events.size_of(),
});

let (batch, receiver) = acknowledgements
let (batch, receiver) = context
.acknowledgements
.then(|| {
let (batch, receiver) = BatchNotifier::new_with_receiver();
(Some(batch), Some(receiver))
Expand All @@ -81,7 +87,7 @@ pub async fn firehose(
}

let count = events.len();
if let Err(error) = out.send_batch(events).await {
if let Err(error) = context.out.send_batch(events).await {
emit!(StreamClosedError {
error: error.clone(),
count,
Expand Down
13 changes: 8 additions & 5 deletions src/sources/aws_s3/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ use snafu::{ResultExt, Snafu};
use tokio::{pin, select};
use tokio_util::codec::FramedRead;
use tracing::Instrument;
use vector_common::internal_event::{
ByteSize, BytesReceived, InternalEventHandle as _, Protocol, Registered,
};
use vector_config::configurable_component;
use vector_core::ByteSizeOf;

Expand All @@ -26,7 +29,7 @@ use crate::{
config::{log_schema, AcknowledgementsConfig, SourceContext},
event::{BatchNotifier, BatchStatus, LogEvent},
internal_events::{
BytesReceived, OldEventsReceived, SqsMessageDeleteBatchError, SqsMessageDeletePartialError,
OldEventsReceived, SqsMessageDeleteBatchError, SqsMessageDeletePartialError,
SqsMessageDeleteSucceeded, SqsMessageProcessingError, SqsMessageProcessingSucceeded,
SqsMessageReceiveError, SqsMessageReceiveSucceeded, SqsS3EventRecordInvalidEventIgnored,
StreamClosedError,
Expand Down Expand Up @@ -255,6 +258,7 @@ pub struct IngestorProcess {
out: SourceSender,
shutdown: ShutdownSignal,
acknowledgements: bool,
bytes_received: Registered<BytesReceived>,
}

impl IngestorProcess {
Expand All @@ -269,6 +273,7 @@ impl IngestorProcess {
out,
shutdown,
acknowledgements,
bytes_received: register!(BytesReceived::from(Protocol::HTTP)),
}
}

Expand Down Expand Up @@ -464,14 +469,12 @@ impl IngestorProcess {
// the offset of the object that has been read, but this would only be relevant in
// the case that the same vector instance processes the same message.
let mut read_error = None;
let bytes_received = self.bytes_received.clone();
let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = Box::new(
FramedRead::new(object_reader, CharacterDelimitedDecoder::new(b'\n'))
.map(|res| {
res.map(|bytes| {
emit!(BytesReceived {
byte_size: bytes.len(),
protocol: "http",
});
bytes_received.emit(ByteSize(bytes.len()));
bytes
})
.map_err(|err| {
Expand Down
10 changes: 5 additions & 5 deletions src/sources/demo_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ use snafu::Snafu;
use std::task::Poll;
use tokio::time::{self, Duration};
use tokio_util::codec::FramedRead;
use vector_common::internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol};
use vector_config::configurable_component;
use vector_core::config::LogNamespace;
use vector_core::ByteSizeOf;

use crate::{
codecs::{Decoder, DecodingConfig},
config::{log_schema, Output, SourceConfig, SourceContext},
internal_events::{BytesReceived, DemoLogsEventProcessed, EventsReceived, StreamClosedError},
internal_events::{DemoLogsEventProcessed, EventsReceived, StreamClosedError},
serde::{default_decoding, default_framing_message_based},
shutdown::ShutdownSignal,
SourceSender,
Expand Down Expand Up @@ -185,6 +186,8 @@ async fn demo_logs_source(

let mut interval = maybe_interval.map(|i| time::interval(Duration::from_secs_f64(i)));

let bytes_received = register!(BytesReceived::from(Protocol::NONE));

for n in 0..count {
if matches!(futures::poll!(&mut shutdown), Poll::Ready(_)) {
break;
Expand All @@ -193,10 +196,7 @@ async fn demo_logs_source(
if let Some(interval) = &mut interval {
interval.tick().await;
}
emit!(BytesReceived {
byte_size: 0,
protocol: "none",
});
bytes_received.emit(ByteSize(0));

let line = format.generate_line(n);

Expand Down
13 changes: 8 additions & 5 deletions src/sources/dnstap/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::path::PathBuf;

use bytes::Bytes;
use vector_common::internal_event::{
ByteSize, BytesReceived, InternalEventHandle as _, Protocol, Registered,
};
use vector_config::configurable_component;
use vector_core::ByteSizeOf;

use super::util::framestream::{build_framestream_unix_source, FrameHandler};
use crate::{
config::{log_schema, DataType, Output, SourceConfig, SourceContext},
event::{Event, LogEvent},
internal_events::{BytesReceived, DnstapParseError, EventsReceived},
internal_events::{DnstapParseError, EventsReceived},
Result,
};

Expand Down Expand Up @@ -138,6 +141,7 @@ pub struct DnstapFrameHandler {
socket_send_buffer_size: Option<usize>,
host_key: String,
timestamp_key: String,
bytes_received: Registered<BytesReceived>,
}

impl DnstapFrameHandler {
Expand Down Expand Up @@ -167,6 +171,7 @@ impl DnstapFrameHandler {
socket_send_buffer_size: config.socket_send_buffer_size,
host_key,
timestamp_key: timestamp_key.to_string(),
bytes_received: register!(BytesReceived::from(Protocol::from("protobuf"))),
}
}
}
Expand All @@ -185,10 +190,8 @@ impl FrameHandler for DnstapFrameHandler {
* Takes a data frame from the unix socket and turns it into a Vector Event.
**/
fn handle_event(&self, received_from: Option<Bytes>, frame: Bytes) -> Option<Event> {
emit!(BytesReceived {
byte_size: frame.len(),
protocol: "protobuf",
});
self.bytes_received.emit(ByteSize(frame.len()));

let mut log_event = LogEvent::default();

if let Some(host) = received_from {
Expand Down
Loading

0 comments on commit 573b309

Please sign in to comment.