Skip to content

Commit

Permalink
fix: graceful blob service shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd committed Jul 19, 2024
1 parent 2103f5f commit 6d2375c
Showing 1 changed file with 19 additions and 2 deletions.
21 changes: 19 additions & 2 deletions blob/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"fmt"
"sync"

"slices"

"github.com/cosmos/cosmos-sdk/types"
logging "github.com/ipfs/go-log/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"slices"

"github.com/celestiaorg/celestia-app/pkg/shares"

Expand Down Expand Up @@ -53,6 +54,10 @@ type Service struct {
headerGetter func(context.Context, uint64) (*header.ExtendedHeader, error)
// headerSub subscribes to new headers to supply to blob subscriptions.
headerSub func(ctx context.Context) (<-chan *header.ExtendedHeader, error)

// activeSubscriptions tracks the number of active subscriptions
activeSubscriptions sync.WaitGroup
mu sync.Mutex
}

func NewService(
Expand All @@ -75,7 +80,13 @@ func (s *Service) Start(context.Context) error {
}

func (s *Service) Stop(context.Context) error {
s.cancel()
s.mu.Lock()
if s.cancel != nil {
s.cancel()
}
s.mu.Unlock()

s.activeSubscriptions.Wait()
return nil
}

Expand All @@ -85,17 +96,23 @@ type SubscriptionResponse struct {
}

func (s *Service) Subscribe(ctx context.Context, ns share.Namespace) (<-chan *SubscriptionResponse, error) {
s.mu.Lock()
if s.ctx == nil {
s.mu.Unlock()
return nil, fmt.Errorf("service has not been started")
}
s.activeSubscriptions.Add(1)
s.mu.Unlock()

headerCh, err := s.headerSub(ctx)
if err != nil {
s.activeSubscriptions.Done()
return nil, err
}

blobCh := make(chan *SubscriptionResponse, 3)
go func() {
defer s.activeSubscriptions.Done()
defer close(blobCh)

for {
Expand Down

0 comments on commit 6d2375c

Please sign in to comment.