diff --git a/pkg/subnet/etcd/registry.go b/pkg/subnet/etcd/registry.go index 8b2f62f3d0..088eebe04c 100644 --- a/pkg/subnet/etcd/registry.go +++ b/pkg/subnet/etcd/registry.go @@ -283,107 +283,150 @@ func (esr *etcdSubnetRegistry) deleteSubnet(ctx context.Context, sn ip.IP4Net, s func (esr *etcdSubnetRegistry) watchSubnets(ctx context.Context, leaseWatchChan chan []lease.LeaseWatchResult, since int64) error { key := path.Join(esr.etcdCfg.Prefix, "subnets") + initialBackoff := 100 * time.Millisecond // Initial backoff duration + exponentialBackoff := initialBackoff + maxBackoff := 5 * time.Second // Cap for backoff duration - wctx, cancel := context.WithCancel(ctx) - //release context ASAP to free resources - defer cancel() - - log.Infof("registry: watching subnets starting from rev %d", since) - rch := esr.cli.Watch(etcd.WithRequireLeader(wctx), key, etcd.WithPrefix(), etcd.WithRev(since)) - if rch == nil { - return errNoWatchChannel - } for { - select { - case <-ctx.Done(): - esr.cli.Close() - close(leaseWatchChan) - return ctx.Err() - case wresp := <-rch: - results := make([]lease.LeaseWatchResult, 0) - for _, etcdEvent := range wresp.Events { - subnetEvent, err := parseSubnetWatchResponse(ctx, esr.cli, etcdEvent) - switch { - - case err == nil: - log.Infof("watchSubnets: got valid subnet event with revision %d", wresp.Header.Revision) - // TODO only vxlan backend and kube subnet manager support dual stack now. - subnetEvent.Lease.EnableIPv4 = true - wr := lease.LeaseWatchResult{ - Events: []lease.Event{subnetEvent}, - Cursor: watchCursor{wresp.Header.Revision}, - } - results = append(results, wr) + wctx, cancel := context.WithCancel(ctx) + defer cancel() + log.Infof("registry: watching subnets starting from rev %d", since) + rch := esr.cli.Watch(etcd.WithRequireLeader(wctx), key, etcd.WithPrefix(), etcd.WithRev(since)) + if rch == nil { + log.Errorf("Failed to establish etcd watch channel") + cancel() + time.Sleep(exponentialBackoff) // Avoid CPU spinning + exponentialBackoff = min(exponentialBackoff*2, maxBackoff) // Exponential backoff + continue + } - case isIndexTooSmall(err): - log.Warning("Watch of subnet leases failed because etcd index outside history window") - wr, err := esr.leasesWatchReset(ctx) + innerLoop: + for { + select { + case <-ctx.Done(): + esr.cli.Close() + close(leaseWatchChan) + return ctx.Err() + case wresp, ok := <-rch: + err := wresp.Err() + if !ok || err != nil { + if err != nil { + log.Warningf("etcd watch channel for %s closed with error %v, reconnecting...", key, err) + } else { + log.Warningf("etcd watch channel for %s closed, reconnecting...", key) + } + cancel() + time.Sleep(exponentialBackoff) + exponentialBackoff = min(exponentialBackoff*2, maxBackoff) + break innerLoop + } + exponentialBackoff = initialBackoff // Reset backoff on success + results := make([]lease.LeaseWatchResult, 0) + for _, etcdEvent := range wresp.Events { + subnetEvent, err := parseSubnetWatchResponse(ctx, esr.cli, etcdEvent) + switch { + + case err == nil: + log.Infof("watchSubnets: got valid subnet event with revision %d", wresp.Header.Revision) + // TODO only vxlan backend and kube subnet manager support dual stack now. + subnetEvent.Lease.EnableIPv4 = true + wr := lease.LeaseWatchResult{ + Events: []lease.Event{subnetEvent}, + Cursor: watchCursor{wresp.Header.Revision}, + } + results = append(results, wr) + + case isIndexTooSmall(err): + log.Warning("Watch of subnet leases failed because etcd index outside history window") + wr, err := esr.leasesWatchReset(ctx) + if err != nil { + log.Errorf("error resetting etcd watch: %s", err) + } + results = append(results, wr) + case wresp.Header.Revision != 0: + log.Warning("Watch of subnet leases failed because header revision != 0") + results = append(results, lease.LeaseWatchResult{Cursor: watchCursor{wresp.Header.Revision}}) + + default: + log.Warningf("Watch of subnet failed with error %s", err) + results = append(results, lease.LeaseWatchResult{}) + } if err != nil { - log.Errorf("error resetting etcd watch: %s", err) + log.Errorf("error parsing etcd event: %s", err) } - results = append(results, wr) - case wresp.Header.Revision != 0: - log.Warning("Watch of subnet leases failed because header revision != 0") - results = append(results, lease.LeaseWatchResult{Cursor: watchCursor{wresp.Header.Revision}}) - - default: - log.Warningf("Watch of subnet failed with error %s", err) - results = append(results, lease.LeaseWatchResult{}) } - if err != nil { - log.Errorf("error parsing etcd event: %s", err) + if len(results) > 0 { + leaseWatchChan <- results } } - if len(results) > 0 { - leaseWatchChan <- results - } } - } } func (esr *etcdSubnetRegistry) watchSubnet(ctx context.Context, since int64, sn ip.IP4Net, sn6 ip.IP6Net, leaseWatchChan chan []lease.LeaseWatchResult) error { - key := path.Join(esr.etcdCfg.Prefix, "subnets", subnet.MakeSubnetKey(sn, sn6)) - - wctx, cancel := context.WithCancel(ctx) - //release context ASAP to free resources - defer cancel() - - rch := esr.cli.Watch(etcd.WithRequireLeader(wctx), key, etcd.WithPrefix(), etcd.WithRev(since)) - if rch == nil { - return errNoWatchChannel - } + subnetKey := subnet.MakeSubnetKey(sn, sn6) + key := path.Join(esr.etcdCfg.Prefix, "subnets", subnetKey) + initialBackoff := 100 * time.Millisecond // Initial backoff duration + exponentialBackoff := initialBackoff + maxBackoff := 5 * time.Second // Cap for backoff duration for { - select { - case <-ctx.Done(): - esr.cli.Close() - close(leaseWatchChan) - return ctx.Err() - case wresp := <-rch: - batch := make([]lease.LeaseWatchResult, 0) - for _, etcdEvent := range wresp.Events { - subnetEvent, err := parseSubnetWatchResponse(ctx, esr.cli, etcdEvent) - switch { - case err == nil: - wr := lease.LeaseWatchResult{ - Events: []lease.Event{subnetEvent}, - Cursor: watchCursor{wresp.Header.Revision}, - } - batch = append(batch, wr) - case isIndexTooSmall(err): - log.Warning("Watch of subnet leases failed because etcd index outside history window") - wr, err := esr.leasesWatchReset(ctx) + wctx, cancel := context.WithCancel(ctx) + defer cancel() + rch := esr.cli.Watch(etcd.WithRequireLeader(wctx), key, etcd.WithPrefix(), etcd.WithRev(since)) + if rch == nil { + log.Errorf("Failed to establish etcd watch channel") + cancel() + time.Sleep(exponentialBackoff) // Avoid CPU spinning + exponentialBackoff = min(exponentialBackoff*2, maxBackoff) // Exponential backoff + continue + } + + innerLoop: + for { + select { + case <-ctx.Done(): + esr.cli.Close() + close(leaseWatchChan) + return ctx.Err() + case wresp, ok := <-rch: + err := wresp.Err() + if !ok || err != nil { if err != nil { - log.Errorf("error resetting etcd watch: %s", err) + log.Warningf("etcd watch channel for %s closed with error %v, reconnecting...", key, err) + } else { + log.Warningf("etcd watch channel for %s closed, reconnecting...", key) } - batch = append(batch, wr) - default: - log.Errorf("couldn't read etcd event: %s", err) + cancel() + time.Sleep(exponentialBackoff) + exponentialBackoff = min(exponentialBackoff*2, maxBackoff) + break innerLoop + } + exponentialBackoff = initialBackoff // Reset backoff on success + batch := make([]lease.LeaseWatchResult, 0) + for _, etcdEvent := range wresp.Events { + subnetEvent, err := parseSubnetWatchResponse(ctx, esr.cli, etcdEvent) + switch { + case err == nil: + wr := lease.LeaseWatchResult{ + Events: []lease.Event{subnetEvent}, + Cursor: watchCursor{wresp.Header.Revision}, + } + batch = append(batch, wr) + case isIndexTooSmall(err): + log.Warning("Watch of subnet leases failed because etcd index outside history window") + wr, err := esr.leasesWatchReset(ctx) + if err != nil { + log.Errorf("error resetting etcd watch: %s", err) + } + batch = append(batch, wr) + default: + log.Errorf("couldn't read etcd event: %s", err) + } + } + if len(batch) > 0 { + leaseWatchChan <- batch } - } - if len(batch) > 0 { - leaseWatchChan <- batch } } }