Skip to content

Commit

Permalink
fix(kuma-cp) Fix DP tracking lock consistency (#2567) (#2568)
Browse files Browse the repository at this point in the history
In the DataplaneLifecycle tracker, consistently lock the map that
tracks whether the dataplane has been successfully registered.

This fixes #2566.

Signed-off-by: James Peach <james.peach@konghq.com>
(cherry picked from commit 1591de6)

Co-authored-by: James Peach <james.peach@konghq.com>
  • Loading branch information
mergify[bot] and jpeach authored Aug 16, 2021
1 parent cfc610a commit 1f3438a
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 15 deletions.
32 changes: 17 additions & 15 deletions pkg/xds/server/callbacks/dataplane_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,27 @@ func (d *DataplaneLifecycle) OnProxyReconnected(streamID core_xds.StreamID, dpKe
}

func (d *DataplaneLifecycle) register(streamID core_xds.StreamID, dpKey model.ResourceKey, md core_xds.DataplaneMetadata) error {
if md.GetProxyType() == mesh_proto.DataplaneProxyType && md.GetDataplaneResource() != nil {
switch {
case md.GetProxyType() == mesh_proto.DataplaneProxyType && md.GetDataplaneResource() != nil:
dp := md.GetDataplaneResource()
lifecycleLog.Info("registering dataplane", "dataplane", dp, "dataplaneKey", dpKey, "streamID", streamID)
if err := d.registerDataplane(dp); err != nil {
return errors.Wrap(err, "could not register dataplane passed in kuma-dp run")
}
d.createdDpByCallbacks[dpKey] = mesh_proto.DataplaneProxyType
return nil
}

if md.GetProxyType() == mesh_proto.IngressProxyType && md.GetZoneIngressResource() != nil {
case md.GetProxyType() == mesh_proto.IngressProxyType && md.GetZoneIngressResource() != nil:
zi := md.GetZoneIngressResource()
lifecycleLog.Info("registering zone ingress", "zoneIngress", zi, "zoneIngressKey", dpKey, "streamID", streamID)
if err := d.registerZoneIngress(zi); err != nil {
return errors.Wrap(err, "could not register zone ingress passed in kuma-dp run")
}
d.createdDpByCallbacks[dpKey] = mesh_proto.IngressProxyType
default:
return nil
}

d.Lock()
d.createdDpByCallbacks[dpKey] = md.GetProxyType()
d.Unlock()

return nil
}

Expand All @@ -78,27 +80,27 @@ func (d *DataplaneLifecycle) OnProxyDisconnected(streamID core_xds.StreamID, dpK
}

d.Lock()
defer d.Unlock()
proxyType, createdByCallbacks := d.createdDpByCallbacks[dpKey]
if createdByCallbacks {
delete(d.createdDpByCallbacks, dpKey)
}
d.Unlock()

if !createdByCallbacks {
return
}
delete(d.createdDpByCallbacks, dpKey)

if proxyType == mesh_proto.DataplaneProxyType {
switch proxyType {
case mesh_proto.DataplaneProxyType:
lifecycleLog.Info("unregistering dataplane", "dataplaneKey", dpKey, "streamID", streamID)
if err := d.unregisterDataplane(dpKey); err != nil {
lifecycleLog.Error(err, "could not unregister dataplane")
}
return
}

if proxyType == mesh_proto.IngressProxyType {
case mesh_proto.IngressProxyType:
lifecycleLog.Info("unregistering zone ingress", "zoneIngressKey", dpKey, "streamID", streamID)
if err := d.unregisterZoneIngress(dpKey); err != nil {
lifecycleLog.Error(err, "could not unregister zone ingress")
}
return
}
}

Expand Down
74 changes: 74 additions & 0 deletions pkg/xds/server/callbacks/dataplane_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package callbacks_test

import (
"context"
"fmt"
"sync"

v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoy_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
Expand Down Expand Up @@ -184,4 +186,76 @@ var _ = Describe("Dataplane Lifecycle", func() {
err = resManager.Get(context.Background(), core_mesh.NewDataplaneResource(), core_store.GetByKey("backend-01", "default"))
Expect(err).ToNot(HaveOccurred())
})

It("should not race when registering concurrently", func() {
const streamID = 123

wg := sync.WaitGroup{}

for i := 0; i < 10; i++ {
wg.Add(1)

go func(num int) {
defer GinkgoRecover()

streamID := int64(streamID + num)
nodeID := fmt.Sprintf("default.backend-%d", num)

// given
req := v2.DiscoveryRequest{
Node: &envoy_core.Node{
Id: nodeID,
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"dataplane.resource": &structpb.Value{
Kind: &structpb.Value_StringValue{
StringValue: fmt.Sprintf(`
{
"type": "Dataplane",
"mesh": "default",
"name": "%s",
"networking": {
"address": "127.0.0.%d",
"inbound": [
{
"port": 22022,
"servicePort": 8443,
"tags": {
"kuma.io/service": "backend"
}
},
]
}
}
`, nodeID, num),
},
},
},
},
},
}

// when
err := callbacks.OnStreamRequest(streamID, &req)
Expect(err).ToNot(HaveOccurred())

// then dp is created
err = resManager.Get(context.Background(), core_mesh.NewDataplaneResource(), core_store.GetByKey(nodeID, "default"))
Expect(err).ToNot(HaveOccurred())

// when
callbacks.OnStreamClosed(streamID)

// then dataplane should be deleted
err = resManager.Get(context.Background(), core_mesh.NewDataplaneResource(), core_store.GetByKey("backend-01", "default"))
Expect(core_store.IsResourceNotFound(err)).To(BeTrue())

wg.Done()
}(i)

}

wg.Wait()

})
})

0 comments on commit 1f3438a

Please sign in to comment.