Skip to content

Commit

Permalink
PRT - DR flag now takes effecwt in sendDataReliabilityRelayIfApplicable
Browse files Browse the repository at this point in the history
  • Loading branch information
ranlavanet committed Dec 15, 2024
1 parent c07dbc6 commit eba1c43
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 26 deletions.
19 changes: 9 additions & 10 deletions protocol/common/cobra_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,15 @@ const (

// helper struct to propagate flags deeper into the code in an organized manner
type ConsumerCmdFlags struct {
HeadersFlag string // comma separated list of headers, or * for all, default simple cors specification headers
CredentialsFlag string // access-control-allow-credentials, defaults to "true"
OriginFlag string // comma separated list of origins, or * for all, default enabled completely
MethodsFlag string // whether to allow access control headers *, most proxies have their own access control so its not required
CDNCacheDuration string // how long to cache the preflight response defaults 24 hours (in seconds) "86400"
RelaysHealthEnableFlag bool // enables relay health check
RelaysHealthIntervalFlag time.Duration // interval for relay health check
DebugRelays bool // enables debug mode for relays
DisableConflictTransactions bool // disable conflict transactions
StaticSpecPath string // path to the spec file, works only when bootstrapping a single chain.
HeadersFlag string // comma separated list of headers, or * for all, default simple cors specification headers
CredentialsFlag string // access-control-allow-credentials, defaults to "true"
OriginFlag string // comma separated list of origins, or * for all, default enabled completely
MethodsFlag string // whether to allow access control headers *, most proxies have their own access control so its not required
CDNCacheDuration string // how long to cache the preflight response defaults 24 hours (in seconds) "86400"
RelaysHealthEnableFlag bool // enables relay health check
RelaysHealthIntervalFlag time.Duration // interval for relay health check
DebugRelays bool // enables debug mode for relays
StaticSpecPath string // path to the spec file, works only when bootstrapping a single chain.
}

// default rolling logs behavior (if enabled) will store 3 files each 100MB for up to 1 day every time.
Expand Down
23 changes: 11 additions & 12 deletions protocol/rpcconsumer/rpcconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt

// spawn up ConsumerStateTracker
lavaChainFetcher := chainlib.NewLavaChainFetcher(ctx, options.clientCtx)
consumerStateTracker, err := statetracker.NewConsumerStateTracker(ctx, options.txFactory, options.clientCtx, lavaChainFetcher, consumerMetricsManager, options.cmdFlags.DisableConflictTransactions)
consumerStateTracker, err := statetracker.NewConsumerStateTracker(ctx, options.txFactory, options.clientCtx, lavaChainFetcher, consumerMetricsManager)
if err != nil {
utils.LavaFormatFatal("failed to create a NewConsumerStateTracker", err)
}
Expand Down Expand Up @@ -649,16 +649,15 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77

maxConcurrentProviders := viper.GetUint(common.MaximumConcurrentProvidersFlagName)
consumerPropagatedFlags := common.ConsumerCmdFlags{
HeadersFlag: viper.GetString(common.CorsHeadersFlag),
CredentialsFlag: viper.GetString(common.CorsCredentialsFlag),
OriginFlag: viper.GetString(common.CorsOriginFlag),
MethodsFlag: viper.GetString(common.CorsMethodsFlag),
CDNCacheDuration: viper.GetString(common.CDNCacheDurationFlag),
RelaysHealthEnableFlag: viper.GetBool(common.RelaysHealthEnableFlag),
RelaysHealthIntervalFlag: viper.GetDuration(common.RelayHealthIntervalFlag),
DebugRelays: viper.GetBool(DebugRelaysFlagName),
DisableConflictTransactions: viper.GetBool(common.DisableConflictTransactionsFlag),
StaticSpecPath: viper.GetString(common.UseStaticSpecFlag),
HeadersFlag: viper.GetString(common.CorsHeadersFlag),
CredentialsFlag: viper.GetString(common.CorsCredentialsFlag),
OriginFlag: viper.GetString(common.CorsOriginFlag),
MethodsFlag: viper.GetString(common.CorsMethodsFlag),
CDNCacheDuration: viper.GetString(common.CDNCacheDurationFlag),
RelaysHealthEnableFlag: viper.GetBool(common.RelaysHealthEnableFlag),
RelaysHealthIntervalFlag: viper.GetDuration(common.RelayHealthIntervalFlag),
DebugRelays: viper.GetBool(DebugRelaysFlagName),
StaticSpecPath: viper.GetString(common.UseStaticSpecFlag),
}

// validate user is does not provide multi chain setup when using the offline spec feature.
Expand Down Expand Up @@ -754,7 +753,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77
cmdRPCConsumer.Flags().String(refererMarkerFlagName, "lava-referer-", "the string marker to identify referer")
cmdRPCConsumer.Flags().String(reportsSendBEAddress, "", "address to send reports to")
cmdRPCConsumer.Flags().BoolVar(&lavasession.DebugProbes, DebugProbesFlagName, false, "adding information to probes")
cmdRPCConsumer.Flags().Bool(common.DisableConflictTransactionsFlag, false, "disabling conflict transactions, this flag should not be used as it harms the network's data reliability and therefore the service.")
cmdRPCConsumer.Flags().BoolVar(&statetracker.DisableDR, common.DisableConflictTransactionsFlag, statetracker.DisableDR, "disabling conflict transactions, this flag should not be used as it harms the network's data reliability and therefore the service.")
cmdRPCConsumer.Flags().DurationVar(&updaters.TimeOutForFetchingLavaBlocks, common.TimeOutForFetchingLavaBlocksFlag, time.Second*5, "setting the timeout for fetching lava blocks")
cmdRPCConsumer.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain")
cmdRPCConsumer.Flags().IntVar(&relayCountOnNodeError, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors")
Expand Down
4 changes: 4 additions & 0 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/lavanet/lava/v4/protocol/lavasession"
"github.com/lavanet/lava/v4/protocol/metrics"
"github.com/lavanet/lava/v4/protocol/performance"
"github.com/lavanet/lava/v4/protocol/statetracker"
"github.com/lavanet/lava/v4/protocol/upgrade"
"github.com/lavanet/lava/v4/utils"
"github.com/lavanet/lava/v4/utils/protocopy"
Expand Down Expand Up @@ -1254,6 +1255,9 @@ func (rpccs *RPCConsumerServer) getFirstSubscriptionReply(ctx context.Context, h
}

func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context.Context, protocolMessage chainlib.ProtocolMessage, dataReliabilityThreshold uint32, relayProcessor *RelayProcessor) error {
if statetracker.DisableDR {
return nil
}
processingTimeout, expectedRelayTimeout := rpccs.getProcessingTimeout(protocolMessage)
// Wait another relayTimeout duration to maybe get additional relay results
if relayProcessor.usedProviders.CurrentlyUsed() > 0 {
Expand Down
8 changes: 4 additions & 4 deletions protocol/statetracker/consumer_state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
protocoltypes "github.com/lavanet/lava/v4/x/protocol/types"
)

var DisableDR = false

type ConsumerTxSenderInf interface {
TxSenderConflictDetection(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict) error
}
Expand All @@ -29,10 +31,9 @@ type ConsumerStateTracker struct {
ConsumerTxSenderInf
*StateTracker
ConsumerEmergencyTrackerInf
disableConflictTransactions bool
}

func NewConsumerStateTracker(ctx context.Context, txFactory tx.Factory, clientCtx client.Context, chainFetcher chaintracker.ChainFetcher, metrics *metrics.ConsumerMetricsManager, disableConflictTransactions bool) (ret *ConsumerStateTracker, err error) {
func NewConsumerStateTracker(ctx context.Context, txFactory tx.Factory, clientCtx client.Context, chainFetcher chaintracker.ChainFetcher, metrics *metrics.ConsumerMetricsManager) (ret *ConsumerStateTracker, err error) {
emergencyTracker, blockNotFoundCallback := NewEmergencyTracker(metrics)
stateQuery := updaters.NewConsumerStateQuery(ctx, clientCtx)
stateTrackerBase, err := NewStateTracker(ctx, txFactory, stateQuery.StateQuery, chainFetcher, blockNotFoundCallback)
Expand All @@ -48,7 +49,6 @@ func NewConsumerStateTracker(ctx context.Context, txFactory tx.Factory, clientCt
StateQuery: stateQuery,
ConsumerTxSenderInf: txSender,
ConsumerEmergencyTrackerInf: emergencyTracker,
disableConflictTransactions: disableConflictTransactions,
}

err = cst.RegisterForDowntimeParamsUpdates(ctx, emergencyTracker)
Expand Down Expand Up @@ -105,7 +105,7 @@ func (cst *ConsumerStateTracker) RegisterFinalizationConsensusForUpdates(ctx con
}

func (cst *ConsumerStateTracker) TxConflictDetection(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict, conflictHandler common.ConflictHandlerInterface) error {
if cst.disableConflictTransactions {
if DisableDR {
utils.LavaFormatInfo("found Conflict, but transactions are disabled, returning")
return nil
}
Expand Down

0 comments on commit eba1c43

Please sign in to comment.