Skip to content

Commit

Permalink
WIP: sidecar server - move logic for processing of interceptor response
Browse files Browse the repository at this point in the history
Refactor into separate function. Also, replace unwraps() with expects().
  • Loading branch information
ekump committed Apr 30, 2024
1 parent 0793ec2 commit ed4b03e
Showing 1 changed file with 97 additions and 42 deletions.
139 changes: 97 additions & 42 deletions sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

// TODO: APM-1076 -
use crate::config::get_product_endpoint;
use crate::interface::{
AppOrQueue, RuntimeInfo, SessionConfig, SidecarAction, SidecarStats, TraceFlusher,
Expand Down Expand Up @@ -59,7 +63,6 @@ impl SidecarServer {
},
Transport::from(async_channel),
);

let mut executor = datadog_ipc::sequential::execute_sequential(
server.requests(),
self.clone().serve(),
Expand All @@ -79,38 +82,58 @@ impl SidecarServer {
warn!("Error from executor: {e:?}");
}

if let Ok((sessions, instances)) = session_interceptor.await {
for session in sessions {
let stop = {
let mut counter = self.session_counter.lock().unwrap();
if let Entry::Occupied(mut entry) = counter.entry(session.clone()) {
if entry.insert(entry.get() - 1) == 1 {
entry.remove();
true
self.process_interceptor_response(session_interceptor.await)
.await;
}

pub fn active_session_count(&self) -> usize {
self.session_counter
.lock()
.expect("Unable to acquire lock on session_counter")
.len()
}

async fn process_interceptor_response(
&self,
result: Result<(HashSet<String>, HashSet<InstanceId>), tokio::task::JoinError>,
) {
match result {
Ok((sessions, instances)) => {
for session in sessions {
let stop = {
let mut counter = self
.session_counter
.lock()
.expect("Unable to obtain lock on session_counter");
if let Entry::Occupied(mut entry) = counter.entry(session.clone()) {
if entry.insert(entry.get() - 1) == 1 {
entry.remove();
true
} else {
false
}
} else {
false
}
} else {
false
};
if stop {
self.stop_session(&session).await;
}
};
if stop {
self.stop_session(&session).await;
}
}
for instance_id in instances {
let maybe_session = self.lock_sessions().get(&instance_id.session_id).cloned();
if let Some(session) = maybe_session {
session.shutdown_runtime(&instance_id.runtime_id).await;
for instance_id in instances {
let maybe_session = self.lock_sessions().get(&instance_id.session_id).cloned();
if let Some(session) = maybe_session {
session.shutdown_runtime(&instance_id.runtime_id).await;
}
}
}
Err(e) => {
// TODO: APMSP-1076 - Do we need to do more than just log this error?
debug!("session interceptor encountered an error: {:?}", e);
}
}
}

pub fn active_session_count(&self) -> usize {
self.session_counter.lock().unwrap().len()
}

fn get_session(&self, session_id: &String) -> SessionInfo {
let mut sessions = self.lock_sessions();
match sessions.get(session_id) {
Expand Down Expand Up @@ -145,7 +168,9 @@ impl SidecarServer {
}

fn lock_sessions(&self) -> MutexGuard<HashMap<String, SessionInfo>> {
self.sessions.lock().unwrap()
self.sessions
.lock()
.expect("Unable to acquire lock on sessions")
}

async fn get_app(
Expand Down Expand Up @@ -175,12 +200,12 @@ impl SidecarServer {
let mut config = session_info
.session_config
.lock()
.unwrap()
.expect("Unable to acquire lock on session_config")
.clone()
.unwrap_or_else(ddtelemetry::config::Config::from_env);
config.restartable = true;

// TODO: log errors
// TODO: APMSP-1076 - log errors
let instance_option = match builder.spawn_with_config(config.clone()).await {
Ok((handle, worker_join)) => {
info!("spawning telemetry worker {config:?}");
Expand All @@ -205,7 +230,10 @@ impl SidecarServer {
None
}
};
completer.unwrap().complete(instance_option).await;
completer
.expect("Completed expected Some ManualFuture for application instance, but found none")
.complete(instance_option)
.await;
app_future.await
}

Expand Down Expand Up @@ -266,7 +294,11 @@ impl SidecarServer {
SidecarStats {
trace_flusher: self.trace_flusher.stats(),
sessions: sessions.len() as u32,
session_counter_size: self.session_counter.lock().unwrap().len() as u32,
session_counter_size: self
.session_counter
.lock()
.expect("Unable to acquire lock on session_counter")
.len() as u32,
runtimes: sessions
.values()
.map(|s| s.lock_runtimes().len() as u32)
Expand Down Expand Up @@ -330,7 +362,10 @@ impl SidecarServer {
.values()
.map(|a| {
a.peek().unwrap_or(&None).as_ref().map_or(0, |w| {
w.telemetry_metrics.lock().unwrap().len() as u32
w.telemetry_metrics
.lock()
.expect("Unable to acquire lock on telemetry_metrics")
.len() as u32
})
})
.sum::<u32>()
Expand Down Expand Up @@ -380,7 +415,11 @@ impl SidecarInterface for SidecarServer {
let apps = rt_info.apps.clone();
tokio::spawn(async move {
let service = service_future.await;
let app_future = if let Some(fut) = apps.lock().unwrap().get(&service) {
let app_future = if let Some(fut) = apps
.lock()
.expect("Unable to acquire lock on apps")
.get(&service)
{
fut.clone()
} else {
return;
Expand Down Expand Up @@ -508,18 +547,27 @@ impl SidecarInterface for SidecarServer {
.min_force_drop_size
.store(config.force_drop_size as u32, Ordering::Relaxed);

session.log_guard.lock().unwrap().replace((
log::MULTI_LOG_FILTER.add(config.log_level),
log::MULTI_LOG_WRITER.add(config.log_file),
));

if let Some(completer) = self.self_telemetry_config.lock().unwrap().take() {
session
.log_guard
.lock()
.expect("Unable to acquire lock on session log_guard")
.replace((
log::MULTI_LOG_FILTER.add(config.log_level),
log::MULTI_LOG_WRITER.add(config.log_file),
));

if let Some(completer) = self
.self_telemetry_config
.lock()
.expect("Unable to acquire lock on telemetry_config")
.take()
{
let config = session
.session_config
.lock()
.unwrap()
.expect("Unable to acquire lock on session_config")
.as_ref()
.unwrap()
.expect("Expected session_config to be Some(Config) but received None")
.clone();
tokio::spawn(async move {
completer.complete(config).await;
Expand Down Expand Up @@ -616,14 +664,17 @@ impl SidecarInterface for SidecarServer {
fn stats(self, _: Context) -> Self::StatsFut {
Box::pin(async move {
let stats = self.compute_stats().await;
simd_json::serde::to_string(&stats).unwrap()
simd_json::serde::to_string(&stats).expect("unable to serialize stats to string")
})
}
}

// The session_interceptor function keeps track of session counts and submitted payload counts. It
// also keeps track of RequestIdentifiers and returns hashsets of session and instance ids when the
// rx channel is closed.
async fn session_interceptor(
session_counter: Arc<Mutex<HashMap<String, u32>>>,
submitted_payloads: Arc<AtomicU64>,
submitted_payload_count: Arc<AtomicU64>,
mut rx: tokio::sync::mpsc::Receiver<(
ServeSidecarInterface<SidecarServer>,
InFlightRequest<SidecarInterfaceRequest, SidecarInterfaceResponse>,
Expand All @@ -641,7 +692,7 @@ async fn session_interceptor(
Some(s) => s,
};

submitted_payloads.fetch_add(1, Ordering::Relaxed);
submitted_payload_count.fetch_add(1, Ordering::Relaxed);

let instance: RequestIdentifier = req.get().extract_identifier();
if tx.send((serve, req)).await.is_ok() {
Expand All @@ -655,7 +706,11 @@ async fn session_interceptor(
}) = instance
{
if sessions.insert(session.clone()) {
match session_counter.lock().unwrap().entry(session) {
match session_counter
.lock()
.expect("Unable to obtain lock on session counter")
.entry(session)
{
Entry::Occupied(mut entry) => entry.insert(entry.get() + 1),
Entry::Vacant(entry) => *entry.insert(1),
};
Expand Down

0 comments on commit ed4b03e

Please sign in to comment.