-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcoordinator.py
204 lines (169 loc) · 7.76 KB
/
coordinator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
from dataclasses import dataclass, field
from multiprocessing import Process, Queue, Value
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR, IPPROTO_TCP, TCP_NODELAY
from typing import Any
from enum import Enum
import logging
import secrets
import sys
import time
from shamir import split_secret
from model import ActionType, CoordinatorModel
from roast import share_val, verify
from transport import send_obj, recv_obj
import fastec
@dataclass(order=True)
class PriorityAction:
priority: int
action: Any=field(compare=False)
class AttackerLevel(Enum):
# Set of malicious participants determined in the beginning
STATIC = 0
# Same as STATIC but at most one participant behaves maliciously in a session
STATIC_COORDINATION = 1
# Exactly one malicious participant in the first m sessions
ADAPTIVE = 2
def random_sample(items, k):
items = list(items)
secrets.SystemRandom().shuffle(items)
return items[:k]
class AttackerStrategy:
def __init__(self, level, n, m):
self.level = level
self.n = n
self.m = m
self.static_attackers = random_sample(range(1, n + 1), m)
def choose_malicious(self, T, sid_ctr):
if self.level == AttackerLevel.STATIC:
return self.static_attackers[:]
elif self.level == AttackerLevel.STATIC_COORDINATION:
candidates = set(T).intersection(self.static_attackers)
return random_sample(candidates, 1)
elif self.level == AttackerLevel.ADAPTIVE:
return random_sample(T, sid_ctr <= self.m)
else:
raise ValueError('Unexpected AttackerLevel:', self.level)
class Coordinator:
def __init__(self, actions, outgoing, i_to_cached_ctx):
self.actions = actions
self.outgoing = outgoing
self.i_to_cached_ctx = i_to_cached_ctx
self.connections = {}
self.run_id = Value('i', 0)
def queue_action(self, action_type, data):
self.actions.put(PriorityAction(action_type.value, (action_type, data)))
def queue_incoming_loop(self, sock, cached_ctx_queue):
while True:
obj = recv_obj(sock)
if not obj:
break
run_id, (i, s_i, pre_i) = obj
share_is_valid = False
if s_i is not None:
ctx_run_id, ctx = cached_ctx_queue.get()
# Discard queue items from previous runs
while ctx_run_id != run_id:
ctx_run_id, ctx = cached_ctx_queue.get()
share_is_valid = share_val(ctx, i, s_i)
data = run_id, i, s_i, pre_i, share_is_valid
self.queue_action(ActionType.INCOMING, data)
def send_outgoing_loop(self):
while True:
i, run_id, data = self.outgoing.get()
assert i in self.connections
with self.run_id.get_lock():
if run_id < self.run_id.value:
# The main thread is at least in run self.run_id.value,
# so it's safe to drop messages from earlier runs.
logging.debug(f'Ignoring outgoing message from previous run (message run_id = {run_id}, my run_id = {self.run_id.value})')
continue
send_obj(self.connections[i], (run_id, data))
def setup(self, i_to_addr):
for i, addr_i in i_to_addr.items():
self.connections[i] = socket(AF_INET, SOCK_STREAM)
self.connections[i].setsockopt(SOL_SOCKET, SO_REUSEADDR, True)
self.connections[i].setsockopt(IPPROTO_TCP, TCP_NODELAY, True)
self.connections[i].connect(addr_i)
logging.debug(f'Established connection to participant {i} at {addr_i}')
Process(target=self.send_outgoing_loop, daemon=True).start()
for i in self.connections.keys():
Process(target=self.queue_incoming_loop, args=[self.connections[i], self.i_to_cached_ctx[i]], daemon=True).start()
def run(self, i_to_sk, model, attacker_strategy):
with self.run_id.get_lock():
self.run_id.value += 1
run_id = self.run_id.value
send_count = 0
recv_count = 0
send_count += len(i_to_sk)
for i, sk_i in i_to_sk.items():
self.outgoing.put((i, run_id, (model.X, i, sk_i)))
start = time.time()
while True:
action_type, data = self.actions.get().action
if action_type == ActionType.NO_OP:
pass
elif action_type == ActionType.INCOMING:
recv_count += 1
msg_run_id, i, s_i, pre_i, share_is_valid = data
# Ignore incoming messages from wrong run_id
if msg_run_id < run_id:
logging.debug(f'Ignoring incoming message from previous run (message run_id = {msg_run_id}, my run_id = {run_id})')
continue
if msg_run_id > run_id:
logging.info(f'Ignoring incoming message from future run; this should not happen (message run_id = {msg_run_id}, my run_id = {run_id})')
continue
if s_i is None:
logging.debug(f'Initial incoming message from participant {i}')
else:
logging.debug(f'Incoming message from participant {i} in session {model.i_to_sid[i]}')
action_type, data = model.handle_incoming(i, s_i, pre_i, share_is_valid)
self.queue_action(action_type, data)
elif action_type == ActionType.SESSION_START:
send_count += len(data)
sid_ctr = model.sid_ctr
logging.debug(f'Enough participants are ready, starting new session with sid {sid_ctr}')
T = model.sid_to_T[sid_ctr]
session_malicious = attacker_strategy.choose_malicious(T, sid_ctr)
for item in data:
ctx, i = item
self.i_to_cached_ctx[i].put((run_id, ctx))
self.outgoing.put((i, run_id, (ctx.msg, ctx.T, ctx.pre, i in session_malicious)))
elif action_type == ActionType.SESSION_SUCCESS:
ctx, sig, sid = data
end = time.time()
assert verify(ctx, sig)
return end - start, send_count, recv_count, sid
else:
raise Exception('Unknown ActionType', action_type)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
if len(sys.argv) != 8:
print(f'usage: {sys.argv[0]} <host> <start_port> <threshold> <total> <malicious> <attacker_level> <runs>')
sys.exit(1)
host = sys.argv[1]
start_port = int(sys.argv[2])
t = int(sys.argv[3])
n = int(sys.argv[4])
m = int(sys.argv[5])
attacker_level = AttackerLevel(int(sys.argv[6]))
runs = int(sys.argv[7])
msg = b""
i_to_addr = {i + 1: (host, start_port + i) for i in range(n)}
# This is insecure; in practice we'd use DKG, but since
# key generation is not the focus of the ROAST protocol, we will
# keep the implementation simple by having the coordinator
# act as a centralized dealer.
sk = 1 + secrets.randbelow(fastec.n - 1)
i_to_sk = split_secret(sk, t, n)
X = sk * fastec.G
i_to_X = {i: sk_i * fastec.G for i, sk_i in i_to_sk.items()}
i_to_cached_ctx = {i + 1: Queue() for i in range(n)}
actions = Queue()
outgoing = Queue()
coordinator = Coordinator(actions, outgoing, i_to_cached_ctx)
coordinator.setup(i_to_addr)
for _ in range(runs):
model = CoordinatorModel(X, i_to_X, t, n, msg)
attacker_strategy = AttackerStrategy(attacker_level, n, m)
elapsed, send_count, recv_count, sid = coordinator.run(i_to_sk, model, attacker_strategy)
print(t, n, m, attacker_level, elapsed, send_count, recv_count, sid, sep=',')