Skip to content
This repository has been archived by the owner on Nov 26, 2020. It is now read-only.

proper synchronized mining #143

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions lotus-soup/_compositions/sync-mining.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
[metadata]
name = "lotus-soup"
author = "raulk"

[global]
plan = "lotus-soup"
case = "sync-mining"
total_instances = 4
builder = "exec:go"
runner = "local:exec"

build = { selectors = ["testground"] }
build_config = { enable_go_build_cache = true }
run_config = { exposed_ports = ["6060", "1234", "2345"] }

[global.run.test_params]
clients = "1"
miners = "2"
genesis_timestamp_offset = "0"
balance = "2000000000"
sectors = "10"
random_beacon_type = "mock"
mining_mode = "synchronized"

[[groups]]
id = "bootstrapper"
instances = { count = 1 }
[groups.run.test_params]
role = "bootstrapper"

[[groups]]
id = "miners"
instances = { count = 2 }
[groups.run.test_params]
role = "miner"

[[groups]]
id = "clients"
instances = { count = 1 }
[groups.run.test_params]
role = "client"
16 changes: 11 additions & 5 deletions lotus-soup/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ go 1.14
require (
contrib.go.opencensus.io/exporter/prometheus v0.1.0
github.com/davecgh/go-spew v1.1.1
github.com/drand/drand v0.9.2-0.20200616080806-a94e9c1636a4
github.com/drand/drand v1.0.3-0.20200714175734-29705eaf09d4
github.com/filecoin-project/go-address v0.0.2-0.20200504173055-8b6f2fb2b3ef
github.com/filecoin-project/go-fil-markets v0.4.0
github.com/filecoin-project/go-fil-markets v0.4.1-0.20200715201050-c141144ea312
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
github.com/filecoin-project/lotus v0.4.2-0.20200708212122-7d841dbfa81c
github.com/filecoin-project/specs-actors v0.7.1
github.com/filecoin-project/lotus v0.4.2-0.20200716093446-6092854ecee2
github.com/filecoin-project/specs-actors v0.7.2
github.com/gorilla/mux v1.7.4
github.com/hashicorp/go-multierror v1.1.0
github.com/influxdata/influxdb v1.8.0 // indirect
github.com/ipfs/go-cid v0.0.6
github.com/ipfs/go-datastore v0.4.4
Expand All @@ -23,13 +24,18 @@ require (
github.com/ipfs/go-unixfs v0.2.4
github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae
github.com/kpacha/opencensus-influxdb v0.0.0-20181102202715-663e2683a27c
github.com/lib/pq v1.7.0 // indirect
github.com/libp2p/go-libp2p v0.10.0
github.com/libp2p/go-libp2p-core v0.6.0
github.com/libp2p/go-libp2p-pubsub-tracer v0.0.0-20200626141350-e730b32bf1e6
github.com/multiformats/go-multiaddr v0.2.2
github.com/multiformats/go-multiaddr-net v0.1.5
github.com/testground/sdk-go v0.2.3-0.20200706132230-6a65ddac2d8c
github.com/prometheus/common v0.10.0
github.com/raulk/clock v1.1.0
github.com/testground/sdk-go v0.2.3-0.20200716112730-420be929ef43
github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542 // indirect
go.opencensus.io v0.22.4
go.uber.org/fx v1.9.0
)

// This will work in all build modes: docker:go, exec:go, and local go build.
Expand Down
65 changes: 61 additions & 4 deletions lotus-soup/go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions lotus-soup/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
)

func init() {
build.BlockDelaySecs = 2
build.PropagationDelaySecs = 1
// build.BlockDelaySecs = 2
// build.PropagationDelaySecs = 1
Comment on lines +18 to +19
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to sort this out. Synchronised mining doesn't need to reduce the block delay and propagation delay.


_ = log.SetLogLevel("*", "WARN")
_ = log.SetLogLevel("dht/RtRefreshManager", "ERROR") // noisy
Expand Down
1 change: 1 addition & 0 deletions lotus-soup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ var cases = map[string]interface{}{
"deals-stress": testkit.WrapTestEnvironment(dealsStress),
"drand-halting": testkit.WrapTestEnvironment(dealsE2E),
"paych-stress": testkit.WrapTestEnvironment(paych.Stress),
"sync-mining": testkit.WrapTestEnvironment(syncMining),
}

func main() {
Expand Down
29 changes: 29 additions & 0 deletions lotus-soup/manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,32 @@ instances = { min = 1, max = 100, default = 5 }
# ********** Test-case specific **********
increments = { type = "int", default = "100", desc = "increments in which to send payment vouchers" }
lane_count = { type = "int", default = "256", desc = "lanes to open; vouchers will be distributed across these lanes in round-robin fashion" }


[[testcases]]
name = "sync-mining"
instances = { min = 1, max = 100, default = 5 }

[testcases.params]
clients = { type = "int", default = 1 }
miners = { type = "int", default = 1 }
balance = { type = "float", default = 1 }
sectors = { type = "int", default = 1 }
role = { type = "string" }
genesis_timestamp_offset = { type = "int", default = 0 }

random_beacon_type = { type = "enum", default = "local-drand", options = ["mock", "local-drand", "external-drand"] }

# Params relevant to drand nodes. drand nodes should have role="drand", and must all be
# in the same composition group. There must be at least threshold drand nodes.
# To get lotus nodes to actually use the drand nodes, you must set random_beacon_type="local-drand"
# for the lotus node groups.
drand_period = { type = "duration", default="10s" }
drand_threshold = { type = "int", default = 2 }
drand_gossip_relay = { type = "bool", default = true }
drand_log_level = { type = "string", default="info" }
suspend_events = { type = "string", default="", desc = "a sequence of halt/resume/wait events separated by '->'" }

# Params relevant to pubsub tracing
enable_pubsub_tracer = { type = "bool", default = false } # Mining Mode: synchronized -vs- natural time
mining_mode = { type = "enum", default = "synchronized", options = ["synchronized", "natural"] }
9 changes: 9 additions & 0 deletions lotus-soup/sync_mining.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package main

import (
"github.com/filecoin-project/oni/lotus-soup/testkit"
)

func syncMining(t *testkit.TestEnvironment) error {
return testkit.HandleDefaultRole(t)
}
16 changes: 16 additions & 0 deletions lotus-soup/testkit/lotus_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package testkit
import (
"fmt"

"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/lp2p"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/specs-actors/actors/runtime"
"github.com/prometheus/common/log"
"go.uber.org/fx"

"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -65,3 +69,15 @@ func withApiEndpoint(addr string) node.Option {
return lr.SetAPIEndpoint(apima)
})
}

// withChainStore takes a double pointer to a ChainStore, creates a new
// ChainStore, and sets it under the pointer.
func withChainStore(cs **store.ChainStore) node.Option {
return node.Override(new(*store.ChainStore), func(_ fx.Lifecycle, bs dtypes.ChainBlockstore, ds dtypes.MetadataDS, syscalls runtime.Syscalls) *store.ChainStore {
*cs = store.NewChainStore(bs, ds, syscalls)
if err := (*cs).Load(); err != nil {
log.Warnf("loading chain state from disk: %s", err)
}
return *cs
})
}
26 changes: 21 additions & 5 deletions lotus-soup/testkit/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ import (
"sort"
"time"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/beacon"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/modules/dtypes"
modtest "github.com/filecoin-project/lotus/node/modules/testing"
tstats "github.com/filecoin-project/lotus/tools/stats"
"github.com/raulk/clock"

"github.com/kpacha/opencensus-influxdb"
ma "github.com/multiformats/go-multiaddr"
Expand All @@ -26,12 +29,24 @@ import (

var PrepareNodeTimeout = time.Minute

const (
AddressWallet = iota
AddressMinerActor
AddressMinerID
AddressMinerWorker
)

type LotusNode struct {
FullApi api.FullNode
MinerApi api.StorageMiner
StopFn node.StopFunc
Wallet *wallet.Key
MineOne func(context.Context, func(bool, error)) error
t *TestEnvironment

FullApi api.FullNode
MinerApi api.StorageMiner
Addresses [4]address.Address // indexed by Address* constants.
StopFn node.StopFunc
ChainStore *store.ChainStore
Wallet *wallet.Key
MockClock *clock.Mock
MineOne func(context.Context, func(bool, error)) error
}

func (n *LotusNode) setWallet(ctx context.Context, walletKey *wallet.Key) error {
Expand All @@ -46,6 +61,7 @@ func (n *LotusNode) setWallet(ctx context.Context, walletKey *wallet.Key) error
}

n.Wallet = walletKey
n.Addresses[AddressWallet] = walletKey.Address

return nil
}
Expand Down
120 changes: 120 additions & 0 deletions lotus-soup/testkit/node_time.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package testkit

import (
"context"
"fmt"
"time"

"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/raulk/clock"
"github.com/testground/sdk-go/sync"
)

// ClockSyncTopic is the topic where clock synchronization events are exchanged.
var ClockSyncTopic = sync.NewTopic("clock_sync", &ClockSyncMsg{})

// ClockSyncMsg is a clock synchronization message.
type ClockSyncMsg struct {
ID string // ${groupname}_${groupseq}
Epoch abi.ChainEpoch
}

// SynchronizeClock synchronizes this instance's clock with the clocks of all
// miner and client nodes participating in this test scenario.
//
// This function returns two channels.
//
// The first channel (localAdvance) is where the user requests local advancement
// to the next epoch. Such requests likely originate from some local event that
// implies readiness to advance, such as "mining round completed" events from
// synchronized mining. Upon receiving a local advance signal, the control
// goroutine forwards the local clock by build.BlockDelaySecs, thus progressing
// one chain epoch. It then signals this fact to all other nodes participating
// in the test via the Testground synchronization service.
//
// The second channel is where the global clock synchronizer signals every time
// the entirety of test participants have progressed to the next epoch, thus
// indicating that all nodes are eady to commence that chain epoch of
// computation.
//
// Essentially this behaviour can be assimilated to a distributed sempaphore.
//
// This method will return immediately, and will spawn a background goroutine
// that performs the clock synchronisation work.
func (n *LotusNode) SynchronizeClock(ctx context.Context) (localAdvance chan<- abi.ChainEpoch, globalEpoch <-chan abi.ChainEpoch, err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

global vs local: i have trouble grappling with the topology implied by this locality

the user requests local advancement: it doesn't seem to me the user (miner) requests advancement, it just says
i've synced to this epoch, yes, you can interpret it as "i'm done let's move one", but the local/global implies to me some equality that is differentiated between how we order the group, but here we have miners and a central time/chain control, the miners don't have any leverage on the control (in more advance use cases even if the miner says i'm done in this round we may still not advance until some other event happens), i don't see the relation that could classify them into local/global with a centralized control system

Copy link
Member Author

@raulk raulk Jul 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SynchronizeClock is another actor (actor-oriented programming). It encapsulates all the control of the clock. No other process in the system (not even the SynchronizeMiner actor) is entitled to change the clock (although we don't enforce that prohibition).

The idea behind this is that when the local node, when enlisted predicates are satisfied, decides that it's ready to advance the clock, it would send a signal to localAdvance.

Right now we ACTUALLY advance the local clock and advertise that fact to the world. In the future, we could use some form of 3PC (three phase commit), where we PREPARE [phase 1] the advancement, nodes opt in or opt out of that round via the sync service, and once all nodes have PREPARED, we actually COMMIT [phase 2] the local clock advancement by manipulating the mock clock, and CERTIFY [phase 3] that we are at the new epoch.

Once all nodes have passed the CERTIFY phase, that's when we release the signal on globalEpoch to trigger mining and/or other downstream processes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think I was having the 3PC model you're describing in my mind. I'm fine if this is not implemented right now.

n.MockClock = clock.NewMock()
build.Clock = n.MockClock

gen, err := n.FullApi.ChainGetGenesis(context.Background())
if err != nil {
return nil, nil, fmt.Errorf("failed to get genesis: %w", err)
}

// start at genesis.
genesisTime := time.Unix(int64(gen.MinTimestamp()), 0)
n.MockClock.Set(genesisTime)

var (
localEpochAdvanceCh = make(chan abi.ChainEpoch, 128)
globalEpochStartCh = make(chan abi.ChainEpoch, 128)
)

// jumpstart the clock!
localEpochAdvanceCh <- abi.ChainEpoch(1)

var (
id = fmt.Sprintf("%s_%d", n.t.TestGroupID, n.t.GroupSeq)

minerCnt = n.t.IntParam("miners")
clientCnt = n.t.IntParam("clients")
totalCnt = minerCnt + clientCnt

epoch = abi.ChainEpoch(0) // current epoch: genesis
epochInterval = time.Duration(build.BlockDelaySecs) * time.Second

// ready tracks which nodes have advanced to the next round.
// when all nodes have advanced, we tick the global clock and reset
// this slice.
ready []string
raulk marked this conversation as resolved.
Show resolved Hide resolved

signalCh = make(chan *ClockSyncMsg, 128)
sub = n.t.SyncClient.MustSubscribe(ctx, ClockSyncTopic, signalCh)
)

go func() {
defer close(globalEpochStartCh)

for {
select {
case <-localEpochAdvanceCh:
// move the local clock forward, and announce that fact to all other instances.
n.MockClock.Add(epochInterval)
schomatis marked this conversation as resolved.
Show resolved Hide resolved
epoch++
n.t.RecordMessage("advanced local clock: %v [epoch=%d]", n.MockClock.Now(), epoch)
n.t.SyncClient.MustPublish(ctx, ClockSyncTopic, &ClockSyncMsg{
ID: id,
Epoch: epoch,
})

case s := <-signalCh:
// this is a clock event from another instance (or ourselves).
ready = append(ready, s.ID)
if len(ready) != totalCnt {
continue
}
// release the new epoch and reset the state.
globalEpochStartCh <- epoch
ready = ready[0:0]

case <-ctx.Done():
return

case <-sub.Done():
panic("global clock synchronization subscription died")
}
}
}()

return localEpochAdvanceCh, globalEpochStartCh, nil
}
2 changes: 1 addition & 1 deletion lotus-soup/testkit/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

func RetrieveData(t *TestEnvironment, ctx context.Context, client api.FullNode, fcid cid.Cid, carExport bool, data []byte) {
t1 := time.Now()
offers, err := client.ClientFindData(ctx, fcid)
offers, err := client.ClientFindData(ctx, fcid, nil)
if err != nil {
panic(err)
}
Expand Down
Loading