Skip to content

Commit

Permalink
Merge branch 'main' into CNS-137-implement
Browse files Browse the repository at this point in the history
  • Loading branch information
oren-lava committed Feb 5, 2023
2 parents 27690c0 + c94cc89 commit 884b1fa
Show file tree
Hide file tree
Showing 35 changed files with 191 additions and 104 deletions.
35 changes: 17 additions & 18 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ jobs:
### Run relayer unitests
######################################################
- name: Run Relayer unit Tests
run: go test ./relayer/lavasession/ ./protocol/chaintracker/ ./relayer/chainproxy/ -v
run: go test ./protocol/lavasession/ ./protocol/chaintracker/ ./relayer/chainproxy/ -v
- name: Run Relayer Metrics Unit Tests
run: go test ./relayer/metrics/ -v

Expand Down Expand Up @@ -117,14 +117,14 @@ jobs:
continue-on-error: true
run: grep "" testutil/e2e/logs/03_jsonProvider* --include="*errors*"

- name: JSON Gateway All Logs
- name: JSON Consumer All Logs
if: always()
run: cat testutil/e2e/logs/04_jsonGateway.log
run: cat testutil/e2e/logs/04_jsonConsumer.log

- name: JSON Gateway Error Only Logs
- name: JSON Consumer Error Only Logs
if: always()
continue-on-error: true
run: cat testutil/e2e/logs/04_jsonGateway_errors.log
run: cat testutil/e2e/logs/04_jsonConsumer_errors.log

- name: Tendermint Provider All Logs
if: always()
Expand All @@ -135,14 +135,14 @@ jobs:
continue-on-error: true
run: grep "" testutil/e2e/logs/05_tendermintProvider* --include="*errors*"

- name: Tendermint Gateway All Logs
- name: Tendermint Consumer All Logs
if: always()
run: cat testutil/e2e/logs/06_tendermintGateway.log
run: cat testutil/e2e/logs/06_tendermintConsumer.log

- name: Tendermint Gateway Error Only Logs
- name: Tendermint Consumer Error Only Logs
if: always()
continue-on-error: true
run: cat testutil/e2e/logs/06_tendermintGateway_errors.log
run: cat testutil/e2e/logs/06_tendermintConsumer_errors.log

- name: Lava over Lava All Logs
if: always()
Expand All @@ -162,14 +162,14 @@ jobs:
continue-on-error: true
run: grep "" testutil/e2e/logs/08_restProvider* --include="*errors*"

- name: Rest Gateway All Logs
- name: Rest Consumer All Logs
if: always()
run: cat testutil/e2e/logs/09_restGateway.log
run: cat testutil/e2e/logs/09_restConsumer.log

- name: Rest Gateway Error Only Logs
- name: Rest Consumer Error Only Logs
if: always()
continue-on-error: true
run: cat testutil/e2e/logs/09_restGateway_errors.log
run: cat testutil/e2e/logs/09_restConsumer_errors.log

- name: GRPC Provider All Logs
if: always()
Expand All @@ -180,12 +180,11 @@ jobs:
continue-on-error: true
run: grep "" testutil/e2e/logs/10_grpcProvider* --include="*errors*"

- name: GRPC Gateway All Logs
- name: GRPC Consumer All Logs
if: always()
run: cat testutil/e2e/logs/11_grpcGateway.log
run: cat testutil/e2e/logs/11_grpcConsumer.log

- name: GRPC Gateway Error Only Logs
- name: GRPC Consumer Error Only Logs
if: always()
continue-on-error: true

run: cat testutil/e2e/logs/11_grpcGateway_errors.log
run: cat testutil/e2e/logs/11_grpcConsumer_errors.log
2 changes: 1 addition & 1 deletion cmd/lavad/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (
"github.com/cosmos/cosmos-sdk/version"
"github.com/ignite-hq/cli/ignite/pkg/cosmoscmd"
"github.com/lavanet/lava/app"
"github.com/lavanet/lava/protocol/lavasession"
"github.com/lavanet/lava/protocol/rpcconsumer"
"github.com/lavanet/lava/protocol/rpcprovider"
"github.com/lavanet/lava/relayer"
"github.com/lavanet/lava/relayer/chainproxy"
"github.com/lavanet/lava/relayer/lavasession"
"github.com/lavanet/lava/relayer/performance"
"github.com/lavanet/lava/relayer/sentry"
"github.com/lavanet/lava/utils"
Expand Down
2 changes: 1 addition & 1 deletion protocol/chainlib/chainlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/lavanet/lava/protocol/chainlib/chainproxy/rpcclient"
"github.com/lavanet/lava/protocol/common"
"github.com/lavanet/lava/relayer/lavasession"
"github.com/lavanet/lava/protocol/lavasession"
"github.com/lavanet/lava/relayer/metrics"
"github.com/lavanet/lava/relayer/parser"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
Expand Down
6 changes: 6 additions & 0 deletions protocol/chainlib/chainproxy/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package chainproxy

const (
LavaErrorCode = 555
InternalErrorString = "Internal Error"
)
6 changes: 6 additions & 0 deletions protocol/chainlib/chainproxy/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ func NewConnector(ctx context.Context, nConns uint, addr string) *Connector {
}
connector.freeClients = append(connector.freeClients, rpcClient)
}
if len(connector.freeClients) == 0 {
utils.LavaFormatFatal("Could not create any connections to the node check address", nil, &map[string]string{"address": addr})
}
utils.LavaFormatInfo("Number of parallel connections created: "+strconv.Itoa(len(connector.freeClients)), nil)
go connector.connectorLoop(ctx)
return connector
Expand Down Expand Up @@ -226,6 +229,9 @@ func NewGRPCConnector(ctx context.Context, nConns uint, addr string) *GRPCConnec
}
connector.freeClients = append(connector.freeClients, grpcClient)
}
if len(connector.freeClients) == 0 {
utils.LavaFormatFatal("Could not create any connections to the node check address", nil, &map[string]string{"address": addr})
}
go connector.connectorLoop(ctx)
return connector
}
Expand Down
34 changes: 29 additions & 5 deletions protocol/chainlib/chainproxy/tendermintRPCMessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ func GetTendermintRPCError(jsonError *rpcclient.JsonError) (*tenderminttypes.RPC
return rpcError, nil
}

func ConvertErrorToRPCError(err error) *tenderminttypes.RPCError {
func ConvertErrorToRPCError(errString string, code int) *tenderminttypes.RPCError {
var rpcError *tenderminttypes.RPCError
unmarshalError := json.Unmarshal([]byte(err.Error()), &rpcError)
unmarshalError := json.Unmarshal([]byte(errString), &rpcError)
if unmarshalError != nil {
utils.LavaFormatWarning("Failed unmarshalling error tendermintrpc", unmarshalError, &map[string]string{"err": err.Error()})
utils.LavaFormatWarning("Failed unmarshalling error tendermintrpc", unmarshalError, &map[string]string{"err": errString})
rpcError = &tenderminttypes.RPCError{
Code: -1, // TODO get code from error
Code: code,
Message: "Rpc Error",
Data: err.Error(),
Data: errString,
}
}
return rpcError
Expand Down Expand Up @@ -123,3 +123,27 @@ func ConvertTendermintMsg(rpcMsg *rpcclient.JsonrpcMessage) (*RPCResponse, error

return msg, nil
}

func ConvertToTendermintError(errString string, inputInfo []byte) string {
var msg JsonrpcMessage
err := json.Unmarshal(inputInfo, &msg)
if err == nil {
id, errId := IdFromRawMessage(msg.ID)
if errId != nil {
utils.LavaFormatError("error idFromRawMessage", errId, nil)
return InternalErrorString
}
res, merr := json.Marshal(&RPCResponse{
JSONRPC: msg.Version,
ID: id,
Error: ConvertErrorToRPCError(errString, LavaErrorCode),
})
if merr != nil {
utils.LavaFormatError("convertToTendermintError json.Marshal", merr, nil)
return InternalErrorString
}
return string(res)
}
utils.LavaFormatError("error convertToTendermintError", err, nil)
return InternalErrorString
}
2 changes: 1 addition & 1 deletion protocol/chainlib/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/lavanet/lava/protocol/chainlib/chainproxy/rpcclient"
"github.com/lavanet/lava/protocol/chainlib/chainproxy/thirdparty"
"github.com/lavanet/lava/protocol/common"
"github.com/lavanet/lava/relayer/lavasession"
"github.com/lavanet/lava/protocol/lavasession"
"github.com/lavanet/lava/relayer/metrics"
"github.com/lavanet/lava/utils"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
Expand Down
2 changes: 1 addition & 1 deletion protocol/chainlib/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/gofiber/fiber/v2/middleware/favicon"
"github.com/gofiber/websocket/v2"
"github.com/lavanet/lava/protocol/common"
"github.com/lavanet/lava/relayer/lavasession"
"github.com/lavanet/lava/protocol/lavasession"
"github.com/lavanet/lava/relayer/metrics"
"github.com/lavanet/lava/relayer/parser"

Expand Down
2 changes: 1 addition & 1 deletion protocol/chainlib/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

"github.com/lavanet/lava/protocol/chainlib/chainproxy"
"github.com/lavanet/lava/protocol/chainlib/chainproxy/rpcclient"
"github.com/lavanet/lava/relayer/lavasession"
"github.com/lavanet/lava/protocol/lavasession"
"github.com/lavanet/lava/utils"

pairingtypes "github.com/lavanet/lava/x/pairing/types"
Expand Down
6 changes: 3 additions & 3 deletions protocol/chainlib/tendermintRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/lavanet/lava/protocol/chainlib/chainproxy"
"github.com/lavanet/lava/protocol/chainlib/chainproxy/rpcclient"
"github.com/lavanet/lava/protocol/common"
"github.com/lavanet/lava/relayer/lavasession"
"github.com/lavanet/lava/protocol/lavasession"
"github.com/lavanet/lava/relayer/metrics"
"github.com/lavanet/lava/relayer/parser"
"github.com/lavanet/lava/utils"
Expand Down Expand Up @@ -337,7 +337,7 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) {
c.Status(fiber.StatusInternalServerError)

// Construct json response
response := convertToJsonError(errMasking)
response := chainproxy.ConvertToTendermintError(errMasking, c.Body())

// Return error json response
return c.SendString(response)
Expand Down Expand Up @@ -515,7 +515,7 @@ func (cp *tendermintRpcChainProxy) SendRPC(ctx context.Context, nodeMessage *cha
replyMsg = &chainproxy.RPCResponse{
JSONRPC: nodeMessage.Version,
ID: id,
Error: chainproxy.ConvertErrorToRPCError(err),
Error: chainproxy.ConvertErrorToRPCError(err.Error(), -1), // TODO: extract code from error status / message
}
} else {
replyMessage, err = chainproxy.ConvertTendermintMsg(rpcMessage)
Expand Down
11 changes: 6 additions & 5 deletions protocol/lavaprotocol/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

"github.com/btcsuite/btcd/btcec"
"github.com/lavanet/lava/protocol/chainlib"
"github.com/lavanet/lava/relayer/lavasession"
"github.com/lavanet/lava/protocol/lavasession"
"github.com/lavanet/lava/relayer/sigs"
"github.com/lavanet/lava/utils"
conflicttypes "github.com/lavanet/lava/x/conflict/types"
Expand All @@ -18,10 +18,11 @@ import (
)

const (
TimePerCU = uint64(100 * time.Millisecond)
MinimumTimePerRelayDelay = time.Second
AverageWorldLatency = 200 * time.Millisecond
SupportedNumberOfVRFs = 2
TimePerCU = uint64(100 * time.Millisecond)
MinimumTimePerRelayDelay = time.Second
AverageWorldLatency = 200 * time.Millisecond
DataReliabilityTimeoutIncrease = 5 * time.Second
SupportedNumberOfVRFs = 2
)

type RelayRequestCommonData struct {
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -334,3 +334,8 @@ func (cs *SingleConsumerSession) CalculateQoS(cu uint64, latency time.Duration,
})
}
}

// validate if this is a data reliability session
func (scs *SingleConsumerSession) IsDataReliabilitySession() bool {
return scs.SessionId <= DataReliabilitySessionId
}
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion protocol/rpcconsumer/rpcconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/lavanet/lava/protocol/chainlib"
"github.com/lavanet/lava/protocol/lavaprotocol"
"github.com/lavanet/lava/protocol/lavasession"
"github.com/lavanet/lava/protocol/statetracker"
"github.com/lavanet/lava/relayer/lavasession"
"github.com/lavanet/lava/relayer/performance"
"github.com/lavanet/lava/relayer/sigs"
"github.com/lavanet/lava/utils"
Expand Down
13 changes: 10 additions & 3 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/lavanet/lava/protocol/chainlib"
"github.com/lavanet/lava/protocol/common"
"github.com/lavanet/lava/protocol/lavaprotocol"
"github.com/lavanet/lava/relayer/lavasession"
"github.com/lavanet/lava/protocol/lavasession"
"github.com/lavanet/lava/relayer/metrics"
"github.com/lavanet/lava/relayer/performance"
"github.com/lavanet/lava/utils"
Expand Down Expand Up @@ -129,7 +129,10 @@ func (rpccs *RPCConsumerServer) SendRelay(
enabled, dataReliabilityThreshold := rpccs.chainParser.DataReliabilityParams()
if enabled {
for _, relayResult := range relayResults {
go rpccs.sendDataReliabilityRelayIfApplicable(ctx, relayResult, chainMessage, dataReliabilityThreshold, &relayRequestCommonData) // runs asynchronously
// new context is needed for data reliability as some clients cancel the context they provide when the relay returns
// as data reliability happens in a go routine it will continue while the response returns.
dataReliabilityContext := context.Background()
go rpccs.sendDataReliabilityRelayIfApplicable(dataReliabilityContext, relayResult, chainMessage, dataReliabilityThreshold, &relayRequestCommonData) // runs asynchronously
}
}

Expand Down Expand Up @@ -230,7 +233,11 @@ func (rpccs *RPCConsumerServer) relayInner(ctx context.Context, singleConsumerSe
existingSessionLatestBlock := singleConsumerSession.LatestBlock // we read it now because singleConsumerSession is locked, and later it's not
endpointClient := *singleConsumerSession.Endpoint.Client
relaySentTime := time.Now()
connectCtx, cancel := context.WithTimeout(ctx, lavaprotocol.GetTimePerCu(singleConsumerSession.LatestRelayCu)+lavaprotocol.AverageWorldLatency)
extraTimeForContext := time.Duration(0)
if singleConsumerSession.IsDataReliabilitySession() { // for data reliability session we add more time to timeout
extraTimeForContext += lavaprotocol.DataReliabilityTimeoutIncrease
}
connectCtx, cancel := context.WithTimeout(ctx, lavaprotocol.GetTimePerCu(singleConsumerSession.LatestRelayCu)+lavaprotocol.AverageWorldLatency+extraTimeForContext)
defer cancel()
relayRequest := relayResult.Request
providerPublicAddress := relayResult.ProviderAddress
Expand Down
2 changes: 1 addition & 1 deletion protocol/rpcprovider/rewardserver/reward_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package rewardserver
import (
"context"

"github.com/lavanet/lava/relayer/lavasession"
"github.com/lavanet/lava/protocol/lavasession"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
)

Expand Down
2 changes: 1 addition & 1 deletion protocol/rpcprovider/rpcprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/lavanet/lava/protocol/chainlib"
"github.com/lavanet/lava/protocol/chaintracker"
"github.com/lavanet/lava/protocol/lavasession"
"github.com/lavanet/lava/protocol/rpcprovider/reliabilitymanager"
"github.com/lavanet/lava/protocol/rpcprovider/rewardserver"
"github.com/lavanet/lava/protocol/statetracker"
"github.com/lavanet/lava/relayer/lavasession"
"github.com/lavanet/lava/relayer/performance"
"github.com/lavanet/lava/relayer/sigs"
"github.com/lavanet/lava/utils"
Expand Down
2 changes: 1 addition & 1 deletion protocol/rpcprovider/rpcprovider_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/btcsuite/btcd/btcec"
"github.com/lavanet/lava/protocol/chainlib"
"github.com/lavanet/lava/protocol/chaintracker"
"github.com/lavanet/lava/relayer/lavasession"
"github.com/lavanet/lava/protocol/lavasession"
"github.com/lavanet/lava/relayer/performance"
)

Expand Down
2 changes: 1 addition & 1 deletion protocol/statetracker/consumer_state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/lavanet/lava/protocol/chainlib"
"github.com/lavanet/lava/protocol/chaintracker"
"github.com/lavanet/lava/protocol/lavaprotocol"
"github.com/lavanet/lava/relayer/lavasession"
"github.com/lavanet/lava/protocol/lavasession"
"github.com/lavanet/lava/utils"
conflicttypes "github.com/lavanet/lava/x/conflict/types"
spectypes "github.com/lavanet/lava/x/spec/types"
Expand Down
2 changes: 1 addition & 1 deletion protocol/statetracker/pairing_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package statetracker

import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/lavanet/lava/relayer/lavasession"
"github.com/lavanet/lava/protocol/lavasession"
epochstoragetypes "github.com/lavanet/lava/x/epochstorage/types"
)

Expand Down
Loading

0 comments on commit 884b1fa

Please sign in to comment.