Skip to content

Commit

Permalink
refactor: use IPLD DAG instead whole CoreAPI
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed May 26, 2021
1 parent 66428c8 commit 2b648fe
Show file tree
Hide file tree
Showing 14 changed files with 74 additions and 279 deletions.
4 changes: 2 additions & 2 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -77,7 +78,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {

// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, mdutils.Mock(), evpool)
cs.SetLogger(cs.Logger)
// set private validator
pv := privVals[i]
Expand All @@ -91,7 +92,6 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {

cs.SetTimeoutTicker(tickerFunc())
cs.SetLogger(logger)
cs.SetIPFSApi(ipfsTestAPI)
css[i] = cs
}

Expand Down
25 changes: 2 additions & 23 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path"
Expand All @@ -15,7 +14,7 @@ import (
"time"

"github.com/go-kit/kit/log/term"
iface "github.com/ipfs/interface-go-ipfs-core"
mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/require"

abcicli "github.com/lazyledger/lazyledger-core/abci/client"
Expand All @@ -24,7 +23,6 @@ import (
abci "github.com/lazyledger/lazyledger-core/abci/types"
cfg "github.com/lazyledger/lazyledger-core/config"
cstypes "github.com/lazyledger/lazyledger-core/consensus/types"
"github.com/lazyledger/lazyledger-core/ipfs"
tmbytes "github.com/lazyledger/lazyledger-core/libs/bytes"
dbm "github.com/lazyledger/lazyledger-core/libs/db"
"github.com/lazyledger/lazyledger-core/libs/db/memdb"
Expand Down Expand Up @@ -55,9 +53,6 @@ var (
config *cfg.Config // NOTE: must be reset for each _test.go file
consensusReplayConfig *cfg.Config
ensureTimeout = 2 * time.Second

ipfsTestAPI iface.CoreAPI
ipfsCloser io.Closer
)

func ensureDir(dir string, mode os.FileMode) {
Expand All @@ -66,19 +61,6 @@ func ensureDir(dir string, mode os.FileMode) {
}
}

func setTestIpfsAPI() (err error) {
mockIPFSProvider := ipfs.Mock()
if ipfsTestAPI, ipfsCloser, err = mockIPFSProvider(); err != nil {
return
}
return
}

func teardownTestIpfsAPI() (err error) {
err = ipfsCloser.Close()
return
}

func ResetConfig(name string) *cfg.Config {
return cfg.ResetTestRoot(name)
}
Expand Down Expand Up @@ -417,7 +399,7 @@ func newStateWithConfigAndBlockStore(
}

blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, mdutils.Mock(), evpool)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)

Expand Down Expand Up @@ -450,7 +432,6 @@ func randState(nValidators int) (*State, []*validatorStub) {
vss := make([]*validatorStub, nValidators)

cs := newState(state, privVals[0], counter.NewApplication(true))
cs.SetIPFSApi(ipfsTestAPI)

for i := 0; i < nValidators; i++ {
vss[i] = newValidatorStub(privVals[i], int32(i))
Expand Down Expand Up @@ -726,7 +707,6 @@ func randConsensusNet(
css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB)
css[i].SetTimeoutTicker(tickerFunc())
css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
css[i].SetIPFSApi(ipfsTestAPI)
}
return css, func() {
for _, dir := range configRootDirs {
Expand Down Expand Up @@ -790,7 +770,6 @@ func randConsensusNetWithPeers(
css[i] = newStateWithConfig(thisConfig, state, privVal, app)
css[i].SetTimeoutTicker(tickerFunc())
css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
css[i].SetIPFSApi(ipfsTestAPI)
}
return css, genDoc, peer0Config, func() {
for _, dir := range configRootDirs {
Expand Down
5 changes: 0 additions & 5 deletions consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) {
config.Consensus.CreateEmptyBlocks = false
state, privVals := randGenesisState(1, false, 10)
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication())
cs.SetIPFSApi(ipfsTestAPI)
assertMempool(cs.txNotifier).EnableTxsAvailable()
height, round := cs.Height, cs.Round
newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock)
Expand All @@ -51,7 +50,6 @@ func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) {
config.Consensus.CreateEmptyBlocksInterval = ensureTimeout
state, privVals := randGenesisState(1, false, 10)
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication())
cs.SetIPFSApi(ipfsTestAPI)

assertMempool(cs.txNotifier).EnableTxsAvailable()

Expand All @@ -70,7 +68,6 @@ func TestMempoolProgressInHigherRound(t *testing.T) {
config.Consensus.CreateEmptyBlocks = false
state, privVals := randGenesisState(1, false, 10)
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication())
cs.SetIPFSApi(ipfsTestAPI)

assertMempool(cs.txNotifier).EnableTxsAvailable()
height, round := cs.Height, cs.Round
Expand Down Expand Up @@ -121,7 +118,6 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) {
stateStore := sm.NewStore(blockDB)

cs := newStateWithConfigAndBlockStore(config, state, privVals[0], NewCounterApplication(), blockDB)
cs.SetIPFSApi(ipfsTestAPI)
err := stateStore.Save(state)
require.NoError(t, err)
newBlockHeaderCh := subscribe(cs.eventBus, types.EventQueryNewBlockHeader)
Expand All @@ -148,7 +144,6 @@ func TestMempoolRmBadTx(t *testing.T) {

stateStore := sm.NewStore(blockDB)
cs := newStateWithConfigAndBlockStore(config, state, privVals[0], app, blockDB)
cs.SetIPFSApi(ipfsTestAPI)
err := stateStore.Save(state)
require.NoError(t, err)

Expand Down
4 changes: 2 additions & 2 deletions consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"
"time"

mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -182,10 +183,9 @@ func TestReactorWithEvidence(t *testing.T) {

// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, mdutils.Mock(), evpool2)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)
cs.SetIPFSApi(ipfsTestAPI)

eventBus := types.NewEventBus()
eventBus.SetLogger(log.TestingLogger().With("module", "events"))
Expand Down
6 changes: 4 additions & 2 deletions consensus/replay_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strconv"
"strings"

mdutils "github.com/ipfs/go-merkledag/test"

cfg "github.com/lazyledger/lazyledger-core/config"
"github.com/lazyledger/lazyledger-core/libs/db/badgerdb"
"github.com/lazyledger/lazyledger-core/libs/log"
Expand Down Expand Up @@ -129,7 +131,7 @@ func (pb *playback) replayReset(count int, newStepSub types.Subscription) error
pb.cs.Wait()

newCS := NewState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec,
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool)
pb.cs.blockStore, pb.cs.txNotifier, mdutils.Mock(), pb.cs.evpool)
newCS.SetEventBus(pb.cs.eventBus)
newCS.startForReplay()

Expand Down Expand Up @@ -329,7 +331,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)

consensusState := NewState(csConfig, state.Copy(), blockExec,
blockStore, mempool, evpool)
blockStore, mempool, mdutils.Mock(), evpool)

consensusState.SetEventBus(eventBus)
return consensusState
Expand Down
7 changes: 2 additions & 5 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/gogo/protobuf/proto"
mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -37,7 +38,6 @@ import (

func TestMain(m *testing.M) {
config = ResetConfig("consensus_reactor_test")
_ = setTestIpfsAPI()
consensusReplayConfig = ResetConfig("consensus_replay_test")
configStateTest := ResetConfig("consensus_state_test")
configMempoolTest := ResetConfig("consensus_mempool_test")
Expand All @@ -48,7 +48,6 @@ func TestMain(m *testing.M) {
os.RemoveAll(configStateTest.RootDir)
os.RemoveAll(configMempoolTest.RootDir)
os.RemoveAll(configByzantineTest.RootDir)
_ = teardownTestIpfsAPI()
os.Exit(code)
}

Expand Down Expand Up @@ -81,7 +80,6 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Confi
blockDB,
)
cs.SetLogger(logger)
cs.SetIPFSApi(ipfsTestAPI)

bytes, _ := ioutil.ReadFile(cs.config.WalFile())
t.Logf("====== WAL: \n\r%X\n", bytes)
Expand Down Expand Up @@ -133,12 +131,11 @@ func TestWALCrash(t *testing.T) {
}{
{"empty block",
func(stateDB dbm.DB, cs *State, ctx context.Context) {
cs.SetIPFSApi(ipfsTestAPI)
cs.dag = mdutils.Mock()
},
1},
{"many non-empty blocks",
func(stateDB dbm.DB, cs *State, ctx context.Context) {
cs.SetIPFSApi(ipfsTestAPI)
go sendTxs(ctx, cs)
},
3},
Expand Down
15 changes: 5 additions & 10 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"time"

"github.com/gogo/protobuf/proto"
ipface "github.com/ipfs/interface-go-ipfs-core"
format "github.com/ipfs/go-ipld-format"
cfg "github.com/lazyledger/lazyledger-core/config"
cstypes "github.com/lazyledger/lazyledger-core/consensus/types"
"github.com/lazyledger/lazyledger-core/crypto"
Expand Down Expand Up @@ -94,7 +94,7 @@ type State struct {
// store blocks and commits
blockStore sm.BlockStore

ipfs ipface.CoreAPI
dag format.DAGService

// create and execute blocks
blockExec *sm.BlockExecutor
Expand Down Expand Up @@ -163,13 +163,15 @@ func NewState(
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
txNotifier txNotifier,
dag format.DAGService,
evpool evidencePool,
options ...StateOption,
) *State {
cs := &State{
config: config,
blockExec: blockExec,
blockStore: blockStore,
dag: dag,
txNotifier: txNotifier,
peerMsgQueue: make(chan msgInfo, msgQueueSize),
internalMsgQueue: make(chan msgInfo, msgQueueSize),
Expand Down Expand Up @@ -207,13 +209,6 @@ func NewState(
//----------------------------------------
// Public interface

// SetIPFSApi sets the IPFSAPI
func (cs *State) SetIPFSApi(api ipface.CoreAPI) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.ipfs = api
}

// SetLogger implements Service.
func (cs *State) SetLogger(l log.Logger) {
cs.BaseService.Logger = l
Expand Down Expand Up @@ -1126,7 +1121,7 @@ func (cs *State) defaultDecideProposal(height int64, round int32) {
defer cancel()
cs.Logger.Info("Putting Block to ipfs", "height", block.Height)
// TODO: post data to IPFS in a goroutine
err = ipld.PutBlock(ctx, cs.ipfs.Dag(), block)
err = ipld.PutBlock(ctx, cs.dag, block)
if err != nil {
// If PutBlock fails we will be the only node that has the data
// this means something is seriously wrong and we can not recover
Expand Down
3 changes: 0 additions & 3 deletions consensus/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,6 @@ func TestStateLockPOLRelock(t *testing.T) {

// before we timeout to the new round set the new proposal
cs2 := newState(cs1.state, vs2, counter.NewApplication(true))
cs2.SetIPFSApi(ipfsTestAPI)
prop, propBlock := decideProposal(cs2, vs2, vs2.Height, vs2.Round+1)
if prop == nil || propBlock == nil {
t.Fatal("Failed to create proposal block with vs2")
Expand Down Expand Up @@ -827,7 +826,6 @@ func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) {

// before we timeout to the new round set the new proposal
cs2 := newState(cs1.state, vs2, counter.NewApplication(true))
cs2.SetIPFSApi(ipfsTestAPI)
prop, propBlock := decideProposal(cs2, vs2, vs2.Height, vs2.Round+1)
if prop == nil || propBlock == nil {
t.Fatal("Failed to create proposal block with vs2")
Expand Down Expand Up @@ -872,7 +870,6 @@ func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) {

// before we timeout to the new round set the new proposal
cs3 := newState(cs1.state, vs3, counter.NewApplication(true))
cs3.SetIPFSApi(ipfsTestAPI)
prop, propBlock = decideProposal(cs3, vs3, vs3.Height, vs3.Round+1)
if prop == nil || propBlock == nil {
t.Fatal("Failed to create proposal block with vs2")
Expand Down
4 changes: 2 additions & 2 deletions consensus/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"testing"
"time"

mdutils "github.com/ipfs/go-merkledag/test"
"github.com/lazyledger/lazyledger-core/abci/example/kvstore"
cfg "github.com/lazyledger/lazyledger-core/config"
"github.com/lazyledger/lazyledger-core/libs/db/memdb"
Expand Down Expand Up @@ -338,8 +339,7 @@ func walGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
evpool := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
require.NoError(t, err)
consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState.SetIPFSApi(ipfsTestAPI)
consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, mdutils.Mock(), evpool)
consensusState.SetLogger(logger)
consensusState.SetEventBus(eventBus)
if privValidator != nil && privValidator != (*privval.FilePV)(nil) {
Expand Down
7 changes: 4 additions & 3 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strings"
"time"

format "github.com/ipfs/go-ipld-format"
ipface "github.com/ipfs/interface-go-ipfs-core"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -395,7 +396,7 @@ func createConsensusReactor(config *cfg.Config,
csMetrics *cs.Metrics,
waitSync bool,
eventBus *types.EventBus,
ipfs ipface.CoreAPI,
dag format.DAGService,
consensusLogger log.Logger) (*cs.Reactor, *cs.State) {

consensusState := cs.NewState(
Expand All @@ -404,10 +405,10 @@ func createConsensusReactor(config *cfg.Config,
blockExec,
blockStore,
mempool,
dag,
evidencePool,
cs.StateMetrics(csMetrics),
)
consensusState.SetIPFSApi(ipfs)
consensusState.SetLogger(consensusLogger)
if privValidator != nil {
consensusState.SetPrivValidator(privValidator)
Expand Down Expand Up @@ -757,7 +758,7 @@ func NewNode(config *cfg.Config,
}
consensusReactor, consensusState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evidencePool,
privValidator, csMetrics, stateSync || fastSync, eventBus, ipfs, consensusLogger,
privValidator, csMetrics, stateSync || fastSync, eventBus, ipfs.Dag(), consensusLogger,
)

// Set up state sync reactor, and schedule a sync if requested.
Expand Down
Loading

0 comments on commit 2b648fe

Please sign in to comment.