Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
feat!: add proxy blockstore by default for testing against non-saturn
Browse files Browse the repository at this point in the history
backend
  • Loading branch information
hacdias committed Feb 22, 2023
1 parent c0c8fa3 commit 7b86715
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 22 deletions.
13 changes: 6 additions & 7 deletions blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"time"

"github.com/ipfs/go-cid"
Expand All @@ -11,14 +12,12 @@ import (
"go.uber.org/zap/zapcore"
)

var errNotImplemented = errors.New("not implemented")

const GetBlockTimeout = time.Second * 60

func newExchange(orchestrator, loggingEndpoint string, cdns *cachedDNS) (exchange.Interface, error) {
b, err := newCabooseBlockStore(orchestrator, loggingEndpoint, cdns)
if err != nil {
return nil, err
}
return &exchangeBsWrapper{bstore: b}, nil
func newExchange(bs blockstore.Blockstore) (exchange.Interface, error) {
return &exchangeBsWrapper{bstore: bs}, nil
}

type exchangeBsWrapper struct {
Expand All @@ -30,7 +29,7 @@ func (e *exchangeBsWrapper) GetBlock(ctx context.Context, c cid.Cid) (blocks.Blo
defer cancel()

if goLog.Level().Enabled(zapcore.DebugLevel) {
goLog.Debugw("block requested from strn", "cid", c.String())
goLog.Debugw("block requested from remote blockstore", "cid", c.String())
}

return e.bstore.Get(ctx, c)
Expand Down
3 changes: 1 addition & 2 deletions blockstore_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"errors"

"github.com/ipfs/go-cid"
format "github.com/ipfs/go-ipld-format"
Expand Down Expand Up @@ -127,7 +126,7 @@ func (l *cacheBlockStore) PutMany(ctx context.Context, blks []blocks.Block) erro
}

func (l *cacheBlockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errors.New("not implemented")
return nil, errNotImplemented
}

func (l *cacheBlockStore) HashOnRead(enabled bool) {
Expand Down
138 changes: 138 additions & 0 deletions blockstore_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package main

import (
"context"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"time"

"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-libipfs/blocks"
)

type proxyBlockStore struct {
httpClient *http.Client
gatewayURL []string
validate bool
rand *rand.Rand
}

func newProxyBlockStore(gatewayURL []string, cdns *cachedDNS) blockstore.Blockstore {
s := rand.NewSource(time.Now().Unix())
rand := rand.New(s)

return &proxyBlockStore{
gatewayURL: gatewayURL,
httpClient: &http.Client{
Timeout: GetBlockTimeout,
Transport: &withUserAgent{
// Roundtripper with increased defaults than http.Transport such that retrieving
// multiple blocks from a single gateway concurrently is fast.
RoundTripper: &http.Transport{
MaxIdleConns: 1000,
MaxConnsPerHost: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
DialContext: cdns.dialWithCachedDNS,
},
},
},
// Enables block validation by default. Important since we are
// proxying block requests to an untrusted gateway.
validate: true,
rand: rand,
}
}

func (ps *proxyBlockStore) fetch(ctx context.Context, c cid.Cid) (blocks.Block, error) {
u, err := url.Parse(fmt.Sprintf("%s/ipfs/%s?format=raw", ps.getRandomGatewayURL(), c))
if err != nil {
return nil, err
}
resp, err := ps.httpClient.Do(&http.Request{
Method: http.MethodGet,
URL: u,
Header: http.Header{
"Accept": []string{"application/vnd.ipld.raw"},
},
})
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("http error from block gateway: %s", resp.Status)
}

rb, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if ps.validate {
nc, err := c.Prefix().Sum(rb)
if err != nil {
return nil, blocks.ErrWrongHash
}
if !nc.Equals(c) {
return nil, blocks.ErrWrongHash
}
}

return blocks.NewBlockWithCid(rb, c)
}

func (ps *proxyBlockStore) Has(ctx context.Context, c cid.Cid) (bool, error) {
blk, err := ps.fetch(ctx, c)
if err != nil {
return false, err
}
return blk != nil, nil
}

func (ps *proxyBlockStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
blk, err := ps.fetch(ctx, c)
if err != nil {
return nil, err
}
return blk, nil
}

func (ps *proxyBlockStore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
blk, err := ps.fetch(ctx, c)
if err != nil {
return 0, err
}
return len(blk.RawData()), nil
}

func (ps *proxyBlockStore) HashOnRead(enabled bool) {
ps.validate = enabled
}

func (c *proxyBlockStore) Put(context.Context, blocks.Block) error {
return errNotImplemented
}

func (c *proxyBlockStore) PutMany(context.Context, []blocks.Block) error {
return errNotImplemented
}

func (c *proxyBlockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errNotImplemented
}

func (c *proxyBlockStore) DeleteBlock(context.Context, cid.Cid) error {
return errNotImplemented
}

func (ps *proxyBlockStore) getRandomGatewayURL() string {
return ps.gatewayURL[ps.rand.Intn(len(ps.gatewayURL))]
}

var _ blockstore.Blockstore = (*proxyBlockStore)(nil)
6 changes: 3 additions & 3 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ func withRequestLogger(next http.Handler) http.Handler {
})
}

func makeGatewayHandler(saturnOrchestrator, saturnLogger string, kuboRPC []string, port int, blockCacheSize int, cdns *cachedDNS) (*http.Server, error) {
// Sets up an exchange based on using Saturn as block storage
exch, err := newExchange(saturnOrchestrator, saturnLogger, cdns)
func makeGatewayHandler(bs bstore.Blockstore, kuboRPC []string, port int, blockCacheSize int) (*http.Server, error) {
// Sets up an exchange based on the given Block Store
exch, err := newExchange(bs)
if err != nil {
return nil, err
}
Expand Down
41 changes: 35 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strings"
"sync"

blockstore "github.com/ipfs/go-ipfs-blockstore"
golog "github.com/ipfs/go-log/v2"
"github.com/spf13/cobra"
)
Expand All @@ -26,13 +27,15 @@ func main() {
}

const (
DefaultSaturnLogger = "https://logs.strn.network"
DefaultSaturnOrchestrator = "https://orchestrator.strn.pl/nodes/nearby"
DefaultSaturnLogger = "https://logs.strn.network"
DefaultProxyGateway = "http://127.0.0.1:8080"
DefaultKuboPRC = "http://127.0.0.1:8080"

EnvSaturnLogger = "STRN_LOGGER_URL"
EnvSaturnOrchestrator = "STRN_ORCHESTRATOR_URL"
EnvBlockCacheSize = "BLOCK_CACHE_SIZE"
EnvProxyGateway = "PROXY_GATEWAY_URL"
EnvKuboRPC = "KUBO_RPC_URL"
EnvBlockCacheSize = "BLOCK_CACHE_SIZE"
)

func init() {
Expand All @@ -57,9 +60,11 @@ See documentation at: https://github.com/ipfs/bifrost-gateway/#readme`,
metricsPort, _ := cmd.Flags().GetInt("metrics-port")

// Get env variables.
saturnOrchestrator := getEnv(EnvSaturnOrchestrator, DefaultSaturnOrchestrator)
saturnOrchestrator := getEnv(EnvSaturnOrchestrator, "")
saturnLogger := getEnv(EnvSaturnLogger, DefaultSaturnLogger)
kuboRPC := strings.Split(os.Getenv(EnvKuboRPC), ",")
proxyGateway := getEnvs(EnvProxyGateway, DefaultProxyGateway)
kuboRPC := getEnvs(EnvKuboRPC, DefaultKuboPRC)

blockCacheSize, err := getEnvInt(EnvBlockCacheSize, DefaultCacheBlockStoreSize)
if err != nil {
return err
Expand All @@ -70,7 +75,22 @@ See documentation at: https://github.com/ipfs/bifrost-gateway/#readme`,
cdns := newCachedDNS(dnsCacheRefreshInterval)
defer cdns.Close()

gatewaySrv, err := makeGatewayHandler(saturnOrchestrator, saturnLogger, kuboRPC, gatewayPort, blockCacheSize, cdns)
var bs blockstore.Blockstore

if saturnOrchestrator != "" {
log.Printf("Proxying gateway block requests to Saturn at %s", saturnOrchestrator)
bs, err = newCabooseBlockStore(saturnOrchestrator, saturnLogger, cdns)
if err != nil {
return err
}
} else if len(proxyGateway) != 0 {
log.Printf("Proxying gateway block requests to %s", strings.Join(proxyGateway, " "))
bs = newProxyBlockStore(proxyGateway, cdns)
} else {
return errors.New("neither saturn orchestrator or proxy gateway are configured")
}

gatewaySrv, err := makeGatewayHandler(bs, kuboRPC, gatewayPort, blockCacheSize)
if err != nil {
return err
}
Expand Down Expand Up @@ -128,6 +148,15 @@ func getEnv(key, defaultValue string) string {
return value
}

func getEnvs(key, defaultValue string) []string {
value := os.Getenv(key)
if value == "" {
value = defaultValue
}
value = strings.TrimSpace(value)
return strings.Split(value, ",")
}

func getEnvInt(key string, defaultValue int) (int, error) {
value := os.Getenv(key)
if value == "" {
Expand Down
9 changes: 5 additions & 4 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,7 @@ func (ps *proxyRouting) fetch(ctx context.Context, key string) (rb []byte, err e

key = "/ipns/" + peer.ToCid(id).String()

// Naively choose one of the Kubo RPC clients.
endpoint := ps.kuboRPC[rand.Intn(len(ps.kuboRPC))]

u, err := url.Parse(fmt.Sprintf("%s/api/v0/dht/get?arg=%s", endpoint, key))
u, err := url.Parse(fmt.Sprintf("%s/api/v0/dht/get?arg=%s", ps.getRandomKuboURL(), key))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -163,3 +160,7 @@ func (ps *proxyRouting) fetch(ctx context.Context, key string) (rb []byte, err e

return rb, nil
}

func (ps *proxyRouting) getRandomKuboURL() string {
return ps.kuboRPC[ps.rand.Intn(len(ps.kuboRPC))]
}

0 comments on commit 7b86715

Please sign in to comment.