Skip to content

Commit

Permalink
feat(subscriber): resource instrumentation (#77)
Browse files Browse the repository at this point in the history
This PR adds the pieces needed to get resource instrumentation data
into the console. This includes:

- changes to the proto definitions
- changes to the subscriber

The UI part will come as a follow up PR. This branch uses a patched
`tokio` that emits these tracing spans and events for the `Sleep`
resource.

You can look at the raw data by running:

```
cargo run --example app
cargo run --example dump
```

The information piped through includes:

- data describing the resource lifecycle, namely when resources are
  created and dropped
- data describing the async operations that take place on these events
  and their associationg with tasks
- data reflecting the state updates that take place on resources
  (e.g. resetting timer's duration, adding permits to a semaphore,
  etc)

Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
Co-authored-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
zaharidichev and hawkw authored Aug 26, 2021
1 parent 5fe4437 commit f4a21ac
Show file tree
Hide file tree
Showing 25 changed files with 1,807 additions and 508 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@ members = [
"console-subscriber",
"console-api"
]
resolver = "2"
resolver = "2"

[patch.crates-io]
tokio = { git = 'https://github.com/zaharidichev/tokio', branch = 'zd/instrument-sleep' }
3 changes: 3 additions & 0 deletions console-api/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ fn main() -> Result<(), Box<dyn Error>> {
"../proto/trace.proto",
"../proto/common.proto",
"../proto/tasks.proto",
"../proto/instrument.proto",
"../proto/resources.proto",
"../proto/async_ops.proto",
];
let dirs = &["../proto"];

Expand Down
1 change: 1 addition & 0 deletions console-api/src/async_ops.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
tonic::include_proto!("rs.tokio.console.async_ops");
54 changes: 45 additions & 9 deletions console-api/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fmt;
use std::hash::{Hash, Hasher};

tonic::include_proto!("rs.tokio.console.common");

Expand Down Expand Up @@ -32,19 +33,11 @@ impl<'a> From<&'a tracing_core::Metadata<'a>> for Metadata {
metadata::Kind::Event
};

let location = Location {
file: meta.file().map(String::from),
module_path: meta.module_path().map(String::from),
line: meta.line(),
column: None, // tracing doesn't support columns yet
};

let field_names = meta.fields().iter().map(|f| f.name().to_string()).collect();

Metadata {
name: meta.name().to_string(),
target: meta.target().to_string(),
location: Some(location),
location: Some(meta.into()),
kind: kind as i32,
level: metadata::Level::from(*meta.level()) as i32,
field_names,
Expand All @@ -53,6 +46,17 @@ impl<'a> From<&'a tracing_core::Metadata<'a>> for Metadata {
}
}

impl<'a> From<&'a tracing_core::Metadata<'a>> for Location {
fn from(meta: &'a tracing_core::Metadata<'a>) -> Self {
Location {
file: meta.file().map(String::from),
module_path: meta.module_path().map(String::from),
line: meta.line(),
column: None, // tracing doesn't support columns yet
}
}
}

impl<'a> From<&'a std::panic::Location<'a>> for Location {
fn from(loc: &'a std::panic::Location<'a>) -> Self {
Location {
Expand Down Expand Up @@ -185,3 +189,35 @@ impl From<&dyn std::fmt::Debug> for field::Value {
field::Value::DebugVal(format!("{:?}", val))
}
}

// Clippy warns when a type derives `PartialEq` but has a manual `Hash` impl,
// or vice versa. However, this is unavoidable here, because `prost` generates
// a struct with `#[derive(PartialEq)]`, but we cannot add`#[derive(Hash)]` to the
// generated code.
#[allow(clippy::derive_hash_xor_eq)]
impl Hash for field::Name {
fn hash<H: Hasher>(&self, state: &mut H) {
match self {
field::Name::NameIdx(idx) => idx.hash(state),
field::Name::StrName(s) => s.hash(state),
}
}
}

impl Eq for field::Name {}

// === IDs ===

impl From<u64> for Id {
fn from(id: u64) -> Self {
Id { id }
}
}

impl From<Id> for u64 {
fn from(id: Id) -> Self {
id.id
}
}

impl Copy for Id {}
1 change: 1 addition & 0 deletions console-api/src/instrument.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
tonic::include_proto!("rs.tokio.console.instrument");
3 changes: 3 additions & 0 deletions console-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
pub mod async_ops;
mod common;
pub mod instrument;
pub mod resources;
pub mod tasks;
pub mod trace;
pub use common::*;
1 change: 1 addition & 0 deletions console-api/src/resources.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
tonic::include_proto!("rs.tokio.console.resources");
16 changes: 0 additions & 16 deletions console-api/src/tasks.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1 @@
tonic::include_proto!("rs.tokio.console.tasks");

// === IDs ===

impl From<u64> for TaskId {
fn from(id: u64) -> Self {
TaskId { id }
}
}

impl From<TaskId> for u64 {
fn from(id: TaskId) -> Self {
id.id
}
}

impl Copy for TaskId {}
1 change: 1 addition & 0 deletions console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ parking_lot = ["parking_lot_crate", "tracing-subscriber/parking_lot"]

tokio = { version = "^1.10", features = ["sync", "time", "macros", "tracing"]}
tokio-stream = "0.1"
thread_local = "1.1.3"
console-api = { path = "../console-api", features = ["transport"]}
tonic = { version = "0.5", features = ["transport"] }
tracing-core = "0.1.18"
Expand Down
10 changes: 5 additions & 5 deletions console-subscriber/examples/dump.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use console_api::tasks::{tasks_client::TasksClient, TasksRequest};
use console_api::instrument::{instrument_client::InstrumentClient, InstrumentRequest};
use futures::stream::StreamExt;

#[tokio::main]
Expand All @@ -11,16 +11,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});

eprintln!("CONNECTING: {}", target);
let mut client = TasksClient::connect(target).await?;
let mut client = InstrumentClient::connect(target).await?;

let request = tonic::Request::new(TasksRequest {});
let mut stream = client.watch_tasks(request).await?.into_inner();
let request = tonic::Request::new(InstrumentRequest {});
let mut stream = client.watch_updates(request).await?.into_inner();

let mut i: usize = 0;
while let Some(update) = stream.next().await {
match update {
Ok(update) => {
eprintln!("UPDATE {}: {:#?}\n", i, update);
println!("UPDATE {}: {:#?}\n", i, update);
i += 1;
}
Err(e) => {
Expand Down
150 changes: 150 additions & 0 deletions console-subscriber/src/aggregator/id_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
use super::{shrink::ShrinkMap, Closable, Id, Ids, ToProto};
use std::collections::{HashMap, HashSet};
use std::ops::{Deref, DerefMut};
use std::time::{Duration, SystemTime};

pub(crate) struct IdData<T> {
data: ShrinkMap<Id, (T, bool)>,
}

pub(crate) struct Updating<'a, T>(&'a mut (T, bool));

pub(crate) enum Include {
All,
UpdatedOnly,
}

// === impl IdData ===

impl<T> Default for IdData<T> {
fn default() -> Self {
IdData {
data: ShrinkMap::<Id, (T, bool)>::new(),
}
}
}

impl<T> IdData<T> {
pub(crate) fn update_or_default(&mut self, id: Id) -> Updating<'_, T>
where
T: Default,
{
Updating(self.data.entry(id).or_default())
}

pub(crate) fn update(&mut self, id: &Id) -> Option<Updating<'_, T>> {
self.data.get_mut(id).map(Updating)
}

pub(crate) fn insert(&mut self, id: Id, data: T) {
self.data.insert(id, (data, true));
}

pub(crate) fn since_last_update(&mut self) -> impl Iterator<Item = (&Id, &mut T)> {
self.data.iter_mut().filter_map(|(id, (data, dirty))| {
if *dirty {
*dirty = false;
Some((id, data))
} else {
None
}
})
}

pub(crate) fn all(&self) -> impl Iterator<Item = (&Id, &T)> {
self.data.iter().map(|(id, (data, _))| (id, data))
}

pub(crate) fn get(&self, id: &Id) -> Option<&T> {
self.data.get(id).map(|(data, _)| data)
}

pub(crate) fn as_proto(&mut self, include: Include) -> HashMap<u64, T::Output>
where
T: ToProto,
{
match include {
Include::UpdatedOnly => self
.since_last_update()
.map(|(id, d)| (*id, d.to_proto()))
.collect(),
Include::All => self.all().map(|(id, d)| (*id, d.to_proto())).collect(),
}
}

pub(crate) fn drop_closed<R: Closable>(
&mut self,
stats: &mut IdData<R>,
now: SystemTime,
retention: Duration,
has_watchers: bool,
ids: &mut Ids,
) {
let _span = tracing::debug_span!(
"drop_closed",
entity = %std::any::type_name::<T>(),
stats = %std::any::type_name::<R>(),
)
.entered();

// drop closed entities
tracing::trace!(?retention, has_watchers, "dropping closed");

let mut dropped_ids = HashSet::new();
stats.data.retain_and_shrink(|id, (stats, dirty)| {
if let Some(closed) = stats.closed_at() {
let closed_for = now.duration_since(closed).unwrap_or_default();
let should_drop =
// if there are any clients watching, retain all dirty tasks regardless of age
(*dirty && has_watchers)
|| closed_for > retention;
tracing::trace!(
stats.id = ?id,
stats.closed_at = ?closed,
stats.closed_for = ?closed_for,
stats.dirty = *dirty,
should_drop,
);

if should_drop {
dropped_ids.insert(*id);
}
return !should_drop;
}

true
});

// drop closed entities which no longer have stats.
self.data
.retain_and_shrink(|id, (_, _)| stats.data.contains_key(id));

if !dropped_ids.is_empty() {
// drop closed entities which no longer have stats.
self.data
.retain_and_shrink(|id, (_, _)| stats.data.contains_key(id));
ids.remove_all(&dropped_ids);
}
}
}

// === impl Updating ===

impl<'a, T> Deref for Updating<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0 .0
}
}

impl<'a, T> DerefMut for Updating<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0 .0
}
}

impl<'a, T> Drop for Updating<'a, T> {
fn drop(&mut self) {
self.0 .1 = true;
}
}
Loading

0 comments on commit f4a21ac

Please sign in to comment.