Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use REAPI batch API for small blob writes #12537

Merged
merged 6 commits into from
Aug 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/python/pants/engine/internals/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def __init__(
store_chunk_upload_timeout=execution_options.remote_store_chunk_upload_timeout_seconds,
store_rpc_retries=execution_options.remote_store_rpc_retries,
store_rpc_concurrency=execution_options.remote_store_rpc_concurrency,
store_batch_api_size_limit=execution_options.remote_store_batch_api_size_limit,
cache_warnings_behavior=execution_options.remote_cache_warnings.value,
cache_eager_fetch=execution_options.remote_cache_eager_fetch,
cache_rpc_concurrency=execution_options.remote_cache_rpc_concurrency,
Expand Down
10 changes: 10 additions & 0 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ class ExecutionOptions:
remote_store_chunk_upload_timeout_seconds: int
remote_store_rpc_retries: int
remote_store_rpc_concurrency: int
remote_store_batch_api_size_limit: int

remote_cache_eager_fetch: bool
remote_cache_warnings: RemoteCacheWarningsBehavior
Expand Down Expand Up @@ -377,6 +378,7 @@ def from_options(
remote_store_chunk_upload_timeout_seconds=bootstrap_options.remote_store_chunk_upload_timeout_seconds,
remote_store_rpc_retries=bootstrap_options.remote_store_rpc_retries,
remote_store_rpc_concurrency=dynamic_remote_options.store_rpc_concurrency,
remote_store_batch_api_size_limit=bootstrap_options.remote_store_batch_api_size_limit,
# Remote cache setup.
remote_cache_eager_fetch=bootstrap_options.remote_cache_eager_fetch,
remote_cache_warnings=bootstrap_options.remote_cache_warnings,
Expand Down Expand Up @@ -455,6 +457,7 @@ def from_options(cls, options: OptionValueContainer) -> LocalStoreOptions:
remote_store_chunk_upload_timeout_seconds=60,
remote_store_rpc_retries=2,
remote_store_rpc_concurrency=128,
remote_store_batch_api_size_limit=4194304,
# Remote cache setup.
remote_cache_eager_fetch=True,
remote_cache_warnings=RemoteCacheWarningsBehavior.first_only,
Expand Down Expand Up @@ -1202,6 +1205,13 @@ def register_bootstrap_options(cls, register):
default=DEFAULT_EXECUTION_OPTIONS.remote_store_rpc_concurrency,
help="The number of concurrent requests allowed to the remote store service.",
)
register(
"--remote-store-batch-api-size-limit",
type=int,
advanced=True,
default=DEFAULT_EXECUTION_OPTIONS.remote_store_batch_api_size_limit,
help="The maximum total size of blobs allowed to be sent in a single batch API call to the remote store.",
)

register(
"--remote-cache-warnings",
Expand Down
12 changes: 11 additions & 1 deletion src/rust/engine/fs/brfs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,15 @@ async fn main() {
.long("rpc-concurrency-limit")
.required(false)
.default_value("128")
).get_matches();
).arg(
Arg::with_name("batch-api-size-limit")
.help("Maximum total size of blobs allowed to be sent in a single batch API call to the remote store.")
.takes_value(true)
.long("batch-api-size-limit")
.required(false)
.default_value("4194304")
)
.get_matches();

let mount_path = args.value_of("mount-path").unwrap();
let store_path = args.value_of("local-store-path").unwrap();
Expand Down Expand Up @@ -756,6 +764,8 @@ async fn main() {
value_t!(args.value_of("rpc-concurrency-limit"), usize)
.expect("Bad rpc-concurrency-limit flag"),
None,
value_t!(args.value_of("batch-api-size-limit"), usize)
.expect("Bad batch-api-size-limit flag"),
)
.expect("Error making remote store"),
None => local_only_store,
Expand Down
9 changes: 9 additions & 0 deletions src/rust/engine/fs/fs_util/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,13 @@ to this directory.",
.required(false)
.default_value("128")
)
.arg(
Arg::with_name("batch-api-size-limit")
.help("Maximum total size of blobs allowed to be sent in a single batch API call to the remote store.")
.takes_value(true)
.long("batch-api-size-limit")
.required(false)
.default_value("4194304"))
.get_matches(),
).await {
Ok(_) => {}
Expand Down Expand Up @@ -327,6 +334,8 @@ async fn execute(top_match: &clap::ArgMatches<'_>) -> Result<(), ExitError> {
value_t!(top_match.value_of("rpc-concurrency-limit"), usize)
.expect("Bad rpc-concurrency-limit flag"),
None,
value_t!(top_match.value_of("batch-api-size-limit"), usize)
.expect("Bad batch-api-size-limit flag"),
),
true,
)
Expand Down
2 changes: 2 additions & 0 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ impl Store {
rpc_retries: usize,
rpc_concurrency_limit: usize,
capabilities_cell_opt: Option<Arc<DoubleCheckedCell<ServerCapabilities>>>,
batch_api_size_limit: usize,
) -> Result<Store, String> {
Ok(Store {
local: self.local,
Expand All @@ -340,6 +341,7 @@ impl Store {
rpc_retries,
rpc_concurrency_limit,
capabilities_cell_opt,
batch_api_size_limit,
)?)),
})
}
Expand Down
65 changes: 61 additions & 4 deletions src/rust/engine/fs/store/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use hashing::Digest;
use log::Level;
use remexec::{
capabilities_client::CapabilitiesClient,
content_addressable_storage_client::ContentAddressableStorageClient, ServerCapabilities,
content_addressable_storage_client::ContentAddressableStorageClient, BatchUpdateBlobsRequest,
ServerCapabilities,
};
use tonic::{Code, Request, Status};
use workunit_store::{in_workunit, Metric, ObservationMetric, WorkunitMetadata};
Expand All @@ -33,6 +34,7 @@ pub struct ByteStore {
cas_client: Arc<ContentAddressableStorageClient<LayeredService>>,
capabilities_cell: Arc<DoubleCheckedCell<ServerCapabilities>>,
capabilities_client: Arc<CapabilitiesClient<LayeredService>>,
batch_api_size_limit: usize,
}

impl fmt::Debug for ByteStore {
Expand Down Expand Up @@ -75,6 +77,7 @@ impl ByteStore {
rpc_retries: usize,
rpc_concurrency_limit: usize,
capabilities_cell_opt: Option<Arc<DoubleCheckedCell<ServerCapabilities>>>,
batch_api_size_limit: usize,
) -> Result<ByteStore, String> {
let tls_client_config = if cas_address.starts_with("https://") {
Some(grpc_util::create_tls_config(root_ca_certs)?)
Expand Down Expand Up @@ -107,6 +110,7 @@ impl ByteStore {
capabilities_cell: capabilities_cell_opt
.unwrap_or_else(|| Arc::new(DoubleCheckedCell::new())),
capabilities_client,
batch_api_size_limit,
})
}

Expand Down Expand Up @@ -203,6 +207,60 @@ impl ByteStore {
digest: Digest,
bytes: ByteSource,
) -> Result<(), ByteStoreError>
where
ByteSource: Fn(Range<usize>) -> Bytes + Send + Sync + 'static,
{
let len = digest.size_bytes;

let max_batch_total_size_bytes = {
let capabilities = self.get_capabilities().await?;
tdyas marked this conversation as resolved.
Show resolved Hide resolved

capabilities
.cache_capabilities
.as_ref()
.map(|c| c.max_batch_total_size_bytes as usize)
.unwrap_or_default()
};

let batch_api_allowed_by_local_config = len <= self.batch_api_size_limit;
let batch_api_allowed_by_server_config =
max_batch_total_size_bytes == 0 || len < max_batch_total_size_bytes;
if batch_api_allowed_by_local_config && batch_api_allowed_by_server_config {
self.store_bytes_source_batch(digest, bytes).await
} else {
self.store_bytes_source_stream(digest, bytes).await
}
}

async fn store_bytes_source_batch<ByteSource>(
&self,
digest: Digest,
bytes: ByteSource,
) -> Result<(), ByteStoreError>
where
ByteSource: Fn(Range<usize>) -> Bytes + Send + Sync + 'static,
{
let request = BatchUpdateBlobsRequest {
instance_name: self.instance_name.clone().unwrap_or_default(),
requests: vec![remexec::batch_update_blobs_request::Request {
digest: Some(digest.into()),
data: bytes(0..digest.size_bytes),
}],
};

let mut client = self.cas_client.as_ref().clone();
client
.batch_update_blobs(request)
.await
.map_err(ByteStoreError::Grpc)?;
Ok(())
}

async fn store_bytes_source_stream<ByteSource>(
&self,
digest: Digest,
bytes: ByteSource,
) -> Result<(), ByteStoreError>
where
ByteSource: Fn(Range<usize>) -> Bytes + Send + Sync + 'static,
{
Expand Down Expand Up @@ -464,8 +522,7 @@ impl ByteStore {
}
}

#[allow(dead_code)]
async fn get_capabilities(&self) -> Result<&remexec::ServerCapabilities, String> {
async fn get_capabilities(&self) -> Result<&remexec::ServerCapabilities, ByteStoreError> {
let capabilities_fut = async {
let mut request = remexec::GetCapabilitiesRequest::default();
if let Some(s) = self.instance_name.as_ref() {
Expand All @@ -477,7 +534,7 @@ impl ByteStore {
.get_capabilities(request)
.await
.map(|r| r.into_inner())
.map_err(status_to_str)
.map_err(ByteStoreError::Grpc)
};

self
Expand Down
3 changes: 3 additions & 0 deletions src/rust/engine/fs/store/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ async fn write_file_multiple_chunks() {
1,
256,
None,
0, // disable batch API, force streaming API
)
.unwrap();

Expand Down Expand Up @@ -230,6 +231,7 @@ async fn write_connection_error() {
1,
256,
None,
super::tests::STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap();
let error = store
Expand Down Expand Up @@ -307,6 +309,7 @@ fn new_byte_store(cas: &StubCAS) -> ByteStore {
1,
256,
None,
super::tests::STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap()
}
Expand Down
7 changes: 7 additions & 0 deletions src/rust/engine/fs/store/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use crate::{
MEGABYTES,
};

pub(crate) const STORE_BATCH_API_SIZE_LIMIT: usize = 4 * 1024 * 1024;

impl LoadMetadata {
fn is_remote(&self) -> bool {
match self {
Expand Down Expand Up @@ -105,6 +107,7 @@ fn new_store<P: AsRef<Path>>(dir: P, cas_address: &str) -> Store {
1,
256,
None,
STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap()
}
Expand Down Expand Up @@ -847,6 +850,7 @@ async fn instance_name_upload() {
1,
256,
None,
STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap();

Expand Down Expand Up @@ -876,6 +880,7 @@ async fn instance_name_download() {
1,
256,
None,
STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap();

Expand Down Expand Up @@ -927,6 +932,7 @@ async fn auth_upload() {
1,
256,
None,
STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap();

Expand Down Expand Up @@ -958,6 +964,7 @@ async fn auth_download() {
1,
256,
None,
STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl StoreSetup {
1,
256,
None,
4 * 1024 * 1024,
)
.unwrap();
StoreSetup {
Expand Down
6 changes: 6 additions & 0 deletions src/rust/engine/process_execution/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use tonic::{Code, Status};
const OVERALL_DEADLINE_SECS: Duration = Duration::from_secs(10 * 60);
const RETRY_INTERVAL: Duration = Duration::from_micros(0);
const STORE_CONCURRENCY_LIMIT: usize = 256;
const STORE_BATCH_API_SIZE_LIMIT: usize = 4 * 1024 * 1024;
const EXEC_CONCURRENCY_LIMIT: usize = 256;
const CACHE_CONCURRENCY_LIMIT: usize = 256;

Expand Down Expand Up @@ -871,6 +872,7 @@ async fn sends_headers() {
1,
STORE_CONCURRENCY_LIMIT,
None,
STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap();

Expand Down Expand Up @@ -1071,6 +1073,7 @@ async fn ensure_inline_stdio_is_stored() {
1,
STORE_CONCURRENCY_LIMIT,
None,
STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap();

Expand Down Expand Up @@ -1452,6 +1455,7 @@ async fn execute_missing_file_uploads_if_known() {
1,
STORE_CONCURRENCY_LIMIT,
None,
STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap();
store
Expand Down Expand Up @@ -1532,6 +1536,7 @@ async fn execute_missing_file_errors_if_unknown() {
1,
STORE_CONCURRENCY_LIMIT,
None,
STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap();

Expand Down Expand Up @@ -2278,6 +2283,7 @@ pub(crate) fn make_store(
1,
STORE_CONCURRENCY_LIMIT,
None,
STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap()
}
Expand Down
5 changes: 5 additions & 0 deletions src/rust/engine/process_executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ struct Opt {
#[structopt(long, default_value = "128")]
store_rpc_concurrency: usize,

/// Total size of blobs allowed to be sent in a single API call.
#[structopt(long, default_value = "4194304")]
store_batch_api_size_limit: usize,

/// Number of concurrent requests to the execution service.
#[structopt(long, default_value = "128")]
execution_rpc_concurrency: usize,
Expand Down Expand Up @@ -251,6 +255,7 @@ async fn main() {
args.store_rpc_retries,
args.store_rpc_concurrency,
None,
args.store_batch_api_size_limit,
)
}
(None, None) => Ok(local_only_store),
Expand Down
2 changes: 2 additions & 0 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub struct RemotingOptions {
pub store_chunk_upload_timeout: Duration,
pub store_rpc_retries: usize,
pub store_rpc_concurrency: usize,
pub store_batch_api_size_limit: usize,
pub cache_warnings_behavior: RemoteCacheWarningsBehavior,
pub cache_eager_fetch: bool,
pub cache_rpc_concurrency: usize,
Expand Down Expand Up @@ -151,6 +152,7 @@ impl Core {
remoting_opts.store_rpc_retries,
remoting_opts.store_rpc_concurrency,
capabilities_cell_opt,
remoting_opts.store_batch_api_size_limit,
)
} else {
Ok(local_only)
Expand Down
Loading