Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ks-327 remote target wait for initiated threads to finish on close #13524

Merged
merged 1 commit into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/sour-pigs-develop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#internal remote target wait until initiated threads exit on close
74 changes: 44 additions & 30 deletions core/capabilities/remote/target/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,48 +52,61 @@ func NewClient(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commo
}
}

func (c *client) expireRequests() {
c.mutex.Lock()
defer c.mutex.Unlock()

for messageID, req := range c.messageIDToCallerRequest {
if req.Expired() {
req.Cancel(errors.New("request expired"))
delete(c.messageIDToCallerRequest, messageID)
}
}
}

func (c *client) Start(ctx context.Context) error {
return c.StartOnce(c.Name(), func() error {
c.wg.Add(1)
go func() {
defer c.wg.Done()
ticker := time.NewTicker(c.requestTimeout)
defer ticker.Stop()
c.lggr.Info("TargetClient started")
for {
select {
case <-c.stopCh:
return
case <-ticker.C:
c.expireRequests()
}
}
c.checkForExpiredRequests()
}()
c.lggr.Info("TargetClient started")
return nil
})
}

func (c *client) Close() error {
return c.StopOnce(c.Name(), func() error {
close(c.stopCh)
c.cancelAllRequests(errors.New("client closed"))
c.wg.Wait()
c.lggr.Info("TargetClient closed")
return nil
})
}

func (c *client) checkForExpiredRequests() {
ticker := time.NewTicker(c.requestTimeout)
defer ticker.Stop()
for {
select {
case <-c.stopCh:
return
case <-ticker.C:
c.expireRequests()
}
}
}

func (c *client) expireRequests() {
c.mutex.Lock()
defer c.mutex.Unlock()

for messageID, req := range c.messageIDToCallerRequest {
if req.Expired() {
req.Cancel(errors.New("request expired"))
delete(c.messageIDToCallerRequest, messageID)
}
}
}

func (c *client) cancelAllRequests(err error) {
c.mutex.Lock()
defer c.mutex.Unlock()
for _, req := range c.messageIDToCallerRequest {
req.Cancel(err)
}
}

func (c *client) Info(ctx context.Context) (commoncap.CapabilityInfo, error) {
return c.remoteCapabilityInfo, nil
}
Expand Down Expand Up @@ -121,8 +134,11 @@ func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest
return nil, fmt.Errorf("request for message ID %s already exists", messageID)
}

ettec marked this conversation as resolved.
Show resolved Hide resolved
cCtx, _ := c.stopCh.NewCtx()
req, err := request.NewClientRequest(cCtx, c.lggr, capReq, messageID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher,
// TODO confirm reasons for below workaround and see if can be resolved
// The context passed in by the workflow engine is cancelled prior to the results being read from the response channel
// The wrapping of the context with 'WithoutCancel' is a workaround for that behaviour.
requestCtx := context.WithoutCancel(ctx)
req, err := request.NewClientRequest(requestCtx, c.lggr, capReq, messageID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher,
c.requestTimeout)
if err != nil {
return nil, fmt.Errorf("failed to create client request: %w", err)
Expand All @@ -146,11 +162,9 @@ func (c *client) Receive(msg *types.MessageBody) {
return
}

go func() {
if err := req.OnMessage(ctx, msg); err != nil {
c.lggr.Errorw("failed to add response to request", "messageID", messageID, "err", err)
}
}()
if err := req.OnMessage(ctx, msg); err != nil {
bolekk marked this conversation as resolved.
Show resolved Hide resolved
c.lggr.Errorw("failed to add response to request", "messageID", messageID, "err", err)
}
}

func GetMessageIDForRequest(req commoncap.CapabilityRequest) (string, error) {
Expand Down
4 changes: 3 additions & 1 deletion core/capabilities/remote/target/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo
ID: "workflow-don",
}

broker := newTestMessageBroker()
broker := newTestAsyncMessageBroker(100)

receivers := make([]remotetypes.Receiver, numCapabilityPeers)
for i := 0; i < numCapabilityPeers; i++ {
Expand All @@ -172,6 +172,8 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo
callers[i] = caller
}

servicetest.Run(t, broker)

executeInputs, err := values.NewMap(
map[string]any{
"executeValue1": "aValue1",
Expand Down
91 changes: 71 additions & 20 deletions core/capabilities/remote/target/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target"
Expand Down Expand Up @@ -214,7 +215,7 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta
F: workflowDonF,
}

broker := newTestMessageBroker()
broker := newTestAsyncMessageBroker(1000)

workflowDONs := map[string]commoncap.DON{
workflowDonInfo.ID: workflowDonInfo,
Expand All @@ -240,6 +241,8 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta
workflowNodes[i] = workflowNode
}

servicetest.Run(t, broker)

executeInputs, err := values.NewMap(
map[string]any{
"executeValue1": "aValue1",
Expand Down Expand Up @@ -271,49 +274,97 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta
wg.Wait()
}

type testMessageBroker struct {
type testAsyncMessageBroker struct {
services.StateMachine
nodes map[p2ptypes.PeerID]remotetypes.Receiver

sendCh chan *remotetypes.MessageBody

stopCh services.StopChan
wg sync.WaitGroup
}

func (a *testAsyncMessageBroker) HealthReport() map[string]error {
return nil
}

func (a *testAsyncMessageBroker) Name() string {
return "testAsyncMessageBroker"
}

func newTestMessageBroker() *testMessageBroker {
return &testMessageBroker{
nodes: make(map[p2ptypes.PeerID]remotetypes.Receiver),
func newTestAsyncMessageBroker(sendChBufferSize int) *testAsyncMessageBroker {
return &testAsyncMessageBroker{
nodes: make(map[p2ptypes.PeerID]remotetypes.Receiver),
stopCh: make(services.StopChan),
sendCh: make(chan *remotetypes.MessageBody, sendChBufferSize),
}
}

func (r *testMessageBroker) NewDispatcherForNode(nodePeerID p2ptypes.PeerID) remotetypes.Dispatcher {
func (a *testAsyncMessageBroker) Start(ctx context.Context) error {
return a.StartOnce("testAsyncMessageBroker", func() error {
a.wg.Add(1)
go func() {
defer a.wg.Done()

for {
select {
case <-a.stopCh:
return
case msg := <-a.sendCh:
receiverId := toPeerID(msg.Receiver)

receiver, ok := a.nodes[receiverId]
if !ok {
panic("server not found for peer id")
}

receiver.Receive(msg)
}
}
}()
return nil
})
}

func (a *testAsyncMessageBroker) Close() error {
return a.StopOnce("testAsyncMessageBroker", func() error {
close(a.stopCh)

a.wg.Wait()
return nil
})
}

func (a *testAsyncMessageBroker) NewDispatcherForNode(nodePeerID p2ptypes.PeerID) remotetypes.Dispatcher {
return &nodeDispatcher{
callerPeerID: nodePeerID,
broker: r,
broker: a,
}
}

func (r *testMessageBroker) RegisterReceiverNode(nodePeerID p2ptypes.PeerID, node remotetypes.Receiver) {
if _, ok := r.nodes[nodePeerID]; ok {
func (a *testAsyncMessageBroker) RegisterReceiverNode(nodePeerID p2ptypes.PeerID, node remotetypes.Receiver) {
if _, ok := a.nodes[nodePeerID]; ok {
panic("node already registered")
}

r.nodes[nodePeerID] = node
a.nodes[nodePeerID] = node
}

func (r *testMessageBroker) Send(msg *remotetypes.MessageBody) {
receiverId := toPeerID(msg.Receiver)

receiver, ok := r.nodes[receiverId]
if !ok {
panic("server not found for peer id")
}

receiver.Receive(msg)
func (a *testAsyncMessageBroker) Send(msg *remotetypes.MessageBody) {
a.sendCh <- msg
}

func toPeerID(id []byte) p2ptypes.PeerID {
return [32]byte(id)
}

type broker interface {
Send(msg *remotetypes.MessageBody)
}

type nodeDispatcher struct {
callerPeerID p2ptypes.PeerID
broker *testMessageBroker
broker broker
}

func (t *nodeDispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBody) error {
Expand Down
17 changes: 14 additions & 3 deletions core/capabilities/remote/target/request/client_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
)

type ClientRequest struct {
cancelFn context.CancelFunc
responseCh chan commoncap.CapabilityResponse
createdAt time.Time
responseIDCount map[[32]byte]int
Expand All @@ -33,6 +34,7 @@ type ClientRequest struct {

respSent bool
mux sync.Mutex
wg *sync.WaitGroup
}

func NewClientRequest(ctx context.Context, lggr logger.Logger, req commoncap.CapabilityRequest, messageID string,
Expand All @@ -56,9 +58,14 @@ func NewClientRequest(ctx context.Context, lggr logger.Logger, req commoncap.Cap
lggr.Debugw("sending request to peers", "execID", req.Metadata.WorkflowExecutionID, "schedule", peerIDToTransmissionDelay)

responseReceived := make(map[p2ptypes.PeerID]bool)

ctxWithCancel, cancelFn := context.WithCancel(ctx)
wg := &sync.WaitGroup{}
for peerID, delay := range peerIDToTransmissionDelay {
responseReceived[peerID] = false
go func(peerID ragep2ptypes.PeerID, delay time.Duration) {
wg.Add(1)
go func(ctx context.Context, peerID ragep2ptypes.PeerID, delay time.Duration) {
defer wg.Done()
message := &types.MessageBody{
CapabilityId: remoteCapabilityInfo.ID,
CapabilityDonId: remoteCapabilityDonInfo.ID,
Expand All @@ -69,7 +76,7 @@ func NewClientRequest(ctx context.Context, lggr logger.Logger, req commoncap.Cap
}

select {
case <-ctx.Done():
case <-ctxWithCancel.Done():
lggr.Debugw("context done, not sending request to peer", "execID", req.Metadata.WorkflowExecutionID, "peerID", peerID)
return
case <-time.After(delay):
Expand All @@ -79,17 +86,19 @@ func NewClientRequest(ctx context.Context, lggr logger.Logger, req commoncap.Cap
lggr.Errorw("failed to send message", "peerID", peerID, "err", err)
}
}
}(peerID, delay)
}(ctxWithCancel, peerID, delay)
}

return &ClientRequest{
cancelFn: cancelFn,
createdAt: time.Now(),
requestTimeout: requestTimeout,
requiredIdenticalResponses: int(remoteCapabilityDonInfo.F + 1),
responseIDCount: make(map[[32]byte]int),
errorCount: make(map[string]int),
responseReceived: responseReceived,
responseCh: make(chan commoncap.CapabilityResponse, 1),
wg: wg,
}, nil
}

Expand All @@ -102,6 +111,8 @@ func (c *ClientRequest) Expired() bool {
}

func (c *ClientRequest) Cancel(err error) {
c.cancelFn()
ettec marked this conversation as resolved.
Show resolved Hide resolved
c.wg.Wait()
c.mux.Lock()
defer c.mux.Unlock()
if !c.respSent {
Expand Down
Loading
Loading