Skip to content

Commit

Permalink
WIP: Refactor sidecar::interface::SessionInfo to its own file.
Browse files Browse the repository at this point in the history
Also, add rustdoc comments and tests. Uncovered a bug in the
shtudown_running_instances function where it never shutdown the running
instances.
  • Loading branch information
ekump committed Apr 17, 2024
1 parent 580ae4e commit b658e1c
Show file tree
Hide file tree
Showing 3 changed files with 260 additions and 117 deletions.
123 changes: 6 additions & 117 deletions sidecar/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,12 @@ use ddtelemetry::{
},
};

use crate::log::{
MultiEnvFilterGuard, MultiWriterGuard, TemporarilyRetainedMapStats, MULTI_LOG_FILTER,
MULTI_LOG_WRITER,
};
use crate::{config, log, tracer};
use crate::log::{TemporarilyRetainedMapStats, MULTI_LOG_FILTER, MULTI_LOG_WRITER};
use crate::{config, log};

use crate::service::{
InstanceId, QueueId, RequestIdentification, RequestIdentifier, RuntimeMetadata,
SerializedTracerHeaderTags, SidecarInterface, SidecarInterfaceRequest,
SerializedTracerHeaderTags, SessionInfo, SidecarInterface, SidecarInterfaceRequest,
SidecarInterfaceResponse,
};

Expand Down Expand Up @@ -94,114 +91,6 @@ pub enum SidecarAction {
PhpComposerTelemetryFile(PathBuf),
}

#[derive(Default, Clone)]
struct SessionInfo {
runtimes: Arc<Mutex<HashMap<String, RuntimeInfo>>>,
session_config: Arc<Mutex<Option<ddtelemetry::config::Config>>>,
tracer_config: Arc<Mutex<tracer::Config>>,
log_guard: Arc<Mutex<Option<(MultiEnvFilterGuard<'static>, MultiWriterGuard<'static>)>>>,
#[cfg(feature = "tracing")]
session_id: String,
}

impl SessionInfo {
fn get_runtime(&self, runtime_id: &String) -> RuntimeInfo {
let mut runtimes = self.lock_runtimes();
match runtimes.get(runtime_id) {
Some(runtime) => runtime.clone(),
None => {
let mut runtime = RuntimeInfo::default();
runtimes.insert(runtime_id.clone(), runtime.clone());
#[cfg(feature = "tracing")]
if enabled!(Level::INFO) {
runtime.instance_id = InstanceId {
session_id: self.session_id.clone(),
runtime_id: runtime_id.clone(),
};
info!(
"Registering runtime_id {} for session {}",
runtime_id, self.session_id
);
}
runtime
}
}
}

async fn shutdown(&self) {
let runtimes: Vec<RuntimeInfo> = self
.lock_runtimes()
.drain()
.map(|(_, instance)| instance)
.collect();

let runtimes_shutting_down: Vec<_> = runtimes
.into_iter()
.map(|rt| tokio::spawn(async move { rt.shutdown().await }))
.collect();

future::join_all(runtimes_shutting_down).await;
}

async fn shutdown_running_instances(&self) {
let runtimes: Vec<RuntimeInfo> = self
.lock_runtimes()
.iter()
.map(|(_, instance)| instance.clone())
.collect();

let instances_shutting_down: Vec<_> = runtimes
.into_iter()
.map(|rt| tokio::spawn(async move { rt.shutdown().await }))
.collect();

future::join_all(instances_shutting_down).await;
}

async fn shutdown_runtime(self, runtime_id: &String) {
let runtime = match self.lock_runtimes().remove(runtime_id) {
Some(rt) => rt,
None => return,
};

runtime.shutdown().await
}

fn lock_runtimes(&self) -> MutexGuard<HashMap<String, RuntimeInfo>> {
self.runtimes.lock().unwrap()
}

fn get_telemetry_config(&self) -> MutexGuard<Option<ddtelemetry::config::Config>> {
let mut cfg = self.session_config.lock().unwrap();

if (*cfg).is_none() {
*cfg = Some(ddtelemetry::config::Config::from_env())
}

cfg
}

fn modify_telemetry_config<F>(&self, mut f: F)
where
F: FnMut(&mut ddtelemetry::config::Config),
{
if let Some(cfg) = &mut *self.get_telemetry_config() {
f(cfg)
}
}

fn get_trace_config(&self) -> MutexGuard<tracer::Config> {
self.tracer_config.lock().unwrap()
}

fn modify_trace_config<F>(&self, mut f: F)
where
F: FnMut(&mut tracer::Config),
{
f(&mut self.get_trace_config());
}
}

#[allow(clippy::large_enum_variant)]
enum AppOrQueue {
App(Shared<ManualFuture<(String, String)>>),
Expand All @@ -210,11 +99,11 @@ enum AppOrQueue {

#[allow(clippy::type_complexity)]
#[derive(Clone, Default)]
struct RuntimeInfo {
pub struct RuntimeInfo {
apps: Arc<Mutex<HashMap<(String, String), Shared<ManualFuture<Option<AppInstance>>>>>>,
app_or_actions: Arc<Mutex<HashMap<QueueId, AppOrQueue>>>,
#[cfg(feature = "tracing")]
instance_id: InstanceId,
pub instance_id: InstanceId,
}

impl RuntimeInfo {
Expand All @@ -239,7 +128,7 @@ impl RuntimeInfo {
}
}

async fn shutdown(self) {
pub async fn shutdown(self) {
#[cfg(feature = "tracing")]
info!(
"Shutting down runtime_id {} for session {}",
Expand Down
2 changes: 2 additions & 0 deletions sidecar/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub use queue_id::QueueId;
pub use request_identification::{RequestIdentification, RequestIdentifier};
pub use runtime_metadata::RuntimeMetadata;
pub use serialized_tracer_header_tags::SerializedTracerHeaderTags;
pub use session_info::SessionInfo;
pub use sidecar_interface::{
SidecarInterface, SidecarInterfaceClient, SidecarInterfaceRequest, SidecarInterfaceResponse,
};
Expand All @@ -15,4 +16,5 @@ pub mod queue_id;
mod request_identification;
mod runtime_metadata;
mod serialized_tracer_header_tags;
mod session_info;
mod sidecar_interface;
Loading

0 comments on commit b658e1c

Please sign in to comment.