Skip to content

Commit

Permalink
implement HTTP target capability and connector handler (#14491)
Browse files Browse the repository at this point in the history
* implement HTTP target capability and connector handler

* self-review

* fix linter

* more linting fixes

* fix build

* regenerate mocks

* Update core/capabilities/webapi/target/connector_handler.go

Co-authored-by: Street <5597260+MStreet3@users.noreply.github.com>

* address feedback

* address comments

* fix failing test

---------

Co-authored-by: Street <5597260+MStreet3@users.noreply.github.com>
  • Loading branch information
jinhoonbang and MStreet3 authored Sep 26, 2024
1 parent 207b0d5 commit c6e6e21
Show file tree
Hide file tree
Showing 12 changed files with 896 additions and 7 deletions.
5 changes: 5 additions & 0 deletions .changeset/six-frogs-juggle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added HTTP target capability and gateway connector handler
123 changes: 123 additions & 0 deletions core/capabilities/webapi/target/connector_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package target

import (
"context"
"encoding/json"
"sort"
"sync"

"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/webapicapabilities"
)

var _ connector.GatewayConnectorHandler = &ConnectorHandler{}

type ConnectorHandler struct {
gc connector.GatewayConnector
lggr logger.Logger
responseChs map[string]chan *api.Message
responseChsMu sync.Mutex
rateLimiter *common.RateLimiter
}

func NewConnectorHandler(gc connector.GatewayConnector, config Config, lgger logger.Logger) (*ConnectorHandler, error) {
rateLimiter, err := common.NewRateLimiter(config.RateLimiter)
if err != nil {
return nil, err
}
responseChs := make(map[string]chan *api.Message)
return &ConnectorHandler{
gc: gc,
responseChs: responseChs,
responseChsMu: sync.Mutex{},
rateLimiter: rateLimiter,
lggr: lgger,
}, nil
}

// HandleSingleNodeRequest sends a request to first available gateway node and blocks until response is received
// TODO: handle retries and timeouts
func (c *ConnectorHandler) HandleSingleNodeRequest(ctx context.Context, messageID string, payload []byte) (*api.Message, error) {
ch := make(chan *api.Message, 1)
c.responseChsMu.Lock()
c.responseChs[messageID] = ch
c.responseChsMu.Unlock()
l := logger.With(c.lggr, "messageID", messageID)
l.Debugw("sending request to gateway")

body := &api.MessageBody{
MessageId: messageID,
DonId: c.gc.DonID(),
Method: webapicapabilities.MethodWebAPITarget,
Payload: payload,
}

// simply, send request to first available gateway node from sorted list
// this allows for deterministic selection of gateway node receiver for easier debugging
gatewayIDs := c.gc.GatewayIDs()
if len(gatewayIDs) == 0 {
return nil, errors.New("no gateway nodes available")
}
sort.Strings(gatewayIDs)

err := c.gc.SignAndSendToGateway(ctx, gatewayIDs[0], body)
if err != nil {
return nil, errors.Wrap(err, "failed to send request to gateway")
}

select {
case resp := <-ch:
return resp, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}

func (c *ConnectorHandler) HandleGatewayMessage(ctx context.Context, gatewayID string, msg *api.Message) {
body := &msg.Body
l := logger.With(c.lggr, "gatewayID", gatewayID, "method", body.Method, "messageID", msg.Body.MessageId)
if !c.rateLimiter.Allow(body.Sender) {
// error is logged here instead of warning because if a message from gateway is rate-limited,
// the workflow will eventually fail with timeout as there are no retries in place yet
c.lggr.Errorw("request rate-limited")
return
}
l.Debugw("handling gateway request")
switch body.Method {
case webapicapabilities.MethodWebAPITarget:
var payload webapicapabilities.TargetResponsePayload
err := json.Unmarshal(body.Payload, &payload)
if err != nil {
l.Errorw("failed to unmarshal payload", "err", err)
return
}
c.responseChsMu.Lock()
defer c.responseChsMu.Unlock()
ch, ok := c.responseChs[body.MessageId]
if !ok {
l.Errorw("no response channel found")
return
}
select {
case ch <- msg:
delete(c.responseChs, body.MessageId)
case <-ctx.Done():
return
}
default:
l.Errorw("unsupported method")
}
}

func (c *ConnectorHandler) Start(ctx context.Context) error {
return c.gc.AddHandler([]string{webapicapabilities.MethodWebAPITarget}, c)
}

func (c *ConnectorHandler) Close() error {
return nil
}
152 changes: 152 additions & 0 deletions core/capabilities/webapi/target/target.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package target

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/validation"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/webapicapabilities"
)

const ID = "web-api-target@1.0.0"

var _ capabilities.TargetCapability = &Capability{}

var capabilityInfo = capabilities.MustNewCapabilityInfo(
ID,
capabilities.CapabilityTypeTarget,
"A target that sends HTTP requests to external clients via the Chainlink Gateway.",
)

// Capability is a target capability that sends HTTP requests to external clients via the Chainlink Gateway.
type Capability struct {
capabilityInfo capabilities.CapabilityInfo
connectorHandler *ConnectorHandler
lggr logger.Logger
registry core.CapabilitiesRegistry
config Config
}

func NewCapability(config Config, registry core.CapabilitiesRegistry, connectorHandler *ConnectorHandler, lggr logger.Logger) (*Capability, error) {
return &Capability{
capabilityInfo: capabilityInfo,
config: config,
registry: registry,
connectorHandler: connectorHandler,
lggr: lggr,
}, nil
}

func (c *Capability) Start(ctx context.Context) error {
return c.registry.Add(ctx, c)
}

func (c *Capability) Close() error {
return nil
}

func (c *Capability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
return capabilityInfo, nil
}

func getMessageID(req capabilities.CapabilityRequest) (string, error) {
if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowID); err != nil {
return "", fmt.Errorf("workflow ID is invalid: %w", err)
}
if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowExecutionID); err != nil {
return "", fmt.Errorf("workflow execution ID is invalid: %w", err)
}
messageID := []string{
req.Metadata.WorkflowID,
req.Metadata.WorkflowExecutionID,
webapicapabilities.MethodWebAPITarget,
}
return strings.Join(messageID, "/"), nil
}

func (c *Capability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
c.lggr.Debugw("executing http target", "capabilityRequest", req)

var input Input
err := req.Inputs.UnwrapTo(&input)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

var workflowCfg WorkflowConfig
err = req.Config.UnwrapTo(&workflowCfg)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

messageID, err := getMessageID(req)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

payload := webapicapabilities.TargetRequestPayload{
URL: input.URL,
Method: input.Method,
Headers: input.Headers,
Body: input.Body,
TimeoutMs: workflowCfg.TimeoutMs,
}

payloadBytes, err := json.Marshal(payload)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

// Default to SingleNode delivery mode
deliveryMode := SingleNode
if workflowCfg.DeliveryMode != "" {
deliveryMode = workflowCfg.DeliveryMode
}

switch deliveryMode {
case SingleNode:
// blocking call to handle single node request. waits for response from gateway
resp, err := c.connectorHandler.HandleSingleNodeRequest(ctx, messageID, payloadBytes)
if err != nil {
return capabilities.CapabilityResponse{}, err
}
c.lggr.Debugw("received gateway response", "resp", resp)
var payload webapicapabilities.TargetResponsePayload
err = json.Unmarshal(resp.Body.Payload, &payload)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

// TODO: check target response format and fields CM-473
values, err := values.NewMap(map[string]any{
"statusCode": payload.StatusCode,
"headers": payload.Headers,
"body": payload.Body,
})
if err != nil {
return capabilities.CapabilityResponse{}, err
}
return capabilities.CapabilityResponse{
Value: values,
}, nil
default:
return capabilities.CapabilityResponse{}, fmt.Errorf("unsupported delivery mode: %v", workflowCfg.DeliveryMode)
}
}

func (c *Capability) RegisterToWorkflow(ctx context.Context, req capabilities.RegisterToWorkflowRequest) error {
// Workflow engine guarantees registration requests are valid
// TODO: handle retry configuration CM-472
return nil
}

func (c *Capability) UnregisterFromWorkflow(ctx context.Context, req capabilities.UnregisterFromWorkflowRequest) error {
// Workflow engine guarantees deregistration requests are valid
return nil
}
Loading

0 comments on commit c6e6e21

Please sign in to comment.