Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Fixed bug in ordersync that caused nodes to use the wrong orderfilter #882

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ func (app *App) Start() error {
for _, subprotocolFactory := range app.privateConfig.paginationSubprotocols {
ordersyncSubprotocols = append(ordersyncSubprotocols, subprotocolFactory(app, app.privateConfig.paginationSubprotocolPerPage))
}
app.ordersyncService = ordersync.New(innerCtx, app.node, ordersyncSubprotocols)
app.ordersyncService = ordersync.New(innerCtx, app.orderFilter, app.node, ordersyncSubprotocols)
orderSyncErrChan := make(chan error, 1)
wg.Add(1)
go func() {
Expand Down
38 changes: 33 additions & 5 deletions core/ordersync/ordersync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"sync"
"time"

"github.com/0xProject/0x-mesh/orderfilter"
"github.com/0xProject/0x-mesh/p2p"
"github.com/0xProject/0x-mesh/zeroex"
"github.com/albrow/stringset"
"github.com/ethereum/go-ethereum/common"
"github.com/jpillora/backoff"
network "github.com/libp2p/go-libp2p-core/network"
protocol "github.com/libp2p/go-libp2p-core/protocol"
Expand Down Expand Up @@ -112,6 +114,7 @@ type rawResponse struct {
// responding to and sending ordersync requests.
type Service struct {
ctx context.Context
orderFilter *orderfilter.Filter
node *p2p.Node
subprotocols map[string]Subprotocol
// requestRateLimiter is a rate limiter for incoming ordersync requests. It's
Expand Down Expand Up @@ -159,13 +162,14 @@ type Subprotocol interface {
// them. New expects an array of subprotocols which the service will support, in the
// order of preference. The service will automatically pick the most preferred protocol
// that is supported by both peers for each request/response.
func New(ctx context.Context, node *p2p.Node, subprotocols []Subprotocol) *Service {
func New(ctx context.Context, orderfilter *orderfilter.Filter, node *p2p.Node, subprotocols []Subprotocol) *Service {
jalextowle marked this conversation as resolved.
Show resolved Hide resolved
supportedSubprotocols := map[string]Subprotocol{}
for _, subp := range subprotocols {
supportedSubprotocols[subp.Name()] = subp
}
s := &Service{
ctx: ctx,
orderFilter: orderfilter,
node: node,
subprotocols: supportedSubprotocols,
requestRateLimiter: rate.NewLimiter(maxRequestsPerSecond, requestsBurst),
Expand Down Expand Up @@ -458,6 +462,31 @@ func parseResponseWithSubprotocol(subprotocol Subprotocol, providerID peer.ID, r
}, nil
}

type requestMetadataForAllSubprotocols struct {
OrderFilter *orderfilter.Filter `json:"orderfilter"`
Page int `json:"page"`
SnapshotID string `json:"snapshotID"`
MinOrderHash common.Hash `json:"minOrderHash"`
}

func (s *Service) createFirstRequestForAllSubprotocols() (*rawRequest, error) {
jalextowle marked this conversation as resolved.
Show resolved Hide resolved
metadata := requestMetadataForAllSubprotocols{
OrderFilter: s.orderFilter,
Page: 0,
SnapshotID: "",
MinOrderHash: common.Hash{},
}
encodedMetadata, err := json.Marshal(metadata)
if err != nil {
return nil, err
}
return &rawRequest{
Type: TypeRequest,
Subprotocols: s.SupportedSubprotocols(),
Metadata: encodedMetadata,
}, nil
}

func (s *Service) getOrdersFromPeer(ctx context.Context, providerID peer.ID, firstRequest *rawRequest) (*rawRequest, error) {
stream, err := s.node.NewStream(ctx, providerID, ID)
if err != nil {
Expand All @@ -473,10 +502,9 @@ func (s *Service) getOrdersFromPeer(ctx context.Context, providerID peer.ID, fir
if firstRequest != nil {
nextReq = firstRequest
} else {
nextReq = &rawRequest{
Type: TypeRequest,
Subprotocols: s.SupportedSubprotocols(),
Metadata: nil,
nextReq, err = s.createFirstRequestForAllSubprotocols()
if err != nil {
return nil, err
}
}
var numValidOrders int
Expand Down
42 changes: 29 additions & 13 deletions core/ordersync_subprotocols.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ func NewFilteredPaginationSubprotocolV0(app *App, perPage int) ordersync.Subprot
// FilteredPaginationSubProtocolV0. It keeps track of the current page and SnapshotID,
// which is expected to be an empty string on the first request.
type FilteredPaginationRequestMetadataV0 struct {
Page int `json:"page"`
SnapshotID string `json:"snapshotID"`
OrderFilter *orderfilter.Filter `json:"orderfilter"`
Page int `json:"page"`
SnapshotID string `json:"snapshotID"`
}

// FilteredPaginationResponseMetadataV0 is the response metadata for the
Expand Down Expand Up @@ -114,10 +115,16 @@ func (p *FilteredPaginationSubProtocolV0) HandleOrderSyncRequest(ctx context.Con
}
nextMinOrderHash = ordersResp.OrdersInfos[len(ordersResp.OrdersInfos)-1].OrderHash
// Filter the orders for this page.
for _, orderInfo := range ordersResp.OrdersInfos {
if matches, err := p.orderFilter.MatchOrder(orderInfo.SignedOrder); err != nil {
return nil, err
} else if matches {
if metadata.OrderFilter != nil {
for _, orderInfo := range ordersResp.OrdersInfos {
if matches, err := metadata.OrderFilter.MatchOrder(orderInfo.SignedOrder); err != nil {
return nil, err
} else if matches {
filteredOrders = append(filteredOrders, orderInfo.SignedOrder)
}
}
} else {
for _, orderInfo := range ordersResp.OrdersInfos {
filteredOrders = append(filteredOrders, orderInfo.SignedOrder)
}
}
Expand Down Expand Up @@ -185,8 +192,9 @@ func (p *FilteredPaginationSubProtocolV0) HandleOrderSyncResponse(ctx context.Co

return &ordersync.Request{
Metadata: &FilteredPaginationRequestMetadataV0{
Page: metadata.Page + 1,
SnapshotID: metadata.SnapshotID,
OrderFilter: p.app.orderFilter,
Page: metadata.Page + 1,
SnapshotID: metadata.SnapshotID,
},
}, len(filteredOrders), nil
}
Expand Down Expand Up @@ -235,7 +243,8 @@ func NewFilteredPaginationSubprotocolV1(app *App, perPage int) ordersync.Subprot
// FilteredPaginationSubProtocolV1. It keeps track of the current
// minOrderHash, which is expected to be an empty string on the first request.
type FilteredPaginationRequestMetadataV1 struct {
MinOrderHash common.Hash `json:"minOrderHash"`
OrderFilter *orderfilter.Filter `json:"orderfilter"`
MinOrderHash common.Hash `json:"minOrderHash"`
}

// FilteredPaginationResponseMetadataV1 is the response metadata for the
Expand Down Expand Up @@ -291,10 +300,16 @@ func (p *FilteredPaginationSubProtocolV1) HandleOrderSyncRequest(ctx context.Con
}
nextMinOrderHash = ordersResp.OrdersInfos[len(ordersResp.OrdersInfos)-1].OrderHash
// Filter the orders for this page.
for _, orderInfo := range ordersResp.OrdersInfos {
if matches, err := p.orderFilter.MatchOrder(orderInfo.SignedOrder); err != nil {
return nil, err
} else if matches {
if metadata.OrderFilter != nil {
for _, orderInfo := range ordersResp.OrdersInfos {
if matches, err := p.orderFilter.MatchOrder(orderInfo.SignedOrder); err != nil {
return nil, err
} else if matches {
filteredOrders = append(filteredOrders, orderInfo.SignedOrder)
}
}
} else {
for _, orderInfo := range ordersResp.OrdersInfos {
filteredOrders = append(filteredOrders, orderInfo.SignedOrder)
}
}
Expand Down Expand Up @@ -370,6 +385,7 @@ func (p *FilteredPaginationSubProtocolV1) HandleOrderSyncResponse(ctx context.Co
}
return &ordersync.Request{
Metadata: &FilteredPaginationRequestMetadataV1{
OrderFilter: p.app.orderFilter,
MinOrderHash: nextMinOrderHash,
},
}, len(filteredOrders), nil
Expand Down
5 changes: 5 additions & 0 deletions orderfilter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

"github.com/0xProject/0x-mesh/ethereum"
"github.com/ethereum/go-ethereum/common"
jsonschema "github.com/xeipuuv/gojsonschema"
)

Expand Down Expand Up @@ -37,8 +38,11 @@ type Filter struct {
rawCustomOrderSchema string
orderSchema *jsonschema.Schema
messageSchema *jsonschema.Schema
exchangeAddress common.Address
}

// TODO(jalextowle): We do not need `contractAddresses` since we only use `contractAddresses.Exchange`.
// In a future refactor, we should update this interface.
func New(chainID int, customOrderSchema string, contractAddresses ethereum.ContractAddresses) (*Filter, error) {
orderLoader, err := newLoader(chainID, customOrderSchema, contractAddresses)
if err != nil {
Expand All @@ -62,6 +66,7 @@ func New(chainID int, customOrderSchema string, contractAddresses ethereum.Contr
rawCustomOrderSchema: customOrderSchema,
orderSchema: compiledRootOrderSchema,
messageSchema: compiledRootOrderMessageSchema,
exchangeAddress: contractAddresses.Exchange,
}, nil
}

Expand Down
3 changes: 3 additions & 0 deletions orderfilter/filter_js.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/0xProject/0x-mesh/ethereum"
"github.com/0xProject/0x-mesh/packages/mesh-browser/go/jsutil"
"github.com/ethereum/go-ethereum/common"
)

type Filter struct {
Expand All @@ -18,6 +19,7 @@ type Filter struct {
encodedSchema string
chainID int
rawCustomOrderSchema string
exchangeAddress common.Address
}

func New(chainID int, customOrderSchema string, contractAddresses ethereum.ContractAddresses) (*Filter, error) {
Expand Down Expand Up @@ -58,5 +60,6 @@ func New(chainID int, customOrderSchema string, contractAddresses ethereum.Contr
messageValidator: messageValidator,
chainID: chainID,
rawCustomOrderSchema: customOrderSchema,
exchangeAddress: contractAddresses.Exchange,
}, nil
}
35 changes: 34 additions & 1 deletion orderfilter/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package orderfilter
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"

"github.com/0xProject/0x-mesh/ethereum"
"github.com/0xProject/0x-mesh/zeroex"
"github.com/ethereum/go-ethereum/common"
canonicaljson "github.com/gibson042/canonicaljson-go"
peer "github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand Down Expand Up @@ -109,7 +111,7 @@ func (f *Filter) ValidatePubSubMessage(ctx context.Context, sender peer.ID, msg
}

func (f *Filter) generateEncodedSchema() string {
// Note(albrow): We use canonicaljson to elminate any differences in spacing,
// Note(albrow): We use canonicaljson to eliminate any differences in spacing,
// formatting, and the order of field names. This ensures that two filters
// that are semantically the same JSON object always encode to exactly the
// same canonical topic string.
Expand All @@ -133,3 +135,34 @@ func (f *Filter) generateEncodedSchema() string {
canonicalOrderSchemaJSON, _ := canonicaljson.Marshal(holder)
return base64.URLEncoding.EncodeToString(canonicalOrderSchemaJSON)
}

// NOTE(jalextowle): Due to the discrepancy between orderfilters used in browser
// nodes and those used in standalone nodes, we cannot simply encode orderfilter.Filter.
// Instead, we marshal a minimal representation of the filter, and then we recreate
// the filter in the node that unmarshals the filter. This ensures that any node
// that unmarshals the orderfilter will be capable of using the filter.
type jsonMarshallerForFilter struct {
CustomOrderSchema string `json:"customOrderSchema"`
ChainID int `json:"chainID"`
ExchangeAddress common.Address `json:"exchangeAddress"`
}

func (f *Filter) MarshalJSON() ([]byte, error) {
j := jsonMarshallerForFilter{
CustomOrderSchema: f.rawCustomOrderSchema,
ChainID: f.chainID,
ExchangeAddress: f.exchangeAddress,
}
return json.Marshal(j)
}

func (f *Filter) UnmarshalJSON(data []byte) error {
j := jsonMarshallerForFilter{}
err := json.Unmarshal(data, &j)
filter, err := New(j.ChainID, j.CustomOrderSchema, ethereum.ContractAddresses{Exchange: j.ExchangeAddress})
if err != nil {
return err
}
*f = *filter
return nil
}