Skip to content

Commit

Permalink
buck2: event log: use async_cleanup_context up the stack
Browse files Browse the repository at this point in the history
Summary: is cleaner

Reviewed By: stepancheg

Differential Revision: D58788618

fbshipit-source-id: 64c2cd6f358d23d680866a27f3d2931774fb6be8
  • Loading branch information
iguridi authored and facebook-github-bot committed Jun 20, 2024
1 parent 493369b commit d835d86
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 24 deletions.
18 changes: 16 additions & 2 deletions app/buck2_client_ctx/src/subscribers/event_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ use buck2_core::fs::working_dir::WorkingDir;
use buck2_event_log::write::WriteEventLog;
use buck2_events::BuckEvent;
use buck2_util::cleanup_ctx::AsyncCleanupContext;
use futures::FutureExt;

use crate::subscribers::subscriber::EventSubscriber;
use crate::subscribers::subscriber::Tick;

/// This EventLog lets us to events emitted by Buck and log them to a file. The events are
/// serialized as JSON and logged one per line.
pub(crate) struct EventLog<'a> {
writer: WriteEventLog<'a>,
async_cleanup_context: Option<AsyncCleanupContext<'a>>,
writer: WriteEventLog,
}

impl<'a> EventLog<'a> {
Expand All @@ -41,13 +43,13 @@ impl<'a> EventLog<'a> {
allow_vpnless: bool,
) -> anyhow::Result<EventLog> {
Ok(Self {
async_cleanup_context: Some(async_cleanup_context),
writer: WriteEventLog::new(
logdir,
working_dir,
extra_path,
extra_user_event_log_path,
sanitized_argv,
async_cleanup_context,
command_name,
log_size_counter_bytes,
allow_vpnless,
Expand Down Expand Up @@ -94,3 +96,15 @@ impl<'a> EventSubscriber for EventLog<'a> {
Ok(())
}
}

impl<'a> Drop for EventLog<'a> {
fn drop(&mut self) {
let exit = self.writer.exit();
match self.async_cleanup_context.as_ref() {
Some(async_cleanup_context) => {
async_cleanup_context.register("event log upload", exit.boxed());
}
None => (),
}
}
}
26 changes: 4 additions & 22 deletions app/buck2_event_log/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ use buck2_core::fs::paths::abs_norm_path::AbsNormPathBuf;
use buck2_core::fs::paths::abs_path::AbsPathBuf;
use buck2_core::fs::working_dir::WorkingDir;
use buck2_events::BuckEvent;
use buck2_util::cleanup_ctx::AsyncCleanupContext;
use buck2_wrapper_common::invocation_id::TraceId;
use futures::future::Future;
use futures::FutureExt;
use prost::Message;
use serde::Serialize;
use tokio::fs::OpenOptions;
Expand Down Expand Up @@ -55,9 +53,8 @@ enum LogWriterState {
Closed,
}

pub struct WriteEventLog<'a> {
pub struct WriteEventLog {
state: LogWriterState,
async_cleanup_context: Option<AsyncCleanupContext<'a>>,
sanitized_argv: SanitizedArgv,
command_name: String,
working_dir: WorkingDir,
Expand All @@ -67,14 +64,13 @@ pub struct WriteEventLog<'a> {
allow_vpnless: bool,
}

impl<'a> WriteEventLog<'a> {
impl WriteEventLog {
pub fn new(
logdir: AbsNormPathBuf,
working_dir: WorkingDir,
extra_path: Option<AbsPathBuf>,
extra_user_event_log_path: Option<AbsPathBuf>,
sanitized_argv: SanitizedArgv,
async_cleanup_context: AsyncCleanupContext<'a>,
command_name: String,
log_size_counter_bytes: Option<Arc<AtomicU64>>,
allow_vpnless: bool,
Expand All @@ -85,7 +81,6 @@ impl<'a> WriteEventLog<'a> {
extra_path,
extra_user_event_log_path,
},
async_cleanup_context: Some(async_cleanup_context),
sanitized_argv,
command_name,
working_dir,
Expand Down Expand Up @@ -244,18 +239,6 @@ impl<'a> WriteEventLog<'a> {
}
}

impl<'a> Drop for WriteEventLog<'a> {
fn drop(&mut self) {
let exit = self.exit();
match self.async_cleanup_context.as_ref() {
Some(async_cleanup_context) => {
async_cleanup_context.register("event log upload", exit.boxed());
}
None => (),
}
}
}

async fn start_persist_event_log_subprocess(
path: EventLogPathBuf,
trace_id: TraceId,
Expand Down Expand Up @@ -342,7 +325,7 @@ async fn open_event_log_for_writing(
))
}

impl<'a> WriteEventLog<'a> {
impl WriteEventLog {
pub async fn write_events(&mut self, events: &[Arc<BuckEvent>]) -> anyhow::Result<()> {
let mut event_refs = Vec::new();
let mut first = true;
Expand Down Expand Up @@ -474,7 +457,7 @@ mod tests {
use crate::stream_value::StreamValue;
use crate::utils::Compression;

impl WriteEventLog<'static> {
impl WriteEventLog {
async fn new_test(log: EventLogPathBuf) -> anyhow::Result<Self> {
Ok(Self {
state: LogWriterState::Opened {
Expand All @@ -486,7 +469,6 @@ mod tests {
argv: vec!["buck2".to_owned()],
expanded_argv: vec!["buck2".to_owned()],
},
async_cleanup_context: None,
command_name: "testtest".to_owned(),
working_dir: WorkingDir::current_dir()?,
buf: Vec::new(),
Expand Down

0 comments on commit d835d86

Please sign in to comment.