Skip to content

Commit

Permalink
unify the clients/cores for the nr sharded with the rackscale version
Browse files Browse the repository at this point in the history
Signed-off-by: Reto Achermann <achreto@gmail.com>
  • Loading branch information
achreto committed Nov 22, 2023
1 parent 39e3642 commit 9098842
Showing 1 changed file with 128 additions and 108 deletions.
236 changes: 128 additions & 108 deletions kernel/tests/s11_rackscale_benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -937,134 +937,154 @@ fn s11_linux_memcached_sharded_benchmark() {
let r = csv_file.write(RACKSCALE_MEMCACHED_CSV_COLUMNS.as_bytes());
assert!(r.is_ok());

let max_threads_per_node = if is_smoke {
1
} else {
machine.max_cores() / machine.max_numa_nodes()
};
println!(
"Nodes: {}, max_threads_per_node: {max_threads_per_node}",
machine.max_numa_nodes()
);
for num_nodes in 1..=machine.max_numa_nodes() {
config.num_servers = num_nodes;
let machine = Machine::determine();
let max_cores = if is_smoke { 2 } else { machine.max_cores() };
let max_numa = machine.max_numa_nodes();
let total_cores_per_node = core::cmp::max(1, max_cores / max_numa);

// Do initial network configuration
let mut num_clients = 1; // num_clients == num_replicas, for baseline
let mut total_cores = 1;
while total_cores < max_cores {
// Round up to get the number of clients
let new_num_clients = (total_cores + (total_cores_per_node - 1)) / total_cores_per_node;

// Do network setup if number of clients has changed.
if num_clients != new_num_clients {
num_clients = new_num_clients;

// ensure total cores is divisible by num clients
total_cores = total_cores - (total_cores % num_clients);
}
let cores_per_client = total_cores / num_clients;

// Break if not enough total cores for the controller, or if we would have to split controller across nodes to make it fit
// We want controller to have it's own socket, so if it's not a 1 socket machine, break when there's equal number of clients
// to numa nodes.
if total_cores + num_clients + 1 > machine.max_cores()
|| num_clients == machine.max_numa_nodes()
&& cores_per_client + num_clients + 1 > total_cores_per_node
|| num_clients == max_numa && max_numa > 1
{
break;
}

for num_threads in 1..=max_threads_per_node {
if (num_threads != 1 || num_threads != max_threads_per_node) && (num_threads % 8 != 0) {
continue;
}
eprintln!(
"\n\nRunning Sharded Memcached test with {:?} total core(s), {:?} (client|replica)(s) (cores_per_(client|replica)={:?})",
total_cores, num_clients, cores_per_client
);

println!("");
// terminate any previous memcached
let _ = Command::new("killall")
.args(&["memcached", "-s", "SIGKILL"])
.output();

config.num_threads = num_threads;
// run the internal configuration
config.num_threads = total_cores;

let _ = Command::new("killall")
.args(&["memcached", "-s", "SIGKILL"])
.status();
let mut pty = run_benchmark_internal(&config, timeout_ms);
let mut output = String::new();
let res =
parse_memcached_output(&mut pty, &mut output).expect("could not parse output!");
let r = csv_file.write(format!("{},", env!("GIT_HASH")).as_bytes());
assert!(r.is_ok());
println!("Memcached Internal: {total_cores} cores");

//git_rev,benchmark,os,protocol,npieces,nthreads,mem,queries,time,thpt
let out = format!(
"memcached_sharded,linux,{},{},{},{},{},{},{}\n",
"internal", 1, res.b_threads, res.b_mem, res.b_queries, res.b_time, res.b_thpt,
);
let r = csv_file.write(out.as_bytes());
assert!(r.is_ok());
let mut pty = run_benchmark_internal(&config, timeout_ms);
let mut output = String::new();
let res = parse_memcached_output(&mut pty, &mut output).expect("could not parse output!");
let r = csv_file.write(format!("{},", env!("GIT_HASH")).as_bytes());
assert!(r.is_ok());
let out = format!(
"memcached_sharded,linux,{},{},{},{},{},{}\n",
res.b_threads, "internal", res.b_mem, res.b_queries, res.b_time, res.b_thpt,
);
let r = csv_file.write(out.as_bytes());
assert!(r.is_ok());

let _r = pty.process.kill(SIGKILL);
let r = pty
.process
.kill(SIGKILL)
.expect("unable to terminate memcached");

// single node
for protocol in &["tcp", "unix"] {
config.protocol = protocol;
for protocol in &["tcp", "unix"] {
config.protocol = protocol;
config.num_servers = num_clients;
config.num_threads = cores_per_client;

println!("");
println!("Memcached Sharded: {cores_per_client}x{num_clients} with {protocol}");

println!("Memcached Sharded: {num_threads}x{num_nodes} with {protocol}");
// terminate the memcached instance
let _ = Command::new("killall")
.args(&["memcached", "-s", "SIGKILL"])
.status();

// terminate the memcached instance
let _ = Command::new("killall")
.args(&["memcached", "-s", "SIGKILL"])
.status();
// give some time so memcached can be cleaned up
std::thread::sleep(Duration::from_secs(5));

// give some time so memcached can be cleaned up
std::thread::sleep(Duration::from_secs(5));
let mut memcached_ctrls = Vec::new();
for i in 0..num_clients {
memcached_ctrls.push(
linux_spawn_memcached(i, &config, timeout_ms).expect("could not spawn memcached"),
);
}

let mut memcached_ctrls = Vec::new();
for i in 0..num_nodes {
memcached_ctrls.push(
linux_spawn_memcached(i, &config, timeout_ms)
.expect("could not spawn memcached"),
);
}
config.num_threads = total_cores;

let mut pty = testutils::memcached::spawn_loadbalancer(&config, timeout_ms)
.expect("failed to spawn load balancer");
let mut output = String::new();
use rexpect::errors::ErrorKind::Timeout;
match parse_memcached_output(&mut pty, &mut output) {
Ok(res) => {
let r = csv_file.write(format!("{},", env!("GIT_HASH")).as_bytes());
assert!(r.is_ok());
let out = format!(
"memcached_sharded,linux,{},{},{},{},{},{},{}\n",
protocol,
config.num_servers,
res.b_threads,
res.b_mem,
res.b_queries,
res.b_time,
res.b_thpt,
);
let r = csv_file.write(out.as_bytes());
assert!(r.is_ok());

println!("{:?}", res);
}
let mut pty =
testutils::memcached::spawn_loadbalancer(&config, timeout_ms).expect("failed to spawn load balancer");
let mut output = String::new();
use rexpect::errors::ErrorKind::Timeout;
match parse_memcached_output(&mut pty, &mut output) {
Ok(res) => {
let r = csv_file.write(format!("{},", env!("GIT_HASH")).as_bytes());
assert!(r.is_ok());
let out = format!(
"memcached_sharded,linux,{},{},{},{},{},{}\n",
res.b_threads, protocol, res.b_mem, res.b_queries, res.b_time, res.b_thpt,
);
let r = csv_file.write(out.as_bytes());
assert!(r.is_ok());

Err(e) => {
if let Timeout(expected, got, timeout) = e.0 {
println!("Timeout while waiting for {} ms\n", timeout.as_millis());
println!("Expected: `{expected}`\n");
println!("Got:",);
for l in got.lines().take(20) {
println!(" > {l}");
}
} else {
panic!("error: {}", e);
println!("{:?}", res);
}
Err(e) => {
if let Timeout(expected, got, timeout) = e.0 {
println!("Timeout while waiting for {} ms\n", timeout.as_millis());
println!("Expected: `{expected}`\n");
println!("Got:",);
for l in got.lines().take(5) {
println!(" > {l}");
}
} else {
println!("error: {}", e);
}

let r = csv_file.write(format!("{},", env!("GIT_HASH")).as_bytes());
assert!(r.is_ok());
let out = format!(
"memcached_sharded,linux,{},{},{},{},{},failure,failure\n",
protocol,
config.num_servers,
config.num_threads * config.num_servers,
config.mem_size,
config.num_queries,
);
let r = csv_file.write(out.as_bytes());
assert!(r.is_ok());

for mc in memcached_ctrls.iter_mut() {
mc.process
.kill(rexpect::process::signal::Signal::SIGKILL)
.expect("couldn't terminate memcached");
while let Ok(l) = mc.read_line() {
println!("MEMCACHED-OUTPUT: {}", l);
}
let r = csv_file.write(format!("{},", env!("GIT_HASH")).as_bytes());
assert!(r.is_ok());
let out = format!(
"memcached_sharded,linux,{},{},failure,failure,failure,failure\n",
config.num_servers, protocol,
);
let r = csv_file.write(out.as_bytes());
assert!(r.is_ok());

for mc in memcached_ctrls.iter_mut() {
mc.process
.kill(rexpect::process::signal::Signal::SIGKILL)
.expect("couldn't terminate memcached");
while let Ok(l) = mc.read_line() {
println!("MEMCACHED-OUTPUT: {}", l);
}
let _ = Command::new("killall").args(&["memcached"]).status();
}
};
}
};

let _ = pty.process.kill(rexpect::process::signal::Signal::SIGKILL);
if total_cores == 1 {
total_cores = 0;
}

if num_clients == 3 {
total_cores += 3;
} else {
total_cores += 4;
}

let _ = pty.process.kill(rexpect::process::signal::Signal::SIGKILL);
}
}

Expand Down

0 comments on commit 9098842

Please sign in to comment.