Skip to content

Commit

Permalink
feat: introduce log consumers (#681)
Browse files Browse the repository at this point in the history
Introduce `LogConsumer` trait and ability to pass a consumer into
`ContainerRequest`.

The main advantage of this functionality is the use of background
container output processing throughout the entire lifecycle of the
container, starting from the moment of its creation. This way you can
process logs even if the container was not started due to the wait
strategy.
  • Loading branch information
DDtKey authored Jul 6, 2024
1 parent ae52151 commit b4aee64
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 7 deletions.
23 changes: 21 additions & 2 deletions testcontainers/src/core/containers/async_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,36 @@ where
pub(crate) async fn new(
id: String,
docker_client: Arc<Client>,
image: ContainerRequest<I>,
mut container_req: ContainerRequest<I>,
network: Option<Arc<Network>>,
) -> Result<ContainerAsync<I>> {
let log_consumers = std::mem::take(&mut container_req.log_consumers);
let container = ContainerAsync {
id,
image,
image: container_req,
docker_client,
network,
dropped: false,
};

let mut logs = container.docker_client.logs(&container.id, true);
let container_id = container.id.clone();
tokio::spawn(async move {
while let Some(result) = logs.next().await {
match result {
Ok(record) => {
for consumer in &log_consumers {
consumer.accept(&record).await;
tokio::task::yield_now().await;
}
}
Err(err) => {
log::error!("Failed to read log frame for container {container_id}: {err}",);
}
}
}
});

let ready_conditions = container.image().ready_conditions();
container.block_until_ready(ready_conditions).await?;
Ok(container)
Expand Down
38 changes: 35 additions & 3 deletions testcontainers/src/core/containers/request.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
use std::{borrow::Cow, collections::BTreeMap, net::IpAddr, time::Duration};
use std::{
borrow::Cow,
collections::BTreeMap,
fmt::{Debug, Formatter},
net::IpAddr,
time::Duration,
};

use crate::{
core::{mounts::Mount, ports::ContainerPort, ContainerState, ExecCommand, WaitFor},
core::{
logs::consumer::LogConsumer, mounts::Mount, ports::ContainerPort, ContainerState,
ExecCommand, WaitFor,
},
Image, TestcontainersError,
};

/// Represents a request to start a container, allowing customization of the container.
#[must_use]
#[derive(Debug)]
pub struct ContainerRequest<I: Image> {
pub(crate) image: I,
pub(crate) overridden_cmd: Vec<String>,
Expand All @@ -24,6 +32,7 @@ pub struct ContainerRequest<I: Image> {
pub(crate) cgroupns_mode: Option<CgroupnsMode>,
pub(crate) userns_mode: Option<String>,
pub(crate) startup_timeout: Option<Duration>,
pub(crate) log_consumers: Vec<Box<dyn LogConsumer + 'static>>,
}

/// Represents a port mapping between a host's external port and the internal port of a container.
Expand Down Expand Up @@ -164,6 +173,7 @@ impl<I: Image> From<I> for ContainerRequest<I> {
cgroupns_mode: None,
userns_mode: None,
startup_timeout: None,
log_consumers: vec![],
}
}
}
Expand All @@ -184,3 +194,25 @@ impl PortMapping {
self.container_port
}
}

impl<I: Image + Debug> Debug for ContainerRequest<I> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ContainerRequest")
.field("image", &self.image)
.field("overridden_cmd", &self.overridden_cmd)
.field("image_name", &self.image_name)
.field("image_tag", &self.image_tag)
.field("container_name", &self.container_name)
.field("network", &self.network)
.field("env_vars", &self.env_vars)
.field("hosts", &self.hosts)
.field("mounts", &self.mounts)
.field("ports", &self.ports)
.field("privileged", &self.privileged)
.field("shm_size", &self.shm_size)
.field("cgroupns_mode", &self.cgroupns_mode)
.field("userns_mode", &self.userns_mode)
.field("startup_timeout", &self.startup_timeout)
.finish()
}
}
13 changes: 12 additions & 1 deletion testcontainers/src/core/image/image_ext.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

use crate::{
core::{CgroupnsMode, ContainerPort, Host, Mount, PortMapping},
core::{logs::consumer::LogConsumer, CgroupnsMode, ContainerPort, Host, Mount, PortMapping},
ContainerRequest, Image,
};

Expand Down Expand Up @@ -81,6 +81,11 @@ pub trait ImageExt<I: Image> {

/// Sets the startup timeout for the container. The default is 60 seconds.
fn with_startup_timeout(self, timeout: Duration) -> ContainerRequest<I>;

/// Adds the log consumer to the container.
///
/// Allows to follow the container logs for the whole lifecycle of the container, starting from the creation.
fn with_log_consumer(self, log_consumer: impl LogConsumer + 'static) -> ContainerRequest<I>;
}

/// Implements the [`ImageExt`] trait for the every type that can be converted into a [`ContainerRequest`].
Expand Down Expand Up @@ -202,4 +207,10 @@ impl<RI: Into<ContainerRequest<I>>, I: Image> ImageExt<I> for RI {
..container_req
}
}

fn with_log_consumer(self, log_consumer: impl LogConsumer + 'static) -> ContainerRequest<I> {
let mut container_req = self.into();
container_req.log_consumers.push(Box::new(log_consumer));
container_req
}
}
10 changes: 10 additions & 0 deletions testcontainers/src/core/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use bytes::Bytes;
use futures::{stream::BoxStream, StreamExt};
use memchr::memmem::Finder;

pub mod consumer;
pub(crate) mod stream;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -40,6 +41,15 @@ impl LogSource {
}
}

impl LogFrame {
pub fn bytes(&self) -> &Bytes {
match self {
LogFrame::StdOut(bytes) => bytes,
LogFrame::StdErr(bytes) => bytes,
}
}
}

// TODO: extract caching functionality to a separate wrapper
pub(crate) struct WaitingStreamWrapper {
inner: BoxStream<'static, Result<Bytes, io::Error>>,
Expand Down
22 changes: 22 additions & 0 deletions testcontainers/src/core/logs/consumer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use futures::{future::BoxFuture, FutureExt};

use crate::core::logs::LogFrame;

/// Log consumer is a trait that allows to consume log frames.
/// Consumers will be called for each log frame that is produced by the container for the whole lifecycle of the container.
pub trait LogConsumer: Send + Sync {
fn accept<'a>(&'a self, record: &'a LogFrame) -> BoxFuture<'a, ()>;
}

impl<F> LogConsumer for F
where
F: Fn(&LogFrame) + Send + Sync,
{
fn accept<'a>(&'a self, record: &'a LogFrame) -> BoxFuture<'a, ()> {
// preferably to spawn blocking task
async move {
self(record);
}
.boxed()
}
}
22 changes: 21 additions & 1 deletion testcontainers/tests/async_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::time::Duration;
use bollard::Docker;
use reqwest::StatusCode;
use testcontainers::{
core::{wait::HttpWaitStrategy, CmdWaitFor, ExecCommand, IntoContainerPort, WaitFor},
core::{
logs::LogFrame, wait::HttpWaitStrategy, CmdWaitFor, ExecCommand, IntoContainerPort, WaitFor,
},
runners::AsyncRunner,
GenericImage, *,
};
Expand Down Expand Up @@ -166,3 +168,21 @@ async fn async_run_exec_fails_due_to_unexpected_code() -> anyhow::Result<()> {
assert!(res.is_err());
Ok(())
}

#[tokio::test]
async fn async_run_with_log_consumer() -> anyhow::Result<()> {
let _ = pretty_env_logger::try_init();

let (tx, rx) = std::sync::mpsc::sync_channel(1);
let _container = HelloWorld
.with_log_consumer(move |frame: &LogFrame| {
// notify when the expected message is found
if String::from_utf8_lossy(frame.bytes()) == "Hello from Docker!\n" {
let _ = tx.send(());
}
})
.start()
.await?;
rx.recv()?; // notification from consumer
Ok(())
}

0 comments on commit b4aee64

Please sign in to comment.