Skip to content

Commit

Permalink
feat: PRT - Check the lava chain id in provider (#1751)
Browse files Browse the repository at this point in the history
* Changed logs

* Add the lava chain id to the probe

* Check the lava chain id in the "rpcprovider test"

* Check the lava chain id on provider startup

* CR Fix + Bug fix

* Fix the Ctrl + C not working bug

* CR Fix: Remove duplicated check
  • Loading branch information
shleikes authored Oct 29, 2024
1 parent b987fd6 commit 479a906
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 32 deletions.
1 change: 1 addition & 0 deletions protocol/common/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
MaximumConcurrentProvidersFlagName = "concurrent-providers"
StatusCodeMetadataKey = "status-code"
VersionMetadataKey = "lavap-version"
LavaChainIdMetadataKey = "lavap-chain-id"
TimeOutForFetchingLavaBlocksFlag = "timeout-for-fetching-lava-blocks"
)

Expand Down
1 change: 1 addition & 0 deletions protocol/rpcprovider/rpcprovider_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1208,6 +1208,7 @@ func (rpcps *RPCProviderServer) Probe(ctx context.Context, probeReq *pairingtype
}
trailer := metadata.Pairs(common.VersionMetadataKey, upgrade.GetCurrentVersion().ProviderVersion)
trailer.Append(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId)
trailer.Append(common.LavaChainIdMetadataKey, rpcps.lavaChainID)
grpc.SetTrailer(ctx, trailer) // we ignore this error here since this code can be triggered not from grpc
return probeReply, nil
}
Expand Down
149 changes: 117 additions & 32 deletions protocol/rpcprovider/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import (
"crypto/tls"
"fmt"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"time"

"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/config"
"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/version"
"github.com/gogo/status"
"github.com/lavanet/lava/v4/app"
lvutil "github.com/lavanet/lava/v4/ecosystem/lavavisor/pkg/util"
"github.com/lavanet/lava/v4/protocol/chainlib/chainproxy"
"github.com/lavanet/lava/v4/protocol/common"
Expand Down Expand Up @@ -49,7 +49,7 @@ func validatePortNumber(ipPort string) string {
}

func PerformCORSCheck(endpoint epochstoragetypes.Endpoint) error {
utils.LavaFormatDebug("Checking CORS", utils.Attribute{Key: "endpoint", Value: endpoint})
utils.LavaFormatDebug("Checking CORS", utils.LogAttr("endpoint", endpoint))
// Construct the URL for the RPC endpoint
endpointURL := "https://" + endpoint.IPPORT // Providers must have HTTPS support

Expand Down Expand Up @@ -86,7 +86,10 @@ func validateCORSHeaders(resp *http.Response) error {
// Check for the presence of "Access-Control-Allow-Origin" header
corsOrigin := resp.Header.Get("Access-Control-Allow-Origin")
if corsOrigin != "*" {
return utils.LavaFormatError("CORS check failed. Expected 'Access-Control-Allow-Origin: *' but not found.", nil, utils.Attribute{Key: "returned code", Value: resp.StatusCode}, utils.Attribute{Key: "corsOrigin", Value: corsOrigin})
return utils.LavaFormatError("CORS check failed. Expected 'Access-Control-Allow-Origin: *' but not found.", nil,
utils.LogAttr("returned code", resp.StatusCode),
utils.LogAttr("corsOrigin", corsOrigin),
)
}

// Headers that must be present in "Access-Control-Allow-Headers"
Expand All @@ -95,21 +98,17 @@ func validateCORSHeaders(resp *http.Response) error {
corsHeaders := strings.ToLower(resp.Header.Get("Access-Control-Allow-Headers"))
for _, requiredHeader := range requiredHeaders {
if !strings.Contains(corsHeaders, strings.ToLower(requiredHeader)) {
return utils.LavaFormatError("CORS check failed. Expected 'Access-Control-Allow-Headers' are not present.", nil, utils.Attribute{Key: "corsHeaders", Value: corsHeaders}, utils.Attribute{Key: "requiredHeader", Value: requiredHeader})
return utils.LavaFormatError("CORS check failed. Expected 'Access-Control-Allow-Headers' are not present.", nil,
utils.LogAttr("corsHeaders", corsHeaders),
utils.LogAttr("requiredHeader", requiredHeader),
)
}
}

return nil
}

func startTesting(ctx context.Context, clientCtx client.Context, providerEntries []epochstoragetypes.StakeEntry, plainTextConnection bool) error {
ctx, cancel := context.WithCancel(ctx)
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
defer func() {
signal.Stop(signalChan)
cancel()
}()
func startTesting(ctx context.Context, clientCtx client.Context, lavaNetworkChainId string, providerEntries []epochstoragetypes.StakeEntry, plainTextConnection bool) error {
goodChains := []string{}
badChains := []string{}
portValidation := []string{}
Expand All @@ -121,7 +120,10 @@ func startTesting(ctx context.Context, clientCtx client.Context, providerEntries
lavaVersion := param.GetParams().Version
targetVersion := lvutil.ParseToSemanticVersion(lavaVersion.ProviderTarget)
for _, providerEntry := range providerEntries {
utils.LavaFormatInfo("checking provider entry", utils.Attribute{Key: "chainID", Value: providerEntry.Chain}, utils.Attribute{Key: "endpoints", Value: providerEntry.Endpoints})
utils.LavaFormatInfo("checking provider entry",
utils.LogAttr("chainID", providerEntry.Chain),
utils.LogAttr("endpoints", providerEntry.Endpoints),
)

for _, endpoint := range providerEntry.Endpoints {
checkOneProvider := func(apiInterface string, addon string) (time.Duration, string, int64, error) {
Expand All @@ -137,7 +139,12 @@ func startTesting(ctx context.Context, clientCtx client.Context, providerEntries
utils.LavaFormatWarning("You are using plain text connection (disabled tls), no consumer can connect to it as all consumers use tls. this should be used for testing purposes only", nil)
conn, err = grpc.DialContext(ctx, endpoint.IPPORT, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(chainproxy.MaxCallRecvMsgSize)))
if err != nil {
return 0, "", 0, utils.LavaFormatError("failed connecting to provider endpoint", err, utils.Attribute{Key: "apiInterface", Value: apiInterface}, utils.Attribute{Key: "addon", Value: addon}, utils.Attribute{Key: "chainID", Value: providerEntry.Chain}, utils.Attribute{Key: "network address", Value: endpoint.IPPORT})
return 0, "", 0, utils.LavaFormatError("failed connecting to provider endpoint", err,
utils.LogAttr("apiInterface", apiInterface),
utils.LogAttr("addon", addon),
utils.LogAttr("chainID", providerEntry.Chain),
utils.LogAttr("network address", endpoint.IPPORT),
)
}
relayerClient = pairingtypes.NewRelayerClient(conn)
} else {
Expand All @@ -149,10 +156,20 @@ func startTesting(ctx context.Context, clientCtx client.Context, providerEntries
_, _, err := cswp.ConnectRawClientWithTimeout(ctx, endpoint.IPPORT)
lavasession.AllowInsecureConnectionToProviders = false
if err == nil {
return 0, "", 0, utils.LavaFormatError("provider endpoint is insecure when it should be secure", err, utils.Attribute{Key: "apiInterface", Value: apiInterface}, utils.Attribute{Key: "addon", Value: addon}, utils.Attribute{Key: "chainID", Value: providerEntry.Chain}, utils.Attribute{Key: "network address", Value: endpoint.IPPORT})
return 0, "", 0, utils.LavaFormatError("provider endpoint is insecure when it should be secure", err,
utils.LogAttr("apiInterface", apiInterface),
utils.LogAttr("addon", addon),
utils.LogAttr("chainID", providerEntry.Chain),
utils.LogAttr("network address", endpoint.IPPORT),
)
}
}
return 0, "", 0, utils.LavaFormatError("failed connecting to provider endpoint", err, utils.Attribute{Key: "apiInterface", Value: apiInterface}, utils.Attribute{Key: "addon", Value: addon}, utils.Attribute{Key: "chainID", Value: providerEntry.Chain}, utils.Attribute{Key: "network address", Value: endpoint.IPPORT})
return 0, "", 0, utils.LavaFormatError("failed connecting to provider endpoint", err,
utils.LogAttr("apiInterface", apiInterface),
utils.LogAttr("addon", addon),
utils.LogAttr("chainID", providerEntry.Chain),
utils.LogAttr("network address", endpoint.IPPORT),
)
}
}

Expand All @@ -167,17 +184,52 @@ func startTesting(ctx context.Context, clientCtx client.Context, providerEntries
var trailer metadata.MD
probeResp, err := relayerClient.Probe(ctx, probeReq, grpc.Trailer(&trailer))
if err != nil {
return 0, "", 0, utils.LavaFormatError("failed probing provider endpoint", err, utils.Attribute{Key: "apiInterface", Value: apiInterface}, utils.Attribute{Key: "addon", Value: addon}, utils.Attribute{Key: "chainID", Value: providerEntry.Chain}, utils.Attribute{Key: "network address", Value: endpoint.IPPORT})
return 0, "", 0, utils.LavaFormatError("failed probing provider endpoint", err,
utils.LogAttr("apiInterface", apiInterface),
utils.LogAttr("addon", addon),
utils.LogAttr("chainID", providerEntry.Chain),
utils.LogAttr("network address", endpoint.IPPORT),
)
}
versions := strings.Join(trailer.Get(common.VersionMetadataKey), ",")
relayLatency := time.Since(relaySentTime)
if guid != probeResp.GetGuid() {
return 0, versions, 0, utils.LavaFormatError("probe returned invalid value", err, utils.Attribute{Key: "returnedGuid", Value: probeResp.GetGuid()}, utils.Attribute{Key: "guid", Value: guid}, utils.Attribute{Key: "apiInterface", Value: apiInterface}, utils.Attribute{Key: "addon", Value: addon}, utils.Attribute{Key: "chainID", Value: providerEntry.Chain}, utils.Attribute{Key: "network address", Value: endpoint.IPPORT})
return 0, versions, 0, utils.LavaFormatError("probe returned invalid value", err,
utils.LogAttr("returnedGuid", probeResp.GetGuid()),
utils.LogAttr("guid", guid),
utils.LogAttr("apiInterface", apiInterface),
utils.LogAttr("addon", addon),
utils.LogAttr("chainID", providerEntry.Chain),
utils.LogAttr("network address", endpoint.IPPORT),
)
}

// chain id check - lava node chain id should be the same as the one we are probing
lavaChainIdFromProbeMD := trailer.Get(common.LavaChainIdMetadataKey)
if len(lavaChainIdFromProbeMD) > 0 {
lavaChainIdFromProbe := lavaChainIdFromProbeMD[0]
if lavaChainIdFromProbe != lavaNetworkChainId {
return 0, versions, 0, utils.LavaFormatError("lava chain id from probe does not match the configured network chain id", nil,
utils.LogAttr("returnedGuid", probeResp.GetGuid()),
utils.LogAttr("guid", guid),
utils.LogAttr("apiInterface", apiInterface),
utils.LogAttr("addon", addon),
utils.LogAttr("lavaChainIdFromProbe", lavaChainIdFromProbe),
utils.LogAttr("networkChainId", lavaNetworkChainId),
)
}
}

// CORS check
if err := PerformCORSCheck(endpoint); err != nil {
return 0, versions, 0, utils.LavaFormatError("invalid CORS check", err, utils.Attribute{Key: "returnedGuid", Value: probeResp.GetGuid()}, utils.Attribute{Key: "guid", Value: guid}, utils.Attribute{Key: "apiInterface", Value: apiInterface}, utils.Attribute{Key: "addon", Value: addon}, utils.Attribute{Key: "chainID", Value: providerEntry.Chain}, utils.Attribute{Key: "network address", Value: endpoint.IPPORT})
return 0, versions, 0, utils.LavaFormatError("invalid CORS check", err,
utils.LogAttr("returnedGuid", probeResp.GetGuid()),
utils.LogAttr("guid", guid),
utils.LogAttr("apiInterface", apiInterface),
utils.LogAttr("addon", addon),
utils.LogAttr("chainID", providerEntry.Chain),
utils.LogAttr("network address", endpoint.IPPORT),
)
}

relayRequest := &pairingtypes.RelayRequest{
Expand All @@ -186,17 +238,27 @@ func startTesting(ctx context.Context, clientCtx client.Context, providerEntries
}
_, err = relayerClient.Relay(ctx, relayRequest)
if err == nil {
return 0, "", 0, utils.LavaFormatError("relay Without signature did not error, unexpected", nil, utils.Attribute{Key: "apiInterface", Value: apiInterface}, utils.Attribute{Key: "addon", Value: addon}, utils.Attribute{Key: "chainID", Value: providerEntry.Chain}, utils.Attribute{Key: "network address", Value: endpoint.IPPORT})
return 0, "", 0, utils.LavaFormatError("relay Without signature did not error, unexpected", nil,
utils.LogAttr("apiInterface", apiInterface),
utils.LogAttr("addon", addon),
utils.LogAttr("chainID", providerEntry.Chain),
utils.LogAttr("network address", endpoint.IPPORT),
)
}
code := status.Code(err)
if code != codes.Code(lavasession.EpochMismatchError.ABCICode()) {
return 0, versions, 0, utils.LavaFormatError("relay returned unexpected error", err, utils.Attribute{Key: "apiInterface", Value: apiInterface}, utils.Attribute{Key: "addon", Value: addon}, utils.Attribute{Key: "chainID", Value: providerEntry.Chain}, utils.Attribute{Key: "network address", Value: endpoint.IPPORT})
return 0, versions, 0, utils.LavaFormatError("relay returned unexpected error", err,
utils.LogAttr("apiInterface", apiInterface),
utils.LogAttr("addon", addon),
utils.LogAttr("chainID", providerEntry.Chain),
utils.LogAttr("network address", endpoint.IPPORT),
)
}
return relayLatency, versions, probeResp.GetLatestBlock(), nil
}
endpointServices := endpoint.GetSupportedServices()
if len(endpointServices) == 0 {
utils.LavaFormatWarning("endpoint has no supported services", nil, utils.Attribute{Key: "endpoint", Value: endpoint})
utils.LavaFormatWarning("endpoint has no supported services", nil, utils.LogAttr("endpoint", endpoint))
}
for _, endpointService := range endpointServices {
probeLatency, version, latestBlockFromProbe, err := checkOneProvider(endpointService.ApiInterface, endpointService.Addon)
Expand All @@ -209,7 +271,12 @@ func startTesting(ctx context.Context, clientCtx client.Context, providerEntries
badChains = append(badChains, providerEntry.Chain+" "+endpointService.String()+" Version:"+version+" should be: "+lavaVersion.ProviderTarget)
continue
}
utils.LavaFormatInfo("successfully verified provider endpoint", utils.LogAttr("version", version), utils.Attribute{Key: "enspointService", Value: endpointService}, utils.Attribute{Key: "chainID", Value: providerEntry.Chain}, utils.Attribute{Key: "network address", Value: endpoint.IPPORT}, utils.Attribute{Key: "probe latency", Value: probeLatency})
utils.LavaFormatInfo("successfully verified provider endpoint", utils.LogAttr("version", version),
utils.LogAttr("enspointService", endpointService),
utils.LogAttr("chainID", providerEntry.Chain),
utils.LogAttr("network address", endpoint.IPPORT),
utils.LogAttr("probe latency", probeLatency),
)
goodChains = append(goodChains, providerEntry.Chain+"-"+endpointService.String()+" version: "+version+" latest block: 0x"+strconv.FormatInt(latestBlockFromProbe, 16))
}
}
Expand Down Expand Up @@ -254,14 +321,24 @@ rpcprovider --from providerWallet --endpoints "provider-public-grpc:port,jsonrpc
if err != nil {
return err
}

if networkChainId == app.Name {
clientTomlConfig, err := config.ReadFromClientConfig(clientCtx)
if err == nil {
if clientTomlConfig.ChainID != "" {
networkChainId = clientTomlConfig.ChainID
}
}
}

logLevel, err := cmd.Flags().GetString(flags.FlagLogLevel)
if err != nil {
utils.LavaFormatFatal("failed to read log level flag", err)
}
// setting the insecure option on provider dial, this should be used in development only!
lavasession.AllowInsecureConnectionToProviders = viper.GetBool(lavasession.AllowInsecureConnectionToProvidersFlag)
if lavasession.AllowInsecureConnectionToProviders {
utils.LavaFormatWarning("AllowInsecureConnectionToProviders is set to true, this should be used only in development", nil, utils.Attribute{Key: lavasession.AllowInsecureConnectionToProvidersFlag, Value: lavasession.AllowInsecureConnectionToProviders})
utils.LavaFormatWarning("AllowInsecureConnectionToProviders is set to true, this should be used only in development", nil, utils.LogAttr(lavasession.AllowInsecureConnectionToProvidersFlag, lavasession.AllowInsecureConnectionToProviders))
}

var address string
Expand All @@ -282,7 +359,7 @@ rpcprovider --from providerWallet --endpoints "provider-public-grpc:port,jsonrpc
} else {
address = args[0]
}
utils.LavaFormatInfo("RPCProvider Test started", utils.Attribute{Key: "address", Value: address})
utils.LavaFormatInfo("RPCProvider Test started", utils.LogAttr("address", address))
utils.SetGlobalLoggingLevel(logLevel)
clientCtx = clientCtx.WithChainID(networkChainId)

Expand All @@ -292,6 +369,14 @@ rpcprovider --from providerWallet --endpoints "provider-public-grpc:port,jsonrpc
if err != nil {
return err
}

if resultStatus.NodeInfo.Network != networkChainId {
return utils.LavaFormatError("network chain id does not match the one in the node", nil,
utils.LogAttr("networkChainId", networkChainId),
utils.LogAttr("nodeNetwork", resultStatus.NodeInfo.Network),
)
}

currentBlock := resultStatus.SyncInfo.LatestBlockHeight
// get all chains provider is serving and their endpoints
specQuerier := spectypes.NewQueryClient(clientCtx)
Expand Down Expand Up @@ -330,11 +415,11 @@ rpcprovider --from providerWallet --endpoints "provider-public-grpc:port,jsonrpc
ChainName: chainID,
})
if err != nil {
return utils.LavaFormatError("failed reading on chain data in order to resolve endpoint", err, utils.Attribute{Key: "endpoint", Value: endpoints[0]})
return utils.LavaFormatError("failed reading on chain data in order to resolve endpoint", err, utils.LogAttr("endpoint", endpoints[0]))
}
endpoints[0].ApiInterfaces = chainInfoResponse.Interfaces
}
utils.LavaFormatDebug("endpoints to check", utils.Attribute{Key: "endpoints", Value: endpoints})
utils.LavaFormatDebug("endpoints to check", utils.LogAttr("endpoints", endpoints))
providerEntry := epochstoragetypes.StakeEntry{
Endpoints: endpoints,
Chain: chainID,
Expand All @@ -353,7 +438,7 @@ rpcprovider --from providerWallet --endpoints "provider-public-grpc:port,jsonrpc
for _, provider := range response.StakeEntry {
if provider.Address == address {
if provider.StakeAppliedBlock > uint64(currentBlock+1) {
utils.LavaFormatWarning("provider is Frozen", nil, utils.Attribute{Key: "chainID", Value: provider.Chain})
utils.LavaFormatWarning("provider is Frozen", nil, utils.LogAttr("chainID", provider.Chain))
}
stakedProviderChains = append(stakedProviderChains, provider)
break
Expand All @@ -363,10 +448,10 @@ rpcprovider --from providerWallet --endpoints "provider-public-grpc:port,jsonrpc
}
}
if len(stakedProviderChains) == 0 {
utils.LavaFormatError("no active chains for provider", nil, utils.Attribute{Key: "address", Value: address})
utils.LavaFormatError("no active chains for provider", nil, utils.LogAttr("address", address))
}
utils.LavaFormatDebug("checking chain entries", utils.Attribute{Key: "stakedProviderChains", Value: stakedProviderChains})
return startTesting(ctx, clientCtx, stakedProviderChains, viper.GetBool(common.PlainTextConnection))
utils.LavaFormatDebug("checking chain entries", utils.LogAttr("stakedProviderChains", stakedProviderChains))
return startTesting(ctx, clientCtx, networkChainId, stakedProviderChains, viper.GetBool(common.PlainTextConnection))
},
}

Expand Down

0 comments on commit 479a906

Please sign in to comment.