Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Parallelize requesting from peers in ordersync #848

Merged
merged 8 commits into from
Jul 1, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ lint: lint-go lint-ts lint-prettier

.PHONY: lint-go
lint-go:
golangci-lint run
golangci-lint run --timeout 2m
jalextowle marked this conversation as resolved.
Show resolved Hide resolved


.PHONY: lint-ts
Expand Down
85 changes: 59 additions & 26 deletions core/ordersync/ordersync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"errors"
"fmt"
"math/rand"
"sync"
"time"

"github.com/0xProject/0x-mesh/p2p"
Expand Down Expand Up @@ -44,6 +45,9 @@ const (
// approxDelay * (1 - jitter) <= actualDelay < approxDelay * (1 + jitter)
//
ordersyncJitterAmount = 0.1
// maxRequestPeersInParallel is the largest number of peers that `GetOrders`
// will try to pull orders from at once.
maxRequestPeersInParallel = 10
)

var (
Expand Down Expand Up @@ -285,50 +289,79 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error {
default:
}

// TODO(albrow): As a performance optimization, do this for loop
// partly in parallel.
m := &sync.Mutex{}
wg := &sync.WaitGroup{}
semaphore := make(chan struct{}, maxRequestPeersInParallel)
currentNeighbors := s.node.Neighbors()
shufflePeers(currentNeighbors)
innerCtx, cancel := context.WithCancel(ctx)
defer cancel()
for _, peerID := range currentNeighbors {
if len(successfullySyncedPeers) >= minPeers {
m.Lock()
successfullySyncedPeerLength := len(successfullySyncedPeers)
successfullySynced := successfullySyncedPeers.Contains(peerID.Pretty())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@albrow This shouldn't need to be in the mutex unless we have duplicate peer IDs in the currentNeighbors array. My guess is that we don't, but I also wanted to protect us from bugs in libp2p that could cause this assumption to not be true. Let me know if you think it's okay to access this outside of a lock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter whether there are any duplicate peer IDs. Maps in Go are not goroutine safe. We need to protect reads and writes to the map with a sync.RWMutex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. I didn't realize that it could crash the program -- I assumed that it was merely a synchronization issue. I'll leave it in the mutex and change that to a RWMutex.

m.Unlock()
if successfullySyncedPeerLength >= minPeers {
return nil
}
if successfullySyncedPeers.Contains(peerID.Pretty()) {
if successfullySynced {
continue
}

log.WithFields(log.Fields{
"provider": peerID.Pretty(),
}).Trace("requesting orders from neighbor via ordersync")
select {
case <-ctx.Done():
return ctx.Err()
default:
}

if err := s.getOrdersFromPeer(ctx, peerID); err != nil {
log.WithFields(log.Fields{
"error": err.Error(),
"provider": peerID.Pretty(),
}).Warn("could not get orders from peer via ordersync")
continue
} else {
// TODO(albrow): Handle case where no orders were returned from this
// peer. This could be considered a valid response, depending on the implementation
// details of the subprotocol. We need to not try them again, but also not count
// them toward the number of peers we have successfully synced with.
log.WithFields(log.Fields{
"provider": peerID.Pretty(),
}).Trace("succesfully got orders from peer via ordersync")
successfullySyncedPeers.Add(peerID.Pretty())
}
wg.Add(1)
go func(id peer.ID) {
defer wg.Done()
select {
jalextowle marked this conversation as resolved.
Show resolved Hide resolved
case <-innerCtx.Done():
// NOTE(jalextowle): In this case, we haven't written to the semaphore
// so we shouldn't read from it.
return
case semaphore <- struct{}{}:
defer func() { <-semaphore }()
}
if err := s.getOrdersFromPeer(innerCtx, id); err != nil {
log.WithFields(log.Fields{
"error": err.Error(),
"provider": id.Pretty(),
}).Warn("could not get orders from peer via ordersync")
} else {
// TODO(albrow): Handle case where no orders were returned from this
// peer. This could be considered a valid response, depending on the implementation
// details of the subprotocol. We need to not try them again, but also not count
// them toward the number of peers we have successfully synced with.
log.WithFields(log.Fields{
"provider": id.Pretty(),
}).Trace("succesfully got orders from peer via ordersync")
m.Lock()
successfullySyncedPeers.Add(id.Pretty())
jalextowle marked this conversation as resolved.
Show resolved Hide resolved
jalextowle marked this conversation as resolved.
Show resolved Hide resolved
successfullySyncedPeerLength := len(successfullySyncedPeers)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@albrow I'm not sure that wrapping this in a RLock is worth the extra overhead, but it seems more technically correct. Do you have any insight into this?

Copy link
Contributor

@albrow albrow Jul 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My gut says just move successfullySyncedPeerLength := len(successfullySyncedPeers) before m.Unlock. Intuitively I would expect that assignment to be faster than acquiring a new mutex but I'm not 100% sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I figured. I'll change it back.

m.Unlock()
if successfullySyncedPeerLength >= minPeers {
cancel()
}
}
}(peerID)
}

// Wait for all goroutines to exit. If the inner context has been
// cancelled, then we have successfully completed ordersync.
wg.Wait()
if innerCtx.Err() == context.Canceled {
return nil
}

delayBeforeNextRetry := retryBackoff.Duration()
m.Lock()
successfullySyncedPeerLength := len(successfullySyncedPeers)
m.Unlock()
log.WithFields(log.Fields{
"delayBeforeNextRetry": delayBeforeNextRetry.String(),
"minPeers": minPeers,
"successfullySyncedPeers": len(successfullySyncedPeers),
"successfullySyncedPeers": successfullySyncedPeerLength,
}).Debug("ordersync could not get orders from enough peers (trying again soon)")
select {
case <-ctx.Done():
Expand Down