Skip to content

Commit

Permalink
feat: now make use of actual build progress (#2294)
Browse files Browse the repository at this point in the history
Co-authored-by: Ruben Arts <ruben.arts@hotmail.com>
Co-authored-by: Bas Zalmstra <zalmstra.bas@gmail.com>
  • Loading branch information
3 people authored Oct 18, 2024
1 parent 2d085b3 commit 3ea6438
Show file tree
Hide file tree
Showing 16 changed files with 561 additions and 70 deletions.
2 changes: 1 addition & 1 deletion crates/pixi_build_frontend/src/build_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl BuildFrontend {

protocol
.with_backend_override(request.build_tool_override)
.finish(&self.tool_cache)
.finish(&self.tool_cache, request.build_id)
.await
}
}
4 changes: 4 additions & 0 deletions crates/pixi_build_frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ pub struct SetupRequest {

/// Overrides for the build tool.
pub build_tool_override: Option<BackendOverride>,

/// Identifier for the rest of the requests
/// This is used to identify the requests that belong to the same build.
pub build_id: usize,
}

#[derive(Debug)]
Expand Down
20 changes: 11 additions & 9 deletions crates/pixi_build_frontend/src/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use miette::Diagnostic;
use std::{path::PathBuf, sync::Arc};

use miette::{Diagnostic, IntoDiagnostic};
use pixi_build_types::procedures::{
conda_build::{CondaBuildParams, CondaBuildResult},
conda_metadata::{CondaMetadataParams, CondaMetadataResult},
};
use std::path::PathBuf;
use std::sync::Arc;

use crate::{conda_build_protocol, pixi_protocol, CondaBuildReporter, CondaMetadataReporter};

Expand Down Expand Up @@ -100,11 +100,10 @@ impl Protocol {
reporter: Arc<dyn CondaMetadataReporter>,
) -> miette::Result<CondaMetadataResult> {
match self {
Self::Pixi(protocol) => {
protocol
.get_conda_metadata(request, reporter.as_ref())
.await
}
Self::Pixi(protocol) => protocol
.get_conda_metadata(request, reporter.as_ref())
.await
.into_diagnostic(),
Self::CondaBuild(protocol) => protocol.get_conda_metadata(request),
}
}
Expand All @@ -115,7 +114,10 @@ impl Protocol {
reporter: Arc<dyn CondaBuildReporter>,
) -> miette::Result<CondaBuildResult> {
match self {
Self::Pixi(protocol) => protocol.conda_build(request, reporter.as_ref()).await,
Self::Pixi(protocol) => protocol
.conda_build(request, reporter.as_ref())
.await
.into_diagnostic(),
Self::CondaBuild(_) => unreachable!(),
}
}
Expand Down
8 changes: 6 additions & 2 deletions crates/pixi_build_frontend/src/protocol_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,15 @@ impl ProtocolBuilder {
}

/// Finish the construction of the protocol and return the protocol object
pub async fn finish(self, tool_cache: &ToolCache) -> Result<Protocol, BuildFrontendError> {
pub async fn finish(
self,
tool_cache: &ToolCache,
build_id: usize,
) -> Result<Protocol, BuildFrontendError> {
match self {
Self::Pixi(protocol) => Ok(Protocol::Pixi(
protocol
.finish(tool_cache)
.finish(tool_cache, build_id)
.await
.map_err(FinishError::Pixi)?,
)),
Expand Down
6 changes: 4 additions & 2 deletions crates/pixi_build_frontend/src/protocols/pixi/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod protocol;
mod stderr;

use std::{
fmt,
Expand All @@ -11,6 +12,7 @@ use pixi_consts::consts;
use pixi_manifest::Manifest;
pub use protocol::{InitializeError, Protocol};
use rattler_conda_types::ChannelConfig;
pub(crate) use stderr::{stderr_null, stderr_stream};
use thiserror::Error;
use which::Error;

Expand Down Expand Up @@ -116,15 +118,15 @@ impl ProtocolBuilder {
Ok(None)
}

pub async fn finish(self, tool: &ToolCache) -> Result<Protocol, FinishError> {
pub async fn finish(self, tool: &ToolCache, build_id: usize) -> Result<Protocol, FinishError> {
let tool_spec = self
.backend_spec
.ok_or(FinishError::NoBuildSection(self.manifest.path.clone()))?;
let tool = tool.instantiate(tool_spec).map_err(FinishError::Tool)?;
Ok(Protocol::setup(
self.source_dir,
self.manifest.path,
self.manifest.parsed.project.name,
build_id,
self.cache_dir,
self.channel_config,
tool,
Expand Down
142 changes: 121 additions & 21 deletions crates/pixi_build_frontend/src/protocols/pixi/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::{ffi::OsStr, path::PathBuf};
use std::{ffi::OsStr, path::PathBuf, sync::Arc};

use futures::TryFutureExt;
use jsonrpsee::{
async_client::{Client, ClientBuilder},
core::client::{ClientT, Error, Error as ClientError, TransportReceiverT, TransportSenderT},
types::ErrorCode,
};
use miette::{diagnostic, Diagnostic, IntoDiagnostic};
use miette::{diagnostic, Diagnostic};
use pixi_build_types::{
procedures,
procedures::{
Expand All @@ -17,7 +18,13 @@ use pixi_build_types::{
};
use rattler_conda_types::ChannelConfig;
use thiserror::Error;
use tokio::{
io::{AsyncBufReadExt, BufReader, Lines},
process::ChildStderr,
sync::{oneshot, Mutex},
};

use super::{stderr_null, stderr_stream};
use crate::{
jsonrpc::{stdio_transport, Receiver, RpcParams, Sender},
protocols::error::BackendError,
Expand Down Expand Up @@ -72,6 +79,9 @@ pub enum ProtocolError {
"This is often caused by the build backend incorrectly reporting certain capabilities. Consider contacting the build backend maintainers for a fix."
))]
MethodNotImplemented(String, String),

#[error("pipe of stderr stopped earlier than expected")]
StdErrPipeStopped,
}

/// A protocol that uses a pixi manifest to invoke a build backend.
Expand All @@ -87,17 +97,20 @@ pub struct Protocol {
/// The path to the manifest relative to the source directory.
relative_manifest_path: PathBuf,

/// Name of the project taken from the manifest
project_name: Option<String>,

_backend_capabilities: BackendCapabilities,

/// The build identifier
build_id: usize,

/// The handle to the stderr of the backend process.
stderr: Option<Arc<Mutex<Lines<BufReader<ChildStderr>>>>>,
}

impl Protocol {
pub(crate) async fn setup(
source_dir: PathBuf,
manifest_path: PathBuf,
project_name: Option<String>,
build_id: usize,
cache_dir: Option<PathBuf>,
channel_config: ChannelConfig,
tool: Tool,
Expand All @@ -108,7 +121,7 @@ impl Protocol {
let mut process = tokio::process::Command::from(tool.command())
.stdout(std::process::Stdio::piped())
.stdin(std::process::Stdio::piped())
.stderr(std::process::Stdio::inherit()) // TODO: Capture this?
.stderr(std::process::Stdio::piped()) // TODO: Capture this?
.spawn()?;

let backend_identifier = tool
Expand All @@ -125,18 +138,23 @@ impl Protocol {
let stdout = process
.stdout
.expect("since we piped stdout we expect a valid value here");
let stderr = process
.stderr
.map(|stderr| BufReader::new(stderr).lines())
.expect("since we piped stderr we expect a valid value here");

// Construct a JSON-RPC client to communicate with the backend process.
let (tx, rx) = stdio_transport(stdin, stdout);
Self::setup_with_transport(
backend_identifier,
source_dir,
manifest_path,
project_name,
build_id,
cache_dir,
channel_config,
tx,
rx,
Some(stderr),
)
.await
}
Expand All @@ -145,11 +163,12 @@ impl Protocol {
"<IPC>".to_string(),
source_dir,
manifest_path,
project_name,
build_id,
cache_dir,
channel_config,
Sender::from(ipc.rpc_out),
Receiver::from(ipc.rpc_in),
None,
)
.await
}
Expand All @@ -161,11 +180,12 @@ impl Protocol {
backend_identifier: String,
source_dir: PathBuf,
manifest_path: PathBuf,
project_name: Option<String>,
build_id: usize,
cache_dir: Option<PathBuf>,
channel_config: ChannelConfig,
sender: impl TransportSenderT + Send,
receiver: impl TransportReceiverT + Send,
stderr: Option<Lines<BufReader<ChildStderr>>>,
) -> Result<Self, InitializeError> {
let relative_manifest_path = manifest_path
.strip_prefix(source_dir)
Expand Down Expand Up @@ -199,10 +219,11 @@ impl Protocol {
Ok(Self {
backend_identifier,
_channel_config: channel_config,
project_name,
client,
_backend_capabilities: result.capabilities,
relative_manifest_path,
build_id,
stderr: stderr.map(Mutex::new).map(Arc::new),
})
}

Expand All @@ -220,8 +241,19 @@ impl Protocol {
&self,
request: &CondaMetadataParams,
reporter: &dyn CondaMetadataReporter,
) -> miette::Result<CondaMetadataResult> {
let operation = reporter.on_metadata_start(self.project_name.as_deref().unwrap_or(""));
) -> Result<CondaMetadataResult, ProtocolError> {
// Capture all of stderr and discard it
let stderr = self.stderr.as_ref().map(|stderr| {
// Cancellation signal
let (cancel_tx, cancel_rx) = oneshot::channel();
// Spawn the stderr forwarding task
let handle = tokio::spawn(stderr_null(stderr.clone(), cancel_rx));
(cancel_tx, handle)
});

// Start the metadata operation
let operation = reporter.on_metadata_start(self.build_id);

let result = self
.client
.request(
Expand All @@ -235,8 +267,23 @@ impl Protocol {
err,
procedures::conda_metadata::METHOD_NAME,
)
})
.into_diagnostic();
});

// Wait for the stderr sink to finish, by signaling it to stop
if let Some((cancel_tx, handle)) = stderr {
// Cancel the stderr forwarding
if cancel_tx.send(()).is_err() {
return Err(ProtocolError::StdErrPipeStopped);
}
handle.await.map_or_else(
|e| match e.try_into_panic() {
Ok(panic) => std::panic::resume_unwind(panic),
Err(_) => Err(ProtocolError::StdErrPipeStopped),
},
|e| e.map_err(|_| ProtocolError::StdErrPipeStopped),
)?;
}

reporter.on_metadata_end(operation);
result
}
Expand All @@ -246,23 +293,76 @@ impl Protocol {
&self,
request: &CondaBuildParams,
reporter: &dyn CondaBuildReporter,
) -> miette::Result<CondaBuildResult> {
let operation = reporter.on_build_start(self.project_name.as_deref().unwrap_or(""));
let result = self
) -> Result<CondaBuildResult, ProtocolError> {
// Captures stderr output
let stderr = self.stderr.as_ref().map(|stderr| {
let (sender, receiver) = tokio::sync::mpsc::channel(100);
let (cancel_tx, cancel_rx) = oneshot::channel();
let handle = tokio::spawn(stderr_stream(stderr.clone(), sender, cancel_rx));
(cancel_tx, receiver, handle)
});

let operation = reporter.on_build_start(self.build_id);
let request = self
.client
.request(
procedures::conda_build::METHOD_NAME,
RpcParams::from(request),
)
.await
.map_err(|err| {
ProtocolError::from_client_error(
self.backend_identifier.clone(),
err,
procedures::conda_build::METHOD_NAME,
)
})
.into_diagnostic();
});

// There can be two cases, the stderr is captured or is not captured
// In the case of capturing we need to select between the request and the stderr
// forwarding to drive these two futures concurrently
//
// In the other case we can just wait for the request to finish
let result = if let Some((cancel_tx, receiver, handle)) = stderr {
// This is the case where we capture stderr

// Create a future that will forward stderr to the reporter
let send_stderr = async {
let mut receiver = receiver;
while let Some(line) = receiver.recv().await {
reporter.on_build_output(operation, line);
}
};

// Select between the request and the stderr forwarding
let result = tokio::select! {
result = request => result,
_ = send_stderr => {
Err(ProtocolError::StdErrPipeStopped)
}
};

// Cancel the stderr forwarding
if cancel_tx.send(()).is_err() {
return Err(ProtocolError::StdErrPipeStopped);
}

// Wait for the stderr forwarding to finish, it should because we cancelled
handle.await.map_or_else(
|e| match e.try_into_panic() {
Ok(panic) => std::panic::resume_unwind(panic),
Err(_) => Err(ProtocolError::StdErrPipeStopped),
},
|e| e.map_err(|_| ProtocolError::StdErrPipeStopped),
)?;

// Return the result
result
} else {
// This is the case where we don't capture stderr
request.await
};

// Build has completed
reporter.on_build_end(operation);
result
}
Expand Down
Loading

0 comments on commit 3ea6438

Please sign in to comment.