Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
Addressed review feedback from @albrow
Browse files Browse the repository at this point in the history
  • Loading branch information
jalextowle committed Jul 1, 2020
1 parent d47e48b commit fae5fd0
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ lint: lint-go lint-ts lint-prettier

.PHONY: lint-go
lint-go:
golangci-lint run
golangci-lint run --timeout 2m


.PHONY: lint-ts
Expand Down
32 changes: 22 additions & 10 deletions core/ordersync/ordersync.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,17 +289,24 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error {
default:
}

m := &sync.Mutex{}
wg := &sync.WaitGroup{}
semaphore := make(chan struct{}, maxRequestPeersInParallel)
currentNeighbors := s.node.Neighbors()
shufflePeers(currentNeighbors)
innerCtx, cancel := context.WithCancel(ctx)
defer cancel()
for _, peerID := range currentNeighbors {
if len(successfullySyncedPeers) >= minPeers {
m.Lock()
successfullySyncedPeerLength := len(successfullySyncedPeers)
m.Unlock()
if successfullySyncedPeerLength >= minPeers {
return nil
}
if successfullySyncedPeers.Contains(peerID.Pretty()) {
m.Lock()
successfullySynced := successfullySyncedPeers.Contains(peerID.Pretty())
m.Unlock()
if successfullySynced {
continue
}

Expand All @@ -309,17 +316,14 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error {

wg.Add(1)
go func(id peer.ID) {
defer wg.Done()
select {
case <-innerCtx.Done():
// NOTE(jalextowle): In this case, we haven't written to the semaphore
// so we shouldn't read from it.
wg.Done()
return
case semaphore <- struct{}{}:
defer func() {
wg.Done()
<-semaphore
}()
defer func() { <-semaphore }()
}
if err := s.getOrdersFromPeer(innerCtx, id); err != nil {
log.WithFields(log.Fields{
Expand All @@ -334,24 +338,32 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error {
log.WithFields(log.Fields{
"provider": id.Pretty(),
}).Trace("succesfully got orders from peer via ordersync")
m.Lock()
successfullySyncedPeers.Add(id.Pretty())
if len(successfullySyncedPeers) >= minPeers {
successfullySyncedPeerLength := len(successfullySyncedPeers)
m.Unlock()
if successfullySyncedPeerLength >= minPeers {
cancel()
}
}
}(peerID)
}

// Wait for all goroutines to exit. If the inner context has been
// cancelled, then we have successfully completed ordersync.
wg.Wait()
if innerCtx.Err() == context.Canceled {
return nil
}

wg.Wait()
delayBeforeNextRetry := retryBackoff.Duration()
m.Lock()
successfullySyncedPeerLength := len(successfullySyncedPeers)
m.Unlock()
log.WithFields(log.Fields{
"delayBeforeNextRetry": delayBeforeNextRetry.String(),
"minPeers": minPeers,
"successfullySyncedPeers": len(successfullySyncedPeers),
"successfullySyncedPeers": successfullySyncedPeerLength,
}).Debug("ordersync could not get orders from enough peers (trying again soon)")
select {
case <-ctx.Done():
Expand Down

0 comments on commit fae5fd0

Please sign in to comment.