Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix leaking sessions #642

Merged
merged 1 commit into from
Sep 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 78 additions & 82 deletions sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use manual_future::{ManualFuture, ManualFutureCompleter};
use std::borrow::Cow;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};

use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
Expand Down Expand Up @@ -249,71 +248,6 @@ impl SidecarServer {
.expect("Unable to acquire lock on sessions")
}

async fn get_app(
&self,
instance_id: &InstanceId,
runtime_meta: &RuntimeMetadata,
service_name: &str,
env_name: &str,
initial_actions: Vec<TelemetryActions>,
) -> Option<AppInstance> {
let rt_info = self.get_runtime(instance_id);

let manual_app_future = rt_info.get_app(service_name, env_name);

if manual_app_future.completer.is_none() {
return manual_app_future.app_future.await;
}

let mut builder = TelemetryWorkerBuilder::new_fetch_host(
service_name.to_owned(),
runtime_meta.language_name.to_owned(),
runtime_meta.language_version.to_owned(),
runtime_meta.tracer_version.to_owned(),
);
builder.runtime_id = Some(instance_id.runtime_id.to_owned());
builder.application.env = Some(env_name.to_owned());
let session_info = self.get_session(&instance_id.session_id);
let mut config = session_info
.session_config
.lock()
.expect("Unable to acquire lock on session_config")
.clone()
.unwrap_or_else(ddtelemetry::config::Config::from_env);
config.restartable = true;

let instance_option = match builder.spawn_with_config(config.clone()).await {
Ok((handle, worker_join)) => {
info!("spawning telemetry worker {config:?}");

let instance = AppInstance {
telemetry: handle,
telemetry_worker_shutdown: worker_join.map(Result::ok).boxed().shared(),
telemetry_metrics: Default::default(),
};

instance.telemetry.send_msgs(initial_actions).await.ok();

instance
.telemetry
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
.await
.ok();
Some(instance)
}
Err(e) => {
error!("could not spawn telemetry worker {:?}", e);
None
}
};
manual_app_future
.completer
.expect("Completed expected Some ManualFuture for application instance, but found none")
.complete(instance_option)
.await;
manual_app_future.app_future.await
}

fn send_trace_v04(&self, headers: &SerializedTracerHeaderTags, data: &[u8], target: &Endpoint) {
let headers: TracerHeaderTags = match headers.try_into() {
Ok(headers) => headers,
Expand Down Expand Up @@ -588,20 +522,77 @@ impl SidecarInterface for SidecarServer {
}
};
if let Some(AppOrQueue::Queue(mut enqueued_data)) = app_or_queue {
let rt_info = self.get_runtime(&instance_id);
let manual_app_future = rt_info.get_app(&service_name, &env_name);

let instance_future = if manual_app_future.completer.is_some() {
let mut builder = TelemetryWorkerBuilder::new_fetch_host(
service_name.to_owned(),
runtime_meta.language_name.to_owned(),
runtime_meta.language_version.to_owned(),
runtime_meta.tracer_version.to_owned(),
);
builder.runtime_id = Some(instance_id.runtime_id.to_owned());
builder.application.env = Some(env_name.to_owned());
let session_info = self.get_session(&instance_id.session_id);
let mut config = session_info
.session_config
.lock()
.expect("Unable to acquire lock on session_config")
.clone()
.unwrap_or_else(ddtelemetry::config::Config::from_env);
config.restartable = true;
Some(
builder
.spawn_with_config(config.clone())
.map(move |result| {
if result.is_ok() {
info!("spawning telemetry worker {config:?}");
}
result
}),
)
} else {
None
};

tokio::spawn(async move {
let mut actions: Vec<TelemetryActions> = vec![];
enqueued_data.extract_telemetry_actions(&mut actions).await;

if let Some(mut app) = self
.get_app(
&instance_id,
&runtime_meta,
&service_name,
&env_name,
actions,
)
.await
{
if let Some(instance_future) = instance_future {
let instance_option = match instance_future.await {
Ok((handle, worker_join)) => {
let instance = AppInstance {
telemetry: handle,
telemetry_worker_shutdown: worker_join
.map(Result::ok)
.boxed()
.shared(),
telemetry_metrics: Default::default(),
};

let mut actions: Vec<TelemetryActions> = vec![];
enqueued_data.extract_telemetry_actions(&mut actions).await;
instance.telemetry.send_msgs(actions).await.ok();

instance
.telemetry
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
.await
.ok();
Some(instance)
}
Err(e) => {
error!("could not spawn telemetry worker {:?}", e);
None
}
};
manual_app_future
.completer
.expect("Completed expected Some ManualFuture for application instance, but found none")
.complete(instance_option)
.await;
}

if let Some(mut app) = manual_app_future.app_future.await {
// Register metrics
for metric in std::mem::take(&mut enqueued_data.metrics).into_iter() {
app.register_metric(metric);
Expand All @@ -618,9 +609,14 @@ impl SidecarInterface for SidecarServer {
if actions.iter().any(|action| {
matches!(action, TelemetryActions::Lifecycle(LifecycleAction::Stop))
}) {
self.get_runtime(&instance_id)
.lock_applications()
.remove(&queue_id);
// Avoid self.get_runtime(), it could create a new one.
if let Some(session) = self.lock_sessions().get(&instance_id.session_id) {
if let Some(runtime) =
session.lock_runtimes().get(&instance_id.runtime_id)
{
runtime.lock_applications().remove(&queue_id);
}
}
}

app.telemetry.send_msgs(actions).await.ok();
Expand Down
Loading