Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rpc): fraud.Proof (un)marshalling and subscription as chan #1307

Merged
merged 8 commits into from
Nov 10, 2022
3 changes: 2 additions & 1 deletion api/gateway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ func (s *Server) Stop(ctx context.Context) error {
return nil
}

// RegisterMiddleware allows to register a custom middleware that will be called before http.Request will reach handler.
// RegisterMiddleware allows to register a custom middleware that will be called before
// http.Request will reach handler.
func (s *Server) RegisterMiddleware(middlewareFuncs ...mux.MiddlewareFunc) {
for _, m := range middlewareFuncs {
// `router.Use` appends new middleware to existing
Expand Down
4 changes: 2 additions & 2 deletions api/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ func setupNodeWithModifiedRPC(t *testing.T) (*nodebuilder.Node, *mockAPI) {
dasMock.NewMockModule(ctrl),
}

// given the behavior of fx.Invoke, this invoke will be called last as it is added at the root level module. For
// further information, check the documentation on fx.Invoke.
// given the behavior of fx.Invoke, this invoke will be called last as it is added at the root
// level module. For further information, check the documentation on fx.Invoke.
invokeRPC := fx.Invoke(func(srv *rpc.Server) {
srv.RegisterService("state", mockAPI.State)
srv.RegisterService("share", mockAPI.Share)
Expand Down
4 changes: 2 additions & 2 deletions nodebuilder/das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"github.com/ipfs/go-datastore"

"github.com/celestiaorg/celestia-node/das"
"github.com/celestiaorg/celestia-node/fraud"
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/share"
)

Expand All @@ -31,7 +31,7 @@ func NewDASer(
hsub header.Subscriber,
store header.Store,
batching datastore.Batching,
fraudService fraud.Module,
fraudService fraud.Service,
options ...das.Option,
) (*das.DASer, error) {
return das.NewDASer(da, hsub, store, batching, fraudService, options...)
Expand Down
2 changes: 1 addition & 1 deletion nodebuilder/das/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
baseComponents,
fx.Provide(fx.Annotate(
NewDASer,
fx.OnStart(func(startCtx, ctx context.Context, fservice fraudServ.Module, das *das.DASer) error {
fx.OnStart(func(startCtx, ctx context.Context, fservice fraud.Service, das *das.DASer) error {
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
return fraudServ.Lifecycle(startCtx, ctx, fraud.BadEncoding, fservice,
das.Start, das.Stop)
}),
Expand Down
5 changes: 1 addition & 4 deletions nodebuilder/das/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@ import (
"github.com/celestiaorg/celestia-node/das"
)

//go:generate mockgen -destination=mocks/api.go -package=mocks . Module
type Module interface {
// SamplingStats returns the current statistics over the DA sampling process.
SamplingStats(ctx context.Context) (das.SamplingStats, error)
}

// API is a wrapper around Module for the RPC.
// TODO(@distractedm1nd): These structs need to be autogenerated.
//
// Module
//
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/api.go -package=mocks .
type API struct {
SamplingStats func(ctx context.Context) (das.SamplingStats, error)
}
39 changes: 39 additions & 0 deletions nodebuilder/fraud/constructors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package fraud

import (
"github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p-core/host"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/fraud"
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
)

func newFraudService(syncerEnabled bool) func(
fx.Lifecycle,
*pubsub.PubSub,
host.Host,
header.Store,
datastore.Batching,
p2p.Network,
) (Module, fraud.Service, error) {
return func(
lc fx.Lifecycle,
sub *pubsub.PubSub,
host host.Host,
hstore header.Store,
ds datastore.Batching,
network p2p.Network,
) (Module, fraud.Service, error) {
pservice := fraud.NewProofService(sub, host, hstore.GetByHeight, ds, syncerEnabled, string(network))
lc.Append(fx.Hook{
OnStart: pservice.Start,
OnStop: pservice.Stop,
})
return &Service{
Service: pservice,
}, pservice, nil
}
}
88 changes: 14 additions & 74 deletions nodebuilder/fraud/fraud.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,83 +3,23 @@ package fraud
import (
"context"

"github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p-core/host"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/fraud"
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
)

// NewModule constructs a fraud proof service with the syncer disabled.
func NewModule(
lc fx.Lifecycle,
sub *pubsub.PubSub,
host host.Host,
hstore header.Store,
ds datastore.Batching,
network p2p.Network,
) (Module, error) {
return newFraudService(lc, sub, host, hstore, ds, false, string(network))
}

// ModuleWithSyncer constructs fraud proof service with enabled syncer.
func ModuleWithSyncer(
lc fx.Lifecycle,
sub *pubsub.PubSub,
host host.Host,
hstore header.Store,
ds datastore.Batching,
network p2p.Network,
) (Module, error) {
return newFraudService(lc, sub, host, hstore, ds, true, string(network))
}

func newFraudService(
lc fx.Lifecycle,
sub *pubsub.PubSub,
host host.Host,
hstore header.Store,
ds datastore.Batching,
isEnabled bool,
protocolSuffix string,
) (Module, error) {
pservice := fraud.NewProofService(sub, host, hstore.GetByHeight, ds, isEnabled, protocolSuffix)
lc.Append(fx.Hook{
OnStart: pservice.Start,
OnStop: pservice.Stop,
})
return pservice, nil
// Module encompasses the behavior necessary to subscribe and broadcast fraud proofs within the
// network. Any method signature changed here needs to also be changed in the API struct.
//
//go:generate mockgen -destination=mocks/api.go -package=mocks . Module
type Module interface {
// Get fetches fraud proofs from the disk by its type.
Get(context.Context, fraud.ProofType) ([]Proof, error)
// Subscribe allows to subscribe on a Proof pub sub topic by its type.
Subscribe(context.Context, fraud.ProofType) (<-chan Proof, error)
}

// Lifecycle controls the lifecycle of service depending on fraud proofs.
// It starts the service only if no fraud-proof exists and stops the service automatically
// if a proof arrives after the service was started.
func Lifecycle(
startCtx, lifecycleCtx context.Context,
p fraud.ProofType,
fraudModule Module,
start, stop func(context.Context) error,
) error {
proofs, err := fraudModule.Get(startCtx, p)
switch err {
default:
return err
case nil:
return &fraud.ErrFraudExists{Proof: proofs}
case datastore.ErrNotFound:
}
err = start(startCtx)
if err != nil {
return err
}
// handle incoming Fraud Proofs
go fraud.OnProof(lifecycleCtx, fraudModule, p, func(fraud.Proof) {
if err := stop(lifecycleCtx); err != nil {
log.Error(err)
}
})
return nil
// API is a wrapper around Module for the RPC.
// TODO(@distractedm1nd): These structs need to be autogenerated.
type API struct {
Subscribe func(context.Context, fraud.ProofType) (<-chan Proof, error)
Get func(context.Context, fraud.ProofType) ([]Proof, error)
}
42 changes: 42 additions & 0 deletions nodebuilder/fraud/lifecycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package fraud

import (
"context"
"time"

"github.com/ipfs/go-datastore"

"github.com/celestiaorg/celestia-node/fraud"
)

// Lifecycle controls the lifecycle of service depending on fraud proofs.
// It starts the service only if no fraud-proof exists and stops the service automatically
// if a proof arrives after the service was started.
func Lifecycle(
startCtx, lifecycleCtx context.Context,
p fraud.ProofType,
fraudServ fraud.Service,
start, stop func(context.Context) error,
) error {
proofs, err := fraudServ.Get(startCtx, p)
switch err {
default:
return err
case nil:
return &fraud.ErrFraudExists{Proof: proofs}
case datastore.ErrNotFound:
}
err = start(startCtx)
if err != nil {
return err
}
// handle incoming Fraud Proofs
go fraud.OnProof(lifecycleCtx, fraudServ, p, func(fraud.Proof) {
ctx, cancel := context.WithTimeout(lifecycleCtx, time.Minute)
defer cancel()
if err := stop(ctx); err != nil {
log.Error(err)
}
})
return nil
}
29 changes: 8 additions & 21 deletions nodebuilder/fraud/mocks/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions nodebuilder/fraud/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ import (
var log = logging.Logger("module/fraud")

func ConstructModule(tp node.Type) fx.Option {
baseComponent := fx.Provide(func(module Module) fraud.Getter {
return module
baseComponent := fx.Provide(func(serv fraud.Service) fraud.Getter {
renaynay marked this conversation as resolved.
Show resolved Hide resolved
return serv
})
switch tp {
case node.Light:
return fx.Module(
"fraud",
baseComponent,
fx.Provide(ModuleWithSyncer),
fx.Provide(newFraudService(true)),
)
case node.Full, node.Bridge:
return fx.Module(
"fraud",
baseComponent,
fx.Provide(NewModule),
fx.Provide(newFraudService(false)),
)
default:
panic("invalid node type")
Expand Down
Loading