diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 43b4c00bdf..10d81a95d6 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -76,7 +76,7 @@ jobs: ### Run relayer unitests ###################################################### - name: Run Relayer unit Tests - run: go test ./relayer/lavasession/ ./protocol/chaintracker/ ./relayer/chainproxy/ -v + run: go test ./protocol/lavasession/ ./protocol/chaintracker/ ./relayer/chainproxy/ -v - name: Run Relayer Metrics Unit Tests run: go test ./relayer/metrics/ -v @@ -117,14 +117,14 @@ jobs: continue-on-error: true run: grep "" testutil/e2e/logs/03_jsonProvider* --include="*errors*" - - name: JSON Gateway All Logs + - name: JSON Consumer All Logs if: always() - run: cat testutil/e2e/logs/04_jsonGateway.log + run: cat testutil/e2e/logs/04_jsonConsumer.log - - name: JSON Gateway Error Only Logs + - name: JSON Consumer Error Only Logs if: always() continue-on-error: true - run: cat testutil/e2e/logs/04_jsonGateway_errors.log + run: cat testutil/e2e/logs/04_jsonConsumer_errors.log - name: Tendermint Provider All Logs if: always() @@ -135,14 +135,14 @@ jobs: continue-on-error: true run: grep "" testutil/e2e/logs/05_tendermintProvider* --include="*errors*" - - name: Tendermint Gateway All Logs + - name: Tendermint Consumer All Logs if: always() - run: cat testutil/e2e/logs/06_tendermintGateway.log + run: cat testutil/e2e/logs/06_tendermintConsumer.log - - name: Tendermint Gateway Error Only Logs + - name: Tendermint Consumer Error Only Logs if: always() continue-on-error: true - run: cat testutil/e2e/logs/06_tendermintGateway_errors.log + run: cat testutil/e2e/logs/06_tendermintConsumer_errors.log - name: Lava over Lava All Logs if: always() @@ -162,14 +162,14 @@ jobs: continue-on-error: true run: grep "" testutil/e2e/logs/08_restProvider* --include="*errors*" - - name: Rest Gateway All Logs + - name: Rest Consumer All Logs if: always() - run: cat testutil/e2e/logs/09_restGateway.log + run: cat testutil/e2e/logs/09_restConsumer.log - - name: Rest Gateway Error Only Logs + - name: Rest Consumer Error Only Logs if: always() continue-on-error: true - run: cat testutil/e2e/logs/09_restGateway_errors.log + run: cat testutil/e2e/logs/09_restConsumer_errors.log - name: GRPC Provider All Logs if: always() @@ -180,12 +180,11 @@ jobs: continue-on-error: true run: grep "" testutil/e2e/logs/10_grpcProvider* --include="*errors*" - - name: GRPC Gateway All Logs + - name: GRPC Consumer All Logs if: always() - run: cat testutil/e2e/logs/11_grpcGateway.log + run: cat testutil/e2e/logs/11_grpcConsumer.log - - name: GRPC Gateway Error Only Logs + - name: GRPC Consumer Error Only Logs if: always() continue-on-error: true - - run: cat testutil/e2e/logs/11_grpcGateway_errors.log \ No newline at end of file + run: cat testutil/e2e/logs/11_grpcConsumer_errors.log diff --git a/cmd/lavad/main.go b/cmd/lavad/main.go index 62be91fc03..d1f88ed9dc 100644 --- a/cmd/lavad/main.go +++ b/cmd/lavad/main.go @@ -18,11 +18,11 @@ import ( "github.com/cosmos/cosmos-sdk/version" "github.com/ignite-hq/cli/ignite/pkg/cosmoscmd" "github.com/lavanet/lava/app" + "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/protocol/rpcconsumer" "github.com/lavanet/lava/protocol/rpcprovider" "github.com/lavanet/lava/relayer" "github.com/lavanet/lava/relayer/chainproxy" - "github.com/lavanet/lava/relayer/lavasession" "github.com/lavanet/lava/relayer/performance" "github.com/lavanet/lava/relayer/sentry" "github.com/lavanet/lava/utils" diff --git a/protocol/chainlib/chainlib.go b/protocol/chainlib/chainlib.go index 2b7fe3af6d..a6a507c9bc 100644 --- a/protocol/chainlib/chainlib.go +++ b/protocol/chainlib/chainlib.go @@ -7,7 +7,7 @@ import ( "github.com/lavanet/lava/protocol/chainlib/chainproxy/rpcclient" "github.com/lavanet/lava/protocol/common" - "github.com/lavanet/lava/relayer/lavasession" + "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/relayer/metrics" "github.com/lavanet/lava/relayer/parser" pairingtypes "github.com/lavanet/lava/x/pairing/types" diff --git a/protocol/chainlib/chainproxy/common.go b/protocol/chainlib/chainproxy/common.go new file mode 100644 index 0000000000..51f3e12411 --- /dev/null +++ b/protocol/chainlib/chainproxy/common.go @@ -0,0 +1,6 @@ +package chainproxy + +const ( + LavaErrorCode = 555 + InternalErrorString = "Internal Error" +) diff --git a/protocol/chainlib/chainproxy/connector.go b/protocol/chainlib/chainproxy/connector.go index 79fccaee39..b9ecabe13b 100644 --- a/protocol/chainlib/chainproxy/connector.go +++ b/protocol/chainlib/chainproxy/connector.go @@ -77,6 +77,9 @@ func NewConnector(ctx context.Context, nConns uint, addr string) *Connector { } connector.freeClients = append(connector.freeClients, rpcClient) } + if len(connector.freeClients) == 0 { + utils.LavaFormatFatal("Could not create any connections to the node check address", nil, &map[string]string{"address": addr}) + } utils.LavaFormatInfo("Number of parallel connections created: "+strconv.Itoa(len(connector.freeClients)), nil) go connector.connectorLoop(ctx) return connector @@ -226,6 +229,9 @@ func NewGRPCConnector(ctx context.Context, nConns uint, addr string) *GRPCConnec } connector.freeClients = append(connector.freeClients, grpcClient) } + if len(connector.freeClients) == 0 { + utils.LavaFormatFatal("Could not create any connections to the node check address", nil, &map[string]string{"address": addr}) + } go connector.connectorLoop(ctx) return connector } diff --git a/protocol/chainlib/chainproxy/tendermintRPCMessage.go b/protocol/chainlib/chainproxy/tendermintRPCMessage.go index a51c7d85f9..0fd5169f4f 100644 --- a/protocol/chainlib/chainproxy/tendermintRPCMessage.go +++ b/protocol/chainlib/chainproxy/tendermintRPCMessage.go @@ -44,15 +44,15 @@ func GetTendermintRPCError(jsonError *rpcclient.JsonError) (*tenderminttypes.RPC return rpcError, nil } -func ConvertErrorToRPCError(err error) *tenderminttypes.RPCError { +func ConvertErrorToRPCError(errString string, code int) *tenderminttypes.RPCError { var rpcError *tenderminttypes.RPCError - unmarshalError := json.Unmarshal([]byte(err.Error()), &rpcError) + unmarshalError := json.Unmarshal([]byte(errString), &rpcError) if unmarshalError != nil { - utils.LavaFormatWarning("Failed unmarshalling error tendermintrpc", unmarshalError, &map[string]string{"err": err.Error()}) + utils.LavaFormatWarning("Failed unmarshalling error tendermintrpc", unmarshalError, &map[string]string{"err": errString}) rpcError = &tenderminttypes.RPCError{ - Code: -1, // TODO get code from error + Code: code, Message: "Rpc Error", - Data: err.Error(), + Data: errString, } } return rpcError @@ -123,3 +123,27 @@ func ConvertTendermintMsg(rpcMsg *rpcclient.JsonrpcMessage) (*RPCResponse, error return msg, nil } + +func ConvertToTendermintError(errString string, inputInfo []byte) string { + var msg JsonrpcMessage + err := json.Unmarshal(inputInfo, &msg) + if err == nil { + id, errId := IdFromRawMessage(msg.ID) + if errId != nil { + utils.LavaFormatError("error idFromRawMessage", errId, nil) + return InternalErrorString + } + res, merr := json.Marshal(&RPCResponse{ + JSONRPC: msg.Version, + ID: id, + Error: ConvertErrorToRPCError(errString, LavaErrorCode), + }) + if merr != nil { + utils.LavaFormatError("convertToTendermintError json.Marshal", merr, nil) + return InternalErrorString + } + return string(res) + } + utils.LavaFormatError("error convertToTendermintError", err, nil) + return InternalErrorString +} diff --git a/protocol/chainlib/grpc.go b/protocol/chainlib/grpc.go index 029281b442..585273aa33 100644 --- a/protocol/chainlib/grpc.go +++ b/protocol/chainlib/grpc.go @@ -21,7 +21,7 @@ import ( "github.com/lavanet/lava/protocol/chainlib/chainproxy/rpcclient" "github.com/lavanet/lava/protocol/chainlib/chainproxy/thirdparty" "github.com/lavanet/lava/protocol/common" - "github.com/lavanet/lava/relayer/lavasession" + "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/relayer/metrics" "github.com/lavanet/lava/utils" pairingtypes "github.com/lavanet/lava/x/pairing/types" diff --git a/protocol/chainlib/jsonRPC.go b/protocol/chainlib/jsonRPC.go index acc5511c35..1e9842d524 100644 --- a/protocol/chainlib/jsonRPC.go +++ b/protocol/chainlib/jsonRPC.go @@ -14,7 +14,7 @@ import ( "github.com/gofiber/fiber/v2/middleware/favicon" "github.com/gofiber/websocket/v2" "github.com/lavanet/lava/protocol/common" - "github.com/lavanet/lava/relayer/lavasession" + "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/relayer/metrics" "github.com/lavanet/lava/relayer/parser" diff --git a/protocol/chainlib/rest.go b/protocol/chainlib/rest.go index 0d2e840255..71577aa9f8 100644 --- a/protocol/chainlib/rest.go +++ b/protocol/chainlib/rest.go @@ -13,7 +13,7 @@ import ( "github.com/lavanet/lava/protocol/chainlib/chainproxy" "github.com/lavanet/lava/protocol/chainlib/chainproxy/rpcclient" - "github.com/lavanet/lava/relayer/lavasession" + "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/utils" pairingtypes "github.com/lavanet/lava/x/pairing/types" diff --git a/protocol/chainlib/tendermintRPC.go b/protocol/chainlib/tendermintRPC.go index 5ac7615715..7b4a24d549 100644 --- a/protocol/chainlib/tendermintRPC.go +++ b/protocol/chainlib/tendermintRPC.go @@ -17,7 +17,7 @@ import ( "github.com/lavanet/lava/protocol/chainlib/chainproxy" "github.com/lavanet/lava/protocol/chainlib/chainproxy/rpcclient" "github.com/lavanet/lava/protocol/common" - "github.com/lavanet/lava/relayer/lavasession" + "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/relayer/metrics" "github.com/lavanet/lava/relayer/parser" "github.com/lavanet/lava/utils" @@ -337,7 +337,7 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) { c.Status(fiber.StatusInternalServerError) // Construct json response - response := convertToJsonError(errMasking) + response := chainproxy.ConvertToTendermintError(errMasking, c.Body()) // Return error json response return c.SendString(response) @@ -515,7 +515,7 @@ func (cp *tendermintRpcChainProxy) SendRPC(ctx context.Context, nodeMessage *cha replyMsg = &chainproxy.RPCResponse{ JSONRPC: nodeMessage.Version, ID: id, - Error: chainproxy.ConvertErrorToRPCError(err), + Error: chainproxy.ConvertErrorToRPCError(err.Error(), -1), // TODO: extract code from error status / message } } else { replyMessage, err = chainproxy.ConvertTendermintMsg(rpcMessage) diff --git a/protocol/lavaprotocol/request_builder.go b/protocol/lavaprotocol/request_builder.go index c2ab741b49..bdb7ff0d33 100644 --- a/protocol/lavaprotocol/request_builder.go +++ b/protocol/lavaprotocol/request_builder.go @@ -9,7 +9,7 @@ import ( "github.com/btcsuite/btcd/btcec" "github.com/lavanet/lava/protocol/chainlib" - "github.com/lavanet/lava/relayer/lavasession" + "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/relayer/sigs" "github.com/lavanet/lava/utils" conflicttypes "github.com/lavanet/lava/x/conflict/types" @@ -18,10 +18,11 @@ import ( ) const ( - TimePerCU = uint64(100 * time.Millisecond) - MinimumTimePerRelayDelay = time.Second - AverageWorldLatency = 200 * time.Millisecond - SupportedNumberOfVRFs = 2 + TimePerCU = uint64(100 * time.Millisecond) + MinimumTimePerRelayDelay = time.Second + AverageWorldLatency = 200 * time.Millisecond + DataReliabilityTimeoutIncrease = 5 * time.Second + SupportedNumberOfVRFs = 2 ) type RelayRequestCommonData struct { diff --git a/relayer/lavasession/common.go b/protocol/lavasession/common.go similarity index 100% rename from relayer/lavasession/common.go rename to protocol/lavasession/common.go diff --git a/relayer/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go similarity index 100% rename from relayer/lavasession/consumer_session_manager.go rename to protocol/lavasession/consumer_session_manager.go diff --git a/relayer/lavasession/consumer_session_manager_test.go b/protocol/lavasession/consumer_session_manager_test.go similarity index 100% rename from relayer/lavasession/consumer_session_manager_test.go rename to protocol/lavasession/consumer_session_manager_test.go diff --git a/relayer/lavasession/consumer_types.go b/protocol/lavasession/consumer_types.go similarity index 98% rename from relayer/lavasession/consumer_types.go rename to protocol/lavasession/consumer_types.go index 15627fd181..ee27179f5f 100644 --- a/relayer/lavasession/consumer_types.go +++ b/protocol/lavasession/consumer_types.go @@ -334,3 +334,8 @@ func (cs *SingleConsumerSession) CalculateQoS(cu uint64, latency time.Duration, }) } } + +// validate if this is a data reliability session +func (scs *SingleConsumerSession) IsDataReliabilitySession() bool { + return scs.SessionId <= DataReliabilitySessionId +} diff --git a/relayer/lavasession/errors.go b/protocol/lavasession/errors.go similarity index 100% rename from relayer/lavasession/errors.go rename to protocol/lavasession/errors.go diff --git a/relayer/lavasession/provider_session_manager.go b/protocol/lavasession/provider_session_manager.go similarity index 100% rename from relayer/lavasession/provider_session_manager.go rename to protocol/lavasession/provider_session_manager.go diff --git a/relayer/lavasession/provider_session_manager_test.go b/protocol/lavasession/provider_session_manager_test.go similarity index 100% rename from relayer/lavasession/provider_session_manager_test.go rename to protocol/lavasession/provider_session_manager_test.go diff --git a/relayer/lavasession/provider_types.go b/protocol/lavasession/provider_types.go similarity index 100% rename from relayer/lavasession/provider_types.go rename to protocol/lavasession/provider_types.go diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index e1cba9c2f5..06db4951c9 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -13,8 +13,8 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/lavanet/lava/protocol/chainlib" "github.com/lavanet/lava/protocol/lavaprotocol" + "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/protocol/statetracker" - "github.com/lavanet/lava/relayer/lavasession" "github.com/lavanet/lava/relayer/performance" "github.com/lavanet/lava/relayer/sigs" "github.com/lavanet/lava/utils" diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 11edd491c7..dcc37c958c 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -12,7 +12,7 @@ import ( "github.com/lavanet/lava/protocol/chainlib" "github.com/lavanet/lava/protocol/common" "github.com/lavanet/lava/protocol/lavaprotocol" - "github.com/lavanet/lava/relayer/lavasession" + "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/relayer/metrics" "github.com/lavanet/lava/relayer/performance" "github.com/lavanet/lava/utils" @@ -129,7 +129,10 @@ func (rpccs *RPCConsumerServer) SendRelay( enabled, dataReliabilityThreshold := rpccs.chainParser.DataReliabilityParams() if enabled { for _, relayResult := range relayResults { - go rpccs.sendDataReliabilityRelayIfApplicable(ctx, relayResult, chainMessage, dataReliabilityThreshold, &relayRequestCommonData) // runs asynchronously + // new context is needed for data reliability as some clients cancel the context they provide when the relay returns + // as data reliability happens in a go routine it will continue while the response returns. + dataReliabilityContext := context.Background() + go rpccs.sendDataReliabilityRelayIfApplicable(dataReliabilityContext, relayResult, chainMessage, dataReliabilityThreshold, &relayRequestCommonData) // runs asynchronously } } @@ -230,7 +233,11 @@ func (rpccs *RPCConsumerServer) relayInner(ctx context.Context, singleConsumerSe existingSessionLatestBlock := singleConsumerSession.LatestBlock // we read it now because singleConsumerSession is locked, and later it's not endpointClient := *singleConsumerSession.Endpoint.Client relaySentTime := time.Now() - connectCtx, cancel := context.WithTimeout(ctx, lavaprotocol.GetTimePerCu(singleConsumerSession.LatestRelayCu)+lavaprotocol.AverageWorldLatency) + extraTimeForContext := time.Duration(0) + if singleConsumerSession.IsDataReliabilitySession() { // for data reliability session we add more time to timeout + extraTimeForContext += lavaprotocol.DataReliabilityTimeoutIncrease + } + connectCtx, cancel := context.WithTimeout(ctx, lavaprotocol.GetTimePerCu(singleConsumerSession.LatestRelayCu)+lavaprotocol.AverageWorldLatency+extraTimeForContext) defer cancel() relayRequest := relayResult.Request providerPublicAddress := relayResult.ProviderAddress diff --git a/protocol/rpcprovider/rewardserver/reward_server.go b/protocol/rpcprovider/rewardserver/reward_server.go index abaa12e5d7..98efbc7b68 100644 --- a/protocol/rpcprovider/rewardserver/reward_server.go +++ b/protocol/rpcprovider/rewardserver/reward_server.go @@ -3,7 +3,7 @@ package rewardserver import ( "context" - "github.com/lavanet/lava/relayer/lavasession" + "github.com/lavanet/lava/protocol/lavasession" pairingtypes "github.com/lavanet/lava/x/pairing/types" ) diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index a2faa3666a..12a65c8a4f 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -12,10 +12,10 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/lavanet/lava/protocol/chainlib" "github.com/lavanet/lava/protocol/chaintracker" + "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/protocol/rpcprovider/reliabilitymanager" "github.com/lavanet/lava/protocol/rpcprovider/rewardserver" "github.com/lavanet/lava/protocol/statetracker" - "github.com/lavanet/lava/relayer/lavasession" "github.com/lavanet/lava/relayer/performance" "github.com/lavanet/lava/relayer/sigs" "github.com/lavanet/lava/utils" diff --git a/protocol/rpcprovider/rpcprovider_server.go b/protocol/rpcprovider/rpcprovider_server.go index 4b74fa0d1d..82c429e1de 100644 --- a/protocol/rpcprovider/rpcprovider_server.go +++ b/protocol/rpcprovider/rpcprovider_server.go @@ -6,7 +6,7 @@ import ( "github.com/btcsuite/btcd/btcec" "github.com/lavanet/lava/protocol/chainlib" "github.com/lavanet/lava/protocol/chaintracker" - "github.com/lavanet/lava/relayer/lavasession" + "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/relayer/performance" ) diff --git a/protocol/statetracker/consumer_state_tracker.go b/protocol/statetracker/consumer_state_tracker.go index 52c5fb921d..936bcb8a31 100644 --- a/protocol/statetracker/consumer_state_tracker.go +++ b/protocol/statetracker/consumer_state_tracker.go @@ -11,7 +11,7 @@ import ( "github.com/lavanet/lava/protocol/chainlib" "github.com/lavanet/lava/protocol/chaintracker" "github.com/lavanet/lava/protocol/lavaprotocol" - "github.com/lavanet/lava/relayer/lavasession" + "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/utils" conflicttypes "github.com/lavanet/lava/x/conflict/types" spectypes "github.com/lavanet/lava/x/spec/types" diff --git a/protocol/statetracker/pairing_updater.go b/protocol/statetracker/pairing_updater.go index 0b1a9ae5c0..df1b77f1ee 100644 --- a/protocol/statetracker/pairing_updater.go +++ b/protocol/statetracker/pairing_updater.go @@ -2,7 +2,7 @@ package statetracker import ( sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/lavanet/lava/relayer/lavasession" + "github.com/lavanet/lava/protocol/lavasession" epochstoragetypes "github.com/lavanet/lava/x/epochstorage/types" ) diff --git a/relayer/chainproxy/chainproxy.go b/relayer/chainproxy/chainproxy.go index caeaa6161c..36919f5412 100644 --- a/relayer/chainproxy/chainproxy.go +++ b/relayer/chainproxy/chainproxy.go @@ -13,8 +13,8 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/gofiber/fiber/v2" "github.com/gofiber/websocket/v2" + "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/relayer/chainproxy/rpcclient" - "github.com/lavanet/lava/relayer/lavasession" "github.com/lavanet/lava/relayer/performance" "github.com/lavanet/lava/relayer/sentry" "github.com/lavanet/lava/relayer/sigs" @@ -24,11 +24,14 @@ import ( ) const ( - DefaultTimeout = 5 * time.Second - TimePerCU = uint64(100 * time.Millisecond) - ContextUserValueKeyDappID = "dappID" - MinimumTimePerRelayDelay = time.Second - AverageWorldLatency = 200 * time.Millisecond + DefaultTimeout = 10 * time.Second + TimePerCU = uint64(100 * time.Millisecond) + ContextUserValueKeyDappID = "dappID" + MinimumTimePerRelayDelay = time.Second + AverageWorldLatency = 200 * time.Millisecond + LavaErrorCode = 555 + InternalErrorString = "Internal Error" + dataReliabilityContextMultiplier = 20 ) type NodeMessage interface { @@ -247,7 +250,11 @@ func SendRelay( relayRequest.DataReliability.Sig = sig c := *consumerSession.Endpoint.Client relaySentTime := time.Now() - reply, err := c.Relay(ctx, relayRequest) + // create a new context for data reliability, it needs to be a new Background context because the ctx might be canceled by the user. + connectCtxDataReliability, cancel := context.WithTimeout(context.Background(), (getTimePerCu(consumerSession.LatestRelayCu)+AverageWorldLatency)*dataReliabilityContextMultiplier) + defer cancel() + + reply, err := c.Relay(connectCtxDataReliability, relayRequest) if err != nil { return nil, nil, 0, err } diff --git a/relayer/chainproxy/connector.go b/relayer/chainproxy/connector.go index 41a5e7f9e4..8fa75d62a7 100644 --- a/relayer/chainproxy/connector.go +++ b/relayer/chainproxy/connector.go @@ -36,6 +36,7 @@ type Connector struct { func NewConnector(ctx context.Context, nConns uint, addr string) *Connector { NumberOfParallelConnections = nConns // set number of parallel connections requested by user (or default.) + utils.LavaFormatInfo("Setting Number of Parallel Connections", &map[string]string{"nConns": strconv.FormatUint(uint64(NumberOfParallelConnections), 10)}) connector := &Connector{ freeClients: make([]*rpcclient.Client, 0, nConns), addr: addr, @@ -78,6 +79,9 @@ func NewConnector(ctx context.Context, nConns uint, addr string) *Connector { connector.freeClients = append(connector.freeClients, rpcClient) } utils.LavaFormatInfo("Number of parallel connections created: "+strconv.Itoa(len(connector.freeClients)), nil) + if len(connector.freeClients) == 0 { + utils.LavaFormatFatal("Could not create any connections to the node check address", nil, &map[string]string{"address": addr}) + } go connector.connectorLoop(ctx) return connector } @@ -192,6 +196,7 @@ func NewGRPCConnector(ctx context.Context, nConns uint, addr string) *GRPCConnec } NumberOfParallelConnections = nConns // set number of parallel connections requested by user (or default.) + utils.LavaFormatInfo("Setting Number of Parallel Connections", &map[string]string{"nConns": strconv.FormatUint(uint64(NumberOfParallelConnections), 10)}) reachedClientLimit := false for i := uint(0); i < nConns; i++ { @@ -226,6 +231,9 @@ func NewGRPCConnector(ctx context.Context, nConns uint, addr string) *GRPCConnec } connector.freeClients = append(connector.freeClients, grpcClient) } + if len(connector.freeClients) == 0 { + utils.LavaFormatFatal("Could not create any connections to the node check address", nil, &map[string]string{"address": addr}) + } go connector.connectorLoop(ctx) return connector } diff --git a/relayer/chainproxy/grpc.go b/relayer/chainproxy/grpc.go index 80587795d6..bb6c67be71 100644 --- a/relayer/chainproxy/grpc.go +++ b/relayer/chainproxy/grpc.go @@ -18,9 +18,9 @@ import ( "github.com/jhump/protoreflect/desc" "github.com/jhump/protoreflect/dynamic" "github.com/jhump/protoreflect/grpcreflect" + "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/relayer/chainproxy/rpcclient" "github.com/lavanet/lava/relayer/chainproxy/thirdparty" - "github.com/lavanet/lava/relayer/lavasession" "github.com/lavanet/lava/relayer/parser" "github.com/lavanet/lava/relayer/performance" "github.com/lavanet/lava/relayer/sentry" diff --git a/relayer/chainproxy/jsonRPC.go b/relayer/chainproxy/jsonRPC.go index d37fa4f0f7..e468de9dca 100644 --- a/relayer/chainproxy/jsonRPC.go +++ b/relayer/chainproxy/jsonRPC.go @@ -15,8 +15,8 @@ import ( "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/favicon" "github.com/gofiber/websocket/v2" + "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/relayer/chainproxy/rpcclient" - "github.com/lavanet/lava/relayer/lavasession" "github.com/lavanet/lava/relayer/parser" "github.com/lavanet/lava/relayer/performance" "github.com/lavanet/lava/relayer/sentry" diff --git a/relayer/chainproxy/rest.go b/relayer/chainproxy/rest.go index 4ba1cb903b..ddd85de92c 100644 --- a/relayer/chainproxy/rest.go +++ b/relayer/chainproxy/rest.go @@ -15,8 +15,8 @@ import ( "github.com/btcsuite/btcd/btcec" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/favicon" + "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/relayer/chainproxy/rpcclient" - "github.com/lavanet/lava/relayer/lavasession" "github.com/lavanet/lava/relayer/parser" "github.com/lavanet/lava/relayer/performance" "github.com/lavanet/lava/relayer/sentry" diff --git a/relayer/chainproxy/tendermintRPC.go b/relayer/chainproxy/tendermintRPC.go index e99e808dad..7d0bedd8c8 100644 --- a/relayer/chainproxy/tendermintRPC.go +++ b/relayer/chainproxy/tendermintRPC.go @@ -16,8 +16,8 @@ import ( "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/favicon" "github.com/gofiber/websocket/v2" + "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/relayer/chainproxy/rpcclient" - "github.com/lavanet/lava/relayer/lavasession" "github.com/lavanet/lava/relayer/parser" "github.com/lavanet/lava/relayer/sentry" "github.com/lavanet/lava/utils" @@ -361,8 +361,8 @@ func (cp *tendermintRpcChainProxy) PortalStart(ctx context.Context, privKey *btc // Set status to internal error c.Status(fiber.StatusInternalServerError) - // Construct json response - response := convertToJsonError(errMasking) + // Construct json (tendermint) response + response := convertToTendermintError(errMasking, c.Body()) // Return error json response return c.SendString(response) @@ -423,6 +423,30 @@ func (cp *tendermintRpcChainProxy) PortalStart(ctx context.Context, privKey *btc } } +func convertToTendermintError(errString string, inputInfo []byte) string { + var msg JsonrpcMessage + err := json.Unmarshal(inputInfo, &msg) + if err == nil { + id, errId := idFromRawMessage(msg.ID) + if errId != nil { + utils.LavaFormatError("error idFromRawMessage", errId, nil) + return InternalErrorString + } + res, merr := json.Marshal(&RPCResponse{ + JSONRPC: msg.Version, + ID: id, + Error: convertErrorToRPCError(errString, LavaErrorCode), + }) + if merr != nil { + utils.LavaFormatError("convertToTendermintError json.Marshal", merr, nil) + return InternalErrorString + } + return string(res) + } + utils.LavaFormatError("error convertToTendermintError", err, nil) + return InternalErrorString +} + func getTendermintRPCError(jsonError *rpcclient.JsonError) (*tenderminttypes.RPCError, error) { var rpcError *tenderminttypes.RPCError if jsonError != nil { @@ -439,15 +463,15 @@ func getTendermintRPCError(jsonError *rpcclient.JsonError) (*tenderminttypes.RPC return rpcError, nil } -func convertErrorToRPCError(err error) *tenderminttypes.RPCError { +func convertErrorToRPCError(errString string, code int) *tenderminttypes.RPCError { var rpcError *tenderminttypes.RPCError - unmarshalError := json.Unmarshal([]byte(err.Error()), &rpcError) + unmarshalError := json.Unmarshal([]byte(errString), &rpcError) if unmarshalError != nil { - utils.LavaFormatWarning("Failed unmarshalling error tendermintrpc", unmarshalError, &map[string]string{"err": err.Error()}) + utils.LavaFormatWarning("Failed unmarshalling error tendermintrpc", unmarshalError, &map[string]string{"err": errString}) rpcError = &tenderminttypes.RPCError{ - Code: -1, // TODO get code from error + Code: code, Message: "Rpc Error", - Data: err.Error(), + Data: errString, } } return rpcError @@ -619,7 +643,7 @@ func (nm *TendemintRpcMessage) SendRPC(ctx context.Context, ch chan interface{}) replyMsg = &RPCResponse{ JSONRPC: nm.msg.Version, ID: id, - Error: convertErrorToRPCError(err), + Error: convertErrorToRPCError(err.Error(), -1), // TODO: fetch error code from err. } } else { replyMessage, err = convertTendermintMsg(rpcMessage) diff --git a/relayer/sentry/sentry.go b/relayer/sentry/sentry.go index 879bfee547..59cbea956a 100755 --- a/relayer/sentry/sentry.go +++ b/relayer/sentry/sentry.go @@ -21,7 +21,7 @@ import ( "github.com/cosmos/cosmos-sdk/client/rpc" "github.com/cosmos/cosmos-sdk/client/tx" sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/lavanet/lava/relayer/lavasession" + "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/relayer/sigs" "github.com/lavanet/lava/utils" conflicttypes "github.com/lavanet/lava/x/conflict/types" diff --git a/relayer/server.go b/relayer/server.go index c4b431c4af..c41567a545 100644 --- a/relayer/server.go +++ b/relayer/server.go @@ -31,10 +31,10 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" "github.com/cosmos/cosmos-sdk/version" + "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/relayer/chainproxy" "github.com/lavanet/lava/relayer/chainproxy/rpcclient" "github.com/lavanet/lava/relayer/chainsentry" - "github.com/lavanet/lava/relayer/lavasession" "github.com/lavanet/lava/relayer/performance" "github.com/lavanet/lava/relayer/sentry" "github.com/lavanet/lava/relayer/sigs" diff --git a/testutil/e2e/e2e.go b/testutil/e2e/e2e.go index 475d8154cc..58c5f9dab8 100644 --- a/testutil/e2e/e2e.go +++ b/testutil/e2e/e2e.go @@ -225,9 +225,9 @@ func (lt *lavaTest) startJSONRPCProvider(rpcURL string, ctx context.Context) { utils.LavaFormatInfo("startJSONRPCProvider OK", nil) } -func (lt *lavaTest) startJSONRPCGateway(ctx context.Context) { +func (lt *lavaTest) startJSONRPCConsumer(ctx context.Context) { providerCommand := lt.lavadPath + " portal_server 127.0.0.1 3333 ETH1 jsonrpc --from user1 --geolocation 1 --log_level debug" - logName := "04_jsonGateway" + logName := "04_jsonConsumer" lt.logs[logName] = new(bytes.Buffer) cmd := exec.CommandContext(ctx, "", "") @@ -243,15 +243,15 @@ func (lt *lavaTest) startJSONRPCGateway(ctx context.Context) { lt.commands[logName] = cmd go func() { - lt.listenCmdCommand(cmd, "startJSONRPCGateway process returned unexpectedly", "startJSONRPCGateway") + lt.listenCmdCommand(cmd, "startJSONRPCConsumer process returned unexpectedly", "startJSONRPCConsumer") }() - utils.LavaFormatInfo("startJSONRPCGateway OK", nil) + utils.LavaFormatInfo("startJSONRPCConsumer OK", nil) } // If after timeout and the check does not return it means it failed -func (lt *lavaTest) checkJSONRPCGateway(rpcURL string, timeout time.Duration, message string) { +func (lt *lavaTest) checkJSONRPCConsumer(rpcURL string, timeout time.Duration, message string) { for start := time.Now(); time.Since(start) < timeout; { - utils.LavaFormatInfo("Waiting JSONRPC Gateway", nil) + utils.LavaFormatInfo("Waiting JSONRPC Consumer", nil) client, err := ethclient.Dial(rpcURL) if err != nil { continue @@ -263,7 +263,7 @@ func (lt *lavaTest) checkJSONRPCGateway(rpcURL string, timeout time.Duration, me } time.Sleep(time.Second) } - panic("checkJSONRPCGateway: JSONRPC Check Failed Gateway didn't respond") + panic("checkJSONRPCConsumer: JSONRPC Check Failed Consumer didn't respond") } func jsonrpcTests(rpcURL string, testDuration time.Duration) error { @@ -401,9 +401,9 @@ func (lt *lavaTest) startTendermintProvider(rpcURL string, ctx context.Context) utils.LavaFormatInfo("startTendermintProvider OK", nil) } -func (lt *lavaTest) startTendermintGateway(ctx context.Context) { +func (lt *lavaTest) startTendermintConsumer(ctx context.Context) { providerCommand := lt.lavadPath + " portal_server 127.0.0.1 3340 LAV1 tendermintrpc --from user2 --geolocation 1 --log_level debug" - logName := "06_tendermintGateway" + logName := "06_tendermintConsumer" lt.logs[logName] = new(bytes.Buffer) cmd := exec.CommandContext(ctx, "", "") @@ -418,21 +418,21 @@ func (lt *lavaTest) startTendermintGateway(ctx context.Context) { } lt.commands[logName] = cmd go func() { - lt.listenCmdCommand(cmd, "startTendermintGateway process returned unexpectedly", "startTendermintGateway") + lt.listenCmdCommand(cmd, "startTendermintConsumer process returned unexpectedly", "startTendermintConsumer") }() - utils.LavaFormatInfo("startTendermintGateway OK", nil) + utils.LavaFormatInfo("startTendermintConsumer OK", nil) } -func (lt *lavaTest) checkTendermintGateway(rpcURL string, timeout time.Duration) { +func (lt *lavaTest) checkTendermintConsumer(rpcURL string, timeout time.Duration) { for start := time.Now(); time.Since(start) < timeout; { - utils.LavaFormatInfo("Waiting TENDERMINT Gateway", nil) + utils.LavaFormatInfo("Waiting TENDERMINT Consumer", nil) client, err := tmclient.New(rpcURL, "/websocket") if err != nil { continue } _, err = client.Status(context.Background()) if err == nil { - utils.LavaFormatInfo("checkTendermintGateway OK", nil) + utils.LavaFormatInfo("checkTendermintConsumer OK", nil) return } time.Sleep(time.Second) @@ -545,9 +545,9 @@ func (lt *lavaTest) startRESTProvider(rpcURL string, ctx context.Context) { utils.LavaFormatInfo("startRESTProvider OK", nil) } -func (lt *lavaTest) startRESTGateway(ctx context.Context) { +func (lt *lavaTest) startRESTConsumer(ctx context.Context) { providerCommand := lt.lavadPath + " portal_server 127.0.0.1 3341 LAV1 rest --from user2 --geolocation 1 --log_level debug" - logName := "09_restGateway" + logName := "09_restConsumer" lt.logs[logName] = new(bytes.Buffer) cmd := exec.CommandContext(ctx, "", "") @@ -562,20 +562,20 @@ func (lt *lavaTest) startRESTGateway(ctx context.Context) { } lt.commands[logName] = cmd go func() { - lt.listenCmdCommand(cmd, "startRESTGateway process returned unexpectedly", "startRESTGateway") + lt.listenCmdCommand(cmd, "startRESTConsumer process returned unexpectedly", "startRESTConsumer") }() - utils.LavaFormatInfo("startRESTGateway OK", nil) + utils.LavaFormatInfo("startRESTConsumer OK", nil) } -func (lt *lavaTest) checkRESTGateway(rpcURL string, timeout time.Duration) { +func (lt *lavaTest) checkRESTConsumer(rpcURL string, timeout time.Duration) { for start := time.Now(); time.Since(start) < timeout; { - utils.LavaFormatInfo("Waiting REST Gateway", nil) + utils.LavaFormatInfo("Waiting REST Consumer", nil) reply, err := getRequest(fmt.Sprintf("%s/blocks/latest", rpcURL)) if err != nil || strings.Contains(string(reply), "error") { time.Sleep(time.Second) continue } else { - utils.LavaFormatInfo("checkRESTGateway OK", nil) + utils.LavaFormatInfo("checkRESTConsumer OK", nil) return } } @@ -658,9 +658,9 @@ func (lt *lavaTest) startGRPCProvider(rpcURL string, ctx context.Context) { utils.LavaFormatInfo("startGRPCProvider OK", nil) } -func (lt *lavaTest) startGRPCGateway(ctx context.Context) { +func (lt *lavaTest) startGRPCConsumer(ctx context.Context) { providerCommand := lt.lavadPath + " portal_server 127.0.0.1 3342 LAV1 grpc --from user2 --geolocation 1 --log_level debug" - logName := "11_grpcGateway" + logName := "11_grpcConsumer" lt.logs[logName] = new(bytes.Buffer) cmd := exec.CommandContext(ctx, "", "") @@ -675,14 +675,14 @@ func (lt *lavaTest) startGRPCGateway(ctx context.Context) { } lt.commands[logName] = cmd go func() { - lt.listenCmdCommand(cmd, "startGRPCGateway process returned unexpectedly", "startGRPCGateway") + lt.listenCmdCommand(cmd, "startGRPCConsumer process returned unexpectedly", "startGRPCConsumer") }() - utils.LavaFormatInfo("startGRPCGateway OK", nil) + utils.LavaFormatInfo("startGRPCConsumer OK", nil) } -func (lt *lavaTest) checkGRPCGateway(rpcURL string, timeout time.Duration) { +func (lt *lavaTest) checkGRPCConsumer(rpcURL string, timeout time.Duration) { for start := time.Now(); time.Since(start) < timeout; { - utils.LavaFormatInfo("Waiting GRPC Gateway", nil) + utils.LavaFormatInfo("Waiting GRPC Consumer", nil) grpcConn, err := grpc.Dial(rpcURL, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { continue @@ -690,7 +690,7 @@ func (lt *lavaTest) checkGRPCGateway(rpcURL string, timeout time.Duration) { specQueryClient := specTypes.NewQueryClient(grpcConn) _, err = specQueryClient.SpecAll(context.Background(), &specTypes.QueryAllSpecRequest{}) if err == nil { - utils.LavaFormatInfo("checkGRPCGateway OK", nil) + utils.LavaFormatInfo("checkGRPCConsumer OK", nil) return } time.Sleep(time.Second) @@ -897,25 +897,25 @@ func runE2E() { jsonCTX := context.Background() lt.startJSONRPCProxy(jsonCTX) - lt.checkJSONRPCGateway("http://127.0.0.1:1111", time.Minute*2, "JSONRPCProxy OK") + lt.checkJSONRPCConsumer("http://127.0.0.1:1111", time.Minute*2, "JSONRPCProxy OK") // checks proxy. lt.startJSONRPCProvider("http://127.0.0.1:1111", jsonCTX) - lt.startJSONRPCGateway(jsonCTX) - lt.checkJSONRPCGateway("http://127.0.0.1:3333/1", time.Minute*2, "JSONRPCGateway OK") + lt.startJSONRPCConsumer(jsonCTX) + lt.checkJSONRPCConsumer("http://127.0.0.1:3333/1", time.Minute*2, "JSONRPCConsumer OK") tendermintCTX := context.Background() lt.startTendermintProvider("http://0.0.0.0:26657", tendermintCTX) - lt.startTendermintGateway(tendermintCTX) - lt.checkTendermintGateway("http://127.0.0.1:3340/1", time.Second*30) + lt.startTendermintConsumer(tendermintCTX) + lt.checkTendermintConsumer("http://127.0.0.1:3340/1", time.Second*30) restCTX := context.Background() lt.startRESTProvider("http://127.0.0.1:1317", restCTX) - lt.startRESTGateway(restCTX) - lt.checkRESTGateway("http://127.0.0.1:3341/1", time.Second*30) + lt.startRESTConsumer(restCTX) + lt.checkRESTConsumer("http://127.0.0.1:3341/1", time.Second*30) grpcCTX := context.Background() lt.startGRPCProvider("127.0.0.1:9090", grpcCTX) - lt.startGRPCGateway(grpcCTX) - lt.checkGRPCGateway("127.0.0.1:3342", time.Second*30) + lt.startGRPCConsumer(grpcCTX) + lt.checkGRPCConsumer("127.0.0.1:3342", time.Second*30) jsonErr := jsonrpcTests("http://127.0.0.1:3333/1", time.Second*30) if jsonErr != nil { @@ -949,7 +949,7 @@ func runE2E() { lt.checkPayments(time.Minute * 10) - grpcErr := grpcTests("127.0.0.1:3342", time.Second*30) + grpcErr := grpcTests("127.0.0.1:3342", time.Second*5) // TODO: if set to 30 secs fails e2e need to investigate why. currently blocking PR's if grpcErr != nil { panic(grpcErr) } else {