diff --git a/Makefile b/Makefile index 8479c7e4e..0b192a114 100644 --- a/Makefile +++ b/Makefile @@ -73,7 +73,7 @@ lint: lint-go lint-ts lint-prettier .PHONY: lint-go lint-go: - golangci-lint run + golangci-lint run --timeout 2m .PHONY: lint-ts diff --git a/core/ordersync/ordersync.go b/core/ordersync/ordersync.go index 391169c14..d9c799c9e 100644 --- a/core/ordersync/ordersync.go +++ b/core/ordersync/ordersync.go @@ -289,6 +289,7 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error { default: } + m := &sync.Mutex{} wg := &sync.WaitGroup{} semaphore := make(chan struct{}, maxRequestPeersInParallel) currentNeighbors := s.node.Neighbors() @@ -296,10 +297,16 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error { innerCtx, cancel := context.WithCancel(ctx) defer cancel() for _, peerID := range currentNeighbors { - if len(successfullySyncedPeers) >= minPeers { + m.Lock() + successfullySyncedPeerLength := len(successfullySyncedPeers) + m.Unlock() + if successfullySyncedPeerLength >= minPeers { return nil } - if successfullySyncedPeers.Contains(peerID.Pretty()) { + m.Lock() + successfullySynced := successfullySyncedPeers.Contains(peerID.Pretty()) + m.Unlock() + if successfullySynced { continue } @@ -309,17 +316,14 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error { wg.Add(1) go func(id peer.ID) { + defer wg.Done() select { case <-innerCtx.Done(): // NOTE(jalextowle): In this case, we haven't written to the semaphore // so we shouldn't read from it. - wg.Done() return case semaphore <- struct{}{}: - defer func() { - wg.Done() - <-semaphore - }() + defer func() { <-semaphore }() } if err := s.getOrdersFromPeer(innerCtx, id); err != nil { log.WithFields(log.Fields{ @@ -334,24 +338,32 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error { log.WithFields(log.Fields{ "provider": id.Pretty(), }).Trace("succesfully got orders from peer via ordersync") + m.Lock() successfullySyncedPeers.Add(id.Pretty()) - if len(successfullySyncedPeers) >= minPeers { + successfullySyncedPeerLength := len(successfullySyncedPeers) + m.Unlock() + if successfullySyncedPeerLength >= minPeers { cancel() } } }(peerID) } + // Wait for all goroutines to exit. If the inner context has been + // cancelled, then we have successfully completed ordersync. + wg.Wait() if innerCtx.Err() == context.Canceled { return nil } - wg.Wait() delayBeforeNextRetry := retryBackoff.Duration() + m.Lock() + successfullySyncedPeerLength := len(successfullySyncedPeers) + m.Unlock() log.WithFields(log.Fields{ "delayBeforeNextRetry": delayBeforeNextRetry.String(), "minPeers": minPeers, - "successfullySyncedPeers": len(successfullySyncedPeers), + "successfullySyncedPeers": successfullySyncedPeerLength, }).Debug("ordersync could not get orders from enough peers (trying again soon)") select { case <-ctx.Done():