Skip to content

Commit

Permalink
Serverset: Lazily connect to a limited number of servers (#8165)
Browse files Browse the repository at this point in the history
This allows for a connection limit, and doesn't make any connections
until they're needed.

This may slightly slow down the first few requests, but means that we
won't proactively connect to a large number of servers.

We also now disconnect from the server when we see it fail (rather than
just stopping to use it).

This makes the order of server connections slightly less guaranteed;
before we would strictly round-robin, whereas now we may skip some
servers or use them twice in a row when we connect/disconnect a new
server.
  • Loading branch information
illicitonion authored and stuhood committed Aug 21, 2019
1 parent bb8eaab commit 70cb3c3
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 87 deletions.
1 change: 1 addition & 0 deletions src/python/pants/engine/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,7 @@ def ti(type_obj):
self.context.utf8_buf(execution_options.remote_oauth_bearer_token_path or ""),
execution_options.remote_store_thread_count,
execution_options.remote_store_chunk_bytes,
execution_options.remote_store_connection_limit,
execution_options.remote_store_chunk_upload_timeout_seconds,
execution_options.remote_store_rpc_retries,
self.context.utf8_buf_buf(execution_options.remote_execution_extra_platform_properties),
Expand Down
7 changes: 7 additions & 0 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class ExecutionOptions(datatype([
'remote_store_chunk_bytes',
'remote_store_chunk_upload_timeout_seconds',
'remote_store_rpc_retries',
'remote_store_connection_limit',
'process_execution_local_parallelism',
'process_execution_remote_parallelism',
'process_execution_cleanup_local_dirs',
Expand Down Expand Up @@ -60,6 +61,7 @@ def from_bootstrap_options(cls, bootstrap_options):
remote_store_chunk_bytes=bootstrap_options.remote_store_chunk_bytes,
remote_store_chunk_upload_timeout_seconds=bootstrap_options.remote_store_chunk_upload_timeout_seconds,
remote_store_rpc_retries=bootstrap_options.remote_store_rpc_retries,
remote_store_connection_limit=bootstrap_options.remote_store_connection_limit,
process_execution_local_parallelism=bootstrap_options.process_execution_local_parallelism,
process_execution_remote_parallelism=bootstrap_options.process_execution_remote_parallelism,
process_execution_cleanup_local_dirs=bootstrap_options.process_execution_cleanup_local_dirs,
Expand All @@ -82,6 +84,7 @@ def from_bootstrap_options(cls, bootstrap_options):
remote_store_chunk_bytes=1024*1024,
remote_store_chunk_upload_timeout_seconds=60,
remote_store_rpc_retries=2,
remote_store_connection_limit=5,
process_execution_local_parallelism=multiprocessing.cpu_count()*2,
process_execution_remote_parallelism=128,
process_execution_cleanup_local_dirs=True,
Expand Down Expand Up @@ -347,6 +350,7 @@ def register_bootstrap_options(cls, register):
help="Enables remote workers for increased parallelism. (Alpha)")
register('--remote-store-server', advanced=True, type=list, default=[],
help='host:port of grpc server to use as remote execution file store.')
# TODO: Infer this from remote-store-connection-limit.
register('--remote-store-thread-count', type=int, advanced=True,
default=DEFAULT_EXECUTION_OPTIONS.remote_store_thread_count,
help='Thread count to use for the pool that interacts with the remote file store.')
Expand All @@ -361,6 +365,9 @@ def register_bootstrap_options(cls, register):
register('--remote-store-rpc-retries', type=int, advanced=True,
default=DEFAULT_EXECUTION_OPTIONS.remote_store_rpc_retries,
help='Number of times to retry any RPC to the remote store before giving up.')
register('--remote-store-connection-limit', type=int, advanced=True,
default=DEFAULT_EXECUTION_OPTIONS.remote_store_connection_limit,
help='Number of remote stores to concurrently allow connections to.')
register('--remote-execution-process-cache-namespace', advanced=True,
help="The cache namespace for remote process execution. "
"Bump this to invalidate every artifact's remote execution. "
Expand Down
2 changes: 2 additions & 0 deletions src/rust/engine/engine_cffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ pub extern "C" fn scheduler_create(
remote_oauth_bearer_token_path_buffer: Buffer,
remote_store_thread_count: u64,
remote_store_chunk_bytes: u64,
remote_store_connection_limit: u64,
remote_store_chunk_upload_timeout_seconds: u64,
remote_store_rpc_retries: u64,
remote_execution_extra_platform_properties_buf: BufferBuffer,
Expand Down Expand Up @@ -312,6 +313,7 @@ pub extern "C" fn scheduler_create(
remote_store_chunk_bytes as usize,
Duration::from_secs(remote_store_chunk_upload_timeout_seconds),
remote_store_rpc_retries as usize,
remote_store_connection_limit as usize,
remote_execution_extra_platform_properties_map,
process_execution_local_parallelism as usize,
process_execution_remote_parallelism as usize,
Expand Down
3 changes: 2 additions & 1 deletion src/rust/engine/fs/brfs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ fn main() {
&store_path,
vec![address.to_owned()],
args.value_of("remote-instance-name").map(str::to_owned),
&root_ca_certs,
root_ca_certs,
oauth_bearer_token,
1,
4 * 1024 * 1024,
Expand All @@ -719,6 +719,7 @@ fn main() {
)
.expect("Error making BackoffConfig"),
1,
1,
),
None => Store::local_only(runtime.clone(), &store_path),
}
Expand Down
12 changes: 11 additions & 1 deletion src/rust/engine/fs/fs_util/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,14 @@ to this directory.",
.required(false)
.default_value("3")
)
.arg(
Arg::with_name("connection-limit")
.help("Number of concurrent servers to allow connections to.")
.takes_value(true)
.long("connection-limit")
.required(false)
.default_value("3")
)
.get_matches(),
) {
Ok(_) => {}
Expand Down Expand Up @@ -299,7 +307,7 @@ fn execute(top_match: &clap::ArgMatches<'_>) -> Result<(), ExitError> {
top_match
.value_of("remote-instance-name")
.map(str::to_owned),
&root_ca_certs,
root_ca_certs,
oauth_bearer_token,
value_t!(top_match.value_of("thread-count"), usize).expect("Invalid thread count"),
chunk_size,
Expand All @@ -319,6 +327,8 @@ fn execute(top_match: &clap::ArgMatches<'_>) -> Result<(), ExitError> {
std::time::Duration::from_secs(20),
)?,
value_t!(top_match.value_of("rpc-attempts"), usize).expect("Bad rpc-attempts flag"),
value_t!(top_match.value_of("connection-limit"), usize)
.expect("Bad connection-limit flag"),
),
true,
)
Expand Down
41 changes: 27 additions & 14 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,14 @@ impl Store {
path: P,
cas_addresses: Vec<String>,
instance_name: Option<String>,
root_ca_certs: &Option<Vec<u8>>,
root_ca_certs: Option<Vec<u8>>,
oauth_bearer_token: Option<String>,
thread_count: usize,
chunk_size_bytes: usize,
upload_timeout: Duration,
backoff_config: BackoffConfig,
rpc_retries: usize,
connection_limit: usize,
) -> Result<Store, String> {
Ok(Store {
local: local::ByteStore::new(executor, path)?,
Expand All @@ -214,6 +215,7 @@ impl Store {
upload_timeout,
backoff_config,
rpc_retries,
connection_limit,
)?),
})
}
Expand Down Expand Up @@ -1776,18 +1778,20 @@ mod remote {
pub fn new(
cas_addresses: Vec<String>,
instance_name: Option<String>,
root_ca_certs: &Option<Vec<u8>>,
root_ca_certs: Option<Vec<u8>>,
oauth_bearer_token: Option<String>,
thread_count: usize,
chunk_size_bytes: usize,
upload_timeout: Duration,
backoff_config: BackoffConfig,
rpc_retries: usize,
connection_limit: usize,
) -> Result<ByteStore, String> {
let env = Arc::new(grpcio::Environment::new(thread_count));
let env2 = env.clone();

let connect = |cas_address: &str| {
let builder = grpcio::ChannelBuilder::new(env.clone());
let connect = move |cas_address: &str| {
let builder = grpcio::ChannelBuilder::new(env2.clone());
if let Some(ref root_ca_certs) = root_ca_certs {
let creds = grpcio::ChannelCredentialsBuilder::new()
.root_cert(root_ca_certs.clone())
Expand All @@ -1798,7 +1802,7 @@ mod remote {
}
};

let serverset = Serverset::new(cas_addresses, connect, backoff_config)?;
let serverset = Serverset::new(cas_addresses, connect, connection_limit, backoff_config)?;

Ok(ByteStore {
instance_name,
Expand Down Expand Up @@ -2282,13 +2286,14 @@ mod remote {
let store = ByteStore::new(
vec![cas.address()],
None,
&None,
None,
None,
1,
10 * 1024,
Duration::from_secs(5),
BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(),
1,
1,
)
.unwrap();

Expand Down Expand Up @@ -2358,13 +2363,14 @@ mod remote {
let store = ByteStore::new(
vec![String::from("doesnotexist.example")],
None,
&None,
None,
None,
1,
10 * 1024 * 1024,
Duration::from_secs(1),
BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(),
1,
1,
)
.unwrap();
let error = block_on(store.store_bytes(TestData::roland().bytes(), WorkUnitStore::new()))
Expand Down Expand Up @@ -2437,13 +2443,14 @@ mod remote {
let store = ByteStore::new(
vec![cas1.address(), cas2.address()],
None,
&None,
None,
None,
1,
10 * 1024 * 1024,
Duration::from_secs(1),
BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(),
1,
2,
)
.unwrap();

Expand All @@ -2465,13 +2472,14 @@ mod remote {
ByteStore::new(
vec![cas.address()],
None,
&None,
None,
None,
1,
10 * MEGABYTES,
Duration::from_secs(1),
BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(),
1,
1,
)
.unwrap()
}
Expand Down Expand Up @@ -2603,13 +2611,14 @@ mod tests {
dir,
vec![cas_address],
None,
&None,
None,
None,
1,
10 * MEGABYTES,
Duration::from_secs(1),
BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(),
1,
1,
)
.unwrap()
}
Expand Down Expand Up @@ -3319,13 +3328,14 @@ mod tests {
dir.path(),
vec![cas.address()],
Some("dark-tower".to_owned()),
&None,
None,
None,
1,
10 * MEGABYTES,
Duration::from_secs(1),
BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(),
1,
1,
)
.unwrap();

Expand All @@ -3348,13 +3358,14 @@ mod tests {
dir.path(),
vec![cas.address()],
Some("dark-tower".to_owned()),
&None,
None,
None,
1,
10 * MEGABYTES,
Duration::from_secs(1),
BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(),
1,
1,
)
.unwrap();

Expand Down Expand Up @@ -3393,13 +3404,14 @@ mod tests {
dir.path(),
vec![cas.address()],
None,
&None,
None,
Some("Armory.Key".to_owned()),
1,
10 * MEGABYTES,
Duration::from_secs(1),
BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(),
1,
1,
)
.unwrap();

Expand All @@ -3422,13 +3434,14 @@ mod tests {
dir.path(),
vec![cas.address()],
None,
&None,
None,
Some("Armory.Key".to_owned()),
1,
10 * MEGABYTES,
Duration::from_secs(1),
BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(),
1,
1,
)
.unwrap();

Expand Down
15 changes: 10 additions & 5 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1571,13 +1571,14 @@ pub mod tests {
&store_dir_path,
vec![cas.address()],
None,
&None,
None,
None,
1,
10 * 1024 * 1024,
Duration::from_secs(1),
store::BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(),
1,
1,
)
.expect("Failed to make store");

Expand Down Expand Up @@ -1945,13 +1946,14 @@ pub mod tests {
store_dir,
vec![cas.address()],
None,
&None,
None,
None,
1,
10 * 1024 * 1024,
Duration::from_secs(1),
store::BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(),
1,
1,
)
.expect("Failed to make store");
runtime
Expand Down Expand Up @@ -2038,13 +2040,14 @@ pub mod tests {
store_dir,
vec![cas.address()],
None,
&None,
None,
None,
1,
10 * 1024 * 1024,
Duration::from_secs(1),
store::BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(),
1,
1,
)
.expect("Failed to make store");
store
Expand Down Expand Up @@ -2106,13 +2109,14 @@ pub mod tests {
store_dir,
vec![cas.address()],
None,
&None,
None,
None,
1,
10 * 1024 * 1024,
Duration::from_secs(1),
store::BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(),
1,
1,
)
.expect("Failed to make store");

Expand Down Expand Up @@ -2860,13 +2864,14 @@ pub mod tests {
store_dir,
vec![cas.address()],
None,
&None,
None,
None,
1,
10 * 1024 * 1024,
Duration::from_secs(1),
store::BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(),
1,
1,
)
.expect("Failed to make store");

Expand Down
Loading

0 comments on commit 70cb3c3

Please sign in to comment.