diff --git a/pkg/registry/etcd/context.go b/pkg/registry/etcd/context.go index 1aa2bb8..1c06dff 100644 --- a/pkg/registry/etcd/context.go +++ b/pkg/registry/etcd/context.go @@ -32,13 +32,6 @@ func nseVersionFromContext(ctx context.Context) (string, bool) { return version, ok } -func max(a, b time.Duration) time.Duration { - if a > b { - return a - } - return b -} - func min(a, b time.Duration) time.Duration { if a > b { return b diff --git a/pkg/registry/etcd/ns_server.go b/pkg/registry/etcd/ns_server.go index bbed77a..ff3838b 100644 --- a/pkg/registry/etcd/ns_server.go +++ b/pkg/registry/etcd/ns_server.go @@ -85,7 +85,7 @@ func (n *etcdNSRegistryServer) watchRemoteStorage() { sleepTime = min(sleepTime, maxSleepTime) continue } - sleepTime = max(sleepTime, minSleepTime) + sleepTime = minSleepTime isWatcherFine := true for isWatcherFine { diff --git a/pkg/registry/etcd/nse_server.go b/pkg/registry/etcd/nse_server.go index d210d10..a291aae 100644 --- a/pkg/registry/etcd/nse_server.go +++ b/pkg/registry/etcd/nse_server.go @@ -88,7 +88,7 @@ func (n *etcdNSERegistryServer) watchRemoteStorage() { sleepTime = min(sleepTime, maxSleepTime) continue } - sleepTime = max(sleepTime, minSleepTime) + sleepTime = minSleepTime isWatcherFine := true for isWatcherFine { @@ -110,15 +110,13 @@ func (n *etcdNSERegistryServer) watchRemoteStorage() { if item.Name == "" { item.Name = model.GetName() } - if v, ok := n.versions.Load(item.Name); ok && v == model.ResourceVersion { - continue - } resp := ®istry.NetworkServiceEndpointResponse{ NetworkServiceEndpoint: item, Deleted: deleted, } n.sendEvent(resp) if !deleted && item.ExpirationTime != nil && item.ExpirationTime.AsTime().Local().Before(time.Now()) { + n.versions.Delete(item.GetName()) n.deleteExecutor.AsyncExec(func() { _ = n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Delete(n.chainContext, item.GetName(), metav1.DeleteOptions{ Preconditions: &metav1.Preconditions{ @@ -252,6 +250,7 @@ func (n *etcdNSERegistryServer) Unregister(ctx context.Context, request *registr if err != nil { return nil, errors.Wrapf(err, "failed to delete a NetworkServiceEndpoints %s in a namespace %s", request.Name, n.ns) } + n.versions.Delete(request.GetName()) } return resp, nil } diff --git a/pkg/registry/etcd/nse_server_test.go b/pkg/registry/etcd/nse_server_test.go index 8399e7c..4123e8a 100644 --- a/pkg/registry/etcd/nse_server_test.go +++ b/pkg/registry/etcd/nse_server_test.go @@ -127,10 +127,13 @@ func Test_K8sNSERegistry_FindWatch(t *testing.T) { require.NoError(t, err) require.Equal(t, "nse-1", nseResp.NetworkServiceEndpoint.Name) - // NSE reregisteration. We shouldn't get any updates _, err = s.Register(ctx, nse.Clone()) require.NoError(t, err) + nseResp, err = stream.Recv() + require.NoError(t, err) + require.Equal(t, "nse-1", nseResp.NetworkServiceEndpoint.Name) + // Update NSE again - add labels updatedNSE := nse.Clone() updatedNSE.NetworkServiceLabels = map[string]*registry.NetworkServiceLabels{"label": {}}