Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd committed Jul 30, 2024
1 parent 79c29c8 commit 6ec4279
Showing 1 changed file with 19 additions and 11 deletions.
30 changes: 19 additions & 11 deletions blob/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,18 @@ func (s *Service) Stop(context.Context) error {
return nil
}

// SubscriptionResponse is the response type for the Subscribe method.
// It contains the blobs and the height at which they were included.
// If the Blobs slice is empty, it means that no blobs were included at the given height.
type SubscriptionResponse struct {
Blobs []*Blob
Height uint64
}

// Subscribe returns a channel that will receive SubscriptionResponse objects.
// The channel will be closed when the context is canceled or the service is stopped.
// Please note that the Subscribe method will not return any errors, but rather retry underlying operations until successful.
// Additionally, not reading from the returned channel will cause the stream to close after 16 messages.
func (s *Service) Subscribe(ctx context.Context, ns share.Namespace) (<-chan *SubscriptionResponse, error) {
if s.ctx == nil {
return nil, fmt.Errorf("service has not been started")
Expand All @@ -109,44 +116,45 @@ func (s *Service) Subscribe(ctx context.Context, ns share.Namespace) (<-chan *Su
select {
case header, ok := <-headerCh:
if ctx.Err() != nil {
log.Debug("blobsub: canceling subscription due to user context closing")
log.Debugw("blobsub: canceling subscription due to user context closing", "namespace", ns.ID())
return
}
if !ok {
log.Errorw("header channel closed for subscription", "namespace", ns.ID())
return
}
// close subscription before buffer overflows
if len(blobCh) == cap(blobCh)-1 {
if len(blobCh) == cap(blobCh) {
log.Debugw("blobsub: canceling subscription due to buffer overflow from slow reader", "namespace", ns.ID())
return
}

var blobs []*Blob
var err error
for {
if ctx.Err() != nil {
// context canceled, continuing would lead to unexpected missed heights for the client
log.Debugw("blobsub: canceling subscription due to user context closing", "namespace", ns.ID())
return
}
blobs, err = s.getAll(ctx, header, []share.Namespace{ns})
if err == nil || ctx.Err() != nil {
if err == nil {
// operation successful, break the loop
break
}
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// context canceled, continuing would lead to unexpected missed heights for the client
log.Debug("blobsub: canceling subscription due to user context closing")
return
}

select {
case <-ctx.Done():
log.Debug("blobsub: canceling subscription with pending response due to user context closing")
log.Debugw("blobsub: canceling subscription with pending response due to user context closing", "namespace", ns.ID())
return
case blobCh <- &SubscriptionResponse{Blobs: blobs, Height: header.Height()}:
}
case <-ctx.Done():
log.Debug("blobsub: canceling subscription due to user context closing")
log.Debugw("blobsub: canceling subscription due to user context closing", "namespace", ns.ID())
return
case <-s.ctx.Done():
log.Debug("blobsub: canceling subscription due to service context closing")
log.Debugw("blobsub: canceling subscription due to service context closing", "namespace", ns.ID())
return
}
}
Expand Down

0 comments on commit 6ec4279

Please sign in to comment.