Skip to content

Commit

Permalink
feat: PRT - offline spec support provider side with inheritance fix. (#…
Browse files Browse the repository at this point in the history
…1640)

* offline spec support provider side with inheritance fix.

* comment fixed.

---------

Co-authored-by: Omer <100387053+omerlavanet@users.noreply.github.com>
Co-authored-by: Nim Rod <nimrod.teich@gmail.com>
  • Loading branch information
3 people committed Aug 28, 2024
1 parent 437eccd commit 6ae8fce
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 20 deletions.
4 changes: 2 additions & 2 deletions protocol/common/cobra_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
// Disable relay retries when we get node errors.
// This feature is suppose to help with successful relays in some chains that return node errors on rare race conditions on the serviced chains.
DisableRetryOnNodeErrorsFlag = "disable-retry-on-node-error"
UseOfflineSpecFlag = "use-offline-spec" // allows the user to manually load a spec providing a path, this is useful to test spec changes before they hit the blockchain
UseStaticSpecFlag = "use-static-spec" // allows the user to manually load a spec providing a path, this is useful to test spec changes before they hit the blockchain
)

const (
Expand All @@ -56,7 +56,7 @@ type ConsumerCmdFlags struct {
DebugRelays bool // enables debug mode for relays
DisableConflictTransactions bool // disable conflict transactions
DisableRetryOnNodeErrors bool // disable retries on node errors
OfflineSpecPath string // path to the spec file, works only when bootstrapping a single chain.
StaticSpecPath string // path to the spec file, works only when bootstrapping a single chain.
}

// default rolling logs behavior (if enabled) will store 3 files each 100MB for up to 1 day every time.
Expand Down
20 changes: 4 additions & 16 deletions protocol/rpcconsumer/rpcconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/lavanet/lava/v2/protocol/statetracker/updaters"
"github.com/lavanet/lava/v2/protocol/upgrade"
"github.com/lavanet/lava/v2/utils"
specutils "github.com/lavanet/lava/v2/utils/keeper"
"github.com/lavanet/lava/v2/utils/rand"
"github.com/lavanet/lava/v2/utils/sigs"
conflicttypes "github.com/lavanet/lava/v2/x/conflict/types"
Expand Down Expand Up @@ -216,18 +215,7 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt
policyUpdaters.Store(rpcEndpoint.ChainID, updaters.NewPolicyUpdater(chainID, consumerStateTracker, consumerAddr.String(), chainParser, *rpcEndpoint))
}

if options.cmdFlags.OfflineSpecPath != "" {
// offline spec mode.
parsedOfflineSpec, loadError := specutils.GetSpecFromPath(options.cmdFlags.OfflineSpecPath, rpcEndpoint.ChainID, nil, nil)
if loadError != nil {
err = utils.LavaFormatError("failed loading offline spec", err, utils.LogAttr("spec_path", options.cmdFlags.OfflineSpecPath), utils.LogAttr("spec_id", rpcEndpoint.ChainID))
}
utils.LavaFormatInfo("Loaded offline spec successfully", utils.LogAttr("spec_path", options.cmdFlags.OfflineSpecPath), utils.LogAttr("chain_id", parsedOfflineSpec.Index))
chainParser.SetSpec(parsedOfflineSpec)
} else {
// register for spec updates
err = rpcc.consumerStateTracker.RegisterForSpecUpdates(ctx, chainParser, *rpcEndpoint)
}
err = statetracker.RegisterForSpecUpdatesOrSetStaticSpec(ctx, chainParser, options.cmdFlags.StaticSpecPath, *rpcEndpoint, rpcc.consumerStateTracker)
if err != nil {
err = utils.LavaFormatError("failed registering for spec updates", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint})
errCh <- err
Expand Down Expand Up @@ -584,11 +572,11 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77
DebugRelays: viper.GetBool(DebugRelaysFlagName),
DisableConflictTransactions: viper.GetBool(common.DisableConflictTransactionsFlag),
DisableRetryOnNodeErrors: viper.GetBool(common.DisableRetryOnNodeErrorsFlag),
OfflineSpecPath: viper.GetString(common.UseOfflineSpecFlag),
StaticSpecPath: viper.GetString(common.UseStaticSpecFlag),
}

// validate user is does not provide multi chain setup when using the offline spec feature.
if consumerPropagatedFlags.OfflineSpecPath != "" && len(rpcEndpoints) > 1 {
if consumerPropagatedFlags.StaticSpecPath != "" && len(rpcEndpoints) > 1 {
utils.LavaFormatFatal("offline spec modifications are supported only in single chain bootstrapping", nil, utils.LogAttr("len(rpcEndpoints)", len(rpcEndpoints)), utils.LogAttr("rpcEndpoints", rpcEndpoints))
}

Expand Down Expand Up @@ -632,7 +620,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77
cmdRPCConsumer.Flags().Bool(common.DisableConflictTransactionsFlag, false, "disabling conflict transactions, this flag should not be used as it harms the network's data reliability and therefore the service.")
cmdRPCConsumer.Flags().DurationVar(&updaters.TimeOutForFetchingLavaBlocks, common.TimeOutForFetchingLavaBlocksFlag, time.Second*5, "setting the timeout for fetching lava blocks")
cmdRPCConsumer.Flags().Bool(common.DisableRetryOnNodeErrorsFlag, false, "Disable relay retries on node errors, prevent the rpcconsumer trying a different provider")
cmdRPCConsumer.Flags().String(common.UseOfflineSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain")
cmdRPCConsumer.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain. example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json")

common.AddRollingLogConfig(cmdRPCConsumer)
return cmdRPCConsumer
Expand Down
17 changes: 15 additions & 2 deletions protocol/rpcprovider/rpcprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type rpcProviderStartOptions struct {
rewardsSnapshotTimeoutSec uint
healthCheckMetricsOptions *rpcProviderHealthCheckMetricsOptions
staticProvider bool
staticSpecPath string
}

type rpcProviderHealthCheckMetricsOptions struct {
Expand Down Expand Up @@ -139,6 +140,7 @@ type RPCProvider struct {
grpcHealthCheckEndpoint string
providerUniqueId string
staticProvider bool
staticSpecPath string
}

func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) {
Expand All @@ -162,6 +164,8 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) {
rpcp.relaysMonitorAggregator = metrics.NewRelaysMonitorAggregator(rpcp.relaysHealthCheckInterval, rpcp.providerMetricsManager)
rpcp.grpcHealthCheckEndpoint = options.healthCheckMetricsOptions.grpcHealthCheckEndpoint
rpcp.staticProvider = options.staticProvider
rpcp.staticSpecPath = options.staticSpecPath

// single state tracker
lavaChainFetcher := chainlib.NewLavaChainFetcher(ctx, options.clientCtx)
providerStateTracker, err := statetracker.NewProviderStateTracker(ctx, options.txFactory, options.clientCtx, lavaChainFetcher, rpcp.providerMetricsManager)
Expand Down Expand Up @@ -222,6 +226,10 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) {
}
}

if rpcp.staticSpecPath != "" && len(rpcp.chainMutexes) > 1 {
utils.LavaFormatFatal("Provider set static spec with more than one chain. static spec configuration supports only a single chain id", nil, utils.LogAttr("Chains", rpcp.chainMutexes), utils.LogAttr("static_spec", rpcp.staticSpecPath))
}

specValidator := NewSpecValidator()
utils.LavaFormatTrace("Running setup for RPCProvider endpoints", utils.LogAttr("endpoints", options.rpcProviderEndpoints))
disabledEndpointsList := rpcp.SetupProviderEndpoints(options.rpcProviderEndpoints, specValidator, true)
Expand Down Expand Up @@ -298,7 +306,10 @@ func (rpcp *RPCProvider) SetupProviderEndpoints(rpcProviderEndpoints []*lavasess
parallelJobs := len(rpcProviderEndpoints)
wg.Add(parallelJobs)
disabledEndpoints := make(chan *lavasession.RPCProviderEndpoint, parallelJobs)
// validate static spec configuration is used only on a single chain setup.
chainIds := make(map[string]struct{})
for _, rpcProviderEndpoint := range rpcProviderEndpoints {
chainIds[rpcProviderEndpoint.ChainID] = struct{}{}
setupEndpoint := func(rpcProviderEndpoint *lavasession.RPCProviderEndpoint, specValidator *SpecValidator) {
defer wg.Done()
err := rpcp.SetupEndpoint(context.Background(), rpcProviderEndpoint, specValidator)
Expand Down Expand Up @@ -345,7 +356,7 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint
}

rpcEndpoint := lavasession.RPCEndpoint{ChainID: chainID, ApiInterface: apiInterface}
err = rpcp.providerStateTracker.RegisterForSpecUpdates(ctx, chainParser, rpcEndpoint)
err = statetracker.RegisterForSpecUpdatesOrSetStaticSpec(ctx, chainParser, rpcp.staticSpecPath, rpcEndpoint, rpcp.providerStateTracker)
if err != nil {
return utils.LavaFormatError("[PANIC] failed to RegisterForSpecUpdates, panic severity critical error, aborting support for chain api due to invalid chain parser, continuing with others", err, utils.Attribute{Key: "endpoint", Value: rpcProviderEndpoint.String()})
}
Expand Down Expand Up @@ -700,7 +711,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt
relaysHealthInterval := viper.GetDuration(common.RelayHealthIntervalFlag)
healthCheckURLPath := viper.GetString(HealthCheckURLPathFlagName)
staticProvider := viper.GetBool(common.StaticProvidersConfigName)

offlineSpecPath := viper.GetString(common.UseStaticSpecFlag)
if staticProvider {
utils.LavaFormatWarning("Running in static provider mode, skipping rewards and allowing requests from anyone", nil)
}
Expand All @@ -726,6 +737,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt
rewardsSnapshotTimeoutSec,
&rpcProviderHealthCheckMetricsOptions,
staticProvider,
offlineSpecPath,
}

rpcProvider := RPCProvider{}
Expand Down Expand Up @@ -760,6 +772,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt
cmdRPCProvider.Flags().String(HealthCheckURLPathFlagName, HealthCheckURLPathFlagDefault, "the url path for the provider's grpc health check")
cmdRPCProvider.Flags().DurationVar(&updaters.TimeOutForFetchingLavaBlocks, common.TimeOutForFetchingLavaBlocksFlag, time.Second*5, "setting the timeout for fetching lava blocks")
cmdRPCProvider.Flags().BoolVar(&chainlib.IgnoreSubscriptionNotConfiguredError, chainlib.IgnoreSubscriptionNotConfiguredErrorFlag, chainlib.IgnoreSubscriptionNotConfiguredError, "ignore webSocket node url not configured error, when subscription is enabled in spec")
cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json")

common.AddRollingLogConfig(cmdRPCProvider)
return cmdRPCProvider
Expand Down
24 changes: 24 additions & 0 deletions protocol/statetracker/state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (

"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/tx"
"github.com/lavanet/lava/v2/protocol/chainlib"
"github.com/lavanet/lava/v2/protocol/chaintracker"
"github.com/lavanet/lava/v2/protocol/lavasession"
updaters "github.com/lavanet/lava/v2/protocol/statetracker/updaters"
"github.com/lavanet/lava/v2/utils"
specutils "github.com/lavanet/lava/v2/utils/keeper"
spectypes "github.com/lavanet/lava/v2/x/spec/types"
)

Expand Down Expand Up @@ -40,6 +43,27 @@ type Updater interface {
UpdaterKey() string
}

type SpecUpdaterInf interface {
RegisterForSpecUpdates(ctx context.Context, specUpdatable updaters.SpecUpdatable, endpoint lavasession.RPCEndpoint) error
}

// Either register for spec updates or set spec for offline spec, used in both consumer and provider process
func RegisterForSpecUpdatesOrSetStaticSpec(ctx context.Context, chainParser chainlib.ChainParser, specPath string, rpcEndpoint lavasession.RPCEndpoint, specUpdaterInf SpecUpdaterInf) (err error) {
if specPath != "" {
// offline spec mode.
parsedOfflineSpec, loadError := specutils.GetSpecsFromPath(specPath, rpcEndpoint.ChainID, nil, nil)
if loadError != nil {
err = utils.LavaFormatError("failed loading offline spec", err, utils.LogAttr("spec_path", specPath), utils.LogAttr("spec_id", rpcEndpoint.ChainID))
}
utils.LavaFormatInfo("Loaded offline spec successfully", utils.LogAttr("spec_path", specPath), utils.LogAttr("chain_id", parsedOfflineSpec.Index))
chainParser.SetSpec(parsedOfflineSpec)
} else {
// register for spec updates
err = specUpdaterInf.RegisterForSpecUpdates(ctx, chainParser, rpcEndpoint)
}
return
}

func GetLavaSpecWithRetry(ctx context.Context, specQueryClient spectypes.QueryClient) (*spectypes.QueryGetSpecResponse, error) {
var specResponse *spectypes.QueryGetSpecResponse
var err error
Expand Down
25 changes: 25 additions & 0 deletions utils/keeper/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"os"
"strings"
"testing"

tmdb "github.com/cometbft/cometbft-db"
Expand Down Expand Up @@ -79,6 +80,30 @@ func decodeProposal(path string) (utils.SpecAddProposalJSON, error) {
return proposal, err
}

func GetSpecsFromPath(path string, specIndex string, ctxArg *sdk.Context, keeper *keeper.Keeper) (specRet spectypes.Spec, err error) {
var ctx sdk.Context
if keeper == nil || ctxArg == nil {
keeper, ctx, err = specKeeper()
if err != nil {
return spectypes.Spec{}, err
}
} else {
ctx = *ctxArg
}

// Split the string by "," if we have a spec with dependencies we need to first load the dependencies.. for example:
// ibc.json, cosmossdk.json, lava.json.
files := strings.Split(path, ",")
for _, fileName := range files {
trimmedFileName := strings.TrimSpace(fileName)
spec, err := GetSpecFromPath(trimmedFileName, specIndex, &ctx, keeper)
if err == nil {
return spec, nil
}
}
return spectypes.Spec{}, fmt.Errorf("spec not found %s", specIndex)
}

func GetSpecFromPath(path string, specIndex string, ctxArg *sdk.Context, keeper *keeper.Keeper) (specRet spectypes.Spec, err error) {
var ctx sdk.Context
if keeper == nil || ctxArg == nil {
Expand Down

0 comments on commit 6ae8fce

Please sign in to comment.