diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index fb65646b3..e87f0a4ab 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -28,7 +28,6 @@ use manual_future::{ManualFuture, ManualFutureCompleter}; use std::borrow::Cow; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; - use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex, MutexGuard}; @@ -249,71 +248,6 @@ impl SidecarServer { .expect("Unable to acquire lock on sessions") } - async fn get_app( - &self, - instance_id: &InstanceId, - runtime_meta: &RuntimeMetadata, - service_name: &str, - env_name: &str, - initial_actions: Vec, - ) -> Option { - let rt_info = self.get_runtime(instance_id); - - let manual_app_future = rt_info.get_app(service_name, env_name); - - if manual_app_future.completer.is_none() { - return manual_app_future.app_future.await; - } - - let mut builder = TelemetryWorkerBuilder::new_fetch_host( - service_name.to_owned(), - runtime_meta.language_name.to_owned(), - runtime_meta.language_version.to_owned(), - runtime_meta.tracer_version.to_owned(), - ); - builder.runtime_id = Some(instance_id.runtime_id.to_owned()); - builder.application.env = Some(env_name.to_owned()); - let session_info = self.get_session(&instance_id.session_id); - let mut config = session_info - .session_config - .lock() - .expect("Unable to acquire lock on session_config") - .clone() - .unwrap_or_else(ddtelemetry::config::Config::from_env); - config.restartable = true; - - let instance_option = match builder.spawn_with_config(config.clone()).await { - Ok((handle, worker_join)) => { - info!("spawning telemetry worker {config:?}"); - - let instance = AppInstance { - telemetry: handle, - telemetry_worker_shutdown: worker_join.map(Result::ok).boxed().shared(), - telemetry_metrics: Default::default(), - }; - - instance.telemetry.send_msgs(initial_actions).await.ok(); - - instance - .telemetry - .send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start)) - .await - .ok(); - Some(instance) - } - Err(e) => { - error!("could not spawn telemetry worker {:?}", e); - None - } - }; - manual_app_future - .completer - .expect("Completed expected Some ManualFuture for application instance, but found none") - .complete(instance_option) - .await; - manual_app_future.app_future.await - } - fn send_trace_v04(&self, headers: &SerializedTracerHeaderTags, data: &[u8], target: &Endpoint) { let headers: TracerHeaderTags = match headers.try_into() { Ok(headers) => headers, @@ -588,20 +522,77 @@ impl SidecarInterface for SidecarServer { } }; if let Some(AppOrQueue::Queue(mut enqueued_data)) = app_or_queue { + let rt_info = self.get_runtime(&instance_id); + let manual_app_future = rt_info.get_app(&service_name, &env_name); + + let instance_future = if manual_app_future.completer.is_some() { + let mut builder = TelemetryWorkerBuilder::new_fetch_host( + service_name.to_owned(), + runtime_meta.language_name.to_owned(), + runtime_meta.language_version.to_owned(), + runtime_meta.tracer_version.to_owned(), + ); + builder.runtime_id = Some(instance_id.runtime_id.to_owned()); + builder.application.env = Some(env_name.to_owned()); + let session_info = self.get_session(&instance_id.session_id); + let mut config = session_info + .session_config + .lock() + .expect("Unable to acquire lock on session_config") + .clone() + .unwrap_or_else(ddtelemetry::config::Config::from_env); + config.restartable = true; + Some( + builder + .spawn_with_config(config.clone()) + .map(move |result| { + if result.is_ok() { + info!("spawning telemetry worker {config:?}"); + } + result + }), + ) + } else { + None + }; + tokio::spawn(async move { - let mut actions: Vec = vec![]; - enqueued_data.extract_telemetry_actions(&mut actions).await; - - if let Some(mut app) = self - .get_app( - &instance_id, - &runtime_meta, - &service_name, - &env_name, - actions, - ) - .await - { + if let Some(instance_future) = instance_future { + let instance_option = match instance_future.await { + Ok((handle, worker_join)) => { + let instance = AppInstance { + telemetry: handle, + telemetry_worker_shutdown: worker_join + .map(Result::ok) + .boxed() + .shared(), + telemetry_metrics: Default::default(), + }; + + let mut actions: Vec = vec![]; + enqueued_data.extract_telemetry_actions(&mut actions).await; + instance.telemetry.send_msgs(actions).await.ok(); + + instance + .telemetry + .send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start)) + .await + .ok(); + Some(instance) + } + Err(e) => { + error!("could not spawn telemetry worker {:?}", e); + None + } + }; + manual_app_future + .completer + .expect("Completed expected Some ManualFuture for application instance, but found none") + .complete(instance_option) + .await; + } + + if let Some(mut app) = manual_app_future.app_future.await { // Register metrics for metric in std::mem::take(&mut enqueued_data.metrics).into_iter() { app.register_metric(metric); @@ -618,9 +609,14 @@ impl SidecarInterface for SidecarServer { if actions.iter().any(|action| { matches!(action, TelemetryActions::Lifecycle(LifecycleAction::Stop)) }) { - self.get_runtime(&instance_id) - .lock_applications() - .remove(&queue_id); + // Avoid self.get_runtime(), it could create a new one. + if let Some(session) = self.lock_sessions().get(&instance_id.session_id) { + if let Some(runtime) = + session.lock_runtimes().get(&instance_id.runtime_id) + { + runtime.lock_applications().remove(&queue_id); + } + } } app.telemetry.send_msgs(actions).await.ok();