diff --git a/examples/network/aliases.json b/examples/network/aliases.json new file mode 100644 index 0000000..9f20c3f --- /dev/null +++ b/examples/network/aliases.json @@ -0,0 +1 @@ +{"pingpong-listener":"12D3KooWK5UgRtP27Jdujm79Pq3QLJ5Gttg2PucDF8Hppd7txJiA","pingpong-dialer":"12D3KooWQ6DqK22D98Ro4WopLhsGPkUyb2d3EvckDrvSYta4E3Y6"} \ No newline at end of file diff --git a/examples/network/bug.zip b/examples/network/bug.zip deleted file mode 100644 index 6257603..0000000 Binary files a/examples/network/bug.zip and /dev/null differ diff --git a/examples/network/ids/pingpong-dialer.json b/examples/network/ids/pingpong-dialer.json new file mode 100644 index 0000000..b0c774e --- /dev/null +++ b/examples/network/ids/pingpong-dialer.json @@ -0,0 +1 @@ +{"id":"12D3KooWQ6DqK22D98Ro4WopLhsGPkUyb2d3EvckDrvSYta4E3Y6","privKey":"CAESQMLauuc9ZpSyyUT6M2aEVHOmpWKtu1iJ1RgoF9J27chG1BN8ReNF+MDNiTWrpC3T66mQdb/lo1Rq0PDXJR2KmWs=","pubKey":"CAESINQTfEXjRfjAzYk1q6Qt0+upkHW/5aNUatDw1yUdiplr"} \ No newline at end of file diff --git a/examples/network/ids/pingpong-listener.json b/examples/network/ids/pingpong-listener.json new file mode 100644 index 0000000..36eab90 --- /dev/null +++ b/examples/network/ids/pingpong-listener.json @@ -0,0 +1 @@ +{"id":"12D3KooWK5UgRtP27Jdujm79Pq3QLJ5Gttg2PucDF8Hppd7txJiA","privKey":"CAESQJrfEr15UL1tJWbldgZzBh9T9Y9RrkRSAzDT4lKeK9EqiZhB5ZFmAGv3Q8cidbFUpoizcWTqTBIZjYmLn3n4jL8=","pubKey":"CAESIImYQeWRZgBr90PHInWxVKaIs3Fk6kwSGY2Ji595+Iy/"} \ No newline at end of file diff --git a/examples/network/pingpong/aliases.json b/examples/network/pingpong/aliases.json index 8607337..8dbf6b0 100644 --- a/examples/network/pingpong/aliases.json +++ b/examples/network/pingpong/aliases.json @@ -1 +1 @@ -{"pingpong-listener":"QmNLstfWyBabHSoCATbjUKyiTkiBbGMRfXKuT2b8Bv62AW","pingpong-dialer":"QmVVBs2C1C9rP72jmHcuJkzvZRPE2WBWUyCPwa7tNavFh7"} \ No newline at end of file +{"pingpong-listener":"12D3KooWRreQ2wg6auezZE2FW5BYaRTW56HH7oPe5EPcwnTDACms","pingpong-dialer":"12D3KooW9yLx9GWMoh58Q3eLwnjSDgZ22pNcbULn1rL1B3g5dCCH"} \ No newline at end of file diff --git a/examples/network/pingpong/ids/pingpong-dialer.json b/examples/network/pingpong/ids/pingpong-dialer.json index d64c738..b32dc86 100644 --- a/examples/network/pingpong/ids/pingpong-dialer.json +++ b/examples/network/pingpong/ids/pingpong-dialer.json @@ -1 +1 @@ -{"id":"QmVVBs2C1C9rP72jmHcuJkzvZRPE2WBWUyCPwa7tNavFh7","privKey":"CAASpwkwggSjAgEAAoIBAQCmU3JZoXROh/HGZTxcQlg2Hun/U6zK0CE+jN7p63cEqfG1rYq6QmQ9oKdTTS7L4Vvtx9JCrcjsXIHQuKBTg1ZS8+hy3ZBo7TbgtYNvD9A8MQW2R6dqurNzUKn1qaxpZl13sQT7ol4cFF4viwMwAo128FeG2k1W4un+D9evg/8FfAbOBxsyTil97R1rWHHtyPobtr+zOV7a9mcbutaKzxWbWjZUQnJcd+BMyesFDYIAZhPYGC458jWkB7/1Sw6nWKIVuh1DDQiQ4AThzp7ks9JLbVKUNzumVn9zOy0CrVlmB3GVj7p77mzJkGuDZqh5CcsK9gs11i/xz5rfuZisqxGDAgMBAAECggEAd+pd3UVMZ3oX1GQUuqeSlaKALneTcr3P2hsSdDAxpQkpnUS7akKHpu729FYHUTLvZmXUsAI/hDnF1kfmP4/HYxM7GeWoQh4UnLoBQsdx6JOnfJ34lDh7PL6Bav6jsXH+HVdhMlMD6ta8eSaOa8TLXV82m6E0dVowPd4KMR7HdJmku/0G7czunZt2huuinYAZ24kLnVxSU47Z7ngQb4pvhaHqepFBEOlp9Gvku9q0nzVeWmLo1J4MPT4CsUybS+c//aFz9pw/6lBBV2SUQuzyZ00fYzLZSRaLKQEo87WUVkdcx81JgoSkFt2i8v7QzmSPDU1sPfC3RUnUluBIfVWz8QKBgQDsWbdRl2WRYDo/S3sD9kwt3WnE+Fdihc24uE1HgLuC9BhRDS9OOBc/XjlUvkCrxd0Glr1FTfP9ewNgxXQ5mnNTo/eMqblxlK/vUeqn4HXysgnBGhsgZmNC1vOb4Q9AnxS54uOBkqNjyyAdU02DHPVFvcfimmWohwBCTfgOywB1FQKBgQC0J2Nq1wnFjyamPcKzIGwEMQWQZ7pRHS9IeAb1c8xitCqFlR3vkKXgbn7EM2gisBLB/Wri5kylQr2Gy84jd7P4hWieGafm1Rc3r7zkafPVtyWomWcfDbcpJsiGsQop66EpBm869dnnexutbAAz3M3fhcjubqj9G1U5o7/1wXjCNwKBgQCTIe6rDlKeQ4c/K9/ywXr++l0Dz42muaEtox4Iqy0QAqC4pDqUuPpP6npKNP3RcSV9Go3M/RAs9k1OCt2llm7A3MwYdvgIqwUzOI2Z4HPMl+TWn0fPza1xSJryqRJzqhSe+42hdgXc8/CUEO2p93cA6XnrqS4r0Y7pt9v6aYlpWQKBgAo3AYgZUVCGYWajsdp+SCGktfAOMZ5PzVKKm7pnKnueQ5r3bY8b4IvtN/rf/1OYMDgXqmvbKxVjx2NRQwr3ypiY1+m/AqowAvUBXfCFoXHIxLXenN5B5NTMgipA95aQ6b5twvjQ394kONmIeip2pqW57D64v5Q6bIasJkJFChfZAoGAYyVkaNVHr8bdH9oxEPbVN40xhVD9J1SaJlk+JERmqtLn9TcXRqT3zvm27W+YQw7E4wEHITjT1P7l7sJ+8n1mzAUjsie8dTLoZ68TvwZC1ktIPU6tUkpefDya+IMbN3DU7X2jrk4y56hnofgK/AzKdBVK/cONoYXtWkotJ0EUZcw=","pubKey":"CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCmU3JZoXROh/HGZTxcQlg2Hun/U6zK0CE+jN7p63cEqfG1rYq6QmQ9oKdTTS7L4Vvtx9JCrcjsXIHQuKBTg1ZS8+hy3ZBo7TbgtYNvD9A8MQW2R6dqurNzUKn1qaxpZl13sQT7ol4cFF4viwMwAo128FeG2k1W4un+D9evg/8FfAbOBxsyTil97R1rWHHtyPobtr+zOV7a9mcbutaKzxWbWjZUQnJcd+BMyesFDYIAZhPYGC458jWkB7/1Sw6nWKIVuh1DDQiQ4AThzp7ks9JLbVKUNzumVn9zOy0CrVlmB3GVj7p77mzJkGuDZqh5CcsK9gs11i/xz5rfuZisqxGDAgMBAAE="} \ No newline at end of file +{"id":"12D3KooW9yLx9GWMoh58Q3eLwnjSDgZ22pNcbULn1rL1B3g5dCCH","privKey":"CAESQF6wIGIGQ5xtPa0aYpASt5nB02lIpyHoKZ/9B7RA3htpAkzZ1fgk6YZabzX9Cx9RMMU6++VHbbJr9eJ7mtgZd9o=","pubKey":"CAESIAJM2dX4JOmGWm81/QsfUTDFOvvlR22ya/Xie5rYGXfa"} \ No newline at end of file diff --git a/examples/network/pingpong/ids/pingpong-listener.json b/examples/network/pingpong/ids/pingpong-listener.json index 61931cf..f1157a8 100644 --- a/examples/network/pingpong/ids/pingpong-listener.json +++ b/examples/network/pingpong/ids/pingpong-listener.json @@ -1 +1 @@ -{"id":"QmNLstfWyBabHSoCATbjUKyiTkiBbGMRfXKuT2b8Bv62AW","privKey":"CAASqAkwggSkAgEAAoIBAQDfhqItLOHTl6cw9hxWkdHipCtlLk5f2NkE4Suebb6h4WpFR8qafNJ8qJLXGxMaYW50OBeMDdnnsefpYbbC2CNgKUx2AnzZ784AdzeDUTTZDwdePwJSuemx1tlSgbw4KADLP8pYNleuflKyUqzdojpc47Sn1gv6zcYPolz+1aSP0Xl76UoRgdm8Zymzxk+xKyxyBYxgCkYu7RNwkoE7hpOyl7eRGPNG31978NR9vu5XdXri27ez51lHpZq7KBgquONFtfok4eEEF26qIJbcb/4xl73I8eXH+FJGqHgyzSyAp0guHXM8Fg+xuenmY0sNtVBAgFbB6unSJWQMFmnxBNoLAgMBAAECggEBAJOSy5ePvjh4M0W79tGgzDUZthzDCbN18zGph6a9RdKShBrhXv3H0x/CG9Awa9hK4yWPstwgePDjH/2RKZxSHmjqWzS+R7eK/zKHgvsLrhxwM6khaGM9ovBqrGgwhxd8Man+n5TFq/XkKKzasI5TAL07CJaWVqprGIxR4ZvNaSwZIERJCLHafn08aC4GVukXeWMoH0ya+WViKev6sGIq39MUHevnA/itpNrj51Btlqs8FHPaMMHzPSxcjEzBIo5upUKqklo7fsx/XE7sp9FeE7h3AaItZMzv/aFJpvMe/m33z7HZHKTBxleU0RnxAKPsW1ZIRr2PNM4wAvfb1XgJcGECgYEA+MN1klZVcUvmniiP8oQ9YHfc0rpC6FKQKJQdizZKQkkrm1m5ulSHx+JuAl0Nk4PcO9g8ZKuI9HLmCeK1fM8MF+rKlmDipI5ajU6P1sryZkzQAFIJLRfjnRub5uuVJWkTedIualE/QADllWEbVOw5eAZIKUAKOd+/kGh9wIWepP8CgYEA5gc63XZJBMgZI1N2hKeN6MmQHABEDacv4mA6Ymhg9y8JmsK890w3wHHdf2NI64rCvELWlPdyfet7SL/xxVI/pYE2ejJXvhrSe6ZARKLEifZ3c3nllHDQ1z2DXyXonhoevfLNdghED5sVvX0ajc246JC4/PrxmEWs18gMvOXVDvUCgYEA2wUUbewvPBosiNGDs200sMu3k51ErVGL9P47aMc66FON3jBIcsJb7ePxIYmWG2v8KoB+48+XPEoxOUDus12D80bYaUASK/ndxg4GXIHAm8tDUxTnWVlwIHIfeFewsAhsilRAY4D3JD3l5PhjXQjCrGczf4YPutbBzb4CAdBjVjcCgYB+AyvuMmRh6DRNM+XTWe7VvcXicQrW5+XFf628Ry4He48pZtEaMHjCRh5vMLa7wjJX682docjozl2lRvFthVc0lYqAep+ylwMDldnTP8+nPIvHiNmJ7huaLiqPrza1ld2NdTu1E2YlnnHUcnpfgHlxfga5H8fGATVkqETCHq4PGQKBgBGUpKmxcVbUNWE8zeOf19nqr5y6dovn3Wm6T1NqBP7yOpEK3wuSOwjwhqy4vdcSHcudiFQAurUVP3cz5WXbdhZp3B8g68VFfq+61fcxMDlXeiIhWxJdqmD2LZPGv4jg+oxB40cJC1R6x5WGuqQQNo/ezrtqP9Q/5XyX80alUXaK","pubKey":"CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDfhqItLOHTl6cw9hxWkdHipCtlLk5f2NkE4Suebb6h4WpFR8qafNJ8qJLXGxMaYW50OBeMDdnnsefpYbbC2CNgKUx2AnzZ784AdzeDUTTZDwdePwJSuemx1tlSgbw4KADLP8pYNleuflKyUqzdojpc47Sn1gv6zcYPolz+1aSP0Xl76UoRgdm8Zymzxk+xKyxyBYxgCkYu7RNwkoE7hpOyl7eRGPNG31978NR9vu5XdXri27ez51lHpZq7KBgquONFtfok4eEEF26qIJbcb/4xl73I8eXH+FJGqHgyzSyAp0guHXM8Fg+xuenmY0sNtVBAgFbB6unSJWQMFmnxBNoLAgMBAAE="} \ No newline at end of file +{"id":"12D3KooWRreQ2wg6auezZE2FW5BYaRTW56HH7oPe5EPcwnTDACms","privKey":"CAESQF79dRdiG8tJ5zRBtAU3UMBSOGSPwRzdsgj5pNnOwHb/7lCAtk+OET71pdXNBhOSUQ64O4qdNCwp13M2Ul9VqF4=","pubKey":"CAESIO5QgLZPjhE+9aXVzQYTklEOuDuKnTQsKddzNlJfVahe"} \ No newline at end of file diff --git a/lib/lists.trp b/lib/lists.trp index 77d42bb..b402da8 100644 --- a/lib/lists.trp +++ b/lib/lists.trp @@ -56,13 +56,6 @@ let fun map f list = in partition_aux ([],[]) ls end - fun contains element list = - case list of - [] => false - | h :: t => - if h = element then true - else contains element t - fun filter f l = case l of [] => [] | h :: t => if f h then h :: (filter f t) else filter f t @@ -97,7 +90,6 @@ in , ("append", append) , ("partition", partition) , ("nth", nth) - , ("contains", contains) , ("filter", filter) , ("first", first) , ("slice", slice) diff --git a/lib/out/lists.exports b/lib/out/lists.exports index cf5a5c3..db2a6ba 100644 --- a/lib/out/lists.exports +++ b/lib/out/lists.exports @@ -9,7 +9,6 @@ length append partition nth -contains filter first slice diff --git a/lib/raft_troupe.trp b/lib/raft_troupe.trp deleted file mode 100644 index 81a16b1..0000000 --- a/lib/raft_troupe.trp +++ /dev/null @@ -1,1014 +0,0 @@ -import lists - -(* - Log = { - snapshot: Snapshot - log: Entry[], - lastApplied: int, - internalChanges: int, - commitIndex: int, - latestSerials: SerialKey[] - } - Snapshot = { - snapshot: Some state - lastIncludedIndex: int, - lastIncludedTerm: int - } - Entry = { - term: int, - command: message, - serial: string - } - SerialKey = { - id: clusterId[] | pid, - key: (logIndex, number) | nonce - } -*) - -(* - LeaderInfo = { - nextIndex = { - peer: p, - next: int - }[], - matchIndex = { - peer: p, - match: int - }[] - } -*) - -(* - StateMachine = { - set_hook : fn (x: string) => x - get_hook : fn (x: string, callback_pid: string) => x - get_snapshot_hook : fn(callback_pid: string) => x - get_changes_hook : fn (callback_pid: string) => x - snapshot_condition_hook : fn (log_summary: LogSummary, callback_pid: string) => x: bool - } - LogSummary = { - log_size: int, - entries_since_snap: int - } -*) - -(* - Node = { - all_nodes: string[], - id: string, - log: Log, - term: int, - voted_for: string, - leader: string, - leader_info: LeaderInfo, - snapshot_condition: fn logSummary => ... : boolean - state_machine: ([SIDE-EFFECTS], STATUS, STEP-FUNC) - total_nodes: int, - verbose: boolean - } -*) - -(* - RaftProcesses = { - type: Client | Cluster, - id: pid | Clusterid[] - } -*) - -datatype Atoms = - WAIT | SUS | DONE - | SEND_HEARTBEAT - | RAFT_UPDATE - | NOT_LEADER - | ACKNOWLEDGE - | REJECT - | ELECTION_TIMEOUT - | REQUEST_VOTE | YES_VOTE | NO_VOTE | VOTE_TIMEOUT - | APPEND_ENTRIES | SNAPSHOT - | ADD_NODES - | DIAL - - | DIALER_ACK | DIALER_SM_BUSY | DIALER_SM_DONE | DIALER_CLIENT_MSG - | DIALER_MESSAGE_TIMEOUT | DIALER_BUSY_TIMEOUT - - | SEND_TO_NTH | SEND_TO_ALL - - | DEBUG_PRINTLOG | DEBUG_PAUSE | DEBUG_CONTINUE | DEBUG_APPLYSNAPSHOT | DEBUG_SNAPSHOT_COND | DEBUG_TIMEOUT - - | FUNCTION_DONE - - | ERROR_TIMEOUT - | CLUSTER | CLIENT - -let - (* Constants *) - val LOCAL_ERROR_TIMEOUT = 4000 - val ELECTION_TIMEOUT_LOWER = 2000 - val ELECTION_TIMEOUT_UPPER = 4000 - val HEARTBEAT_INTERVAL = 500 - - val DIALER_NOLEADER_TIMEOUT = 500 - val DIALER_NOMSG_TIMEOUT = 2000 - val DIALER_SM_BUSY_TIMEOUT = 1000 - - fun not a = a = false - fun send_to_all processes msg sender = map (fn x => send(x, msg)) (filter (fn x => x <> sender) processes) - - fun send_to_nth processes msg n = send((nth (reverse processes) n), msg) - - fun max a b = if a < b then b else a - - fun min a b = if a > b then b else a - - (* #IMPORT libs/quickselect.trp *) - fun is_even i = i mod 2 = 0 - - (* Using QuickSelect, finds the kth element of a list. *) - fun quickselect list k = - case list of - [] => "ERROR: Empty list" - | h :: t => - let val (ys, zs) = partition (fn x => x > h) t - val l = length ys - in - if k < l then quickselect ys k - else if k > l then quickselect zs (k-l-1) - else h - end - - (* Returns the median of a list. *) - fun median list = - let val len = length list - val middle = if is_even len then len / 2 - 1 else (len - 1) / 2 - in quickselect list (middle) - end - (* END OF libs/quickselect.trp *) - - (* #IMPORT libs/log.trp *) - (* Creates a snapshot. *) - fun set_snapshot snapshot index term = { - snapshot = snapshot, - lastIncludedIndex = index, - lastIncludedTerm = term - } - - (* Creates a default, empty snapshot. *) - val empty_snapshot = { - snapshot = (), - lastIncludedIndex = 0, - lastIncludedTerm = 0 - } - - (* A default, empty log. *) - val empty_log = { - log = [], - snapshot = empty_snapshot, - lastApplied = 0, - commitIndex = 0, - lastMessageSerial = "" - } - - fun pretty_print_log id log = - (* Disabled for library *) - (* printString "\n========******========"; - print (length log.log); - printString ("ID: "^id); - printString "----------------------"; - printString "Entries (term, message):"; - map (fn x => print (x.term, x.command)) log.log; - printString "----------------------"; - printString "CommitIndex:"; - print log.commitIndex; - printString "LastApplied:"; - print log.lastApplied; - printString "----------------------"; - printString "Snapshot:"; - print log.snapshot; - printString "========******========\n";*) - () - - (* Appends a message to the log, and notes the message's serial number. *) - fun append_message log message callback term serial = - let val new_entry = { - term = term, - command = message, - callback = callback, - serial = serial - } - in { - log with - lastMessageSerial = serial, - log = new_entry :: log.log - } - end - - - (* Appends a list of message to the log. *) - fun add_entries_to_log log entries term = - case entries of - [] => log - | h :: t => - add_entries_to_log (append_message log h.command h.callback term h.serial) t - h.term - - (* Updates the lastApplied-index. *) - fun update_applied log = { - log with - lastApplied = log.lastApplied + 1 - } - - (* Commits a message in the log. *) - fun update_commit log new_index = { - log with - commitIndex = (max new_index log.commitIndex) - } - - (* Rolls the log back one entry. *) - fun rollback_log log = - let val loglog = log.log - in case loglog of - (_ :: prev_log) => { - log with - log = prev_log - } - | [] => {log with log = []} - end - - (* Get the entry of the latest log entry. *) - fun get_log_index log = (length log.log) + log.snapshot.lastIncludedIndex - - (*Determines whether or not all log changes have been committed. *) - fun log_is_committed log = (get_log_index log = log.commitIndex) - - (* Rolls the log back n time. *) - fun rollback_log_to log n = - if n < (get_log_index log) then - let val log = rollback_log log - in (rollback_log_to log n) - end - else log - - (* Get the term of the latest entry of the log, or, if empty, the last - included index of the snapshot. *) - fun get_latest_entry_term log = - case log.log of - [] => log.snapshot.lastIncludedTerm - | h :: _ => h.term - - (* Get the term of the latest log entry. *) - fun get_latest_log_term log = get_latest_entry_term log - - (* Get the message of the latest log entry. *) - fun get_latest_log_command log = - case log.log of - [] => 0 (* Should not be reachable. *) - | h :: _ => h.command - - fun get_nth_command log index = nth (reverse log.log) (index - log.snapshot.lastIncludedIndex) - - (* Returns a slice of all entries after log-index n. *) - fun get_commands_after_nth entries n last_included = - let val log_slice = slice (n - last_included) (length entries) (reverse entries) - in log_slice - end - - (* Get a snapshot of all committed entries. *) - fun get_snapshot state log = - if log.commitIndex > 0 andalso - (log.commitIndex - log.snapshot.lastIncludedIndex) <= length log.log then - let val lastCommitted = get_nth_command log log.commitIndex - in set_snapshot state log.commitIndex lastCommitted.term end - else empty_snapshot - - - (* Applies a snapshot to the log. *) - fun apply_snapshot snapshot log = - let val newCommitIndex = - if log.commitIndex < snapshot.lastIncludedIndex then snapshot.lastIncludedIndex - else log.commitIndex - val uncommitted_entries = get_commands_after_nth log.log newCommitIndex log.snapshot.lastIncludedIndex - val newLastApplied = - if log.lastApplied < snapshot.lastIncludedIndex then snapshot.lastIncludedIndex - else log.lastApplied - in { log with - log = uncommitted_entries, - commitIndex = newCommitIndex, - lastApplied = newLastApplied, - snapshot = snapshot } - end - - (* Asks the state-machine whether or not to snapshot. *) - fun evaluate_snapshot_cond state snapshot_cond log = - if (log.lastApplied - log.snapshot.lastIncludedIndex) > snapshot_cond then - apply_snapshot (get_snapshot state log) log - else log - (* END OF libs/log.trp *) - - (* #IMPORT libs/leader-info.trp *) - - (* Generates a default leader info, with the nextIndex of all followers - being the nextIndex of the new leader. This can be changed with followers - rejecting AppendEntries *) - fun new_leader all_nodes log = - let val nextIndex = get_log_index log - val index = map (fn id => {peer = id, next = nextIndex + 1}) all_nodes - val match_index = map (fn id => {peer = id, match = 0}) all_nodes - in { - nextIndex = index, - matchIndex = match_index - } end - - (* Get the nextIndex of a peer *) - fun get_next_index leader_info peer = first (filter (fn (x) => x.peer = peer) leader_info.nextIndex) - - (* Get the matchIndex of a peer *) - fun get_match_index leader_info peer = first (filter (fn (x) => x.peer = peer) leader_info.matchIndex) - - (* Updates a cluster member's next-index. This is done after an - acknowledgement or rejection. *) - fun update_next_index leader_info peer new = let - val prevIndex = get_next_index leader_info peer - val newIndex = {peer = peer, next = new} - val withoutPeer = filter (fn (x) => x.peer <> peer) leader_info.nextIndex - in { - leader_info with - nextIndex = newIndex :: withoutPeer - } end - - (* Updates a cluster member's match-index, denoting how much of their log - matches the leader holding the leader info. *) - fun update_match_index leader_info peer new = let - val prevIndex = get_match_index leader_info peer - val newIndex = {peer = peer, match = new} - val withoutPeer = filter (fn (x) => x.peer <> peer) leader_info.matchIndex - in { - leader_info with - matchIndex = newIndex :: withoutPeer - } end - - (* Get all follower's matchIndex*) - fun get_matches leader_info = map (fn x => x.match) leader_info.matchIndex - - (* Get the highest index of entries that a majority of followers have - appended to, by finding the median *) - fun calc_highest_commit matches = median matches - (* END OF libs/leader-info.trp *) - - (* Executes a function after a given timeout. *) - fun start_timeout func duration = - let fun timeout () = - let val time = duration - val _ = sleep time - in func () - end - val p_id = self() - in spawn timeout - end - - (* Send message after a delay. *) - fun send_delay (to, m) delay = - sleep delay; - send (to, m) - - (* Starts a random timeout with lower=2sec and upper=4sec *) - fun start_random_timeout func = start_timeout func (ELECTION_TIMEOUT_LOWER + ((random ()) * (ELECTION_TIMEOUT_UPPER - ELECTION_TIMEOUT_LOWER))) - - (* #IMPORT ./libs/dialer.trp *) - -(* Selects a random element from a list *) -fun random_element list = - let fun roundUp n m = - if n <= 0 then m else roundUp (n - 1) (m + 1) - val r_n = roundUp (random() * (length list - 1)) 0 - in nth list r_n -end - - -(* Given a list of serialkeys, and a serial key, check if it is valid, and -return a list containing the serial key if so, and a boolean denoting whether or -not it is valid. *) -fun apply_serialkey list key = - case list of - [] => (true, []) - | h :: t => - if h = key then - case (h.key, key.key) of - ((log_index, seq_numb), (new_log_index, new_seq_numb)) => - if new_log_index > log_index orelse - (log_index = new_log_index andalso new_seq_numb > seq_numb) then - (true, ({ h with key = key.key } :: t)) - else (false, h :: t) - | (nonce, new_nonce) => - if nonce <> new_nonce then - (true, ({ h with key = key.key } :: t)) - else (false, h :: t) - | _ => (true, ({ h with key = key.key } :: t)) - else - let val (cond, list) = apply_serialkey t key - in (cond, h :: list) - end - - -(* Used by the dialer to send message to a cluster. If the nodes are busy or if -no leader is present, this function re-sends the message until it is eventually -delivered and acknowledged by the leader of the cluster. If leader is unknown, -it can be defined as unit.*) -fun dialer_send_message p_id msg serial_n leader cluster = - let val nonce = mkuuid() - val msg_timeout = start_timeout (fn() => send(p_id, (DIALER_MESSAGE_TIMEOUT, nonce))) - val busy_timeout = start_timeout (fn() => send(p_id, (DIALER_BUSY_TIMEOUT, nonce))) - fun wait () = - receive [ - hn (NOT_LEADER, leader_id) => - dialer_send_message p_id msg serial_n leader_id cluster, - hn (DIALER_ACK, other_serial) when other_serial = serial_n => - leader, - hn (DIALER_SM_BUSY, other_serial) when other_serial = serial_n => - busy_timeout DIALER_SM_BUSY_TIMEOUT; - wait (), - hn (DIALER_SM_DONE, other_serial) when other_serial = serial_n => - leader, - hn (DIALER_MESSAGE_TIMEOUT, x) => - if x = nonce then dialer_send_message p_id msg serial_n (random_element cluster) cluster - else wait (), - hn (DIALER_BUSY_TIMEOUT, x) => - if x = nonce then dialer_send_message p_id msg serial_n leader cluster - else wait () - ] - in (case leader of - () => msg_timeout DIALER_NOLEADER_TIMEOUT - | x => - msg_timeout DIALER_NOMSG_TIMEOUT; - send(x, msg)); - wait () -end - -(* Facilitates client-side interaction to the Raft cluster. Allows the -programmer to send messages to the cluster in the format (RAFT_UPDATE, msg)*) -fun dialer cluster client_id = - let val p_id = self() - fun update_message x leader = let - val serial_n = mkuuid() - in dialer_send_message p_id ((RAFT_UPDATE, x), p_id, serial_n) serial_n leader cluster - end - val leader = random_element cluster - - fun loop leader sks = - receive [ - hn (RAFT_UPDATE, x) => - loop (update_message x leader) sks, - - hn (DIALER_CLIENT_MSG, msg, sk) => - let val (cond, sks) = apply_serialkey sks sk - in - (if cond then send(client_id, msg) - else ()); - loop leader sks - end, - - hn (SEND_TO_NTH, n, x) => - send_to_nth cluster x n; - loop leader sks, - - hn (SEND_TO_ALL, x) => - send_to_all cluster x (self()); - loop leader sks, - hn _ => loop leader sks ] - in loop leader [] -end - -(* Temporary dialer sends a list of messages to a cluster before terminating. *) -fun leader_dialer cluster msgs = - let val p_id = self() - val leader = random_element cluster - in - map (fn (msg, serial) => dialer_send_message p_id ((RAFT_UPDATE, msg), p_id, serial) serial leader cluster) msgs -end - -(* Send-function used for clusters to send a message to either the dialer or -client. *) -fun raft_send (process, msgs) = case process.type of -CLIENT => map (fn (msg, sk) => send(process.id, (DIALER_CLIENT_MSG, msg, sk))) msgs -| CLUSTER => spawn (fn () => leader_dialer process.id msgs) -(* END OF ./libs/dialer.trp *) - - (* Send the side-effect-messages to dialers or clusters *) - fun send_sides log sides = - (* Add message to key-value-store, sorting by the recipients. *) - let fun add_msg id msg sk dict = case dict of - [] => [(id, [(msg, sk)])] - | (other_id, msgs) :: t => - if id = other_id then - (id, (msg, sk) :: t) - else (other_id, msgs) :: add_msg id msg sk t - (* Generate key-value-store of all message, sorting by recipients. *) - val (sorted_msgs, _) = case sides of - [] => ([], 0) - | x => foldl (fn ((callback, msg), (acc, seq)) => - (add_msg callback msg ({ id = callback, key = (log.lastApplied, seq)}) acc, seq + 1) - ) ([], 1) x - (* Sends all messages. *) - in map (fn x => raft_send x) sorted_msgs - end - - (* Applies all log-entries that have been committed, but not applied *) - fun apply_log log state_machine is_leader = - (* If any non-applied, committed logs apply... *) - if log.lastApplied < log.commitIndex then - (* Get the latest non-applied committed entry *) - let val entry = get_nth_command log (log.lastApplied + 1) - val command = entry.command - (* Update log to apply entry and apply entry on state-machine*) - val log = update_applied log - val (sides, status, step) = state_machine - val (new_sides, new_status, new_step) = step command - (* If leader is applying, execute side-effects. *) - in (if is_leader then - entry.callback (); - send_sides log new_sides - else ()); - apply_log log (new_sides, new_status, new_step) is_leader end - else (log, state_machine) - - (* #IMPORT ./libs/nodes/leader.trp *) -fun leader_node node = - let val p_id = self() - (* Appends appends all entries from a follower's nextIndex to the leader's log index*) - fun append_entries node follower_pid = - let val nextIndex = get_next_index node.leader_info follower_pid - val logIndex = get_log_index node.log - in if logIndex + 1 >= nextIndex.next then - let - val latestLogIndex = nextIndex.next - 1 - in - (* Sends the snapshot if the followers nextIndex is before the Snapshot's lastIncludedIndex *) - if nextIndex.next <= node.log.snapshot.lastIncludedIndex - then send(follower_pid, (SNAPSHOT, node.log.snapshot, p_id, node.term)) - else - let val entries = get_commands_after_nth node.log.log latestLogIndex node.log.snapshot.lastIncludedIndex - val afterSnapshot = latestLogIndex - node.log.snapshot.lastIncludedIndex - val prevEntryTerm = - if afterSnapshot > 0 then (get_nth_command node.log latestLogIndex).term - else node.log.snapshot.lastIncludedTerm - in send(follower_pid, (APPEND_ENTRIES, entries, p_id, node.term, latestLogIndex, prevEntryTerm, node.log.commitIndex)) - end - end - (* A follower should never get more entries than the leader *) - else () - end - - (* Convert leader to follower *) - fun demote term leader voted_for node = - {node with - term = term, - leader = leader, - leader_info = (), - voted_for = voted_for} - - fun append_update node msg callback serial = - let val latestLogIndex = get_log_index node.log - val prevLogTerm = get_latest_log_term node.log - val log = append_message node.log msg callback node.term serial - val leader_info = update_match_index node.leader_info p_id (get_log_index log) - val leader_info = update_next_index leader_info p_id ((get_log_index log) + 1) - val node = {node with log = log, leader_info = leader_info} - in node - end - - (* Applies all committed log entries that have not already been applied *) - fun apply_committed node = - let val prev_commit = node.log.commitIndex - val highest_commit = calc_highest_commit (map (fn x => x.match) node.leader_info.matchIndex) - val node = { node with log = update_commit node.log highest_commit } - val (applied_log, new_sm) = apply_log node.log node.state_machine true - val snapshot_log = - if prev_commit < highest_commit then - evaluate_snapshot_cond new_sm node.snapshot_cond applied_log - else - applied_log - val (_, status, _) = new_sm - val node = { node with log = snapshot_log, state_machine = new_sm} - in - case status of - SUS => if log_is_committed node.log then append_update node () (fn () => ()) (mkuuid()) - else node - | _ => node - end - - val nonce = mkuuid () - - fun loop node = - receive [ - (* Halts the leader *) - hn DEBUG_PAUSE => - let fun pause () = receive [ - hn (DEBUG_CONTINUE) => loop node, - hn x => pause () - ] - in pause () end, - - hn (SEND_HEARTBEAT, x) when nonce = x => leader_node node, - - hn (SEND_HEARTBEAT, x) => - loop node, - - (* Message has not been appended before *) - hn ((RAFT_UPDATE, x), dialer_id, serial_n) => - let val (cond, sks) = apply_serialkey node.serialkeys serial_n - in if cond then let - val (_, stat, _) = node.state_machine - val node = case stat of - SUS => send(dialer_id, (DIALER_SM_BUSY, serial_n)); node - | DONE => send(dialer_id, (DIALER_SM_DONE, serial_n)); node - | WAIT => - if log_is_committed node.log then - let fun replication_cb () = send (dialer_id, (DIALER_ACK, serial_n)) - in append_update node x replication_cb serial_n end - else send(dialer_id, (DIALER_SM_BUSY, serial_n)); node - in leader_node node end - else send(dialer_id, (DIALER_ACK, serial_n)); - loop node - end, - - (* If append is successful on a follower*) - hn (ACKNOWLEDGE, (peer, logIndex)) => - let val prev_index = get_log_index node.log - val node = { node with leader_info = update_match_index node.leader_info peer logIndex } - val node = { node with leader_info = update_next_index node.leader_info peer (logIndex + 1) } - val node = apply_committed node - val next_index = get_next_index node.leader_info peer - in (if prev_index < get_log_index node.log then - map (fn x => append_entries node x) - (filter (fn x => - let val next_index = get_next_index node.leader_info x - in x <> p_id andalso next_index.next > logIndex end) node.all_nodes) - else if next_index.next <= get_log_index node.log then - append_entries node peer - else ()); - loop node - end, - - (* If append is unsuccessful *) - hn (REJECT, (peer, terminfo, logIndex)) => - if node.term >= terminfo.term then - let val node = { node with leader_info = update_next_index node.leader_info peer (logIndex + 1) } - in loop node - end - else follower (demote terminfo.term terminfo.leader () - node), - - (* If another node has been elected as a candidate, and - their term is in front of ours, convert to a follower *) - hn (REQUEST_VOTE, (c_term, c_id, c_log_index, c_log_term)) when c_term > node.term => - send(c_id, (YES_VOTE, node.id)); - follower (demote c_term () c_id node), - - hn (REQUEST_VOTE, (c_term, c_id, c_log_index, c_log_term)) => - send(c_id, (NO_VOTE, node.id)); - loop node, - - (* If we receive snapshot from a leader in a higher term, - convert to follower *) - hn (SNAPSHOT, snapshot, l_id, other_term) when other_term > node.term => follower (demote other_term l_id () node), - - (* If we receive AppendEntries from a leader in a higher term, - convert to follower *) - hn (APPEND_ENTRIES, x, l_id, other_term, prevIndex, prevTerm, commitIndex) when other_term > node.term => follower (demote other_term l_id () node), - - (* Prints log *) - hn DEBUG_PRINTLOG => - pretty_print_log node.id node.log; - loop node, - - (* Applies a snapshot *) - hn DEBUG_APPLYSNAPSHOT => - let - val snapshot = get_snapshot node.state_machine node.log - val node = case snapshot.snapshot of - () => node - | _ => {node with log = apply_snapshot snapshot node.log} - in loop node end, - hn _ => loop node - ] - in - (* Append entries for each follower *) - map (fn x => append_entries node x) (filter (fn x => x <> p_id) node.all_nodes); - start_timeout (fn () => send (p_id, (SEND_HEARTBEAT, nonce))) HEARTBEAT_INTERVAL; - loop node -end -(* END OF ./libs/nodes/leader.trp *) - (* #IMPORT ./libs/nodes/candidate.trp *) -and candidate node = - let val p_id = self() - - (* A candidate cannot vote for anyone and has no leader *) - val node = {node with voted_for = (), leader = ()} - val nonce = mkuuid() - - (* Sends a vote request to all followers *) - val latestLogIndex = get_log_index node.log - val prevLogTerm = get_latest_log_term node.log - - - (* Becoming a leader requires majority vote *) - val req_votes = ((length node.all_nodes) / 2) - - fun won_election () = - let val (sides, _, _) = node.state_machine - in - send_sides node.log sides; - leader_node ({ - node with leader_info = (new_leader node.all_nodes node.log), - leader = (p_id)}) - end - - fun wait_for_votes (follower_votes, vote_amount) = - let - fun loop () = receive [ - (* Received a vote from a follower we have not already - received a vote from *) - hn (YES_VOTE, follower_id) when (not (contains follower_id follower_votes)) => - wait_for_votes ((append follower_votes [follower_id]), vote_amount + 1), - - (*We received a NO_VOTE from a follower in a later term. - This can only happen if there is a leader/candidate in this - term, and as such, we convert to a follower *) - hn (NO_VOTE, other_term) when other_term > node.term => - follower node, - - (* Received vote request from candidate in later term *) - hn (REQUEST_VOTE, (c_term, other_c_id, c_log_index, c_log_term)) when c_term > node.term => - send(other_c_id, (YES_VOTE, node.id)); - follower ({ node with term = c_term, voted_for = other_c_id}), - - (* Received message from leader in a term at least as - up-to-date as ours. Because of this, we must have lost the - election *) - hn (APPEND_ENTRIES, x, l_id, other_term, prevIndex, prevTerm, commitIndex) when other_term >= node.term => - follower ({ node with leader = l_id}), - - (* Election timeout, send out another request vote *) - hn (VOTE_TIMEOUT, x) when x = nonce => candidate {node with term = node.term + 1}, - - (* Halts the candidate *) - hn (DEBUG_PAUSE) => - let fun loop () = receive [ - hn (DEBUG_CONTINUE) => (), - hn x => loop () - ] - in loop () end, - hn _ => loop () - ] - in if vote_amount >= req_votes then won_election () else loop () - end - in - send_to_all node.all_nodes (REQUEST_VOTE, (node.term, p_id, latestLogIndex, prevLogTerm)) (p_id); - start_random_timeout (fn () => send(p_id, (VOTE_TIMEOUT, nonce))); - wait_for_votes ([node.id], 1) -end -(* END OF ./libs/nodes/candidate.trp *) - (* #IMPORT ./libs/nodes/follower.trp *) -and follower node = - let val nonce = mkuuid() - val p_id = self() - val _ = start_random_timeout (fn () => send(p_id, (ELECTION_TIMEOUT, nonce))) - (* Sends a YES_VOTE to a candidate *) - fun vote_for c_id c_term node = - send(c_id, (YES_VOTE, node.id)); - { node with term = c_term, voted_for = c_id } - fun loop node start_time = let - fun start_election () = - candidate ({node with term = node.term + 1}) - val _ = receive [ - (* Starts an election *) - hn (ELECTION_TIMEOUT, x) when x = nonce => - if (getTime() - start_time >= ELECTION_TIMEOUT_LOWER) then start_election () - else start_random_timeout (fn () => send(p_id, (ELECTION_TIMEOUT, nonce))); loop node (getTime()), - - (* Sends a re-vote to a candidate we already voted for *) - hn (REQUEST_VOTE, (c_term, c_id, c_log_index, c_log_term)) when c_id = node.voted_for => - follower (vote_for c_id c_term node), - - (* If we receive a vote request, vote yes if: the log is a - up-to-date and the term of the candidate is later than our - current. Vote no otherwise *) - hn (REQUEST_VOTE, (c_term, c_id, c_log_index, c_log_term)) => - let val latestLogIndex = get_log_index node.log - val latestLogTerm = get_latest_log_term node.log - fun no_vote () = - send(c_id, NO_VOTE); - follower node - fun yes_vote () = - follower (vote_for c_id c_term ({node with term = c_term})) - in - if latestLogIndex > c_log_index - orelse latestLogTerm <> c_log_term - orelse c_term <= node.term then no_vote () - else yes_vote () - end, - - (* When receiving a snapshot from a leader in a later or - same term, acknowledge if it contains entries past our - current log index. Update leader and term accordingly. *) - hn (SNAPSHOT, x, l_id, leader_term) => - let val node = {node with leader = - if node.leader = () orelse node.term < leader_term then l_id - else node.leader} - - val {snapshot, lastIncludedIndex, lastIncludedTerm} = x - val log_term = get_latest_log_term node.log - val log_index = get_log_index node.log - - val accepting = - if leader_term < node.term then false - else if lastIncludedIndex <= log_index then false - else true - - val newlog = if accepting then apply_snapshot x node.log else node.log - val new_sm = if accepting then snapshot else node.state_machine - val reject = - fn () => send (l_id, (REJECT, (p_id, {term = node.term, leader = node.leader}, (get_log_index newlog)))) - val ack = - fn () => send (l_id, (ACKNOWLEDGE, (p_id, get_log_index newlog))) - - val node = { - node with term = (if node.term < leader_term then leader_term else node.term), - state_machine = new_sm, - log = newlog} - - in (if accepting then ack () - else reject ()); - loop node (getTime()) - end, - - (* When receiving entries from a leader in a later or - same term, acknowledge if it contains entries past our - current log index. And if the latest log index matches ours. - Update log accordingly.*) - hn (APPEND_ENTRIES, x, l_id, leader_term, latestLogIndex, prevLogTerm, leaderCommit) => - let val node = {node with leader = - if node.leader = () orelse node.term <= leader_term then l_id - else node.leader} - val accepting = - if leader_term < node.term then false - else if latestLogIndex > (get_log_index node.log) then false - else if (get_latest_log_term node.log) <> prevLogTerm andalso prevLogTerm > 0 then false - else true - val prev_commit = node.log.commitIndex - val newlog = - if accepting then - let val log = rollback_log_to node.log latestLogIndex - val log = add_entries_to_log log x leader_term - in update_commit log (min leaderCommit (get_log_index log)) - end - else node.log - val reject = - fn () => send (l_id, (REJECT, (p_id, {term = node.term, leader = node.leader}, (get_log_index newlog)))) - val ack = - fn () => send (l_id, (ACKNOWLEDGE, (p_id, get_log_index newlog))) - - val node = {node with term = (if node.term < leader_term then leader_term else node.term)} - val (applied_log, new_sm) = apply_log newlog node.state_machine false - val snapshot_log = - if prev_commit < applied_log.commitIndex then - evaluate_snapshot_cond new_sm node.snapshot_cond applied_log - else - applied_log - in - (if accepting then ack () - else reject ()); - loop {node with log = snapshot_log, state_machine = new_sm} (getTime()) - end, - - (* If client sends update, sends the leader's id *) - hn ((RAFT_UPDATE, x), dialer_id, _) => - send(dialer_id, (NOT_LEADER, node.leader)); - loop node start_time, - - (* Prints the log *) - hn (DEBUG_PRINTLOG) => - pretty_print_log node.id node.log; - loop node start_time, - - (* Halts the follower *) - hn (DEBUG_PAUSE) => - let fun paused () = receive [ - hn (DEBUG_CONTINUE) => (), - hn _ => paused () - ] - in - paused (); - loop node start_time - end, - - (* Start an election, electing this follower to a candidate *) - hn (DEBUG_TIMEOUT) => start_election (), - hn x => loop node start_time - ] - in () - end - in loop node (getTime ()) -end -(* END OF ./libs/nodes/follower.trp *) - - (* A node is dormant until it has received the references of all other - dormant_node ({node with all_nodes = append node.all_nodes x}) - ] - else follower node - - (* Defines a default node, being a follower in term 1 without a leader and - the state-machine in its beginning state *) - fun default_node id all_nodes node_amount state_machine snapshot_cond = - let val node = { - all_nodes = all_nodes, - id = id, - log = empty_log, - term = 1, - voted_for = (), - leader = (), - leader_info = (), - state_machine = case state_machine of - (_, _, _) => state_machine - | _ => ([], WAIT, fn x => x ()), - snapshot_cond = snapshot_cond, - node_amount = node_amount, - serialkeys = [] - } - in dormant_node node - end - - (* Spawn a state-machine on a seperate thread, creates a record*) - fun initiate_node state_machine snapshot_cond node_amount id = - spawn (fn () => default_node id [] node_amount state_machine snapshot_cond) - - (* Sends a list of all nodes to all nodes *) - fun add_refs nodes = - map (fn x => send(x, (ADD_NODES, nodes))) nodes - - (* Spawn n nodes*) - fun initiate_nodes n state_machine snapshot_cond = - let val part_init = initiate_node state_machine snapshot_cond n - fun spawn_nodes n acc_id = - case n of - 0 => [] - | x => append - (spawn_nodes (x - 1) (acc_id ^ "I")) - [(part_init acc_id)] - - val nodes = spawn_nodes n "I" - in - add_refs nodes; - nodes - end - - (* Spawn a state-machine on some alias *) - fun initiate_distributed_node state_machine snapshot_cond node_amount id alias = - spawn(alias, fn () => (default_node id [] node_amount state_machine snapshot_cond)) - - fun initiate_distributed_nodes aliases state_machine snapshot_cond = - let val part_init = initiate_distributed_node state_machine snapshot_cond (length(aliases)) - fun spawn_nodes acc acc_id = - case acc of - [] => [] - | h :: t => - append (spawn_nodes t (acc_id ^ "I")) [part_init acc_id h] - val nodes = spawn_nodes aliases "I" - in - add_refs nodes; - nodes - end - - (* Spawns a dialer, dialing into a cluster. *) - fun raft_dial (cluster, client_id) = - spawn(fn () => dialer cluster client_id) - - (* Spawns a distributed Raft network, which can be dialed into to - communicate with their state-machines *) - fun raft_spawn_alias (state_machine, aliases, snapshot_cond) = - initiate_distributed_nodes aliases state_machine snapshot_cond - | raft_spawn_alias (state_machine, aliases) = - raft_spawn_alias (state_machine, aliases, 50) - - (* Spawns a Raft network, which can be contacted to - communicate with their state-machines *) - fun raft_spawn (state_machine, n, snapshot_cond) = - initiate_nodes n state_machine snapshot_cond - | raft_spawn (state_machine, n) = raft_spawn (state_machine, n, 50) - -in - [ ("raft_dial", raft_dial) - , ("raft_spawn_alias", raft_spawn_alias) - , ("raft_spawn", raft_spawn) - , ("WAIT", WAIT) - , ("SUS", SUS) - , ("DONE", DONE) - , ("RAFT_UPDATE", RAFT_UPDATE) - , ("CLIENT", CLIENT) - , ("CLUSTER", CLUSTER) - ] -end