Skip to content

Commit

Permalink
refactor: internal log processing structures
Browse files Browse the repository at this point in the history
  • Loading branch information
DDtKey committed Jul 6, 2024
1 parent f80f4cd commit aef72f7
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 98 deletions.
1 change: 1 addition & 0 deletions testcontainers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ futures = "0.3"
log = "0.4"
memchr = "2.7.2"
parse-display = "0.9.0"
pin-project-lite = "0.2.14"
reqwest = { version = "0.12.5", features = ["rustls-tls", "rustls-tls-native-roots", "hickory-dns", "json", "charset", "http2"], default-features = false }
serde = { version = "1", features = ["derive"] }
serde-java-properties = { version = "0.2.0", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion testcontainers/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub(crate) mod client;
pub(crate) mod containers;
pub(crate) mod env;
pub mod error;
pub(crate) mod logs;
pub mod logs;
pub(crate) mod mounts;
pub(crate) mod network;
pub mod ports;
Expand Down
126 changes: 56 additions & 70 deletions testcontainers/src/core/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{io, sync::Arc};
use std::io;

use bollard::{
auth::DockerCredentials,
Expand All @@ -12,13 +12,15 @@ use bollard::{
use bollard_stubs::models::{ContainerInspectResponse, ExecInspectResponse, Network};
use futures::{StreamExt, TryStreamExt};
use tokio::sync::OnceCell;
use tokio_stream::wrappers::UnboundedReceiverStream;

use crate::core::{
client::exec::ExecResult,
env,
env::ConfigurationError,
logs::{LogSource, LogStreamAsync},
logs::{
stream::{LogStream, RawLogStream},
LogFrame, LogSource, WaitingStreamWrapper,
},
ports::{PortMappingError, Ports},
};

Expand Down Expand Up @@ -94,12 +96,18 @@ impl Client {
Ok(Client { config, bollard })
}

pub(crate) fn stdout_logs(&self, id: &str, follow: bool) -> LogStreamAsync {
self.logs(id, LogSource::StdOut, follow)
pub(crate) fn stdout_logs(&self, id: &str, follow: bool) -> RawLogStream {
self.logs_stream(id, Some(LogSource::StdOut), follow)
.into_stdout()
}

pub(crate) fn stderr_logs(&self, id: &str, follow: bool) -> RawLogStream {
self.logs_stream(id, Some(LogSource::StdErr), follow)
.into_stderr()
}

pub(crate) fn stderr_logs(&self, id: &str, follow: bool) -> LogStreamAsync {
self.logs(id, LogSource::StdErr, follow)
pub(crate) fn logs(&self, id: &str, follow: bool) -> LogStream {

Check warning on line 109 in testcontainers/src/core/client.rs

View workflow job for this annotation

GitHub Actions / clippy

method `logs` is never used

warning: method `logs` is never used --> testcontainers/src/core/client.rs:109:19 | 91 | impl Client { | ----------- method in this implementation ... 109 | pub(crate) fn logs(&self, id: &str, follow: bool) -> LogStream { | ^^^^ | = note: `#[warn(dead_code)]` on by default
self.logs_stream(id, None, follow)
}

pub(crate) async fn ports(&self, id: &str) -> Result<Ports, ClientError> {
Expand Down Expand Up @@ -184,49 +192,9 @@ impl Client {

match res {
StartExecResults::Attached { output, .. } => {
let (stdout_tx, stdout_rx) = tokio::sync::mpsc::unbounded_channel();
let (stderr_tx, stderr_rx) = tokio::sync::mpsc::unbounded_channel();

tokio::spawn(async move {
macro_rules! handle_error {
($res:expr) => {
if let Err(err) = $res {
log::debug!(
"Receiver has been dropped, stop producing messages: {}",
err
);
break;
}
};
}
let mut output = output;
while let Some(chunk) = output.next().await {
match chunk {
Ok(LogOutput::StdOut { message }) => {
handle_error!(stdout_tx.send(Ok(message)));
}
Ok(LogOutput::StdErr { message }) => {
handle_error!(stderr_tx.send(Ok(message)));
}
Err(err) => {
let err = Arc::new(err);
handle_error!(stdout_tx
.send(Err(io::Error::new(io::ErrorKind::Other, err.clone()))));
handle_error!(
stderr_tx.send(Err(io::Error::new(io::ErrorKind::Other, err)))
);
}
Ok(_) => {
unreachable!("only stdout and stderr are supported")
}
}
}
});

let stdout = LogStreamAsync::new(UnboundedReceiverStream::new(stdout_rx).boxed())
.enable_cache();
let stderr = LogStreamAsync::new(UnboundedReceiverStream::new(stderr_rx).boxed())
.enable_cache();
let (stdout, stderr) = LogStream::from(output).split().await;
let stdout = WaitingStreamWrapper::new(stdout).enable_cache();
let stderr = WaitingStreamWrapper::new(stderr).enable_cache();

Ok(ExecResult {
id: exec.id,
Expand All @@ -248,32 +216,21 @@ impl Client {
.map_err(ClientError::InspectExec)
}

fn logs(&self, container_id: &str, log_source: LogSource, follow: bool) -> LogStreamAsync {
fn logs_stream(
&self,
container_id: &str,
source_filter: Option<LogSource>,
follow: bool,
) -> LogStream {
let options = LogsOptions {
follow,
stdout: log_source.is_stdout(),
stderr: log_source.is_stderr(),
stdout: source_filter.map(LogSource::is_stdout).unwrap_or(true),
stderr: source_filter.map(LogSource::is_stderr).unwrap_or(true),
tail: "all".to_owned(),
..Default::default()
};

let stream = self
.bollard
.logs(container_id, Some(options))
.map_ok(|chunk| chunk.into_bytes())
.map_err(|err| match err {
bollard::errors::Error::DockerResponseServerError {
status_code: 404,
message,
} => io::Error::new(
io::ErrorKind::UnexpectedEof,
format!("Docker container has been dropped: {}", message),
),
bollard::errors::Error::IOError { err } => err,
err => io::Error::new(io::ErrorKind::Other, err),
})
.boxed();
LogStreamAsync::new(stream)
self.bollard.logs(container_id, Some(options)).into()
}

/// Creates a network with given name and returns an ID
Expand Down Expand Up @@ -423,3 +380,32 @@ impl Client {
Some(bollard_credentials)
}
}

impl<BS> From<BS> for LogStream
where
BS: futures::Stream<Item = Result<LogOutput, BollardError>> + Send + 'static,
{
fn from(stream: BS) -> Self {
let stream = stream
.map_ok(|chunk| match chunk {
LogOutput::StdErr { message } => LogFrame::StdErr(message),
LogOutput::StdOut { message } => LogFrame::StdOut(message),
LogOutput::StdIn { .. } | LogOutput::Console { .. } => {
unreachable!("only stdout and stderr are supported")
}
})
.map_err(|err| match err {
BollardError::DockerResponseServerError {
status_code: 404,
message,
} => io::Error::new(
io::ErrorKind::UnexpectedEof,
format!("Docker container has been dropped: {}", message),
),
bollard::errors::Error::IOError { err } => err,
err => io::Error::new(io::ErrorKind::Other, err),
})
.boxed();
LogStream::new(stream)
}
}
10 changes: 5 additions & 5 deletions testcontainers/src/core/client/exec.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use crate::core::logs::LogStreamAsync;
use crate::core::logs::WaitingStreamWrapper;

pub(crate) struct ExecResult {
pub(crate) id: String,
pub(crate) stdout: LogStreamAsync,
pub(crate) stderr: LogStreamAsync,
pub(crate) stdout: WaitingStreamWrapper,
pub(crate) stderr: WaitingStreamWrapper,
}

impl ExecResult {
pub(crate) fn id(&self) -> &str {
&self.id
}

pub(crate) fn stdout(&mut self) -> &mut LogStreamAsync {
pub(crate) fn stdout(&mut self) -> &mut WaitingStreamWrapper {
&mut self.stdout
}

pub(crate) fn stderr(&mut self) -> &mut LogStreamAsync {
pub(crate) fn stderr(&mut self) -> &mut WaitingStreamWrapper {
&mut self.stderr
}
}
5 changes: 3 additions & 2 deletions testcontainers/src/core/containers/async_container.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{fmt, net::IpAddr, pin::Pin, str::FromStr, sync::Arc, time::Duration};

use tokio::io::{AsyncBufRead, AsyncReadExt};
use tokio_stream::StreamExt;

Check warning on line 4 in testcontainers/src/core/containers/async_container.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `tokio_stream::StreamExt`

warning: unused import: `tokio_stream::StreamExt` --> testcontainers/src/core/containers/async_container.rs:4:5 | 4 | use tokio_stream::StreamExt; | ^^^^^^^^^^^^^^^^^^^^^^^ | = note: `#[warn(unused_imports)]` on by default

use crate::{
core::{
Expand Down Expand Up @@ -264,7 +265,7 @@ where
/// - pass `false` to read logs from startup to present.
pub fn stdout(&self, follow: bool) -> Pin<Box<dyn AsyncBufRead + Send>> {
let stdout = self.docker_client.stdout_logs(&self.id, follow);
Box::pin(tokio_util::io::StreamReader::new(stdout.into_inner()))
Box::pin(tokio_util::io::StreamReader::new(stdout))
}

/// Returns an asynchronous reader for stderr.
Expand All @@ -274,7 +275,7 @@ where
/// - pass `false` to read logs from startup to present.
pub fn stderr(&self, follow: bool) -> Pin<Box<dyn AsyncBufRead + Send>> {
let stderr = self.docker_client.stderr_logs(&self.id, follow);
Box::pin(tokio_util::io::StreamReader::new(stderr.into_inner()))
Box::pin(tokio_util::io::StreamReader::new(stderr))
}

/// Returns stdout as a vector of bytes available at the moment of call (from container startup to present).
Expand Down
21 changes: 15 additions & 6 deletions testcontainers/src/core/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ use bytes::Bytes;
use futures::{stream::BoxStream, StreamExt};
use memchr::memmem::Finder;

pub(crate) mod stream;

#[derive(Debug, Clone)]
pub enum LogFrame {
StdOut(Bytes),
StdErr(Bytes),
}

/// Defines error cases when waiting for a message in a stream.
#[derive(Debug, thiserror::Error)]
pub enum WaitLogError {
Expand All @@ -23,22 +31,23 @@ pub(crate) enum LogSource {
}

impl LogSource {
pub(super) fn is_stdout(&self) -> bool {
pub(super) fn is_stdout(self) -> bool {
matches!(self, Self::StdOut)
}

pub(super) fn is_stderr(&self) -> bool {
pub(super) fn is_stderr(self) -> bool {
matches!(self, Self::StdErr)
}
}

pub(crate) struct LogStreamAsync {
// TODO: extract caching functionality to a separate wrapper
pub(crate) struct WaitingStreamWrapper {
inner: BoxStream<'static, Result<Bytes, io::Error>>,
cache: Vec<Result<Bytes, io::Error>>,
enable_cache: bool,
}

impl LogStreamAsync {
impl WaitingStreamWrapper {
pub fn new(stream: BoxStream<'static, Result<Bytes, io::Error>>) -> Self {
Self {
inner: stream,
Expand Down Expand Up @@ -90,7 +99,7 @@ fn display_bytes(bytes: &[Bytes]) -> Vec<Cow<'_, str>> {
.collect::<Vec<_>>()
}

impl fmt::Debug for LogStreamAsync {
impl fmt::Debug for WaitingStreamWrapper {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LogStreamAsync").finish()
}
Expand All @@ -102,7 +111,7 @@ mod tests {

#[tokio::test(flavor = "multi_thread")]
async fn given_logs_when_line_contains_message_should_find_it() {
let mut log_stream = LogStreamAsync::new(Box::pin(futures::stream::iter([Ok(r"
let mut log_stream = WaitingStreamWrapper::new(Box::pin(futures::stream::iter([Ok(r"
Message one
Message two
Message three
Expand Down
Loading

0 comments on commit aef72f7

Please sign in to comment.