Skip to content

Commit

Permalink
refactor: simplify GetCar signature and use .CloseWithError
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed May 25, 2023
1 parent ccee892 commit a44790d
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 31 deletions.
22 changes: 9 additions & 13 deletions gateway/blocks_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
gopath "path"
"strings"

"go.uber.org/multierr"

"github.com/ipfs/boxo/blockservice"
blockstore "github.com/ipfs/boxo/blockstore"
nsopts "github.com/ipfs/boxo/coreiface/options/namesys"
Expand Down Expand Up @@ -231,25 +229,23 @@ func (api *BlocksGateway) Head(ctx context.Context, path ImmutablePath) (Content
return md, fileNode, nil
}

func (api *BlocksGateway) GetCAR(ctx context.Context, p ImmutablePath, params CarParams) (io.ReadCloser, <-chan error, error) {
func (api *BlocksGateway) GetCAR(ctx context.Context, p ImmutablePath, params CarParams) (io.ReadCloser, error) {
contentPathStr := p.String()
if !strings.HasPrefix(contentPathStr, "/ipfs/") {
return nil, nil, fmt.Errorf("path does not have /ipfs/ prefix")
return nil, fmt.Errorf("path does not have /ipfs/ prefix")
}
firstSegment, _, _ := strings.Cut(contentPathStr[6:], "/")
rootCid, err := cid.Decode(firstSegment)
if err != nil {
return nil, nil, err
return nil, err
}

r, w := io.Pipe()
errCh := make(chan error, 1)
go func() {
cw, err := storage.NewWritable(w, []cid.Cid{rootCid}, car.WriteAsCarV1(true))
if err != nil {
pipeCloseErr := w.Close()
errCh <- multierr.Combine(err, pipeCloseErr)
close(errCh)
// io.PipeWriter.CloseWithError always returns nil.
_ = w.CloseWithError(err)
return
}

Expand All @@ -270,12 +266,12 @@ func (api *BlocksGateway) GetCAR(ctx context.Context, p ImmutablePath, params Ca
// TODO: support selectors passed as request param: https://github.com/ipfs/kubo/issues/8769
// TODO: this is very slow if blocks are remote due to linear traversal. Do we need deterministic traversals here?
carWriteErr := walkGatewaySimpleSelector(ctx, ipfspath.Path(contentPathStr), params, &lsys, pathResolver)
pipeCloseErr := w.Close()
errCh <- multierr.Combine(carWriteErr, pipeCloseErr)
close(errCh)

// io.PipeWriter.CloseWithError always returns nil.
_ = w.CloseWithError(carWriteErr)
}()

return r, errCh, nil
return r, nil
}

// walkGatewaySimpleSelector walks the subgraph described by the path and terminal element parameters
Expand Down
9 changes: 3 additions & 6 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,9 @@ type IPFSBackend interface {
// NewErrorResponse(fmt.Errorf("no link named %q under %s", name, cid), http.StatusNotFound)
ResolvePath(context.Context, ImmutablePath) (ContentPathMetadata, error)

// GetCAR returns a CAR file for the given immutable path
// Returns an initial error if there was an issue before the CAR streaming begins as well as a channel with a single
// that may contain a single error for if any errors occur during the streaming. If there was an initial error the
// error channel is nil
// TODO: Make this function signature better
GetCAR(context.Context, ImmutablePath, CarParams) (io.ReadCloser, <-chan error, error)
// GetCAR returns a CAR file for the given immutable path. It returns an error
// if there was an issue before the CAR streaming begins.
GetCAR(context.Context, ImmutablePath, CarParams) (io.ReadCloser, error)

// IsCached returns whether or not the path exists locally.
IsCached(context.Context, path.Path) bool
Expand Down
2 changes: 1 addition & 1 deletion gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (api *mockAPI) Head(ctx context.Context, immutablePath ImmutablePath) (Cont
return api.gw.Head(ctx, immutablePath)
}

func (api *mockAPI) GetCAR(ctx context.Context, immutablePath ImmutablePath, params CarParams) (io.ReadCloser, <-chan error, error) {
func (api *mockAPI) GetCAR(ctx context.Context, immutablePath ImmutablePath, params CarParams) (io.ReadCloser, error) {
return api.gw.GetCAR(ctx, immutablePath, params)
}

Expand Down
12 changes: 7 additions & 5 deletions gateway/handler_car.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (
"go.uber.org/multierr"
)

const carRangeBytesKey = "entity-bytes"
const carTerminalElementTypeKey = "dag-scope"
const (
carRangeBytesKey = "entity-bytes"
carTerminalElementTypeKey = "dag-scope"
)

// serveCAR returns a CAR stream for specific DAG+selector
func (i *handler) serveCAR(ctx context.Context, w http.ResponseWriter, r *http.Request, imPath ImmutablePath, contentPath ipath.Path, carVersion string, begin time.Time) bool {
Expand Down Expand Up @@ -56,15 +58,15 @@ func (i *handler) serveCAR(ctx context.Context, w http.ResponseWriter, r *http.R
case dagScopeEntity, dagScopeAll, dagScopeBlock:
params.Scope = s
default:
err := fmt.Errorf("unsupported dag-scope %s", scopeStr)
err := fmt.Errorf("unsupported dag-scope %q", scopeStr)
webError(w, err, http.StatusBadRequest)
return false
}
} else {
params.Scope = dagScopeAll
}

carFile, errCh, err := i.api.GetCAR(ctx, imPath, params)
carFile, err := i.api.GetCAR(ctx, imPath, params)
if !i.handleRequestErrors(w, contentPath, err) {
return false
}
Expand Down Expand Up @@ -120,7 +122,7 @@ func (i *handler) serveCAR(ctx context.Context, w http.ResponseWriter, r *http.R
w.Header().Set("X-Content-Type-Options", "nosniff") // no funny business in the browsers :^)

_, copyErr := io.Copy(w, carFile)
carErr := <-errCh
carErr := carFile.Close()
streamErr := multierr.Combine(carErr, copyErr)
if streamErr != nil {
// Update fail metric
Expand Down
6 changes: 3 additions & 3 deletions gateway/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func (api *errorMockAPI) Head(ctx context.Context, path ImmutablePath) (ContentP
return ContentPathMetadata{}, nil, api.err
}

func (api *errorMockAPI) GetCAR(ctx context.Context, path ImmutablePath, params CarParams) (io.ReadCloser, <-chan error, error) {
return nil, nil, api.err
func (api *errorMockAPI) GetCAR(ctx context.Context, path ImmutablePath, params CarParams) (io.ReadCloser, error) {
return nil, api.err
}

func (api *errorMockAPI) ResolveMutable(ctx context.Context, path ipath.Path) (ImmutablePath, error) {
Expand Down Expand Up @@ -173,7 +173,7 @@ func (api *panicMockAPI) Head(ctx context.Context, immutablePath ImmutablePath)
panic("i am panicking")
}

func (api *panicMockAPI) GetCAR(ctx context.Context, immutablePath ImmutablePath, params CarParams) (io.ReadCloser, <-chan error, error) {
func (api *panicMockAPI) GetCAR(ctx context.Context, immutablePath ImmutablePath, params CarParams) (io.ReadCloser, error) {
panic("i am panicking")
}

Expand Down
6 changes: 3 additions & 3 deletions gateway/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,17 @@ func (b *ipfsBackendWithMetrics) ResolvePath(ctx context.Context, path Immutable
return md, err
}

func (b *ipfsBackendWithMetrics) GetCAR(ctx context.Context, path ImmutablePath, params CarParams) (io.ReadCloser, <-chan error, error) {
func (b *ipfsBackendWithMetrics) GetCAR(ctx context.Context, path ImmutablePath, params CarParams) (io.ReadCloser, error) {
begin := time.Now()
name := "IPFSBackend.GetCAR"
ctx, span := spanTrace(ctx, name, trace.WithAttributes(attribute.String("path", path.String())))
defer span.End()

rc, errCh, err := b.api.GetCAR(ctx, path, params)
rc, err := b.api.GetCAR(ctx, path, params)

// TODO: handle errCh
b.updateApiCallMetric(name, err, begin)
return rc, errCh, err
return rc, err
}

func (b *ipfsBackendWithMetrics) IsCached(ctx context.Context, path path.Path) bool {
Expand Down

0 comments on commit a44790d

Please sign in to comment.