-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.zig
188 lines (151 loc) · 5.71 KB
/
main.zig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
const std = @import("std");
const zaft = @import("zaft");
const server = @import("server.zig");
const Ticker = @import("ticker.zig").Ticker;
const Storage = @import("storage.zig").Storage;
const ClientServer = server.ClientServer;
const RaftServer = server.RaftServer;
pub const Add = struct {
key: []const u8,
value: []const u8,
};
pub const Remove = struct {
key: []const u8,
};
pub const Entry = union(enum) {
add: Add,
remove: Remove,
};
const UserData = struct {
allocator: std.mem.Allocator,
addresses: []const []const u8,
store: *std.StringHashMap([]const u8),
cond: *std.Thread.Condition,
storage: *Storage,
};
pub const Raft = zaft.Raft(UserData, Entry);
// TODO: make these configurable
const raft_addresses = [_][]const u8{ "127.0.0.1:10000", "127.0.0.1:10001", "127.0.0.1:10002" };
const client_addresses = [_][]const u8{ "127.0.0.1:20000", "127.0.0.1:20001", "127.0.0.1:20002" };
const cache_name = "zaft-cache";
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator();
var args = try std.process.argsWithAllocator(allocator);
defer args.deinit();
_ = args.next();
const self_id = try std.fmt.parseInt(usize, args.next().?, 10);
const raft_http_addresses = try allocator.alloc([]u8, raft_addresses.len);
defer allocator.free(raft_http_addresses);
const client_http_addresses = try allocator.alloc([]u8, client_addresses.len);
defer allocator.free(client_http_addresses);
for (raft_http_addresses, raft_addresses) |*address, raw_address| {
address.* = try std.fmt.allocPrint(allocator, "http://{s}/", .{raw_address});
}
for (client_http_addresses, client_addresses) |*address, raw_address| {
address.* = try std.fmt.allocPrint(allocator, "http://{s}/", .{raw_address});
}
var mutex = std.Thread.Mutex{};
var cond = std.Thread.Condition{};
var store = std.StringHashMap([]const u8).init(allocator);
defer store.deinit();
var buffer: [128]u8 = undefined;
const cache_dir_name = try std.fmt.bufPrint(&buffer, "{s}-{}", .{ cache_name, self_id });
var storage = try Storage.init(cache_dir_name);
defer storage.deinit();
const readLog = try storage.readLog(allocator);
defer allocator.free(readLog);
const config = Raft.Config{ .id = @intCast(self_id), .server_no = @intCast(raft_addresses.len) };
const initial_state = Raft.InitialState{
.current_term = try storage.readCurrentTerm(),
.voted_for = try storage.readVotedFor(),
.log = readLog,
};
var user_data = UserData{
.allocator = allocator,
.addresses = raft_http_addresses,
.store = &store,
.cond = &cond,
.storage = &storage,
};
const callbacks = Raft.Callbacks{
.user_data = &user_data,
.makeRPC = makeRPC,
.applyEntry = applyEntry,
.logAppend = logAppend,
.logPop = logPop,
.persistCurrentTerm = persistCurrentTerm,
.persistVotedFor = persistVotedFor,
};
var raft = try Raft.init(config, initial_state, callbacks, allocator);
var ticker = Ticker{ .raft = &raft, .mutex = &mutex };
const ticker_thread = try std.Thread.spawn(.{}, Ticker.run, .{&ticker});
ticker_thread.detach();
const raft_address = try parseAddress(raft_addresses[self_id]);
var raft_server = RaftServer{
.raft = &raft,
.mutex = &mutex,
.address = raft_address,
.allocator = allocator,
};
const raft_thread = try std.Thread.spawn(.{}, RaftServer.run, .{&raft_server});
raft_thread.detach();
const client_address = try parseAddress(client_addresses[self_id]);
var client_server = ClientServer{
.raft = &raft,
.cond = &cond,
.mutex = &mutex,
.store = &store,
.address = client_address,
.addresses = client_http_addresses,
.allocator = allocator,
};
try client_server.run();
}
fn makeRPC(user_data: *UserData, id: u32, rpc: Raft.RPC) !void {
const address = user_data.addresses[id];
const uri = try std.Uri.parse(address);
var client = std.http.Client{ .allocator = user_data.allocator };
defer client.deinit();
var buffer: [1024]u8 = undefined;
var request = try client.open(.POST, uri, .{ .server_header_buffer = &buffer });
defer request.deinit();
const json = try std.json.stringifyAlloc(user_data.allocator, rpc, .{});
defer user_data.allocator.free(json);
request.transfer_encoding = .chunked;
try request.send();
try request.writeAll(json);
try request.finish();
// we don't wait for the response, this function has to be non-blocking
}
fn applyEntry(user_data: *UserData, entry: Entry) !void {
switch (entry) {
.remove => |ent| {
// we do not free the memory for the key/value
// as it still is used by the entry in the log
_ = user_data.store.remove(ent.key);
},
.add => |ent| {
try user_data.store.put(ent.key, ent.value);
},
}
user_data.cond.signal();
}
fn logAppend(ud: *UserData, log_entry: Raft.LogEntry) !void {
try ud.storage.appendLog(log_entry, ud.allocator);
}
fn logPop(ud: *UserData) !Raft.LogEntry {
return ud.storage.popLog(ud.allocator);
}
fn persistVotedFor(ud: *UserData, voted_for: ?u32) !void {
try ud.storage.writeVotedFor(voted_for);
}
fn persistCurrentTerm(ud: *UserData, current_term: u32) !void {
try ud.storage.writeCurrentTerm(current_term);
}
fn parseAddress(address: []const u8) !std.net.Address {
var it = std.mem.splitSequence(u8, address, ":");
const ip = it.first();
const port = try std.fmt.parseInt(u16, it.rest(), 10);
return std.net.Address.parseIp4(ip, port);
}