From 94dd3947e589d908ab697a3b8a4f3ffa3659bf9a Mon Sep 17 00:00:00 2001 From: Alex Towle Date: Fri, 19 Jun 2020 19:00:56 -0500 Subject: [PATCH 1/8] Parallelized part of `GetOrders` --- core/ordersync/ordersync.go | 66 +++++++++++++++++++++++++------------ 1 file changed, 45 insertions(+), 21 deletions(-) diff --git a/core/ordersync/ordersync.go b/core/ordersync/ordersync.go index 4ea8a7273..9ca381a41 100644 --- a/core/ordersync/ordersync.go +++ b/core/ordersync/ordersync.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "math/rand" + "sync" "time" "github.com/0xProject/0x-mesh/p2p" @@ -44,6 +45,9 @@ const ( // approxDelay * (1 - jitter) <= actualDelay < approxDelay * (1 + jitter) // ordersyncJitterAmount = 0.1 + // maxRequestPeersInParallel is the largest number of peers that `GetOrders` + // will try to pull orders from at once. + maxRequestPeersInParallel = 10 ) var ( @@ -285,10 +289,11 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error { default: } - // TODO(albrow): As a performance optimization, do this for loop - // partly in parallel. currentNeighbors := s.node.Neighbors() shufflePeers(currentNeighbors) + i := 0 + wg := &sync.WaitGroup{} + waitChan := make(chan struct{}, 1) for _, peerID := range currentNeighbors { if len(successfullySyncedPeers) >= minPeers { return nil @@ -300,30 +305,49 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error { log.WithFields(log.Fields{ "provider": peerID.Pretty(), }).Trace("requesting orders from neighbor via ordersync") - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - if err := s.getOrdersFromPeer(ctx, peerID); err != nil { - log.WithFields(log.Fields{ - "error": err.Error(), - "provider": peerID.Pretty(), - }).Warn("could not get orders from peer via ordersync") - continue + if i == maxRequestPeersInParallel { + i = 0 + go func() { + wg.Wait() + waitChan <- struct{}{} + }() + select { + case <-waitChan: + case <-ctx.Done(): + return ctx.Err() + } } else { - // TODO(albrow): Handle case where no orders were returned from this - // peer. This could be considered a valid response, depending on the implementation - // details of the subprotocol. We need to not try them again, but also not count - // them toward the number of peers we have successfully synced with. - log.WithFields(log.Fields{ - "provider": peerID.Pretty(), - }).Trace("succesfully got orders from peer via ordersync") - successfullySyncedPeers.Add(peerID.Pretty()) + select { + case <-ctx.Done(): + return ctx.Err() + default: + } } + + i = i + 1 + wg.Add(1) + go func(id peer.ID) { + defer wg.Done() + if err := s.getOrdersFromPeer(ctx, id); err != nil { + log.WithFields(log.Fields{ + "error": err.Error(), + "provider": id.Pretty(), + }).Warn("could not get orders from peer via ordersync") + } else { + // TODO(albrow): Handle case where no orders were returned from this + // peer. This could be considered a valid response, depending on the implementation + // details of the subprotocol. We need to not try them again, but also not count + // them toward the number of peers we have successfully synced with. + log.WithFields(log.Fields{ + "provider": id.Pretty(), + }).Trace("succesfully got orders from peer via ordersync") + successfullySyncedPeers.Add(id.Pretty()) + } + }(peerID) } + wg.Wait() delayBeforeNextRetry := retryBackoff.Duration() log.WithFields(log.Fields{ "delayBeforeNextRetry": delayBeforeNextRetry.String(), From e2d5ba79c1a7c13ac70529a326c2d40131a8059d Mon Sep 17 00:00:00 2001 From: Alex Towle Date: Fri, 19 Jun 2020 19:12:59 -0500 Subject: [PATCH 2/8] Added an inner context to cut `getOrdersFromPeer` short --- core/ordersync/ordersync.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/ordersync/ordersync.go b/core/ordersync/ordersync.go index 9ca381a41..e1d3d6699 100644 --- a/core/ordersync/ordersync.go +++ b/core/ordersync/ordersync.go @@ -294,6 +294,8 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error { i := 0 wg := &sync.WaitGroup{} waitChan := make(chan struct{}, 1) + innerCtx, cancel := context.WithCancel(ctx) + defer cancel() for _, peerID := range currentNeighbors { if len(successfullySyncedPeers) >= minPeers { return nil @@ -329,7 +331,7 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error { wg.Add(1) go func(id peer.ID) { defer wg.Done() - if err := s.getOrdersFromPeer(ctx, id); err != nil { + if err := s.getOrdersFromPeer(innerCtx, id); err != nil { log.WithFields(log.Fields{ "error": err.Error(), "provider": id.Pretty(), From 407aeae4df115b1566fd62518fba69b9163f58a1 Mon Sep 17 00:00:00 2001 From: Alex Towle Date: Tue, 30 Jun 2020 16:24:25 -0500 Subject: [PATCH 3/8] Refactored ordersync to request from a new peer as soon as possible --- core/ordersync/ordersync.go | 40 ++++++++++++++----------------------- 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/core/ordersync/ordersync.go b/core/ordersync/ordersync.go index e1d3d6699..64885c104 100644 --- a/core/ordersync/ordersync.go +++ b/core/ordersync/ordersync.go @@ -47,7 +47,7 @@ const ( ordersyncJitterAmount = 0.1 // maxRequestPeersInParallel is the largest number of peers that `GetOrders` // will try to pull orders from at once. - maxRequestPeersInParallel = 10 + maxRequestPeersInParallel = 15 ) var ( @@ -289,11 +289,10 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error { default: } + wg := &sync.WaitGroup{} + semaphore := make(chan struct{}, maxRequestPeersInParallel) currentNeighbors := s.node.Neighbors() shufflePeers(currentNeighbors) - i := 0 - wg := &sync.WaitGroup{} - waitChan := make(chan struct{}, 1) innerCtx, cancel := context.WithCancel(ctx) defer cancel() for _, peerID := range currentNeighbors { @@ -308,29 +307,20 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error { "provider": peerID.Pretty(), }).Trace("requesting orders from neighbor via ordersync") - if i == maxRequestPeersInParallel { - i = 0 - go func() { - wg.Wait() - waitChan <- struct{}{} - }() - select { - case <-waitChan: - case <-ctx.Done(): - return ctx.Err() - } - } else { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - } - - i = i + 1 wg.Add(1) go func(id peer.ID) { - defer wg.Done() + select { + case <-innerCtx.Done(): + // NOTE(jalextowle): In this case, we haven't written to the semaphore + // so we shouldn't read from it. + wg.Done() + return + case semaphore <- struct{}{}: + defer func() { + wg.Done() + <-semaphore + }() + } if err := s.getOrdersFromPeer(innerCtx, id); err != nil { log.WithFields(log.Fields{ "error": err.Error(), From df422f4dae5cb01f1cc2b7827138c8029fa24095 Mon Sep 17 00:00:00 2001 From: Alex Towle Date: Tue, 30 Jun 2020 16:33:22 -0500 Subject: [PATCH 4/8] Improved ordersync `GetOrders` slightly --- core/ordersync/ordersync.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/ordersync/ordersync.go b/core/ordersync/ordersync.go index 64885c104..28824fb62 100644 --- a/core/ordersync/ordersync.go +++ b/core/ordersync/ordersync.go @@ -335,10 +335,17 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error { "provider": id.Pretty(), }).Trace("succesfully got orders from peer via ordersync") successfullySyncedPeers.Add(id.Pretty()) + if len(successfullySyncedPeers) >= minPeers { + cancel() + } } }(peerID) } + if innerCtx.Err() == context.Canceled { + return nil + } + wg.Wait() delayBeforeNextRetry := retryBackoff.Duration() log.WithFields(log.Fields{ From d47e48b9630ebf5b2c91fb740352aa96df7e3d45 Mon Sep 17 00:00:00 2001 From: Alex Towle Date: Tue, 30 Jun 2020 16:44:44 -0500 Subject: [PATCH 5/8] Adjusted `maxPeersInParallel` --- core/ordersync/ordersync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/ordersync/ordersync.go b/core/ordersync/ordersync.go index 28824fb62..391169c14 100644 --- a/core/ordersync/ordersync.go +++ b/core/ordersync/ordersync.go @@ -47,7 +47,7 @@ const ( ordersyncJitterAmount = 0.1 // maxRequestPeersInParallel is the largest number of peers that `GetOrders` // will try to pull orders from at once. - maxRequestPeersInParallel = 15 + maxRequestPeersInParallel = 10 ) var ( From 5df8739bea100941e33e45b495801e3c5d48edfb Mon Sep 17 00:00:00 2001 From: Alex Towle Date: Wed, 1 Jul 2020 15:30:32 -0500 Subject: [PATCH 6/8] Addressed review feedback from @albrow --- Makefile | 2 +- core/ordersync/ordersync.go | 30 ++++++++++++++++++++---------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/Makefile b/Makefile index 8479c7e4e..0b192a114 100644 --- a/Makefile +++ b/Makefile @@ -73,7 +73,7 @@ lint: lint-go lint-ts lint-prettier .PHONY: lint-go lint-go: - golangci-lint run + golangci-lint run --timeout 2m .PHONY: lint-ts diff --git a/core/ordersync/ordersync.go b/core/ordersync/ordersync.go index 391169c14..84c861098 100644 --- a/core/ordersync/ordersync.go +++ b/core/ordersync/ordersync.go @@ -289,6 +289,7 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error { default: } + m := &sync.Mutex{} wg := &sync.WaitGroup{} semaphore := make(chan struct{}, maxRequestPeersInParallel) currentNeighbors := s.node.Neighbors() @@ -296,10 +297,14 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error { innerCtx, cancel := context.WithCancel(ctx) defer cancel() for _, peerID := range currentNeighbors { - if len(successfullySyncedPeers) >= minPeers { + m.Lock() + successfullySyncedPeerLength := len(successfullySyncedPeers) + successfullySynced := successfullySyncedPeers.Contains(peerID.Pretty()) + m.Unlock() + if successfullySyncedPeerLength >= minPeers { return nil } - if successfullySyncedPeers.Contains(peerID.Pretty()) { + if successfullySynced { continue } @@ -309,17 +314,14 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error { wg.Add(1) go func(id peer.ID) { + defer wg.Done() select { case <-innerCtx.Done(): // NOTE(jalextowle): In this case, we haven't written to the semaphore // so we shouldn't read from it. - wg.Done() return case semaphore <- struct{}{}: - defer func() { - wg.Done() - <-semaphore - }() + defer func() { <-semaphore }() } if err := s.getOrdersFromPeer(innerCtx, id); err != nil { log.WithFields(log.Fields{ @@ -334,24 +336,32 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error { log.WithFields(log.Fields{ "provider": id.Pretty(), }).Trace("succesfully got orders from peer via ordersync") + m.Lock() successfullySyncedPeers.Add(id.Pretty()) - if len(successfullySyncedPeers) >= minPeers { + successfullySyncedPeerLength := len(successfullySyncedPeers) + m.Unlock() + if successfullySyncedPeerLength >= minPeers { cancel() } } }(peerID) } + // Wait for all goroutines to exit. If the inner context has been + // cancelled, then we have successfully completed ordersync. + wg.Wait() if innerCtx.Err() == context.Canceled { return nil } - wg.Wait() delayBeforeNextRetry := retryBackoff.Duration() + m.Lock() + successfullySyncedPeerLength := len(successfullySyncedPeers) + m.Unlock() log.WithFields(log.Fields{ "delayBeforeNextRetry": delayBeforeNextRetry.String(), "minPeers": minPeers, - "successfullySyncedPeers": len(successfullySyncedPeers), + "successfullySyncedPeers": successfullySyncedPeerLength, }).Debug("ordersync could not get orders from enough peers (trying again soon)") select { case <-ctx.Done(): From 3554a10c8cfa33c5e70b01dd4f90c1965fb4ef37 Mon Sep 17 00:00:00 2001 From: Alex Towle Date: Wed, 1 Jul 2020 15:55:46 -0500 Subject: [PATCH 7/8] Switched to `sync.RWMutex` --- core/ordersync/ordersync.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/core/ordersync/ordersync.go b/core/ordersync/ordersync.go index 84c861098..e789ee6a2 100644 --- a/core/ordersync/ordersync.go +++ b/core/ordersync/ordersync.go @@ -289,7 +289,7 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error { default: } - m := &sync.Mutex{} + m := &sync.RWMutex{} wg := &sync.WaitGroup{} semaphore := make(chan struct{}, maxRequestPeersInParallel) currentNeighbors := s.node.Neighbors() @@ -297,10 +297,10 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error { innerCtx, cancel := context.WithCancel(ctx) defer cancel() for _, peerID := range currentNeighbors { - m.Lock() + m.RLock() successfullySyncedPeerLength := len(successfullySyncedPeers) successfullySynced := successfullySyncedPeers.Contains(peerID.Pretty()) - m.Unlock() + m.RUnlock() if successfullySyncedPeerLength >= minPeers { return nil } @@ -338,8 +338,10 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error { }).Trace("succesfully got orders from peer via ordersync") m.Lock() successfullySyncedPeers.Add(id.Pretty()) - successfullySyncedPeerLength := len(successfullySyncedPeers) m.Unlock() + m.RLock() + successfullySyncedPeerLength := len(successfullySyncedPeers) + m.RUnlock() if successfullySyncedPeerLength >= minPeers { cancel() } @@ -355,9 +357,9 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error { } delayBeforeNextRetry := retryBackoff.Duration() - m.Lock() + m.RLock() successfullySyncedPeerLength := len(successfullySyncedPeers) - m.Unlock() + m.RUnlock() log.WithFields(log.Fields{ "delayBeforeNextRetry": delayBeforeNextRetry.String(), "minPeers": minPeers, From 5f5d7890a5e4dcbf97701e3d11f512785f26ed8f Mon Sep 17 00:00:00 2001 From: Alex Towle Date: Wed, 1 Jul 2020 17:07:03 -0500 Subject: [PATCH 8/8] Addressed lingering review feedback from @albrow --- core/ordersync/ordersync.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/ordersync/ordersync.go b/core/ordersync/ordersync.go index e789ee6a2..c4b83857e 100644 --- a/core/ordersync/ordersync.go +++ b/core/ordersync/ordersync.go @@ -338,10 +338,8 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error { }).Trace("succesfully got orders from peer via ordersync") m.Lock() successfullySyncedPeers.Add(id.Pretty()) - m.Unlock() - m.RLock() successfullySyncedPeerLength := len(successfullySyncedPeers) - m.RUnlock() + m.Unlock() if successfullySyncedPeerLength >= minPeers { cancel() }