Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
test: Added more tests for ordersync (#890)
Browse files Browse the repository at this point in the history
* Added a test for orderfilter json encoding

* Added ordersync test for nodes with different filters

* Added ordersync test for receiving first request from old peer

* Fixed ordersync tests with distinct orderfilters

* Added another test case

* Cleaned up in anticipation of review

* Refactored orderfilter tests

* Addressed some review feedback from @albrow

* Apply suggestions from @albrow

Co-authored-by: Alex Browne <stephenalexbrowne@gmail.com>

* Fixed issues caused by suggestions

* Fixed naming

Co-authored-by: Alex Browne <stephenalexbrowne@gmail.com>
  • Loading branch information
jalextowle and albrow committed Jul 31, 2020
1 parent ff9e214 commit d7f70fc
Show file tree
Hide file tree
Showing 6 changed files with 477 additions and 138 deletions.
242 changes: 162 additions & 80 deletions core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package core
import (
"context"
"flag"
"fmt"
"math/big"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -92,11 +94,14 @@ func TestConfigChainIDAndRPCMatchDetection(t *testing.T) {
wg.Wait()
}

func newTestApp(t *testing.T) *App {
return newTestAppWithPrivateConfig(t, defaultPrivateConfig())
func newTestApp(t *testing.T, ctx context.Context) *App {
return newTestAppWithPrivateConfig(t, ctx, defaultOrderFilter, defaultPrivateConfig())
}

func newTestAppWithPrivateConfig(t *testing.T, pConfig privateConfig) *App {
func newTestAppWithPrivateConfig(t *testing.T, ctx context.Context, customOrderFilter string, pConfig privateConfig) *App {
if customOrderFilter == "" {
customOrderFilter = defaultOrderFilter
}
dataDir := "/tmp/test_node/" + uuid.New().String()
config := Config{
Verbosity: 2,
Expand All @@ -113,7 +118,7 @@ func newTestAppWithPrivateConfig(t *testing.T, pConfig privateConfig) *App {
EthereumRPCMaxRequestsPer24HrUTC: 99999999999999,
EthereumRPCMaxRequestsPerSecond: 99999999999999,
MaxOrdersInStorage: 100000,
CustomOrderFilter: "{}",
CustomOrderFilter: customOrderFilter,
}
app, err := newWithPrivateConfig(config, pConfig)
require.NoError(t, err)
Expand Down Expand Up @@ -177,100 +182,177 @@ func TestOrderSync(t *testing.T) {
t.Skip("Serial tests (tests which cannot run in parallel) are disabled. You can enable them with the --serial flag")
}

teardownSubTest := setupSubTest(t)
defer teardownSubTest(t)
testCases := []ordersyncTestCase{
{
name: "FilteredPaginationSubprotocol version 0",
pConfig: privateConfig{
paginationSubprotocolPerPage: 10,
},
},
{
name: "FilteredPaginationSubprotocol version 1",
pConfig: privateConfig{
paginationSubprotocolPerPage: 10,
},
},
{
name: "FilteredPaginationSubprotocol version 1 and version 0",
pConfig: privateConfig{
paginationSubprotocolPerPage: 10,
},
},
{
name: "makerAssetAmount orderfilter - match all orders",
customOrderFilter: `{"properties":{"makerAssetAmount":{"pattern":"^1$","type":"string"}}}`,
orderOptionsForIndex: func(_ int) []orderopts.Option {
return []orderopts.Option{orderopts.MakerAssetAmount(big.NewInt(1))}
},
pConfig: privateConfig{
paginationSubprotocolPerPage: 10,
},
},
{
name: "makerAssetAmount OrderFilter - matches one order",
customOrderFilter: `{"properties":{"makerAssetAmount":{"pattern":"^1$","type":"string"}}}`,
orderOptionsForIndex: func(i int) []orderopts.Option {
if i == 0 {
return []orderopts.Option{orderopts.MakerAssetAmount(big.NewInt(1))}
}
return []orderopts.Option{}
},
pConfig: privateConfig{
paginationSubprotocolPerPage: 10,
},
},
}
for i, testCase := range testCases {
testCaseName := fmt.Sprintf("%s (test case %d)", testCase.name, i)
t.Run(testCaseName, runOrdersyncTestCase(t, testCase))
}
}

// Set up two Mesh nodes. originalNode starts with some orders. newNode enters
// the network without any orders.
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
wg := &sync.WaitGroup{}
type ordersyncTestCase struct {
name string
customOrderFilter string
orderOptionsForIndex func(int) []orderopts.Option
pConfig privateConfig
}

perPage := 10
pConfig := privateConfig{
paginationSubprotocolPerPage: perPage,
}
originalNode := newTestAppWithPrivateConfig(t, pConfig)
wg.Add(1)
go func() {
defer wg.Done()
if err := originalNode.Start(ctx); err != nil && err != context.Canceled {
// context.Canceled is expected. For any other error, fail the test.
require.NoError(t, err)
}
}()
const defaultOrderFilter = "{}"

// Manually add some orders to originalNode.
orderOptions := scenario.OptionsForAll(orderopts.SetupMakerState(true))
originalOrders := scenario.NewSignedTestOrdersBatch(t, perPage*3+1, orderOptions)
func runOrdersyncTestCase(t *testing.T, testCase ordersyncTestCase) func(t *testing.T) {
return func(t *testing.T) {
teardownSubTest := setupSubTest(t)
defer teardownSubTest(t)

// Set up two Mesh nodes. originalNode starts with some orders. newNode enters
// the network without any orders.
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
wg := &sync.WaitGroup{}
originalNode := newTestAppWithPrivateConfig(t, ctx, defaultOrderFilter, testCase.pConfig)
wg.Add(1)
go func() {
defer wg.Done()
if err := originalNode.Start(ctx); err != nil && err != context.Canceled {
// context.Canceled is expected. For any other error, fail the test.
panic(fmt.Sprintf("%s %s", testCase.name, err))
}
}()

// We have to wait for latest block to be processed by the Mesh node.
time.Sleep(blockProcessingWaitTime)
// Manually add some orders to originalNode.
orderOptionsForIndex := func(i int) []orderopts.Option {
orderOptions := []orderopts.Option{orderopts.SetupMakerState(true)}
if testCase.orderOptionsForIndex != nil {
return append(testCase.orderOptionsForIndex(i), orderOptions...)
}
return orderOptions
}
numOrders := testCase.pConfig.paginationSubprotocolPerPage*3 + 1
originalOrders := scenario.NewSignedTestOrdersBatch(t, numOrders, orderOptionsForIndex)

results, err := originalNode.orderWatcher.ValidateAndStoreValidOrders(ctx, originalOrders, true, constants.TestChainID)
require.NoError(t, err)
require.Empty(t, results.Rejected, "tried to add orders but some were invalid: \n%s\n", spew.Sdump(results))
// We have to wait for latest block to be processed by the Mesh node.
time.Sleep(blockProcessingWaitTime)

newNode := newTestApp(t)
wg.Add(1)
go func() {
defer wg.Done()
if err := newNode.Start(ctx); err != nil && err != context.Canceled {
// context.Canceled is expected. For any other error, fail the test.
results, err := originalNode.orderWatcher.ValidateAndStoreValidOrders(ctx, originalOrders, true, constants.TestChainID)
require.NoError(t, err)
require.Empty(t, results.Rejected, "tried to add orders but some were invalid: \n%s\n", spew.Sdump(results))

newNode := newTestAppWithPrivateConfig(t, ctx, testCase.customOrderFilter, defaultPrivateConfig())
wg.Add(1)
go func() {
defer wg.Done()
if err := newNode.Start(ctx); err != nil && err != context.Canceled {
// context.Canceled is expected. For any other error, fail the test.
panic(fmt.Sprintf("%s %s", testCase.name, err))
}
}()
<-newNode.started

orderEventsChan := make(chan []*zeroex.OrderEvent)
orderEventsSub := newNode.SubscribeToOrderEvents(orderEventsChan)
defer orderEventsSub.Unsubscribe()

// Connect the two nodes *after* adding orders to one of them. This should
// trigger the ordersync protocol.
err = originalNode.AddPeer(peer.AddrInfo{
ID: newNode.node.ID(),
Addrs: newNode.node.Multiaddrs(),
})
require.NoError(t, err)

// Only the orders that satisfy the new node's orderfilter should
// be received during ordersync.
filteredOrders := []*zeroex.SignedOrder{}
for _, order := range originalOrders {
matches, err := newNode.orderFilter.MatchOrder(order)
require.NoError(t, err)
if matches {
filteredOrders = append(filteredOrders, order)
}
}
}()
<-newNode.started

orderEventsChan := make(chan []*zeroex.OrderEvent)
orderEventsSub := newNode.SubscribeToOrderEvents(orderEventsChan)
defer orderEventsSub.Unsubscribe()

// Connect the two nodes *after* adding orders to one of them. This should
// trigger the ordersync protocol.
err = originalNode.AddPeer(peer.AddrInfo{
ID: newNode.node.ID(),
Addrs: newNode.node.Multiaddrs(),
})
require.NoError(t, err)

// Wait for newNode to get the orders via ordersync.
receivedAddedEvents := []*zeroex.OrderEvent{}
OrderEventLoop:
for {
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for %d order added events (received %d so far)", len(originalOrders), len(receivedAddedEvents))
case orderEvents := <-orderEventsChan:
for _, orderEvent := range orderEvents {
if orderEvent.EndState == zeroex.ESOrderAdded {
receivedAddedEvents = append(receivedAddedEvents, orderEvent)
// Wait for newNode to get the orders via ordersync.
receivedAddedEvents := []*zeroex.OrderEvent{}
OrderEventLoop:
for {
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for %d order added events (received %d so far)", len(originalOrders), len(receivedAddedEvents))
case orderEvents := <-orderEventsChan:
for _, orderEvent := range orderEvents {
if orderEvent.EndState == zeroex.ESOrderAdded {
receivedAddedEvents = append(receivedAddedEvents, orderEvent)
}
}
if len(receivedAddedEvents) >= len(filteredOrders) {
break OrderEventLoop
}
}
if len(receivedAddedEvents) >= len(originalOrders) {
break OrderEventLoop
}
}
}

// Test that the orders are actually in the database and are returned by
// GetOrders.
newNodeOrdersResp, err := newNode.GetOrders(0, len(originalOrders), "")
require.NoError(t, err)
assert.Len(t, newNodeOrdersResp.OrdersInfos, len(originalOrders), "new node should have %d orders", len(originalOrders))
for _, expectedOrder := range originalOrders {
orderHash, err := expectedOrder.ComputeOrderHash()
// Test that the orders are actually in the database and are returned by
// GetOrders.
newNodeOrdersResp, err := newNode.GetOrders(0, len(filteredOrders), "")
require.NoError(t, err)
expectedOrder.ResetHash()
var dbOrder meshdb.Order
require.NoError(t, newNode.db.Orders.FindByID(orderHash.Bytes(), &dbOrder))
actualOrder := dbOrder.SignedOrder
assert.Equal(t, expectedOrder, actualOrder, "correct order was not stored in new node database")
}
assert.Len(t, newNodeOrdersResp.OrdersInfos, len(filteredOrders), "new node should have %d orders", len(originalOrders))
for _, expectedOrder := range filteredOrders {
orderHash, err := expectedOrder.ComputeOrderHash()
require.NoError(t, err)
expectedOrder.ResetHash()
var dbOrder meshdb.Order
require.NoError(t, newNode.db.Orders.FindByID(orderHash.Bytes(), &dbOrder))
actualOrder := dbOrder.SignedOrder
assert.Equal(t, expectedOrder, actualOrder, "correct order was not stored in new node database")
}

// Wait for nodes to exit without error.
cancel()
wg.Wait()
// Wait for nodes to exit without error.
cancel()
wg.Wait()
}
}

func setupSubTest(t *testing.T) func(t *testing.T) {
Expand Down
7 changes: 0 additions & 7 deletions core/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,6 @@ import (
// Ensure that App implements p2p.MessageHandler.
var _ p2p.MessageHandler = &App{}

func min(a int, b int) int {
if a < b {
return a
}
return b
}

func (app *App) HandleMessages(ctx context.Context, messages []*p2p.Message) error {
// First we validate the messages and decode them into orders.
orders := []*zeroex.SignedOrder{}
Expand Down
Loading

0 comments on commit d7f70fc

Please sign in to comment.