Skip to content

Commit

Permalink
impl chain-ro
Browse files Browse the repository at this point in the history
  • Loading branch information
dtynn committed Jun 3, 2021
1 parent 7c40067 commit 7c43183
Show file tree
Hide file tree
Showing 16 changed files with 1,009 additions and 126 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/build-dep/.*
/build
/tmp-clone
/bin
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,12 @@ dist-clean:

gen-proxy:
go run scripts/proxy-gen.go

build-ro: $(BUILD_DEPS)
mkdir -p ./bin
rm -f ./bin/chain-ro
go build $(GOFLAGS) -o ./bin/chain-ro ./chain-ro/cmd
go run github.com/GeertJohan/go.rice/rice append --exec ./bin/chain-ro -i ./build

.PHONY: lotus
BINS+=lotus
8 changes: 7 additions & 1 deletion api/proxy.go → api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ import (
"github.com/filecoin-project/lotus/node/modules/dtypes"
)

var _ Proxy = api.FullNode(nil)
var _ api.FullNode = combined(nil)

type combined interface {
Proxy
Local
UnSupport
}

// Proxy is a subset of api.FullNode.
// Requests involved will be proxied to the choosen remote node
Expand Down
64 changes: 64 additions & 0 deletions chain-ro/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main

import (
"context"
"os"

logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
"go.opencensus.io/trace"

"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/lib/tracing"
)

const cliName = "chain-ro"

var log = logging.Logger(cliName)

func main() {
lotuslog.SetupLogLevels()

local := []*cli.Command{}

jaeger := tracing.SetupJaegerTracing(cliName)
defer func() {
if jaeger != nil {
jaeger.Flush()
}
}()

for _, cmd := range local {
cmd := cmd
originBefore := cmd.Before
cmd.Before = func(cctx *cli.Context) error {
trace.UnregisterExporter(jaeger)
jaeger = tracing.SetupJaegerTracing(cliName + "/" + cmd.Name)

if originBefore != nil {
return originBefore(cctx)
}
return nil
}
}

ctx, span := trace.StartSpan(context.Background(), "/cli")
defer span.End()

app := &cli.App{
Name: cliName,
Usage: "read-only chain node for filecoin",
EnableBashCompletion: true,
Flags: []cli.Flag{},

Commands: local,
}

app.Setup()
app.Metadata["traceContext"] = ctx

if err := app.Run(os.Args); err != nil {
log.Errorf("CLI error: %s", err)
os.Exit(1)
}
}
65 changes: 65 additions & 0 deletions chain-ro/cmd/rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package main

import (
"context"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"

"github.com/dtynn/dix"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/lotus/api"
)

func serveRPC(ctx context.Context, listen string, full api.FullNode, stop dix.StopFunc, maxRequestSize int64) error {
rpcOpts := []jsonrpc.ServerOption{}
if maxRequestSize > 0 {
rpcOpts = append(rpcOpts, jsonrpc.WithMaxRequestSize(maxRequestSize))
}

rpcServer := jsonrpc.NewServer(rpcOpts...)
rpcServer.Register("Filecoin", full)

http.Handle("/rpc/v0", rpcServer)

server := http.Server{
Addr: listen,
Handler: http.DefaultServeMux,
BaseContext: func(net.Listener) context.Context {
return ctx
},
}

sigCh := make(chan os.Signal, 2)

go func() {
select {
case <-ctx.Done():

case sig := <-sigCh:
log.Infof("signal %s captured", sig)
}

if err := server.Shutdown(context.Background()); err != nil {
log.Warnf("shutdown http server: %s", err)
}

if err := stop(context.Background()); err != nil {
log.Warnf("call app stop func: %s", err)
}

log.Sync()
}()

signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT)

if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
return err
}

log.Info("gracefull down")
return nil
}
65 changes: 65 additions & 0 deletions chain-ro/cmd/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package main

import (
"context"

"github.com/filecoin-project/lotus/api"
"github.com/urfave/cli/v2"

"github.com/dtynn/chain-co/chain-ro/service"
"github.com/dtynn/chain-co/dep"
)

var runCmd = &cli.Command{
Name: "run",
Usage: "start the chain-ro daemon",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "listen",
Usage: "listen address for the service",
Value: ":1234",
},
&cli.Int64Flag{
Name: "max-req-size",
Usage: "max request size",
Value: 10 << 20,
},
&cli.StringSliceFlag{
Name: "node",
Usage: "node info",
},
},
Action: func(cctx *cli.Context) error {
appCtx, appCancel := context.WithCancel(cctx.Context)
defer appCancel()

var full api.FullNode

stop, err := service.Build(
appCtx,

dep.MetricsCtxOption(appCtx, cliName),

service.ParseNodeInfoList(cctx.StringSlice("node")),
service.FullNode(&full),
)

if err != nil {
return nil
}

defer stop(context.Background())

return serveRPC(
appCtx,
cctx.String("listen"),
full,
func(ctx context.Context) error {
appCancel()
stop(ctx)
return nil
},
cctx.Int64("max-req-size"),
)
},
}
166 changes: 166 additions & 0 deletions chain-ro/service/dep.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package service

import (
"context"
"fmt"
"time"

"github.com/dtynn/dix"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"go.uber.org/fx"

"github.com/dtynn/chain-co/co"
"github.com/dtynn/chain-co/proxy"
)

const ExtractFullNodeAPIKey dix.Invoke = 1

func Build(ctx context.Context, overrides ...dix.Option) (dix.StopFunc, error) {
opts := []dix.Option{
dix.Override(new(co.NodeOption), co.DefaultNodeOption),
dix.Override(new(*co.Ctx), co.NewCtx),
dix.Override(new(*co.Connector), co.NewConnector),
dix.Override(new(*co.Coordinator), buildCoordinator),
dix.Override(new(*co.Selector), co.NewSelector),
dix.Override(new(*proxy.Proxy), buildProxyAPI),
dix.Override(new(*proxy.Local), buildLocalAPI),
dix.Override(new(*proxy.UnSupport), buildUnSupportAPI),
}
opts = append(opts, overrides...)
return dix.New(ctx, opts...)
}

func FullNode(full *api.FullNode) dix.Option {
return dix.Override(ExtractFullNodeAPIKey, func(srv Service) error {
*full = &srv
return nil
})
}

func ParseNodeInfoList(raws []string) dix.Option {
return dix.Override(new(co.NodeInfoList), func() (co.NodeInfoList, error) {
list := make(co.NodeInfoList, 0, len(raws))
for _, str := range raws {
info := co.ParseNodeInfo(str)
if _, err := info.DialArgs(); err != nil {
return nil, fmt.Errorf("invalid node info: %s", str)
}

list = append(list, info)
}

return list, nil
})
}

func buildCoordinator(lc fx.Lifecycle, ctx *co.Ctx, connector *co.Connector, infos co.NodeInfoList, sel *co.Selector) (*co.Coordinator, error) {
nodes := make([]*co.Node, 0, len(infos))
allDone := false
defer func() {
if !allDone {
for i := range nodes {
nodes[i].Stop()
}
}
}()

var head *types.TipSet
weight := types.NewInt(0)

for i := range infos {
info := infos[i]
nlog := log.With("host", info.Host)

node, err := connector.Connect(info)
if err != nil {
nlog.Errorf("connect failed: %s", err)
continue
}

full := node.FullNode()
h, w, err := getHeadCandidate(full)
if err != nil {
node.Stop()
nlog.Errorf("failed to get head: %s", err)
continue
}

if head == nil || w.GreaterThan(weight) {
head = h
weight = w
}

nodes = append(nodes, node)
}

if len(nodes) == 0 {
return nil, fmt.Errorf("no available node")
}

coordinator, err := co.NewCoordinator(ctx, head, weight, sel)
if err != nil {
return nil, err
}

lc.Append(fx.Hook{
OnStart: func(context.Context) error {
go coordinator.Start()
sel.ReplaceNodes(nodes, nil, false)
return nil
},
OnStop: func(context.Context) error {
coordinator.Stop()
return nil
},
})

allDone = true
return coordinator, nil
}

func getHeadCandidate(full api.FullNode) (*types.TipSet, types.BigInt, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

head, err := full.ChainHead(ctx)
if err != nil {
return nil, types.BigInt{}, err
}

weight, err := full.ChainTipSetWeight(ctx, head.Key())
if err != nil {
return nil, types.BigInt{}, err
}

return head, weight, nil
}

func buildProxyAPI(sel *co.Selector) *proxy.Proxy {
return &proxy.Proxy{
Select: func() (proxy.ProxyAPI, error) {
node, err := sel.Select()
if err != nil {
return nil, err
}

return node.FullNode(), nil
},
}
}

func buildLocalAPI(lsrv LocalChainService) *proxy.Local {
return &proxy.Local{
Select: func() (proxy.LocalAPI, error) {
return &lsrv, nil
},
}
}

func buildUnSupportAPI() *proxy.UnSupport {
return &proxy.UnSupport{
Select: func() (proxy.UnSupportAPI, error) {
return nil, fmt.Errorf("api not supported")
},
}
}
Loading

0 comments on commit 7c43183

Please sign in to comment.