Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rpc): Add client wrapper #1195

Merged
merged 6 commits into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
415 changes: 415 additions & 0 deletions api/mocks/api.go

Large diffs are not rendered by default.

79 changes: 79 additions & 0 deletions api/rpc/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package client

import (
"context"

"github.com/filecoin-project/go-jsonrpc"

"github.com/celestiaorg/celestia-node/nodebuilder/das"
"github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/nodebuilder/header"
"github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/nodebuilder/state"
)

//go:generate go run github.com/golang/mock/mockgen -destination=../../mocks/api.go -package=mocks . API
type API interface {
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
fraud.Module
header.Module
state.Module
share.Module
das.Module
}

type Client struct {
Fraud fraud.API
Header header.API
State state.API
Share share.API
DAS das.API
}

// MultiClientCloser is a wrapper struct to close clients across multiple namespaces.
type MultiClientCloser struct {
closers map[string]jsonrpc.ClientCloser
}

// Register adds a new closer to the MultiClientCloser under the given namespace.
func (m *MultiClientCloser) Register(namespace string, closer jsonrpc.ClientCloser) {
if m.closers == nil {
m.closers = make(map[string]jsonrpc.ClientCloser)
}
m.closers[namespace] = closer
}

// CloseNamespace closes the client for the given namespace.
func (m *MultiClientCloser) CloseNamespace(namespace string) {
m.closers[namespace]()
}

// CloseAll closes all saved clients.
func (m *MultiClientCloser) CloseAll() {
for _, closer := range m.closers {
closer()
}
}

// NewClient creates a new Client with one connection per namespace.
func NewClient(ctx context.Context, addr string) (*Client, *MultiClientCloser, error) {
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
var client Client
var multiCloser MultiClientCloser

// TODO: this duplication of strings many times across the codebase can be avoided with issue #1176
var modules = map[string]interface{}{
"share": &client.Share,
"state": &client.State,
"header": &client.Header,
"fraud": &client.Fraud,
"das": &client.DAS,
}
for name, module := range modules {
closer, err := jsonrpc.NewClient(ctx, addr, name, module, nil)
if err != nil {
return nil, &multiCloser, err
}
multiCloser.Register(name, closer)
}

return &client, &multiCloser, nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/filecoin-project/go-jsonrpc v0.1.8
github.com/gammazero/workerpool v1.1.3
github.com/gogo/protobuf v1.3.3
github.com/golang/mock v1.6.0
github.com/gorilla/mux v1.8.0
github.com/hashicorp/go-retryablehttp v0.7.1-0.20211018174820-ff6d014e72d9
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
Expand Down
4 changes: 4 additions & 0 deletions nodebuilder/das/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
return das.Stop(ctx)
}),
)),
// Module is needed for the RPC handler
fx.Provide(func(das *das.DASer) Module {
return das
}),
)
case node.Bridge:
return fx.Options()
Expand Down
17 changes: 17 additions & 0 deletions nodebuilder/das/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package das

import (
"context"

"github.com/celestiaorg/celestia-node/das"
)

// API is a wrapper around Module for the RPC.
// TODO(@distractedm1nd): These structs need to be autogenerated.
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
type API struct {
SamplingStats func(ctx context.Context) (das.SamplingStats, error)
}

type Module interface {
SamplingStats(ctx context.Context) (das.SamplingStats, error)
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
}
17 changes: 14 additions & 3 deletions nodebuilder/fraud/service.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
package fraud

import "github.com/celestiaorg/celestia-node/fraud"
import (
"context"

// Module encompasses the behavior necessary to subscribe and broadcast
// fraud proofs within the network.
"github.com/celestiaorg/celestia-node/fraud"
)

// API is a wrapper around Module for the RPC.
// TODO(@distractedm1nd): These structs need to be autogenerated.
type API struct {
Subscribe func(fraud.ProofType) (fraud.Subscription, error)
Get func(context.Context, fraud.ProofType) ([]fraud.Proof, error)
}

// Module encompasses the behavior necessary to subscribe and broadcast fraud proofs within the network.
// Any method signature changed here needs to also be changed in the API struct.
type Module interface {
fraud.Service
}
14 changes: 14 additions & 0 deletions nodebuilder/header/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,20 @@ import (
"github.com/celestiaorg/celestia-node/header/sync"
)

// API is a wrapper around Module for the RPC.
// TODO(@distractedm1nd): These structs need to be autogenerated.
type API struct {
// GetByHeight returns the ExtendedHeader at the given height, blocking
// until header has been processed by the store or context deadline is exceeded.
GetByHeight func(context.Context, uint64) (*header.ExtendedHeader, error)
// Head returns the ExtendedHeader of the chain head.
Head func(context.Context) (*header.ExtendedHeader, error)
// IsSyncing returns the status of sync
IsSyncing func() bool
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
}
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved

// Module exposes the functionality needed for querying headers from the network.
// Any method signature changed here needs to also be changed in the API struct.
type Module interface {
// GetByHeight returns the ExtendedHeader at the given height, blocking
// until header has been processed by the store or context deadline is exceeded.
Expand Down
4 changes: 3 additions & 1 deletion nodebuilder/rpc/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/api/rpc"
fraudServ "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
headerServ "github.com/celestiaorg/celestia-node/nodebuilder/header"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
shareServ "github.com/celestiaorg/celestia-node/nodebuilder/share"
Expand Down Expand Up @@ -44,10 +45,11 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
fx.Invoke(func(
state stateServ.Module,
share shareServ.Module,
fraud fraudServ.Module,
header headerServ.Module,
rpcSrv *rpc.Server,
) {
RegisterEndpoints(state, share, header, rpcSrv, nil)
RegisterEndpoints(state, share, fraud, header, nil, rpcSrv)
}),
)
default:
Expand Down
11 changes: 8 additions & 3 deletions nodebuilder/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package rpc

import (
"github.com/celestiaorg/celestia-node/api/rpc"
"github.com/celestiaorg/celestia-node/das"
"github.com/celestiaorg/celestia-node/nodebuilder/das"
"github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/nodebuilder/header"
"github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/nodebuilder/state"
Expand All @@ -12,14 +13,18 @@ import (
func RegisterEndpoints(
state state.Module,
share share.Module,
fraud fraud.Module,
header header.Module,
daser das.Module,
serv *rpc.Server,
daser *das.DASer,
) {
serv.RegisterService("state", state)
serv.RegisterService("share", share)
serv.RegisterService("fraud", fraud)
serv.RegisterService("header", header)
serv.RegisterService("daser", daser)
if daser != nil {
serv.RegisterService("das", daser)
}
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
}

func Server(cfg *Config) *rpc.Server {
Expand Down
121 changes: 121 additions & 0 deletions nodebuilder/rpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package nodebuilder
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"encoding/json"
"reflect"
"testing"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/api/mocks"
"github.com/celestiaorg/celestia-node/api/rpc"
"github.com/celestiaorg/celestia-node/api/rpc/client"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/state"
)

func TestRPCCallsUnderlyingNode(t *testing.T) {
nd, server := setupNodeWithModifiedRPC(t)
url := nd.RPCServer.ListenAddr()
client, closer, err := client.NewClient(context.Background(), "http://"+url)
require.NoError(t, err)
defer closer.CloseAll()
ctx := context.Background()

expectedBalance := &state.Balance{
Amount: sdk.NewInt(100),
Denom: "utia",
}

server.EXPECT().Balance(gomock.Any()).Return(expectedBalance, nil).Times(1)
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved

balance, err := client.State.Balance(ctx)
require.NoError(t, err)
require.Equal(t, expectedBalance, balance)
}

func TestModulesImplementFullAPI(t *testing.T) {
api := reflect.TypeOf(new(client.API)).Elem()
client := reflect.TypeOf(new(client.Client)).Elem()
for i := 0; i < client.NumField(); i++ {
module := client.Field(i)
for j := 0; j < module.Type.NumField(); j++ {
impl := module.Type.Field(j)
method, _ := api.MethodByName(impl.Name)
require.Equal(t, method.Type, impl.Type)
}
}
}

// TODO(@distractedm1nd): Blocked by issues #1208 and #1207
func TestAllReturnValuesAreMarshalable(t *testing.T) {
t.Skip()
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
ra := reflect.TypeOf(new(client.API)).Elem()
for i := 0; i < ra.NumMethod(); i++ {
m := ra.Method(i)
for j := 0; j < m.Type.NumOut(); j++ {
implementsMarshaler(t, m.Type.Out(j))
}
}
}

func implementsMarshaler(t *testing.T, typ reflect.Type) { //nolint:unused
switch typ.Kind() {
case reflect.Struct:
for i := 0; i < typ.NumField(); i++ {
implementsMarshaler(t, typ.Field(i).Type)
}
return
case reflect.Map:
implementsMarshaler(t, typ.Elem())
implementsMarshaler(t, typ.Key())
case reflect.Ptr:
fallthrough
case reflect.Array:
fallthrough
case reflect.Slice:
fallthrough
case reflect.Chan:
implementsMarshaler(t, typ.Elem())
case reflect.Interface:
if typ != reflect.TypeOf(new(interface{})).Elem() && typ != reflect.TypeOf(new(error)).Elem() {
require.True(
t,
typ.Implements(reflect.TypeOf(new(json.Marshaler)).Elem()),
"type %s does not implement json.Marshaler", typ.String(),
)
}
default:
return
}

}

func setupNodeWithModifiedRPC(t *testing.T) (*Node, *mocks.MockAPI) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

ctrl := gomock.NewController(t)
server := mocks.NewMockAPI(ctrl)

overrideRPCHandler := fx.Invoke(func(srv *rpc.Server) {
srv.RegisterService("state", server)
srv.RegisterService("share", server)
srv.RegisterService("fraud", server)
srv.RegisterService("header", server)
srv.RegisterService("das", server)
})
nd := TestNode(t, node.Full, overrideRPCHandler)
// start node
err := nd.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() {
err = nd.Stop(ctx)
require.NoError(t, err)
})
return nd, server
}
12 changes: 12 additions & 0 deletions nodebuilder/share/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ import (
"github.com/celestiaorg/nmt/namespace"
)

// API is a wrapper around Module for the RPC.
// TODO(@distractedm1nd): These structs need to be autogenerated.
type API struct {
SharesAvailable func(context.Context, *share.Root) error
ProbabilityOfAvailability func() float64
GetShare func(ctx context.Context, dah *share.Root, row, col int) (share.Share, error)
GetShares func(ctx context.Context, root *share.Root) ([][]share.Share, error)
GetSharesByNamespace func(ctx context.Context, root *share.Root, namespace namespace.ID) ([]share.Share, error)
}

// Module provides access to any data square or block share on the network.
//
// All Get methods provided on Module follow the following flow:
Expand All @@ -24,6 +34,8 @@ import (
// * Fetch the Share from the provider
// * Store the Share
// * Return
//
// Any method signature changed here needs to also be changed in the API struct.
type Module interface {
share.Availability
GetShare(ctx context.Context, dah *share.Root, row, col int) (share.Share, error)
Expand Down
Loading