-
Notifications
You must be signed in to change notification settings - Fork 581
/
Copy pathtoydb.rs
137 lines (125 loc) · 4.93 KB
/
toydb.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
//! The toyDB server. Takes configuration from a config file (default
//! config/toydb.yaml) or corresponding TOYDB_ environment variables. Listens
//! for SQL clients (default port 9605) and Raft connections from other toyDB
//! peers (default port 9705). The Raft log and SQL database are stored at
//! data/raft and data/sql by default.
//!
//! Use the toysql command-line client to connect to the server.
#![warn(clippy::all)]
use toydb::errinput;
use toydb::error::Result;
use toydb::raft;
use toydb::sql;
use toydb::storage;
use toydb::Server;
use clap::Parser as _;
use serde::Deserialize;
use std::collections::HashMap;
fn main() {
if let Err(error) = Command::parse().run() {
eprintln!("Error: {error}")
}
}
/// The toyDB server configuration. Can be provided via config file (default
/// config/toydb.yaml) or TOYDB_ environment variables.
#[derive(Debug, Deserialize)]
struct Config {
/// The node ID. Must be unique in the cluster.
id: raft::NodeID,
/// The other nodes in the cluster, and their Raft TCP addresses.
peers: HashMap<raft::NodeID, String>,
/// The Raft listen address.
listen_raft: String,
/// The SQL listen address.
listen_sql: String,
/// The log level.
log_level: String,
/// The path to this node's data directory. The Raft log is stored in
/// the file "raft", and the SQL state machine in "sql".
data_dir: String,
/// The Raft storage engine: bitcask or memory.
storage_raft: String,
/// The SQL storage engine: bitcask or memory.
storage_sql: String,
/// If false, don't fsync Raft log writes to disk. Disabling this
/// will yield much better write performance, but may lose data on
/// host crashes which compromises Raft safety guarantees.
fsync: bool,
/// The garbage fraction threshold at which to trigger compaction.
compact_threshold: f64,
/// The minimum bytes of garbage before triggering compaction.
compact_min_bytes: u64,
}
impl Config {
/// Loads the configuration from the given file.
fn load(file: &str) -> Result<Self> {
Ok(config::Config::builder()
.set_default("id", "1")?
.set_default("listen_sql", "localhost:9605")?
.set_default("listen_raft", "localhost:9705")?
.set_default("log_level", "info")?
.set_default("data_dir", "data")?
.set_default("storage_raft", "bitcask")?
.set_default("storage_sql", "bitcask")?
.set_default("fsync", true)?
.set_default("compact_threshold", 0.2)?
.set_default("compact_min_bytes", 1_000_000)?
.add_source(config::File::with_name(file))
.add_source(config::Environment::with_prefix("TOYDB"))
.build()?
.try_deserialize()?)
}
}
/// The toyDB server command.
#[derive(clap::Parser)]
#[command(about = "Starts a toyDB server.", version, propagate_version = true)]
struct Command {
/// The configuration file path.
#[arg(short = 'c', long, default_value = "config/toydb.yaml")]
config: String,
}
impl Command {
/// Runs the toyDB server.
fn run(self) -> Result<()> {
// Load the configuration.
let cfg = Config::load(&self.config)?;
// Initialize logging.
let loglevel = cfg.log_level.parse()?;
let mut logconfig = simplelog::ConfigBuilder::new();
if loglevel != simplelog::LevelFilter::Debug {
logconfig.add_filter_allow_str("toydb");
}
simplelog::SimpleLogger::init(loglevel, logconfig.build())?;
// Initialize the Raft log storage engine.
let datadir = std::path::Path::new(&cfg.data_dir);
let mut raft_log = match cfg.storage_raft.as_str() {
"bitcask" | "" => {
let engine = storage::BitCask::new_compact(
datadir.join("raft"),
cfg.compact_threshold,
cfg.compact_min_bytes,
)?;
raft::Log::new(Box::new(engine))?
}
"memory" => raft::Log::new(Box::new(storage::Memory::new()))?,
name => return errinput!("invalid Raft storage engine {name}"),
};
raft_log.enable_fsync(cfg.fsync);
// Initialize the SQL storage engine.
let raft_state: Box<dyn raft::State> = match cfg.storage_sql.as_str() {
"bitcask" | "" => {
let engine = storage::BitCask::new_compact(
datadir.join("sql"),
cfg.compact_threshold,
cfg.compact_min_bytes,
)?;
Box::new(sql::engine::Raft::new_state(engine)?)
}
"memory" => Box::new(sql::engine::Raft::new_state(storage::Memory::new())?),
name => return errinput!("invalid SQL storage engine {name}"),
};
// Start the server.
Server::new(cfg.id, cfg.peers, raft_log, raft_state)?
.serve(&cfg.listen_raft, &cfg.listen_sql)
}
}