Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Fixed bug in ordersync that caused nodes to use the wrong orderfilter #882

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ This changelog is a work in progress and may contain notes for versions which ha

- Upgraded to Go 1.14, which contains several WebAssembly performance improvements [#815](https://github.com/0xProject/0x-mesh/pull/815).

## v9.4.1

### Bug fixes 🐞

- Fixed a problem in the filtered pagination subprotocols of ordersync that caused the nodes to use the wrong orderfilter [#882](https://github.com/0xProject/0x-mesh/pull/882)

## v9.4.0

### Features ✅
Expand Down
59 changes: 50 additions & 9 deletions core/ordersync/ordersync.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ type Subprotocol interface {
// ParseResponseMetadata converts raw response metadata into a concrete type
// that the subprotocol expects.
ParseResponseMetadata(metadata json.RawMessage) (interface{}, error)
// GenerateFirstRequestMetadata generates the metadata for the first request
// that should be made with this subprotocol.
GenerateFirstRequestMetadata() (json.RawMessage, error)
}

// New creates and returns a new ordersync service, which is used for both
Expand All @@ -176,19 +179,19 @@ func New(ctx context.Context, node *p2p.Node, subprotocols []Subprotocol) *Servi

// GetMatchingSubprotocol returns the most preferred subprotocol to use
// based on the given request.
func (s *Service) GetMatchingSubprotocol(rawReq *rawRequest) (Subprotocol, error) {
for _, protoID := range rawReq.Subprotocols {
func (s *Service) GetMatchingSubprotocol(rawReq *rawRequest) (Subprotocol, int, error) {
for i, protoID := range rawReq.Subprotocols {
subprotocol, found := s.subprotocols[protoID]
if found {
return subprotocol, nil
return subprotocol, i, nil
}
}

err := NoMatchingSubprotocolsError{
Requested: rawReq.Subprotocols,
Supported: s.SupportedSubprotocols(),
}
return nil, err
return nil, 0, err
}

// HandleStream is a stream handler that is used to handle incoming ordersync requests.
Expand Down Expand Up @@ -229,12 +232,19 @@ func (s *Service) HandleStream(stream network.Stream) {
s.handlePeerScoreEvent(requesterID, psInvalidMessage)
return
}
subprotocol, err := s.GetMatchingSubprotocol(rawReq)
subprotocol, i, err := s.GetMatchingSubprotocol(rawReq)
if err != nil {
log.WithError(err).Warn("GetMatchingSubprotocol returned error")
s.handlePeerScoreEvent(requesterID, psSubprotocolNegotiationFailed)
return
}
if len(rawReq.Subprotocols) > 1 {
firstRequests := FirstRequestsForSubprotocols{}
err := json.Unmarshal(rawReq.Metadata, &firstRequests)
if err == nil {
rawReq.Metadata = firstRequests.MetadataForSubprotocol[i]
}
}
res, err := handleRequestWithSubprotocol(s.ctx, subprotocol, requesterID, rawReq)
if err != nil {
log.WithError(err).Warn("subprotocol returned error")
Expand Down Expand Up @@ -458,6 +468,38 @@ func parseResponseWithSubprotocol(subprotocol Subprotocol, providerID peer.ID, r
}, nil
}

type FirstRequestsForSubprotocols struct {
MetadataForSubprotocol []json.RawMessage `json:"metadata"`
}

// createFirstRequestForAllSubprotocols creates an initial ordersync request that
// contains metadata for all of the ordersync subprotocols.
func (s *Service) createFirstRequestForAllSubprotocols() (*rawRequest, error) {
jalextowle marked this conversation as resolved.
Show resolved Hide resolved
metadata := []json.RawMessage{}
for _, subprotocolString := range s.SupportedSubprotocols() {
subprotocol, found := s.subprotocols[subprotocolString]
if !found {
return nil, fmt.Errorf("cannot generate first request metadata for subprotocol %s", subprotocolString)
}
m, err := subprotocol.GenerateFirstRequestMetadata()
if err != nil {
return nil, err
}
metadata = append(metadata, m)
}
encodedMetadata, err := json.Marshal(FirstRequestsForSubprotocols{
MetadataForSubprotocol: metadata,
})
if err != nil {
return nil, err
}
return &rawRequest{
Type: TypeRequest,
Subprotocols: s.SupportedSubprotocols(),
Metadata: encodedMetadata,
}, nil
}

func (s *Service) getOrdersFromPeer(ctx context.Context, providerID peer.ID, firstRequest *rawRequest) (*rawRequest, error) {
stream, err := s.node.NewStream(ctx, providerID, ID)
if err != nil {
Expand All @@ -473,10 +515,9 @@ func (s *Service) getOrdersFromPeer(ctx context.Context, providerID peer.ID, fir
if firstRequest != nil {
nextReq = firstRequest
} else {
nextReq = &rawRequest{
Type: TypeRequest,
Subprotocols: s.SupportedSubprotocols(),
Metadata: nil,
nextReq, err = s.createFirstRequestForAllSubprotocols()
if err != nil {
return nil, err
}
}
var numValidOrders int
Expand Down
57 changes: 44 additions & 13 deletions core/ordersync_subprotocols.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ func NewFilteredPaginationSubprotocolV0(app *App, perPage int) ordersync.Subprot
// FilteredPaginationSubProtocolV0. It keeps track of the current page and SnapshotID,
// which is expected to be an empty string on the first request.
type FilteredPaginationRequestMetadataV0 struct {
Page int `json:"page"`
SnapshotID string `json:"snapshotID"`
OrderFilter *orderfilter.Filter `json:"orderfilter"`
Page int `json:"page"`
SnapshotID string `json:"snapshotID"`
}

// FilteredPaginationResponseMetadataV0 is the response metadata for the
Expand Down Expand Up @@ -114,10 +115,16 @@ func (p *FilteredPaginationSubProtocolV0) HandleOrderSyncRequest(ctx context.Con
}
nextMinOrderHash = ordersResp.OrdersInfos[len(ordersResp.OrdersInfos)-1].OrderHash
// Filter the orders for this page.
for _, orderInfo := range ordersResp.OrdersInfos {
if matches, err := p.orderFilter.MatchOrder(orderInfo.SignedOrder); err != nil {
return nil, err
} else if matches {
if metadata.OrderFilter != nil {
for _, orderInfo := range ordersResp.OrdersInfos {
if matches, err := metadata.OrderFilter.MatchOrder(orderInfo.SignedOrder); err != nil {
return nil, err
} else if matches {
filteredOrders = append(filteredOrders, orderInfo.SignedOrder)
}
}
} else {
for _, orderInfo := range ordersResp.OrdersInfos {
filteredOrders = append(filteredOrders, orderInfo.SignedOrder)
}
}
Expand Down Expand Up @@ -185,8 +192,9 @@ func (p *FilteredPaginationSubProtocolV0) HandleOrderSyncResponse(ctx context.Co

return &ordersync.Request{
Metadata: &FilteredPaginationRequestMetadataV0{
Page: metadata.Page + 1,
SnapshotID: metadata.SnapshotID,
OrderFilter: p.orderFilter,
Page: metadata.Page + 1,
SnapshotID: metadata.SnapshotID,
},
}, len(filteredOrders), nil
}
Expand All @@ -207,6 +215,14 @@ func (p *FilteredPaginationSubProtocolV0) ParseResponseMetadata(metadata json.Ra
return &parsed, nil
}

func (p *FilteredPaginationSubProtocolV0) GenerateFirstRequestMetadata() (json.RawMessage, error) {
return json.Marshal(FilteredPaginationRequestMetadataV0{
OrderFilter: p.orderFilter,
Page: 0,
SnapshotID: "",
})
}

// Ensure that FilteredPaginationSubProtocolV1 implements the Subprotocol interface.
var _ ordersync.Subprotocol = (*FilteredPaginationSubProtocolV1)(nil)

Expand Down Expand Up @@ -235,7 +251,8 @@ func NewFilteredPaginationSubprotocolV1(app *App, perPage int) ordersync.Subprot
// FilteredPaginationSubProtocolV1. It keeps track of the current
// minOrderHash, which is expected to be an empty string on the first request.
type FilteredPaginationRequestMetadataV1 struct {
MinOrderHash common.Hash `json:"minOrderHash"`
OrderFilter *orderfilter.Filter `json:"orderfilter"`
MinOrderHash common.Hash `json:"minOrderHash"`
}

// FilteredPaginationResponseMetadataV1 is the response metadata for the
Expand Down Expand Up @@ -291,10 +308,16 @@ func (p *FilteredPaginationSubProtocolV1) HandleOrderSyncRequest(ctx context.Con
}
nextMinOrderHash = ordersResp.OrdersInfos[len(ordersResp.OrdersInfos)-1].OrderHash
// Filter the orders for this page.
for _, orderInfo := range ordersResp.OrdersInfos {
if matches, err := p.orderFilter.MatchOrder(orderInfo.SignedOrder); err != nil {
return nil, err
} else if matches {
if metadata.OrderFilter != nil {
for _, orderInfo := range ordersResp.OrdersInfos {
if matches, err := p.orderFilter.MatchOrder(orderInfo.SignedOrder); err != nil {
return nil, err
} else if matches {
filteredOrders = append(filteredOrders, orderInfo.SignedOrder)
}
}
} else {
for _, orderInfo := range ordersResp.OrdersInfos {
filteredOrders = append(filteredOrders, orderInfo.SignedOrder)
}
}
Expand Down Expand Up @@ -370,6 +393,7 @@ func (p *FilteredPaginationSubProtocolV1) HandleOrderSyncResponse(ctx context.Co
}
return &ordersync.Request{
Metadata: &FilteredPaginationRequestMetadataV1{
OrderFilter: p.orderFilter,
MinOrderHash: nextMinOrderHash,
},
}, len(filteredOrders), nil
Expand All @@ -391,6 +415,13 @@ func (p *FilteredPaginationSubProtocolV1) ParseResponseMetadata(metadata json.Ra
return &parsed, nil
}

func (p *FilteredPaginationSubProtocolV1) GenerateFirstRequestMetadata() (json.RawMessage, error) {
return json.Marshal(FilteredPaginationRequestMetadataV1{
OrderFilter: p.orderFilter,
MinOrderHash: common.Hash{},
})
}

// validateHexHash returns an error if s is not a valid hex hash. It supports
// encodings with or without the "0x" prefix.
// Note(albrow) This is based on unexported code in go-ethereum.
Expand Down
5 changes: 5 additions & 0 deletions orderfilter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

"github.com/0xProject/0x-mesh/ethereum"
"github.com/ethereum/go-ethereum/common"
jsonschema "github.com/xeipuuv/gojsonschema"
)

Expand Down Expand Up @@ -37,8 +38,11 @@ type Filter struct {
rawCustomOrderSchema string
orderSchema *jsonschema.Schema
messageSchema *jsonschema.Schema
exchangeAddress common.Address
}

// TODO(jalextowle): We do not need `contractAddresses` since we only use `contractAddresses.Exchange`.
// In a future refactor, we should update this interface.
func New(chainID int, customOrderSchema string, contractAddresses ethereum.ContractAddresses) (*Filter, error) {
orderLoader, err := newLoader(chainID, customOrderSchema, contractAddresses)
if err != nil {
Expand All @@ -62,6 +66,7 @@ func New(chainID int, customOrderSchema string, contractAddresses ethereum.Contr
rawCustomOrderSchema: customOrderSchema,
orderSchema: compiledRootOrderSchema,
messageSchema: compiledRootOrderMessageSchema,
exchangeAddress: contractAddresses.Exchange,
}, nil
}

Expand Down
3 changes: 3 additions & 0 deletions orderfilter/filter_js.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/0xProject/0x-mesh/ethereum"
"github.com/0xProject/0x-mesh/packages/mesh-browser/go/jsutil"
"github.com/ethereum/go-ethereum/common"
)

type Filter struct {
Expand All @@ -18,6 +19,7 @@ type Filter struct {
encodedSchema string
chainID int
rawCustomOrderSchema string
exchangeAddress common.Address
}

func New(chainID int, customOrderSchema string, contractAddresses ethereum.ContractAddresses) (*Filter, error) {
Expand Down Expand Up @@ -58,5 +60,6 @@ func New(chainID int, customOrderSchema string, contractAddresses ethereum.Contr
messageValidator: messageValidator,
chainID: chainID,
rawCustomOrderSchema: customOrderSchema,
exchangeAddress: contractAddresses.Exchange,
}, nil
}
35 changes: 34 additions & 1 deletion orderfilter/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package orderfilter
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"

"github.com/0xProject/0x-mesh/ethereum"
"github.com/0xProject/0x-mesh/zeroex"
"github.com/ethereum/go-ethereum/common"
canonicaljson "github.com/gibson042/canonicaljson-go"
peer "github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand Down Expand Up @@ -109,7 +111,7 @@ func (f *Filter) ValidatePubSubMessage(ctx context.Context, sender peer.ID, msg
}

func (f *Filter) generateEncodedSchema() string {
// Note(albrow): We use canonicaljson to elminate any differences in spacing,
// Note(albrow): We use canonicaljson to eliminate any differences in spacing,
// formatting, and the order of field names. This ensures that two filters
// that are semantically the same JSON object always encode to exactly the
// same canonical topic string.
Expand All @@ -133,3 +135,34 @@ func (f *Filter) generateEncodedSchema() string {
canonicalOrderSchemaJSON, _ := canonicaljson.Marshal(holder)
return base64.URLEncoding.EncodeToString(canonicalOrderSchemaJSON)
}

// NOTE(jalextowle): Due to the discrepancy between orderfilters used in browser
// nodes and those used in standalone nodes, we cannot simply encode orderfilter.Filter.
// Instead, we marshal a minimal representation of the filter, and then we recreate
// the filter in the node that unmarshals the filter. This ensures that any node
// that unmarshals the orderfilter will be capable of using the filter.
type jsonMarshallerForFilter struct {
CustomOrderSchema string `json:"customOrderSchema"`
ChainID int `json:"chainID"`
ExchangeAddress common.Address `json:"exchangeAddress"`
}

func (f *Filter) MarshalJSON() ([]byte, error) {
j := jsonMarshallerForFilter{
CustomOrderSchema: f.rawCustomOrderSchema,
ChainID: f.chainID,
ExchangeAddress: f.exchangeAddress,
}
return json.Marshal(j)
}

func (f *Filter) UnmarshalJSON(data []byte) error {
j := jsonMarshallerForFilter{}
err := json.Unmarshal(data, &j)
filter, err := New(j.ChainID, j.CustomOrderSchema, ethereum.ContractAddresses{Exchange: j.ExchangeAddress})
if err != nil {
return err
}
*f = *filter
return nil
}