diff --git a/blob/service.go b/blob/service.go index 4b944636ff..81ae340efa 100644 --- a/blob/service.go +++ b/blob/service.go @@ -86,11 +86,18 @@ func (s *Service) Stop(context.Context) error { return nil } +// SubscriptionResponse is the response type for the Subscribe method. +// It contains the blobs and the height at which they were included. +// If the Blobs slice is empty, it means that no blobs were included at the given height. type SubscriptionResponse struct { Blobs []*Blob Height uint64 } +// Subscribe returns a channel that will receive SubscriptionResponse objects. +// The channel will be closed when the context is canceled or the service is stopped. +// Please note that the Subscribe method will not return any errors, but rather retry underlying operations until successful. +// Additionally, not reading from the returned channel will cause the stream to close after 16 messages. func (s *Service) Subscribe(ctx context.Context, ns share.Namespace) (<-chan *SubscriptionResponse, error) { if s.ctx == nil { return nil, fmt.Errorf("service has not been started") @@ -109,7 +116,7 @@ func (s *Service) Subscribe(ctx context.Context, ns share.Namespace) (<-chan *Su select { case header, ok := <-headerCh: if ctx.Err() != nil { - log.Debug("blobsub: canceling subscription due to user context closing") + log.Debugw("blobsub: canceling subscription due to user context closing", "namespace", ns.ID()) return } if !ok { @@ -117,36 +124,37 @@ func (s *Service) Subscribe(ctx context.Context, ns share.Namespace) (<-chan *Su return } // close subscription before buffer overflows - if len(blobCh) == cap(blobCh)-1 { + if len(blobCh) == cap(blobCh) { + log.Debugw("blobsub: canceling subscription due to buffer overflow from slow reader", "namespace", ns.ID()) return } var blobs []*Blob var err error for { + if ctx.Err() != nil { + // context canceled, continuing would lead to unexpected missed heights for the client + log.Debugw("blobsub: canceling subscription due to user context closing", "namespace", ns.ID()) + return + } blobs, err = s.getAll(ctx, header, []share.Namespace{ns}) - if err == nil || ctx.Err() != nil { + if err == nil { // operation successful, break the loop break } } - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - // context canceled, continuing would lead to unexpected missed heights for the client - log.Debug("blobsub: canceling subscription due to user context closing") - return - } select { case <-ctx.Done(): - log.Debug("blobsub: canceling subscription with pending response due to user context closing") + log.Debugw("blobsub: canceling subscription with pending response due to user context closing", "namespace", ns.ID()) return case blobCh <- &SubscriptionResponse{Blobs: blobs, Height: header.Height()}: } case <-ctx.Done(): - log.Debug("blobsub: canceling subscription due to user context closing") + log.Debugw("blobsub: canceling subscription due to user context closing", "namespace", ns.ID()) return case <-s.ctx.Done(): - log.Debug("blobsub: canceling subscription due to service context closing") + log.Debugw("blobsub: canceling subscription due to service context closing", "namespace", ns.ID()) return } }