Skip to content

Commit

Permalink
feat: add lava over lava secondary transport (#1769)
Browse files Browse the repository at this point in the history
* refactor state query access

* remove direct usage of client.Context to allow the rewiring of lava over lava

* refactor rpcconsumer, allow creating a server with a function

* lint

* added custom lava transport

* add lava over lava secondary transport

* lint

* added initialization condition

* relaysMonitor is dependent on metrics, so put the functionality in rpcconsumer server

* added metrics

* add vote test script

* fix lint

* added support for e2e

* oops brackets

* added secondary transport startup

---------

Co-authored-by: Ran Mishael <ran@lavanet.xyz>
  • Loading branch information
omerlavanet and ranlavanet authored Dec 2, 2024
1 parent 8b16323 commit b4bd381
Show file tree
Hide file tree
Showing 11 changed files with 295 additions and 13 deletions.
51 changes: 51 additions & 0 deletions protocol/chainlib/base_chain_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package chainlib
import (
"errors"
"fmt"
"io"
"net/http"
"regexp"
"strings"
"sync"
Expand Down Expand Up @@ -356,6 +358,55 @@ func (apip *BaseChainParser) isValidInternalPath(path string) bool {
return ok
}

// take an http request and direct it through the consumer
func (apip *BaseChainParser) ExtractDataFromRequest(request *http.Request) (url string, data string, connectionType string, metadata []pairingtypes.Metadata, err error) {
// Extract relative URL path
url = request.URL.Path
// Extract connection type
connectionType = request.Method

// Extract metadata
for key, values := range request.Header {
for _, value := range values {
metadata = append(metadata, pairingtypes.Metadata{
Name: key,
Value: value,
})
}
}

// Extract data
if request.Body != nil {
bodyBytes, err := io.ReadAll(request.Body)
if err != nil {
return "", "", "", nil, err
}
data = string(bodyBytes)
}

return url, data, connectionType, metadata, nil
}

func (apip *BaseChainParser) SetResponseFromRelayResult(relayResult *common.RelayResult) (*http.Response, error) {
if relayResult == nil {
return nil, errors.New("relayResult is nil")
}
response := &http.Response{
StatusCode: relayResult.StatusCode,
Header: make(http.Header),
}

for _, values := range relayResult.Reply.Metadata {
response.Header.Add(values.Name, values.Value)
}

if relayResult.Reply != nil && relayResult.Reply.Data != nil {
response.Body = io.NopCloser(strings.NewReader(string(relayResult.Reply.Data)))
}

return response, nil
}

// getSupportedApi fetches service api from spec by name
func (apip *BaseChainParser) getApiCollection(connectionType, internalPath, addon string) (*spectypes.ApiCollection, error) {
// Guard that the GrpcChainParser instance exists
Expand Down
26 changes: 26 additions & 0 deletions protocol/chainlib/chainlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package chainlib
import (
"context"
"fmt"
"net/http"
"time"

"github.com/lavanet/lava/v4/protocol/chainlib/chainproxy/rpcInterfaceMessages"
Expand All @@ -11,10 +12,15 @@ import (
"github.com/lavanet/lava/v4/protocol/common"
"github.com/lavanet/lava/v4/protocol/lavasession"
"github.com/lavanet/lava/v4/protocol/metrics"
"github.com/lavanet/lava/v4/utils"
pairingtypes "github.com/lavanet/lava/v4/x/pairing/types"
spectypes "github.com/lavanet/lava/v4/x/spec/types"
)

const (
INTERNAL_ADDRESS = "internal-addr"
)

var (
IgnoreSubscriptionNotConfiguredError = true
IgnoreSubscriptionNotConfiguredErrorFlag = "ignore-subscription-not-configured-error"
Expand Down Expand Up @@ -44,6 +50,10 @@ func NewChainListener(
refererData *RefererData,
consumerWsSubscriptionManager *ConsumerWSSubscriptionManager,
) (ChainListener, error) {
if listenEndpoint.NetworkAddress == INTERNAL_ADDRESS {
utils.LavaFormatDebug("skipping chain listener for internal address")
return NewEmptyChainListener(), nil
}
switch listenEndpoint.ApiInterface {
case spectypes.APIInterfaceJsonRPC:
return NewJrpcChainListener(ctx, listenEndpoint, relaySender, healthReporter, rpcConsumerLogs, refererData, consumerWsSubscriptionManager), nil
Expand Down Expand Up @@ -76,6 +86,8 @@ type ChainParser interface {
UpdateBlockTime(newBlockTime time.Duration)
GetUniqueName() string
ExtensionsParser() *extensionslib.ExtensionParser
ExtractDataFromRequest(*http.Request) (url string, data string, connectionType string, metadata []pairingtypes.Metadata, err error)
SetResponseFromRelayResult(*common.RelayResult) (*http.Response, error)
}

type ChainMessage interface {
Expand Down Expand Up @@ -173,3 +185,17 @@ func GetChainRouter(ctx context.Context, nConns uint, rpcProviderEndpoint *lavas
}
return newChainRouter(ctx, nConns, *rpcProviderEndpoint, chainParser, proxyConstructor)
}

type EmptyChainListener struct{}

func NewEmptyChainListener() ChainListener {
return &EmptyChainListener{}
}

func (*EmptyChainListener) Serve(ctx context.Context, cmdFlags common.ConsumerCmdFlags) {
// do nothing
}

func (*EmptyChainListener) GetListeningAddress() string {
return ""
}
6 changes: 6 additions & 0 deletions protocol/chainlib/tendermintRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,12 @@ func (*TendermintChainParser) newBatchChainMessage(serviceApi *spectypes.Api, re
return nodeMsg, err
}

// overwritten because tendermintrpc doesnt use POST but an empty connecionType
func (apip *TendermintChainParser) ExtractDataFromRequest(request *http.Request) (url string, data string, connectionType string, metadata []pairingtypes.Metadata, err error) {
url, data, _, metadata, err = apip.BaseChainParser.ExtractDataFromRequest(request)
return url, data, "", metadata, err
}

func (*TendermintChainParser) newChainMessage(serviceApi *spectypes.Api, requestedBlock int64, requestedHashes []string, msg *rpcInterfaceMessages.TendermintrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool) *baseChainMessageContainer {
nodeMsg := &baseChainMessageContainer{
api: serviceApi,
Expand Down
2 changes: 1 addition & 1 deletion protocol/lavasession/provider_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (endpoint *RPCProviderEndpoint) AddonsString() string {
}

func (endpoint *RPCProviderEndpoint) String() string {
return endpoint.ChainID + ":" + endpoint.ApiInterface + " Network Address:" + endpoint.NetworkAddress.Address + " Node: " + endpoint.UrlsString() + " Geolocation:" + strconv.FormatUint(endpoint.Geolocation, 10) + " Addons:" + endpoint.AddonsString()
return endpoint.ChainID + ":" + endpoint.ApiInterface + " Network Address:" + endpoint.NetworkAddress.Address + " Node:" + endpoint.UrlsString() + " Geolocation:" + strconv.FormatUint(endpoint.Geolocation, 10) + " Addons:" + endpoint.AddonsString()
}

func (endpoint *RPCProviderEndpoint) Validate() error {
Expand Down
27 changes: 27 additions & 0 deletions protocol/metrics/consumer_metrics_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type ConsumerMetricsManager struct {
totalFailedWsSubscriptionRequestsMetric *prometheus.CounterVec
totalWsSubscriptionDissconnectMetric *prometheus.CounterVec
totalDuplicatedWsSubscriptionRequestsMetric *prometheus.CounterVec
totalLoLSuccessMetric prometheus.Counter
totalLoLErrorsMetric prometheus.Counter
totalWebSocketConnectionsActive *prometheus.GaugeVec
blockMetric *prometheus.GaugeVec
latencyMetric *prometheus.GaugeVec
Expand Down Expand Up @@ -118,6 +120,16 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
Help: "The total number of duplicated webscket subscription requests over time per chain id per api interface.",
}, []string{"spec", "apiInterface"})

totalLoLSuccessMetric := prometheus.NewCounter(prometheus.CounterOpts{
Name: "lava_consumer_total_lol_successes",
Help: "The total number of requests sent to lava over lava successfully",
})

totalLoLErrorsMetric := prometheus.NewCounter(prometheus.CounterOpts{
Name: "lava_consumer_total_lol_errors",
Help: "The total number of requests sent to lava over lava and failed",
})

totalWebSocketConnectionsActive := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_total_websocket_connections_active",
Help: "The total number of currently active websocket connections with users",
Expand Down Expand Up @@ -241,6 +253,8 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
prometheus.MustRegister(totalFailedWsSubscriptionRequestsMetric)
prometheus.MustRegister(totalDuplicatedWsSubscriptionRequestsMetric)
prometheus.MustRegister(totalWsSubscriptionDissconnectMetric)
prometheus.MustRegister(totalLoLSuccessMetric)
prometheus.MustRegister(totalLoLErrorsMetric)

consumerMetricsManager := &ConsumerMetricsManager{
totalCURequestedMetric: totalCURequestedMetric,
Expand Down Expand Up @@ -274,6 +288,8 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
relayProcessingLatencyBeforeProvider: relayProcessingLatencyBeforeProvider,
relayProcessingLatencyAfterProvider: relayProcessingLatencyAfterProvider,
averageProcessingLatency: map[string]*LatencyTracker{},
totalLoLSuccessMetric: totalLoLSuccessMetric,
totalLoLErrorsMetric: totalLoLErrorsMetric,
consumerOptimizerQoSClient: options.ConsumerOptimizerQoSClient,
}

Expand Down Expand Up @@ -565,6 +581,17 @@ func (pme *ConsumerMetricsManager) SetWsSubscriptioDisconnectRequestMetric(chain
pme.totalWsSubscriptionDissconnectMetric.WithLabelValues(chainId, apiInterface, disconnectReason).Inc()
}

func (pme *ConsumerMetricsManager) SetLoLResponse(success bool) {
if pme == nil {
return
}
if success {
pme.totalLoLSuccessMetric.Inc()
} else {
pme.totalLoLErrorsMetric.Inc()
}
}

func (pme *ConsumerMetricsManager) handleOptimizerQoS(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
Expand Down
7 changes: 7 additions & 0 deletions protocol/metrics/rpcconsumer_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ func NewRPCConsumerLogs(consumerMetricsManager *ConsumerMetricsManager, consumer
return rpcConsumerLogs, err
}

func (rpccl *RPCConsumerLogs) SetLoLResponse(success bool) {
if rpccl == nil {
return
}
rpccl.consumerMetricsManager.SetLoLResponse(success)
}

func (rpccl *RPCConsumerLogs) SetWebSocketConnectionActive(chainId string, apiInterface string, add bool) {
rpccl.consumerMetricsManager.SetWebSocketConnectionActive(chainId, apiInterface, add)
}
Expand Down
47 changes: 41 additions & 6 deletions protocol/rpcconsumer/custom_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,57 @@ package rpcconsumer

import (
"net/http"
"sync"
"sync/atomic"

"github.com/lavanet/lava/v4/utils"
)

type CustomLavaTransport struct {
transport http.RoundTripper
transport http.RoundTripper
lock sync.RWMutex
secondaryTransport http.RoundTripper
consecutiveFails atomic.Uint64 // TODO: export to metrics
}

func NewCustomLavaTransport(httpTransport http.RoundTripper, secondaryTransport http.RoundTripper) *CustomLavaTransport {
return &CustomLavaTransport{transport: httpTransport, secondaryTransport: secondaryTransport}
}

func NewCustomLavaTransport(httpTransport http.RoundTripper) *CustomLavaTransport {
return &CustomLavaTransport{transport: httpTransport}
func (c *CustomLavaTransport) SetSecondaryTransport(secondaryTransport http.RoundTripper) {
c.lock.Lock()
defer c.lock.Unlock()
utils.LavaFormatDebug("Setting secondary transport for CustomLavaTransport")
c.secondaryTransport = secondaryTransport
}

// used to switch the primary and secondary transports, in case the primary one fails too much
func (c *CustomLavaTransport) TogglePrimarySecondaryTransport() {
c.lock.Lock()
defer c.lock.Unlock()
primaryTransport := c.transport
secondaryTransport := c.secondaryTransport
c.secondaryTransport = primaryTransport
c.transport = secondaryTransport
}

func (c *CustomLavaTransport) RoundTrip(req *http.Request) (*http.Response, error) {
// Custom logic before the request

c.lock.RLock()
primaryTransport := c.transport
secondaryTransport := c.secondaryTransport
c.lock.RUnlock()
// Delegate to the underlying RoundTripper (usually http.Transport)
resp, err := c.transport.RoundTrip(req)

resp, err := primaryTransport.RoundTrip(req)
// Custom logic after the request
if err != nil {
c.consecutiveFails.Add(1)
// If the primary transport fails, use the secondary transport
if secondaryTransport != nil {
resp, err = secondaryTransport.RoundTrip(req)
}
} else {
c.consecutiveFails.Store(0)
}
return resp, err
}
60 changes: 58 additions & 2 deletions protocol/rpcconsumer/rpcconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
refererBackendAddressFlagName = "referer-be-address"
refererMarkerFlagName = "referer-marker"
reportsSendBEAddress = "reports-be-address"
LavaOverLavaBackupFlagName = "use-lava-over-lava-backup"
)

var (
Expand Down Expand Up @@ -156,9 +157,11 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt
}

consumerMetricsManager.SetVersion(upgrade.GetCurrentVersion().ConsumerVersion)
var customLavaTransport *CustomLavaTransport
httpClient, err := jsonrpcclient.DefaultHTTPClient(options.clientCtx.NodeURI)
if err == nil {
httpClient.Transport = NewCustomLavaTransport(httpClient.Transport)
customLavaTransport = NewCustomLavaTransport(httpClient.Transport, nil)
httpClient.Transport = customLavaTransport
client, err := rpchttp.NewWithClient(options.clientCtx.NodeURI, "/websocket", httpClient)
if err == nil {
options.clientCtx = options.clientCtx.WithClient(client)
Expand Down Expand Up @@ -227,10 +230,25 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt
for _, rpcEndpoint := range options.rpcEndpoints {
go func(rpcEndpoint *lavasession.RPCEndpoint) error {
defer wg.Done()
_, err := rpcc.CreateConsumerEndpoint(ctx, rpcEndpoint, errCh, consumerAddr, consumerStateTracker,
rpcConsumerServer, err := rpcc.CreateConsumerEndpoint(ctx, rpcEndpoint, errCh, consumerAddr, consumerStateTracker,
policyUpdaters, optimizers, consumerConsistencies, finalizationConsensuses, chainMutexes,
options, privKey, lavaChainID, rpcConsumerMetrics, consumerReportsManager, consumerOptimizerQoSClient,
consumerMetricsManager, relaysMonitorAggregator)
if err == nil {
if customLavaTransport != nil && statetracker.IsLavaNativeSpec(rpcEndpoint.ChainID) && rpcEndpoint.ApiInterface == spectypes.APIInterfaceTendermintRPC {
// we can add lava over lava to the custom transport as a secondary source
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
if rpcConsumerServer.IsInitialized() {
customLavaTransport.SetSecondaryTransport(rpcConsumerServer)
return
}
}
}()
}
}
return err
}(rpcEndpoint)
}
Expand Down Expand Up @@ -634,6 +652,43 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77
utils.LavaFormatFatal("offline spec modifications are supported only in single chain bootstrapping", nil, utils.LogAttr("len(rpcEndpoints)", len(rpcEndpoints)), utils.LogAttr("rpcEndpoints", rpcEndpoints))
}

if viper.GetBool(LavaOverLavaBackupFlagName) {
additionalEndpoint := func() *lavasession.RPCEndpoint {
for _, endpoint := range rpcEndpoints {
if statetracker.IsLavaNativeSpec(endpoint.ChainID) {
// native spec already exists, no need to add
return nil
}
}
// need to add an endpoint for the native lava chain
if strings.Contains(networkChainId, "mainnet") {
return &lavasession.RPCEndpoint{
NetworkAddress: chainlib.INTERNAL_ADDRESS,
ChainID: statetracker.MAINNET_SPEC,
ApiInterface: spectypes.APIInterfaceTendermintRPC,
}
} else if strings.Contains(networkChainId, "testnet") {
return &lavasession.RPCEndpoint{
NetworkAddress: chainlib.INTERNAL_ADDRESS,
ChainID: statetracker.TESTNET_SPEC,
ApiInterface: spectypes.APIInterfaceTendermintRPC,
}
} else if strings.Contains(networkChainId, "testnet") || networkChainId == "lava" {
return &lavasession.RPCEndpoint{
NetworkAddress: chainlib.INTERNAL_ADDRESS,
ChainID: statetracker.TESTNET_SPEC,
ApiInterface: spectypes.APIInterfaceTendermintRPC,
}
}
utils.LavaFormatError("could not find a native lava chain for the current network", nil, utils.LogAttr("networkChainId", networkChainId))
return nil
}()
if additionalEndpoint != nil {
utils.LavaFormatInfo("Lava over Lava backup is enabled", utils.Attribute{Key: "additionalEndpoint", Value: additionalEndpoint.ChainID})
rpcEndpoints = append(rpcEndpoints, additionalEndpoint)
}
}

rpcConsumerSharedState := viper.GetBool(common.SharedStateFlag)
err = rpcConsumer.Start(ctx, &rpcConsumerStartOptions{
txFactory,
Expand Down Expand Up @@ -699,6 +754,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77
cmdRPCConsumer.Flags().DurationVar(&metrics.OptimizerQosServerSamplingInterval, common.OptimizerQosServerSamplingIntervalFlag, time.Second*1, "interval to sample optimizer qos reports")
cmdRPCConsumer.Flags().IntVar(&chainlib.WebSocketRateLimit, common.RateLimitWebSocketFlag, chainlib.WebSocketRateLimit, "rate limit (per second) websocket requests per user connection, default is unlimited")
cmdRPCConsumer.Flags().DurationVar(&chainlib.WebSocketBanDuration, common.BanDurationForWebsocketRateLimitExceededFlag, chainlib.WebSocketBanDuration, "once websocket rate limit is reached, user will be banned Xfor a duration, default no ban")
cmdRPCConsumer.Flags().Bool(LavaOverLavaBackupFlagName, true, "enable lava over lava backup to regular rpc calls")
common.AddRollingLogConfig(cmdRPCConsumer)
return cmdRPCConsumer
}
Expand Down
Loading

0 comments on commit b4bd381

Please sign in to comment.