From 733d6fa02794819babd890d88c44fe6320f28523 Mon Sep 17 00:00:00 2001 From: James Peach Date: Mon, 16 Aug 2021 09:14:04 +1000 Subject: [PATCH] fix(kuma-cp) Fix DP tracking lock consistency In the DataplaneLifecycle tracker, consistently lock the map that tracks whether the dataplane has been successfully registered. This fixes #2566. Signed-off-by: James Peach --- .../server/callbacks/dataplane_lifecycle.go | 32 ++++---- .../callbacks/dataplane_lifecycle_test.go | 74 +++++++++++++++++++ 2 files changed, 91 insertions(+), 15 deletions(-) diff --git a/pkg/xds/server/callbacks/dataplane_lifecycle.go b/pkg/xds/server/callbacks/dataplane_lifecycle.go index 4e8a5bcb5a7f..3997da99a643 100644 --- a/pkg/xds/server/callbacks/dataplane_lifecycle.go +++ b/pkg/xds/server/callbacks/dataplane_lifecycle.go @@ -43,25 +43,27 @@ func (d *DataplaneLifecycle) OnProxyReconnected(streamID core_xds.StreamID, dpKe } func (d *DataplaneLifecycle) register(streamID core_xds.StreamID, dpKey model.ResourceKey, md core_xds.DataplaneMetadata) error { - if md.GetProxyType() == mesh_proto.DataplaneProxyType && md.GetDataplaneResource() != nil { + switch { + case md.GetProxyType() == mesh_proto.DataplaneProxyType && md.GetDataplaneResource() != nil: dp := md.GetDataplaneResource() lifecycleLog.Info("registering dataplane", "dataplane", dp, "dataplaneKey", dpKey, "streamID", streamID) if err := d.registerDataplane(dp); err != nil { return errors.Wrap(err, "could not register dataplane passed in kuma-dp run") } - d.createdDpByCallbacks[dpKey] = mesh_proto.DataplaneProxyType - return nil - } - - if md.GetProxyType() == mesh_proto.IngressProxyType && md.GetZoneIngressResource() != nil { + case md.GetProxyType() == mesh_proto.IngressProxyType && md.GetZoneIngressResource() != nil: zi := md.GetZoneIngressResource() lifecycleLog.Info("registering zone ingress", "zoneIngress", zi, "zoneIngressKey", dpKey, "streamID", streamID) if err := d.registerZoneIngress(zi); err != nil { return errors.Wrap(err, "could not register zone ingress passed in kuma-dp run") } - d.createdDpByCallbacks[dpKey] = mesh_proto.IngressProxyType + default: return nil } + + d.Lock() + d.createdDpByCallbacks[dpKey] = md.GetProxyType() + d.Unlock() + return nil } @@ -78,27 +80,27 @@ func (d *DataplaneLifecycle) OnProxyDisconnected(streamID core_xds.StreamID, dpK } d.Lock() - defer d.Unlock() proxyType, createdByCallbacks := d.createdDpByCallbacks[dpKey] + if createdByCallbacks { + delete(d.createdDpByCallbacks, dpKey) + } + d.Unlock() + if !createdByCallbacks { return } - delete(d.createdDpByCallbacks, dpKey) - if proxyType == mesh_proto.DataplaneProxyType { + switch proxyType { + case mesh_proto.DataplaneProxyType: lifecycleLog.Info("unregistering dataplane", "dataplaneKey", dpKey, "streamID", streamID) if err := d.unregisterDataplane(dpKey); err != nil { lifecycleLog.Error(err, "could not unregister dataplane") } - return - } - - if proxyType == mesh_proto.IngressProxyType { + case mesh_proto.IngressProxyType: lifecycleLog.Info("unregistering zone ingress", "zoneIngressKey", dpKey, "streamID", streamID) if err := d.unregisterZoneIngress(dpKey); err != nil { lifecycleLog.Error(err, "could not unregister zone ingress") } - return } } diff --git a/pkg/xds/server/callbacks/dataplane_lifecycle_test.go b/pkg/xds/server/callbacks/dataplane_lifecycle_test.go index 58b68ec1eb52..dfc754dbc8b4 100644 --- a/pkg/xds/server/callbacks/dataplane_lifecycle_test.go +++ b/pkg/xds/server/callbacks/dataplane_lifecycle_test.go @@ -2,6 +2,8 @@ package callbacks_test import ( "context" + "fmt" + "sync" v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" envoy_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" @@ -184,4 +186,76 @@ var _ = Describe("Dataplane Lifecycle", func() { err = resManager.Get(context.Background(), core_mesh.NewDataplaneResource(), core_store.GetByKey("backend-01", "default")) Expect(err).ToNot(HaveOccurred()) }) + + It("should not race when registering concurrently", func() { + const streamID = 123 + + wg := sync.WaitGroup{} + + for i := 0; i < 10; i++ { + wg.Add(1) + + go func(num int) { + defer GinkgoRecover() + + streamID := int64(streamID + num) + nodeID := fmt.Sprintf("default.backend-%d", num) + + // given + req := v2.DiscoveryRequest{ + Node: &envoy_core.Node{ + Id: nodeID, + Metadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "dataplane.resource": &structpb.Value{ + Kind: &structpb.Value_StringValue{ + StringValue: fmt.Sprintf(` + { + "type": "Dataplane", + "mesh": "default", + "name": "%s", + "networking": { + "address": "127.0.0.%d", + "inbound": [ + { + "port": 22022, + "servicePort": 8443, + "tags": { + "kuma.io/service": "backend" + } + }, + ] + } + } + `, nodeID, num), + }, + }, + }, + }, + }, + } + + // when + err := callbacks.OnStreamRequest(streamID, &req) + Expect(err).ToNot(HaveOccurred()) + + // then dp is created + err = resManager.Get(context.Background(), core_mesh.NewDataplaneResource(), core_store.GetByKey(nodeID, "default")) + Expect(err).ToNot(HaveOccurred()) + + // when + callbacks.OnStreamClosed(streamID) + + // then dataplane should be deleted + err = resManager.Get(context.Background(), core_mesh.NewDataplaneResource(), core_store.GetByKey("backend-01", "default")) + Expect(core_store.IsResourceNotFound(err)).To(BeTrue()) + + wg.Done() + }(i) + + } + + wg.Wait() + + }) })