Skip to content

Commit

Permalink
buck2: event log: writer to module
Browse files Browse the repository at this point in the history
Summary: Now fields in NamedEventLogWriter can't be accesed outside this module. Which is nice I think

Reviewed By: stepancheg

Differential Revision: D58788229

fbshipit-source-id: 8c8f0f0445faf32c1168936010d02363fe836af1
  • Loading branch information
iguridi authored and facebook-github-bot committed Jun 20, 2024
1 parent e9e0d30 commit 493369b
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 196 deletions.
1 change: 1 addition & 0 deletions app/buck2_event_log/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub mod stream_value;
pub mod user_event_types;
pub mod utils;
pub mod write;
pub mod writer;

pub fn should_upload_log() -> anyhow::Result<bool> {
if buck2_core::is_open_source() {
Expand Down
202 changes: 6 additions & 196 deletions app/buck2_event_log/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,14 @@
* of this source tree.
*/

use std::io;
use std::mem;
use std::pin::Pin;
use std::process::Stdio;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

use anyhow::Context as _;
use async_compression::tokio::write::GzipEncoder;
use async_compression::tokio::write::ZstdEncoder;
use buck2_cli_proto::*;
use buck2_common::argv::SanitizedArgv;
use buck2_core::fs::paths::abs_norm_path::AbsNormPathBuf;
use buck2_core::fs::paths::abs_path::AbsPathBuf;
use buck2_core::fs::working_dir::WorkingDir;
Expand All @@ -29,205 +23,26 @@ use buck2_util::cleanup_ctx::AsyncCleanupContext;
use buck2_wrapper_common::invocation_id::TraceId;
use futures::future::Future;
use futures::FutureExt;
use pin_project::pin_project;
use prost::Message;
use serde::Serialize;
use tokio::fs::OpenOptions;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;

use crate::file_names::get_logfile_name;
use crate::file_names::remove_old_logs;
use crate::read::EventLogPathBuf;
use crate::should_block_on_log_upload;
use crate::should_upload_log;
use crate::utils::Compression;
use crate::user_event_types::try_get_user_event;
use crate::utils::Encoding;
use crate::utils::EventLogErrors;
use crate::utils::Invocation;
use crate::utils::LogMode;
use crate::utils::NoInference;
use crate::wait_for_child_and_log;
use crate::writer::EventLogType;
use crate::writer::NamedEventLogWriter;
use crate::writer::SerializeForLog;
use crate::FutureChildOutput;

type EventLogWriter = Box<dyn AsyncWrite + Send + Sync + Unpin + 'static>;

mod counting_reader {
use super::*;

#[pin_project]
pub struct CountingReader<T> {
#[pin]
pub(super) inner: T,
pub(super) stats: Option<Arc<AtomicU64>>,
}
}

use buck2_common::argv::SanitizedArgv;
use counting_reader::CountingReader;

use super::user_event_types::try_get_user_event;

impl<T> CountingReader<T> {
fn new(inner: T, stats: Option<Arc<AtomicU64>>) -> Self {
Self { inner, stats }
}
}

impl<T> AsyncWrite for CountingReader<T>
where
T: AsyncWrite,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let this = self.project();
let bytes = futures::ready!(this.inner.poll_write(cx, buf))?;
if let Some(stats) = this.stats {
stats.fetch_add(bytes as u64, Ordering::Relaxed);
}

Poll::Ready(Ok(bytes))
}

fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
self.project().inner.poll_flush(cx)
}

fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
self.project().inner.poll_shutdown(cx)
}
}

#[derive(Eq, PartialEq, Copy, Clone)]
pub(crate) enum EventLogType {
System,
User,
}

struct NamedEventLogWriter {
path: EventLogPathBuf,
file: EventLogWriter,
event_log_type: EventLogType,
/// If this writing is done by a subprocess, that process's output, assuming we intend to wait
/// for it to exit.
process_to_wait_for: Option<FutureChildOutput>,
}

impl NamedEventLogWriter {
fn new(
path: EventLogPathBuf,
file: impl AsyncWrite + std::marker::Send + std::marker::Unpin + std::marker::Sync + 'static,
bytes_written: Option<Arc<AtomicU64>>,
event_log_type: EventLogType,
process_to_wait_for: Option<FutureChildOutput>,
) -> Self {
let file = match path.encoding.compression {
Compression::None => {
Box::new(CountingReader::new(file, bytes_written)) as EventLogWriter
}
Compression::Gzip => Box::new(GzipEncoder::with_quality(
CountingReader::new(file, bytes_written),
async_compression::Level::Fastest,
)) as EventLogWriter,
Compression::Zstd => Box::new(ZstdEncoder::with_quality(
CountingReader::new(file, bytes_written),
async_compression::Level::Default,
)) as EventLogWriter,
};
Self {
path,
file,
event_log_type,
process_to_wait_for,
}
}

async fn flush(&mut self) -> anyhow::Result<()> {
match self.file.flush().await {
Ok(_) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
// The subprocess exited with some kind of error. That is logged separately, so
// here we just ignore it.
Ok(())
}
Err(e) => Err(anyhow::Error::from(e).context(format!(
"Error flushing log file at {}",
self.path.path.display()
))),
}
}

async fn shutdown(&mut self) {
if let Err(e) = self.file.shutdown().await {
tracing::warn!("Failed to flush log file at `{}`: {:#}", self.path.path, e);
}
}

fn child(mut self) -> Option<FutureChildOutput> {
self.process_to_wait_for.take()
}

fn serialize_event<'b, T>(&self, mut buf: &mut Vec<u8>, event: &T) -> anyhow::Result<()>
where
T: SerializeForLog + 'b,
{
match self.event_log_type {
EventLogType::System => {
match self.path.encoding.mode {
LogMode::Json => {
event.serialize_to_json(&mut buf)?;
buf.push(b'\n');
}
LogMode::Protobuf => event.serialize_to_protobuf_length_delimited(&mut buf)?,
};
}
EventLogType::User => {
if event.maybe_serialize_user_event(&mut buf)? {
buf.push(b'\n');
}
}
}
Ok(())
}

async fn write_all(&mut self, buf: &[u8]) -> anyhow::Result<()> {
match self.file.write_all(buf).await {
Ok(_) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
// The subprocess exited with some kind of error. That is logged separately, so
// here we just ignore it.
Ok(())
}
Err(e) => Err(anyhow::Error::from(e).context("Failed to write event")),
}
}

async fn write_events<'b, T, I>(
&mut self,
mut buf: &mut Vec<u8>,
events: &I,
) -> Result<(), anyhow::Error>
where
T: SerializeForLog + 'b,
I: IntoIterator<Item = &'b T> + Clone + 'b,
{
for event in events.clone() {
self.serialize_event(&mut buf, event)?;
}
self.write_all(&buf).await?;
Ok(())
}
}

enum LogWriterState {
Unopened {
logdir: AbsNormPathBuf,
Expand Down Expand Up @@ -582,12 +397,6 @@ impl<'a> WriteEventLog<'a> {
}
}

pub(crate) trait SerializeForLog {
fn serialize_to_json(&self, buf: &mut Vec<u8>) -> anyhow::Result<()>;
fn serialize_to_protobuf_length_delimited(&self, buf: &mut Vec<u8>) -> anyhow::Result<()>;
fn maybe_serialize_user_event(&self, buf: &mut Vec<u8>) -> anyhow::Result<bool>;
}

impl SerializeForLog for Invocation {
fn serialize_to_json(&self, buf: &mut Vec<u8>) -> anyhow::Result<()> {
serde_json::to_writer(buf, &self).context("Failed to serialize event")
Expand Down Expand Up @@ -663,6 +472,7 @@ mod tests {

use super::*;
use crate::stream_value::StreamValue;
use crate::utils::Compression;

impl WriteEventLog<'static> {
async fn new_test(log: EventLogPathBuf) -> anyhow::Result<Self> {
Expand Down
Loading

0 comments on commit 493369b

Please sign in to comment.