Skip to content

Commit

Permalink
Merge pull request #3 from josehu07/kv-framework
Browse files Browse the repository at this point in the history
Saving current progress on KV framework
  • Loading branch information
josehu07 authored Apr 12, 2023
2 parents 3cb152e + 30c4497 commit 057b52d
Show file tree
Hide file tree
Showing 13 changed files with 433 additions and 53 deletions.
9 changes: 8 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,11 @@ jobs:
- name: Get protobuf
run: sudo apt install -y protobuf-compiler libprotobuf-dev
- name: Build
run: cargo build --all --verbose
run: cargo build --workspace --verbose
- name: Add clippy component
run: rustup component add clippy
- name: Clippy check
uses: actions-rs/clippy-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: --workspace
4 changes: 3 additions & 1 deletion .github/workflows/format.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ jobs:
- uses: actions/checkout@v3
- name: Add rustfmt component
run: rustup component add rustfmt
- name: Format check
- name: Format check (Rust)
run: cargo fmt --all -- --check
- name: Format check (Python)
uses: psf/black@stable
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ jobs:
- name: Get protobuf
run: sudo apt install -y protobuf-compiler libprotobuf-dev
- name: Run tests
run: cargo test --all --verbose
run: cargo test --workspace --verbose
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/target
.vscode/
8 changes: 0 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,3 @@ log = "0.4"

[build-dependencies]
tonic-build = "0.8"

[features]
default = ["all_protocols", "all_transport"]
all_protocols = ["protocol_do_nothing", "protocol_simple_push"]
protocol_do_nothing = []
protocol_simple_push = []
all_transport = ["transport_rpc"]
transport_rpc = []
34 changes: 18 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,48 +13,46 @@ This codebase comprises the following pieces:

* `proto/`: protobuf definitions for various APIs and replication protocols
* `src/`: the core Summerset library, linked by both `_server` and `_client`
* `summerset_server`: the Summerset server standalone executable
* `summerset_client`: the Summerset client-side library, linked by all client executables
* `summerset_server`: the server-side standalone executable
* `summerset_client`: the client-side library, linked by all client executables
* `summerset_bench`: a client executable for benchmarking purposes

## Build

Build in debug mode:
Build everything in debug or release (`-r`) mode:

```bash
cargo build --all
cargo build [-r] --workspace
```

Run all unit tests:

```bash
cargo test --all
```

Build in release mode:

```bash
cargo build --all --release
cargo test [-r] --workspace
```

## Usage

### Run Servers

Run a server executable:

```bash
cargo run -p summerset_server -- -h
cargo run [-r] -p summerset_server -- -h
```

Run the benchmarking client:
The default logging level is set as >= `info`. To display debugging or even tracing logs, set the `RUST_LOG` environment variable to `debug` or `trace`, e.g.:

```bash
cargo run -p summerset_bench -- -h
RUST_LOG=debug cargo run ...
```

The default logging level is set as >= `info`. To display debugging or even tracing logs, set the `RUST_LOG` environment variable to `debug` or `trace`:
### Run Clients

Run the benchmarking client:

```bash
RUST_LOG=debug cargo run ...
cargo run [-r] -p summerset_bench -- -h
```

## TODO List
Expand All @@ -67,3 +65,7 @@ RUST_LOG=debug cargo run ...
* [ ] more protocols, comprehensive tests & CI
* [ ] true benchmarking client
* [ ] better usage README

---

**Lore**: [Summerset](https://en.uesp.net/wiki/Online:Summerset) Isles is the name of an elvish archipelagic province in the Elder Scrolls series.
103 changes: 103 additions & 0 deletions cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import sys
import os
import argparse
import subprocess
import atexit

SPAWNED_PROCS = []


def kill_spawned_procs():
for proc in SPAWNED_PROCS:
proc.kill()


def run_process(cmd):
print("Run:", " ".join(cmd))
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
return proc


def launch_servers(protocol, num_replicas):
api_ports = list(range(50700, 50700 + num_replicas))
comm_ports = list(range(50800, 50800 + num_replicas))
assert len(comm_ports) == len(api_ports)

for api_port, comm_port in zip(api_ports, comm_ports):
peers = []
for peer_port in comm_ports:
if peer_port != comm_port:
peers += ["-n", f"localhost:{peer_port}"]

cmd = [
"cargo",
"run",
"-p",
"summerset_server",
"--",
"-p",
protocol,
"-a",
str(api_port),
"-s",
str(comm_port),
]
cmd += peers
SPAWNED_PROCS.append(run_process(cmd))

atexit.register(kill_spawned_procs)


def parse_ports_list(s):
s = s.strip()
if len(s) == 0:
return []

ports = []
l = s.split(",")

for port_str in l:
port = None
try:
port = int(port_str)
except:
raise Exception(f"{port_str} is not a valid integer")

if port <= 1024 or port >= 65536:
raise Exception(f"{port} is not in the range of valid ports")

ports.append(port)

return ports


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"-p", "--protocol", type=str, required=True, help="protocol name"
)
parser.add_argument(
"-r", "--num_replicas", type=int, required=True, help="number of replicas"
)
args = parser.parse_args()

# api_ports = parse_ports_list(args.api_ports)
# comm_ports = parse_ports_list(args.comm_ports)
# if len(comm_ports) != len(api_ports):
# raise ValueError("length of `comm_ports` does not match `api_ports`")
if args.num_replicas <= 0 or args.num_replicas > 9:
raise ValueError(f"invalid number of replicas {args.num_replicas}")

# kill all existing server processes
print("NOTE: Killing all existing server processes...")
os.system("pkill summerset_server")

print("NOTE: Type 'exit' to terminate all servers...")
launch_servers(args.protocol, args.num_replicas)

while True:
word = input()
if word == "exit":
break

sys.exit() # triggers atexit handler
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Public interface to the Summerset core library, linked by both server
//! executable and client library.
#![allow(clippy::uninlined_format_args)]

mod smr_client;
mod smr_server;
mod statemach;
Expand Down
25 changes: 0 additions & 25 deletions src/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,10 @@ use crate::replicator::{
};
use crate::utils::InitError;

#[cfg(feature = "protocol_do_nothing")]
mod do_nothing;
#[cfg(feature = "protocol_do_nothing")]
use do_nothing::{DoNothingServerNode, DoNothingCommService, DoNothingClientStub};

#[cfg(feature = "protocol_simple_push")]
mod simple_push;
#[cfg(feature = "protocol_simple_push")]
use simple_push::{
SimplePushServerNode, SimplePushCommService, SimplePushClientStub,
};
Expand Down Expand Up @@ -48,68 +44,47 @@ impl SMRProtocol {
}

/// Create a server replicator module instance of this protocol on heap.
#[allow(unused_variables, unreachable_patterns)]
pub fn new_server_node(
&self,
peers: Vec<String>,
) -> Result<Box<dyn ReplicatorServerNode>, InitError> {
match self {
#[cfg(feature = "protocol_do_nothing")]
Self::DoNothing => {
box_if_ok!(DoNothingServerNode::new(peers))
}
#[cfg(feature = "protocol_simple_push")]
Self::SimplePush => {
box_if_ok!(SimplePushServerNode::new(peers))
}
_ => Err(InitError(format!(
"protocol {} is not enabled in cargo features",
self
))),
}
}

/// Create a server internal communication tonic service holder struct.
#[allow(unused_variables, unreachable_patterns)]
pub fn new_comm_service(
&self,
node: Arc<SummersetServerNode>,
) -> Result<Box<dyn ReplicatorCommService>, InitError> {
match self {
#[cfg(feature = "protocol_do_nothing")]
Self::DoNothing => {
box_if_ok!(DoNothingCommService::new(node))
}
#[cfg(feature = "protocol_simple_push")]
Self::SimplePush => {
box_if_ok!(SimplePushCommService::new(node))
}
_ => Err(InitError(format!(
"protocol {} is not enabled in cargo features",
self
))),
}
}

/// Create a client replicator stub instance of this protocol on heap.
#[allow(unused_variables, unreachable_patterns)]
pub fn new_client_stub(
&self,
servers: Vec<String>,
) -> Result<Box<dyn ReplicatorClientStub>, InitError> {
match self {
#[cfg(feature = "protocol_do_nothing")]
Self::DoNothing => {
box_if_ok!(DoNothingClientStub::new(servers))
}
#[cfg(feature = "protocol_simple_push")]
Self::SimplePush => {
box_if_ok!(SimplePushClientStub::new(servers))
}
_ => Err(InitError(format!(
"protocol {} is not enabled in cargo features",
self
))),
}
}
}
Expand Down
Loading

0 comments on commit 057b52d

Please sign in to comment.