-
Notifications
You must be signed in to change notification settings - Fork 5
proper synchronized mining #143
base: master
Are you sure you want to change the base?
Changes from all commits
8695873
193d78f
4b0929d
d3a593f
072e022
1f2903e
486f936
89c07dc
0dc62c4
c467b32
2e3de49
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
[metadata] | ||
name = "lotus-soup" | ||
author = "raulk" | ||
|
||
[global] | ||
plan = "lotus-soup" | ||
case = "sync-mining" | ||
total_instances = 4 | ||
builder = "exec:go" | ||
runner = "local:exec" | ||
|
||
build = { selectors = ["testground"] } | ||
build_config = { enable_go_build_cache = true } | ||
run_config = { exposed_ports = ["6060", "1234", "2345"] } | ||
|
||
[global.run.test_params] | ||
clients = "1" | ||
miners = "2" | ||
genesis_timestamp_offset = "0" | ||
balance = "2000000000" | ||
sectors = "10" | ||
random_beacon_type = "mock" | ||
mining_mode = "synchronized" | ||
|
||
[[groups]] | ||
id = "bootstrapper" | ||
instances = { count = 1 } | ||
[groups.run.test_params] | ||
role = "bootstrapper" | ||
|
||
[[groups]] | ||
id = "miners" | ||
instances = { count = 2 } | ||
[groups.run.test_params] | ||
role = "miner" | ||
|
||
[[groups]] | ||
id = "clients" | ||
instances = { count = 1 } | ||
[groups.run.test_params] | ||
role = "client" |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
package main | ||
|
||
import ( | ||
"github.com/filecoin-project/oni/lotus-soup/testkit" | ||
) | ||
|
||
func syncMining(t *testkit.TestEnvironment) error { | ||
return testkit.HandleDefaultRole(t) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
package testkit | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/filecoin-project/lotus/build" | ||
"github.com/filecoin-project/specs-actors/actors/abi" | ||
"github.com/raulk/clock" | ||
"github.com/testground/sdk-go/sync" | ||
) | ||
|
||
// ClockSyncTopic is the topic where clock synchronization events are exchanged. | ||
var ClockSyncTopic = sync.NewTopic("clock_sync", &ClockSyncMsg{}) | ||
|
||
// ClockSyncMsg is a clock synchronization message. | ||
type ClockSyncMsg struct { | ||
ID string // ${groupname}_${groupseq} | ||
Epoch abi.ChainEpoch | ||
} | ||
|
||
// SynchronizeClock synchronizes this instance's clock with the clocks of all | ||
// miner and client nodes participating in this test scenario. | ||
// | ||
// This function returns two channels. | ||
// | ||
// The first channel (localAdvance) is where the user requests local advancement | ||
// to the next epoch. Such requests likely originate from some local event that | ||
// implies readiness to advance, such as "mining round completed" events from | ||
// synchronized mining. Upon receiving a local advance signal, the control | ||
// goroutine forwards the local clock by build.BlockDelaySecs, thus progressing | ||
// one chain epoch. It then signals this fact to all other nodes participating | ||
// in the test via the Testground synchronization service. | ||
// | ||
// The second channel is where the global clock synchronizer signals every time | ||
// the entirety of test participants have progressed to the next epoch, thus | ||
// indicating that all nodes are eady to commence that chain epoch of | ||
// computation. | ||
// | ||
// Essentially this behaviour can be assimilated to a distributed sempaphore. | ||
// | ||
// This method will return immediately, and will spawn a background goroutine | ||
// that performs the clock synchronisation work. | ||
func (n *LotusNode) SynchronizeClock(ctx context.Context) (localAdvance chan<- abi.ChainEpoch, globalEpoch <-chan abi.ChainEpoch, err error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. global vs local: i have trouble grappling with the topology implied by this locality
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The idea behind this is that when the local node, when enlisted predicates are satisfied, decides that it's ready to advance the clock, it would send a signal to Right now we ACTUALLY advance the local clock and advertise that fact to the world. In the future, we could use some form of 3PC (three phase commit), where we PREPARE [phase 1] the advancement, nodes opt in or opt out of that round via the sync service, and once all nodes have PREPARED, we actually COMMIT [phase 2] the local clock advancement by manipulating the mock clock, and CERTIFY [phase 3] that we are at the new epoch. Once all nodes have passed the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I think I was having the 3PC model you're describing in my mind. I'm fine if this is not implemented right now. |
||
n.MockClock = clock.NewMock() | ||
build.Clock = n.MockClock | ||
|
||
gen, err := n.FullApi.ChainGetGenesis(context.Background()) | ||
if err != nil { | ||
return nil, nil, fmt.Errorf("failed to get genesis: %w", err) | ||
} | ||
|
||
// start at genesis. | ||
genesisTime := time.Unix(int64(gen.MinTimestamp()), 0) | ||
n.MockClock.Set(genesisTime) | ||
|
||
var ( | ||
localEpochAdvanceCh = make(chan abi.ChainEpoch, 128) | ||
globalEpochStartCh = make(chan abi.ChainEpoch, 128) | ||
) | ||
|
||
// jumpstart the clock! | ||
localEpochAdvanceCh <- abi.ChainEpoch(1) | ||
|
||
var ( | ||
id = fmt.Sprintf("%s_%d", n.t.TestGroupID, n.t.GroupSeq) | ||
|
||
minerCnt = n.t.IntParam("miners") | ||
clientCnt = n.t.IntParam("clients") | ||
totalCnt = minerCnt + clientCnt | ||
|
||
epoch = abi.ChainEpoch(0) // current epoch: genesis | ||
epochInterval = time.Duration(build.BlockDelaySecs) * time.Second | ||
|
||
// ready tracks which nodes have advanced to the next round. | ||
// when all nodes have advanced, we tick the global clock and reset | ||
// this slice. | ||
ready []string | ||
raulk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
signalCh = make(chan *ClockSyncMsg, 128) | ||
sub = n.t.SyncClient.MustSubscribe(ctx, ClockSyncTopic, signalCh) | ||
) | ||
|
||
go func() { | ||
defer close(globalEpochStartCh) | ||
|
||
for { | ||
select { | ||
case <-localEpochAdvanceCh: | ||
// move the local clock forward, and announce that fact to all other instances. | ||
n.MockClock.Add(epochInterval) | ||
schomatis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
epoch++ | ||
n.t.RecordMessage("advanced local clock: %v [epoch=%d]", n.MockClock.Now(), epoch) | ||
n.t.SyncClient.MustPublish(ctx, ClockSyncTopic, &ClockSyncMsg{ | ||
ID: id, | ||
Epoch: epoch, | ||
}) | ||
|
||
case s := <-signalCh: | ||
// this is a clock event from another instance (or ourselves). | ||
ready = append(ready, s.ID) | ||
if len(ready) != totalCnt { | ||
continue | ||
} | ||
// release the new epoch and reset the state. | ||
globalEpochStartCh <- epoch | ||
ready = ready[0:0] | ||
|
||
case <-ctx.Done(): | ||
return | ||
|
||
case <-sub.Done(): | ||
panic("global clock synchronization subscription died") | ||
} | ||
} | ||
}() | ||
|
||
return localEpochAdvanceCh, globalEpochStartCh, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to sort this out. Synchronised mining doesn't need to reduce the block delay and propagation delay.