Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARC-1-new-consumer-design #228

Merged
merged 36 commits into from
Jan 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
b9557b2
added chainTracker class
omerlavanet Dec 29, 2022
284fc51
api change to add specific block
omerlavanet Dec 29, 2022
df6a96c
finished implementing chainTracker
omerlavanet Dec 29, 2022
7d06284
better performance, fixed some bugs, added protections for polling to…
omerlavanet Jan 2, 2023
5386fcc
fixed all issues, added unitests for fork callbacks
omerlavanet Jan 2, 2023
004d3f3
added latest block as arg to callbacks
omerlavanet Jan 2, 2023
d19730d
fixed fork identification bug
omerlavanet Jan 2, 2023
08f87e0
allows cancelling the ticker in chainTracker
omerlavanet Jan 2, 2023
e852d89
lint spaces
omerlavanet Jan 2, 2023
391fd32
lint space
omerlavanet Jan 2, 2023
ff8aab4
lint
omerlavanet Jan 2, 2023
f3056cf
better fetch performance
omerlavanet Jan 2, 2023
a20329a
improved readability of the fetcher code
omerlavanet Jan 2, 2023
9a46f82
added a unit test verifying stateTracker doesn't need to read previou…
omerlavanet Jan 2, 2023
18ea195
modified parameters to make the unit-test more interesting
omerlavanet Jan 2, 2023
38f76c2
lint
omerlavanet Jan 3, 2023
e477a64
Merge branch 'PRT-227-refactor-chain-sentry-class' into ARC-1-new-con…
omerlavanet Jan 8, 2023
81cebd1
WIP defining the new classes hierarchy
omerlavanet Jan 8, 2023
5600279
added apiListener, modified apiParser to apiLib to combine all api sp…
omerlavanet Jan 8, 2023
41d8aa0
add arguments to the listener
omerlavanet Jan 9, 2023
ae333fb
continue design, need to finish setting up pairing parameters and con…
omerlavanet Jan 11, 2023
46ba223
Merge branch 'main' into ARC-1-new-consumer-design
omerlavanet Jan 11, 2023
f1bbeab
merged
omerlavanet Jan 11, 2023
ba1365e
move apilib outside of consumerServer
omerlavanet Jan 11, 2023
9d3f32d
added pairing updater
omerlavanet Jan 11, 2023
a493bd6
added finalizationConsensus, continued sendRelay code, added request …
omerlavanet Jan 16, 2023
f84ffeb
continued send relay, solved some bugs
omerlavanet Jan 16, 2023
91f70e2
rename stuff according to terminology meeting
omerlavanet Jan 16, 2023
143201d
added data reliability sending code
omerlavanet Jan 17, 2023
31eb27a
finished RPCConsumerServer added data reliability
omerlavanet Jan 17, 2023
2f8e5d3
Merge branch 'main' into ARC-1-new-consumer-design
omerlavanet Jan 17, 2023
68b62dd
lint
omerlavanet Jan 17, 2023
d161bcd
fixed unitest paths
omerlavanet Jan 17, 2023
201407e
chain tracker tests are async and in a very slow machine can fail, mo…
omerlavanet Jan 17, 2023
9f62476
ARC-1 update naming and todos
ranlavanet Jan 22, 2023
bd05ff8
fixed QoS bug
omerlavanet Jan 17, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,4 @@ jobs:
### Run relayer unitests
######################################################
- name: Run Relayer unit Tests
run: go test ./relayer/lavasession/ ./relayer/chainTracker/ -v
run: go test ./relayer/lavasession/ ./protocol/chaintracker/ -v
53 changes: 53 additions & 0 deletions cmd/lavad/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,25 @@ package main
import (
"context"
"fmt"
"math/rand"
"os"
"strconv"
"strings"
"time"

_ "net/http/pprof"

"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/client/tx"
svrcmd "github.com/cosmos/cosmos-sdk/server/cmd"
"github.com/cosmos/cosmos-sdk/version"
"github.com/ignite-hq/cli/ignite/pkg/cosmoscmd"
"github.com/lavanet/lava/app"
"github.com/lavanet/lava/protocol/rpcconsumer"
"github.com/lavanet/lava/relayer"
"github.com/lavanet/lava/relayer/chainproxy"
"github.com/lavanet/lava/relayer/lavasession"
"github.com/lavanet/lava/relayer/performance"
"github.com/lavanet/lava/relayer/sentry"
"github.com/lavanet/lava/utils"
Expand Down Expand Up @@ -178,6 +183,43 @@ func main() {
},
}

cmdRPCClient := &cobra.Command{
Use: "rpcconsumer [listen-ip]:[listen-port],[spec-chain-id],[api-interface] repeat...",
Short: "rpcconsumer sets up a server to perform api requests and sends them through the lava protocol to data providers",
Long: `rpcconsumer sets up a server to perform api requests and sends them through the lava protocol to data providers`,
Example: `rpcclient 127.0.0.1:3333,COS3,tendermintrpc 127.0.0.1:3334,COS3,rest`,
Args: cobra.ExactArgs(4),
RunE: func(cmd *cobra.Command, args []string) error {
utils.LavaFormatInfo("Gateway Proxy process started", &map[string]string{"args": strings.Join(args, ",")})
clientCtx, err := client.GetClientTxContext(cmd)
if err != nil {
return err
}

// decide on cli input method
// write good documentation for usage
// parse requested inputs into RPCEndpoint list
// handle flags, pass necessary fields
ctx := context.Background()
networkChainId, err := cmd.Flags().GetString(flags.FlagChainID)
if err != nil {
return err
}
txFactory := tx.NewFactoryCLI(clientCtx, cmd.Flags()).WithChainID(networkChainId)
rpcConsumer := rpcconsumer.RPCConsumer{}
rpcEndpoints := []*lavasession.RPCEndpoint{}
requiredResponses := 1 // TODO: handle secure flag, for a majority between providers
utils.LavaFormatInfo("lavad Binary Version: "+version.Version, nil)
rand.Seed(time.Now().UnixNano())
vrf_sk, _, err := utils.GetOrCreateVRFKey(clientCtx)
if err != nil {
utils.LavaFormatFatal("failed getting or creating a VRF key", err, nil)
}
rpcConsumer.Start(ctx, txFactory, clientCtx, rpcEndpoints, requiredResponses, vrf_sk)
return nil
},
}

flags.AddTxFlagsToCmd(cmdServer)
cmdServer.MarkFlagRequired(flags.FlagFrom)
flags.AddTxFlagsToCmd(cmdPortalServer)
Expand Down Expand Up @@ -205,6 +247,17 @@ func main() {
rootCmd.AddCommand(cmdPortalServer)
rootCmd.AddCommand(cmdTestClient)

// RPCConsumer command flags
flags.AddTxFlagsToCmd(cmdRPCClient)
cmdRPCClient.MarkFlagRequired(flags.FlagFrom)
cmdRPCClient.Flags().String(flags.FlagChainID, app.Name, "network chain id")
cmdRPCClient.Flags().Uint64(sentry.GeolocationFlag, 0, "geolocation to run from")
cmdRPCClient.MarkFlagRequired(sentry.GeolocationFlag)
cmdRPCClient.Flags().Bool("secure", false, "secure sends reliability on every message")
cmdRPCClient.Flags().String(performance.PprofAddressFlagName, "", "pprof server address, used for code profiling")
cmdRPCClient.Flags().String(performance.CacheFlagName, "", "address for a cache server to improve performance")
// rootCmd.AddCommand(cmdRPCClient) // Remove this when ready

if err := svrcmd.Execute(rootCmd, app.DefaultNodeHome); err != nil {
os.Exit(1)
}
Expand Down
68 changes: 68 additions & 0 deletions protocol/chainlib/chainlib.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package chainlib

import (
"context"
"fmt"
"time"

"github.com/lavanet/lava/relayer/lavasession"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
spectypes "github.com/lavanet/lava/x/spec/types"
)

func NewChainParser(apiInterface string) (chainParser ChainParser, err error) {
switch apiInterface {
case spectypes.APIInterfaceJsonRPC:
return NewJrpcChainParser()
case spectypes.APIInterfaceTendermintRPC:
return NewTendermintRpcChainParser()
case spectypes.APIInterfaceRest:
return NewRestChainParser()
case spectypes.APIInterfaceGrpc:
return NewGrpcChainParser()
}
return nil, fmt.Errorf("chainParser for apiInterface (%s) not found", apiInterface)
}

func NewChainListener(ctx context.Context, listenEndpoint *lavasession.RPCEndpoint, relaySender RelaySender) (ChainListener, error) {
switch listenEndpoint.ApiInterface {
case spectypes.APIInterfaceJsonRPC:
return NewJrpcChainListener(ctx, listenEndpoint, relaySender), nil
case spectypes.APIInterfaceTendermintRPC:
return NewTendermintRpcChainListener(ctx, listenEndpoint, relaySender), nil
case spectypes.APIInterfaceRest:
return NewRestChainListener(ctx, listenEndpoint, relaySender), nil
case spectypes.APIInterfaceGrpc:
return NewGrpcChainListener(ctx, listenEndpoint, relaySender), nil
}
return nil, fmt.Errorf("chainListener for apiInterface (%s) not found", listenEndpoint.ApiInterface)
}

// this is an interface for parsing and generating messages of the supported APIType
// it checks for the existence of the method in the spec, and formats the message
type ChainParser interface {
ParseMsg(url string, data []byte, connectionType string) (ChainMessage, error) // has to be thread safe
SetSpec(spec spectypes.Spec) // has to be thread safe
DataReliabilityParams() (enabled bool, dataReliabilityThreshold uint32)
ChainBlockStats() (allowedBlockLagForQosSync int64, averageBlockTime time.Duration, blockDistanceForFinalizedData uint32)
}

type ChainMessage interface {
GetServiceApi() *spectypes.ServiceApi
GetInterface() *spectypes.ApiInterface
RequestedBlock() int64
}

type RelaySender interface {
SendRelay(
ctx context.Context,
url string,
req string,
connectionType string,
dappID string,
) (*pairingtypes.RelayReply, *pairingtypes.Relayer_RelaySubscribeClient, error)
}

type ChainListener interface {
Serve()
}
41 changes: 41 additions & 0 deletions protocol/chainlib/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package chainlib

import (
"context"
"fmt"
"time"

"github.com/lavanet/lava/relayer/lavasession"
spectypes "github.com/lavanet/lava/x/spec/types"
)

type GrpcChainParser struct{}

func (apip *GrpcChainParser) ParseMsg(url string, data []byte, connectionType string) (ChainMessage, error) {
return nil, fmt.Errorf("not implemented")
}

func (apip *GrpcChainParser) SetSpec(spec spectypes.Spec) {}

func (apip *GrpcChainParser) DataReliabilityParams() (enabled bool, dataReliabilityThreshold uint32) {
// TODO
return false, 0
}

func (apip *GrpcChainParser) ChainBlockStats() (allowedBlockLagForQosSync int64, averageBlockTime time.Duration, blockDistanceForFinalizedData uint32) {
return 0, 0, 0
}

func NewGrpcChainParser() (chainParser *GrpcChainParser, err error) {
return nil, fmt.Errorf("not implemented")
}

type GrpcChainListener struct{}

func (apil *GrpcChainListener) Serve() {}

func NewGrpcChainListener(ctx context.Context, listenEndpoint *lavasession.RPCEndpoint, relaySender RelaySender) (chainListener *GrpcChainListener) {
// open up server for grpc implementing the api requested (currently implemented in serve_portal in chainproxy, endpoint at listenEndpoint
// when receiving the data such as url, rpc data, headers (connectionType), use relaySender to wrap verify and send that data
return nil
}
42 changes: 42 additions & 0 deletions protocol/chainlib/jsonRPC.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package chainlib

import (
"context"
"fmt"
"time"

"github.com/lavanet/lava/relayer/lavasession"
spectypes "github.com/lavanet/lava/x/spec/types"
)

type JsonRPCChainParser struct{}

func (apip *JsonRPCChainParser) ParseMsg(url string, data []byte, connectionType string) (ChainMessage, error) {
return nil, fmt.Errorf("not implemented")
}

func (apip *JsonRPCChainParser) SetSpec(spec spectypes.Spec) {}

func (apip *JsonRPCChainParser) DataReliabilityParams() (enabled bool, dataReliabilityThreshold uint32) {
// TODO
return false, 0
}

func (apip *JsonRPCChainParser) ChainBlockStats() (allowedBlockLagForQosSync int64, averageBlockTime time.Duration, blockDistanceForFinalizedData uint32) {
// TODO:
return 0, 0, 0
}

func NewJrpcChainParser() (chainParser *JsonRPCChainParser, err error) {
return nil, fmt.Errorf("not implemented")
}

type JsonRPCChainListener struct{}

func (apil *JsonRPCChainListener) Serve() {}

func NewJrpcChainListener(ctx context.Context, listenEndpoint *lavasession.RPCEndpoint, relaySender RelaySender) (chainListener *JsonRPCChainListener) {
// open up server for http implementing the api requested (currently implemented in serve_portal in chainproxy, endpoint at listenEndpoint
// when receiving the data such as url, rpc data, headers (connectionType), use relaySender to wrap verify and send that data
return nil
}
42 changes: 42 additions & 0 deletions protocol/chainlib/rest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package chainlib

import (
"context"
"fmt"
"time"

"github.com/lavanet/lava/relayer/lavasession"
spectypes "github.com/lavanet/lava/x/spec/types"
)

type RestChainParser struct{}

func (apip *RestChainParser) ParseMsg(url string, data []byte, connectionType string) (ChainMessage, error) {
return nil, fmt.Errorf("not implemented")
}

func (apip *RestChainParser) SetSpec(spec spectypes.Spec) {}

func (apip *RestChainParser) DataReliabilityParams() (enabled bool, dataReliabilityThreshold uint32) {
// TODO
return false, 0
}

func (apip *RestChainParser) ChainBlockStats() (allowedBlockLagForQosSync int64, averageBlockTime time.Duration, blockDistanceForFinalizedData uint32) {
// TODO:
return 0, 0, 0
}

func NewRestChainParser() (chainParser *RestChainParser, err error) {
return nil, fmt.Errorf("not implemented")
}

type RestChainListener struct{}

func (apil *RestChainListener) Serve() {}

func NewRestChainListener(ctx context.Context, listenEndpoint *lavasession.RPCEndpoint, relaySender RelaySender) (chainListener *RestChainListener) {
// open up server for http implementing the api requested (currently implemented in serve_portal in chainproxy, endpoint at listenEndpoint
// when receiving the data such as url, rpc data, headers (connectionType), use relaySender to wrap verify and send that data
return nil
}
42 changes: 42 additions & 0 deletions protocol/chainlib/tendermintRPC.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package chainlib

import (
"context"
"fmt"
"time"

"github.com/lavanet/lava/relayer/lavasession"
spectypes "github.com/lavanet/lava/x/spec/types"
)

type TendermintChainParser struct{}

func (apip *TendermintChainParser) ParseMsg(url string, data []byte, connectionType string) (ChainMessage, error) {
return nil, fmt.Errorf("not implemented")
}

func (apip *TendermintChainParser) SetSpec(spec spectypes.Spec) {}

func (apip *TendermintChainParser) DataReliabilityParams() (enabled bool, dataReliabilityThreshold uint32) {
// TODO
return false, 0
}

func (apip *TendermintChainParser) ChainBlockStats() (allowedBlockLagForQosSync int64, averageBlockTime time.Duration, blockDistanceForFinalizedData uint32) {
// TODO:
return 0, 0, 0
}

func NewTendermintRpcChainParser() (chainParser *TendermintChainParser, err error) {
return nil, fmt.Errorf("not implemented")
}

type TendermintRpcChainListener struct{}

func (apil *TendermintRpcChainListener) Serve() {}

func NewTendermintRpcChainListener(ctx context.Context, listenEndpoint *lavasession.RPCEndpoint, relaySender RelaySender) (chainListener *TendermintRpcChainListener) {
// open up server for http implementing the api requested (currently implemented in serve_portal in chainproxy, endpoint at listenEndpoint
// when receiving the data such as url, rpc data, headers (connectionType), use relaySender to wrap verify and send that data
return nil
}
Loading