Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

feat: add proxy blockstore for testing against non-saturn backend #50

Merged
merged 5 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,43 @@ Local build:

```console
$ go build
$ ./bifrost-gateway --help
$ ./bifrost-gateway
```

Prebuild Docker image:
### Docker

```console
$ docker pull ipfs/bifrost-gateway:main-latest
$ docker run --rm -it --net=host ipfs/bifrost-gateway:main-latest --help
$ docker run --rm -it --net=host ipfs/bifrost-gateway:main-latest
```

When using Docker, make sure to pass necessary config via [`./docs/environment-variables.md`](./docs/environment-variables.md).

List of available revisions: https://hub.docker.com/r/ipfs/bifrost-gateway/tags

### How to run with Saturn CDN backend

[Saturn](https://strn.network) is an open-source, community-run Content Delivery Network (CDN).
`bifrost-gateway` supports it via the [Caboose](https://github.com/filecoin-saturn/caboose) backend,
which takes care of discovering and evaluating Saturn CDN peers.

See [_Saturn Backend_ in `./docs/environment-variables.md`](./docs/environment-variables.md#saturn-backend)

### How to run with local gateway

Saturn is implementation detail specific to ipfs.io infrastructure.
One can run `bifrost-gateway` without it. All you need is endpoint that supports
[verifiable response types](https://docs.ipfs.tech/reference/http/gateway/#trustless-verifiable-retrieval).

To run without Saturn and use Gateway provided by a local IPFS node like [Kubo](https://github.com/ipfs/kubo):

```console
$ PROXY_GATEWAY_URL="http://127.0.0.1:8080" ./bifrost-gateway
```

See [_Proxy Backend_ in `./docs/environment-variables.md`](./docs/environment-variables.md#proxy-backend)


### How to debug?

See [`GOLOG_LOG_LEVEL`](./docs/environment-variables.md#golog_log_level).
Expand Down
42 changes: 34 additions & 8 deletions blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,26 @@ package main

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/filecoin-saturn/caboose"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
blocks "github.com/ipfs/go-libipfs/blocks"
"github.com/ipfs/go-libipfs/gateway"
"go.uber.org/zap/zapcore"
)

var errNotImplemented = errors.New("not implemented")

const GetBlockTimeout = time.Second * 60

func newExchange(orchestrator, loggingEndpoint string, cdns *cachedDNS) (exchange.Interface, error) {
b, err := newCabooseBlockStore(orchestrator, loggingEndpoint, cdns)
if err != nil {
return nil, err
}
return &exchangeBsWrapper{bstore: b}, nil
func newExchange(bs blockstore.Blockstore) (exchange.Interface, error) {
return &exchangeBsWrapper{bstore: bs}, nil
}

type exchangeBsWrapper struct {
Expand All @@ -30,10 +33,14 @@ func (e *exchangeBsWrapper) GetBlock(ctx context.Context, c cid.Cid) (blocks.Blo
defer cancel()

if goLog.Level().Enabled(zapcore.DebugLevel) {
goLog.Debugw("block requested from strn", "cid", c.String())
goLog.Debugw("block requested from remote blockstore", "cid", c.String())
}

return e.bstore.Get(ctx, c)
blk, err := e.bstore.Get(ctx, c)
if err != nil {
return nil, gatewayError(err)
}
return blk, nil
}

func (e *exchangeBsWrapper) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) {
Expand All @@ -60,4 +67,23 @@ func (e *exchangeBsWrapper) Close() error {
return nil
}

// gatewayError translates underlying blockstore error into one that gateway code will return as HTTP 502 or 504
func gatewayError(err error) error {
if errors.Is(err, gateway.ErrGatewayTimeout) || errors.Is(err, gateway.ErrBadGateway) {
// already correct error
return err
}

// All timeouts should produce 504 Gateway Timeout
if errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, caboose.ErrSaturnTimeout) ||
// Unfortunately this is not an exported type so we have to check for the content.
strings.Contains(err.Error(), "Client.Timeout exceeded") {
return fmt.Errorf("%w: %s", gateway.ErrGatewayTimeout, err.Error())
}

// everything else returns 502 Bad Gateway
return fmt.Errorf("%w: %s", gateway.ErrBadGateway, err.Error())
}

var _ exchange.Interface = (*exchangeBsWrapper)(nil)
2 changes: 2 additions & 0 deletions blockstore_caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
blockstore "github.com/ipfs/go-ipfs-blockstore"
)

const DefaultSaturnLogger = "http://set-STRN_LOGGER_URL"

func newCabooseBlockStore(orchestrator, loggingEndpoint string, cdns *cachedDNS) (blockstore.Blockstore, error) {
var (
orchURL *url.URL
Expand Down
3 changes: 1 addition & 2 deletions blockstore_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"errors"

"github.com/ipfs/go-cid"
format "github.com/ipfs/go-ipld-format"
Expand Down Expand Up @@ -127,7 +126,7 @@ func (l *cacheBlockStore) PutMany(ctx context.Context, blks []blocks.Block) erro
}

func (l *cacheBlockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errors.New("not implemented")
return nil, errNotImplemented
}

func (l *cacheBlockStore) HashOnRead(enabled bool) {
Expand Down
154 changes: 154 additions & 0 deletions blockstore_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package main

import (
"context"
"fmt"
"io"
"log"
"math/rand"
"net/http"
"net/url"
"time"

"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-libipfs/blocks"
)

// Blockstore backed by a verifiable gateway. This is vendor-agnostic proxy interface,
// one can use Gateway provided by Kubo, or any other implementation that follows
// the spec for verifiable responses:
// https://docs.ipfs.tech/reference/http/gateway/#trustless-verifiable-retrieval
// https://github.com/ipfs/specs/blob/main/http-gateways/TRUSTLESS_GATEWAY.md

const (
DefaultProxyGateway = "http://127.0.0.1:8080"
DefaultKuboPRC = "http://127.0.0.1:5001"
)

type proxyBlockStore struct {
httpClient *http.Client
gatewayURL []string
validate bool
rand *rand.Rand
}

func newProxyBlockStore(gatewayURL []string, cdns *cachedDNS) blockstore.Blockstore {
s := rand.NewSource(time.Now().Unix())
rand := rand.New(s)

if len(gatewayURL) == 0 {
log.Fatal("Missing PROXY_GATEWAY_URL. See https://github.com/ipfs/bifrost-gateway/blob/main/docs/environment-variables.md")
}

return &proxyBlockStore{
gatewayURL: gatewayURL,
httpClient: &http.Client{
Timeout: GetBlockTimeout,
Transport: &withUserAgent{
// Roundtripper with increased defaults than http.Transport such that retrieving
// multiple blocks from a single gateway concurrently is fast.
RoundTripper: &http.Transport{
MaxIdleConns: 1000,
MaxConnsPerHost: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
DialContext: cdns.dialWithCachedDNS,
},
},
},
// Enables block validation by default. Important since we are
// proxying block requests to an untrusted gateway.
validate: true,
rand: rand,
}
}

func (ps *proxyBlockStore) fetch(ctx context.Context, c cid.Cid) (blocks.Block, error) {
u, err := url.Parse(fmt.Sprintf("%s/ipfs/%s?format=raw", ps.getRandomGatewayURL(), c))
if err != nil {
return nil, err
}
resp, err := ps.httpClient.Do(&http.Request{
Method: http.MethodGet,
URL: u,
Header: http.Header{
"Accept": []string{"application/vnd.ipld.raw"},
},
})
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("http error from block gateway: %s", resp.Status)
}

rb, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if ps.validate {
nc, err := c.Prefix().Sum(rb)
if err != nil {
return nil, blocks.ErrWrongHash
}
if !nc.Equals(c) {
return nil, blocks.ErrWrongHash
}
}

return blocks.NewBlockWithCid(rb, c)
}

func (ps *proxyBlockStore) Has(ctx context.Context, c cid.Cid) (bool, error) {
blk, err := ps.fetch(ctx, c)
if err != nil {
return false, err
}
return blk != nil, nil
}

func (ps *proxyBlockStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
blk, err := ps.fetch(ctx, c)
if err != nil {
return nil, err
}
return blk, nil
}

func (ps *proxyBlockStore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
blk, err := ps.fetch(ctx, c)
if err != nil {
return 0, err
}
return len(blk.RawData()), nil
}

func (ps *proxyBlockStore) HashOnRead(enabled bool) {
ps.validate = enabled
}

func (c *proxyBlockStore) Put(context.Context, blocks.Block) error {
return errNotImplemented
}

func (c *proxyBlockStore) PutMany(context.Context, []blocks.Block) error {
return errNotImplemented
}

func (c *proxyBlockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errNotImplemented
}

func (c *proxyBlockStore) DeleteBlock(context.Context, cid.Cid) error {
return errNotImplemented
}

func (ps *proxyBlockStore) getRandomGatewayURL() string {
return ps.gatewayURL[ps.rand.Intn(len(ps.gatewayURL))]
}

var _ blockstore.Blockstore = (*proxyBlockStore)(nil)
8 changes: 6 additions & 2 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- [`KUBO_RPC_URL`](#kubo_rpc_url)
- [`BLOCK_CACHE_SIZE`](#block_cache_size)
- [Proxy Backend](#proxy-backend)
- [`PROXY_GATEWAY_URL`](#proxy_gateway_url)
- [Saturn Backend](#saturn-backend)
- [`STRN_ORCHESTRATOR_URL`](#strn_orchestrator_url)
- [`STRN_LOGGER_URL`](#strn_logger_url)
Expand All @@ -32,8 +33,11 @@ The size of in-memory [2Q cache](https://pkg.go.dev/github.com/hashicorp/golang-

## Proxy Backend

TODO: this will be the default backend used when `STRN_ORCHESTRATOR_URL` is not set.
We will have env variable that allows customizing URL of HTTP Gateway that supports Block/CAR responses.
### `PROXY_GATEWAY_URL`

Single URL or a comma separated list of Gateway endpoints that support `?format=block|car`
responses. This is used by default with `http://127.0.0.1:8080` unless `STRN_ORCHESTRATOR_URL`
is set.

## Saturn Backend

Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/ipfs/bifrost-gateway
go 1.19

require (
github.com/filecoin-saturn/caboose v0.0.0-20230224143717-684691e6737d
github.com/filecoin-saturn/caboose v0.0.0-20230224175506-a1f20de5ba9e
github.com/gogo/protobuf v1.3.2
github.com/hashicorp/golang-lru/v2 v2.0.1
github.com/ipfs/go-blockservice v0.5.0
Expand All @@ -13,11 +13,11 @@ require (
github.com/ipfs/go-ipfs-exchange-interface v0.2.0
github.com/ipfs/go-ipld-format v0.4.0
github.com/ipfs/go-ipns v0.3.0
github.com/ipfs/go-libipfs v0.6.1-0.20230224134131-7ba1df55d53b
github.com/ipfs/go-libipfs v0.6.1-0.20230224152609-00e024995173
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipfs/go-merkledag v0.9.0
github.com/ipfs/go-namesys v0.7.0
github.com/ipfs/go-path v0.3.0
github.com/ipfs/go-path v0.3.1
github.com/ipfs/go-unixfs v0.3.1
github.com/ipfs/go-unixfsnode v1.5.1
github.com/ipfs/interface-go-ipfs-core v0.11.1
Expand Down Expand Up @@ -92,6 +92,7 @@ require (
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/opencontainers/runtime-spec v1.0.3-0.20211123151946-c2389c3cb60a // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
Expand Down
Loading