Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix high CPU usage when losing etcd connection and try to re-est… #2172

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 126 additions & 83 deletions pkg/subnet/etcd/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,107 +283,150 @@ func (esr *etcdSubnetRegistry) deleteSubnet(ctx context.Context, sn ip.IP4Net, s

func (esr *etcdSubnetRegistry) watchSubnets(ctx context.Context, leaseWatchChan chan []lease.LeaseWatchResult, since int64) error {
key := path.Join(esr.etcdCfg.Prefix, "subnets")
initialBackoff := 100 * time.Millisecond // Initial backoff duration
exponentialBackoff := initialBackoff
maxBackoff := 5 * time.Second // Cap for backoff duration

wctx, cancel := context.WithCancel(ctx)
//release context ASAP to free resources
defer cancel()

log.Infof("registry: watching subnets starting from rev %d", since)
rch := esr.cli.Watch(etcd.WithRequireLeader(wctx), key, etcd.WithPrefix(), etcd.WithRev(since))
if rch == nil {
return errNoWatchChannel
}
for {
select {
case <-ctx.Done():
esr.cli.Close()
close(leaseWatchChan)
return ctx.Err()
case wresp := <-rch:
results := make([]lease.LeaseWatchResult, 0)
for _, etcdEvent := range wresp.Events {
subnetEvent, err := parseSubnetWatchResponse(ctx, esr.cli, etcdEvent)
switch {

case err == nil:
log.Infof("watchSubnets: got valid subnet event with revision %d", wresp.Header.Revision)
// TODO only vxlan backend and kube subnet manager support dual stack now.
subnetEvent.Lease.EnableIPv4 = true
wr := lease.LeaseWatchResult{
Events: []lease.Event{subnetEvent},
Cursor: watchCursor{wresp.Header.Revision},
}
results = append(results, wr)
wctx, cancel := context.WithCancel(ctx)
defer cancel()
log.Infof("registry: watching subnets starting from rev %d", since)
rch := esr.cli.Watch(etcd.WithRequireLeader(wctx), key, etcd.WithPrefix(), etcd.WithRev(since))
if rch == nil {
log.Errorf("Failed to establish etcd watch channel")
cancel()
time.Sleep(exponentialBackoff) // Avoid CPU spinning
exponentialBackoff = min(exponentialBackoff*2, maxBackoff) // Exponential backoff
continue
}

case isIndexTooSmall(err):
log.Warning("Watch of subnet leases failed because etcd index outside history window")
wr, err := esr.leasesWatchReset(ctx)
innerLoop:
for {
select {
case <-ctx.Done():
esr.cli.Close()
close(leaseWatchChan)
return ctx.Err()
case wresp, ok := <-rch:
err := wresp.Err()
if !ok || err != nil {
if err != nil {
log.Warningf("etcd watch channel for %s closed with error %v, reconnecting...", key, err)
} else {
log.Warningf("etcd watch channel for %s closed, reconnecting...", key)
}
cancel()
time.Sleep(exponentialBackoff)
exponentialBackoff = min(exponentialBackoff*2, maxBackoff)
break innerLoop
}
exponentialBackoff = initialBackoff // Reset backoff on success
results := make([]lease.LeaseWatchResult, 0)
for _, etcdEvent := range wresp.Events {
subnetEvent, err := parseSubnetWatchResponse(ctx, esr.cli, etcdEvent)
switch {

case err == nil:
log.Infof("watchSubnets: got valid subnet event with revision %d", wresp.Header.Revision)
// TODO only vxlan backend and kube subnet manager support dual stack now.
subnetEvent.Lease.EnableIPv4 = true
wr := lease.LeaseWatchResult{
Events: []lease.Event{subnetEvent},
Cursor: watchCursor{wresp.Header.Revision},
}
results = append(results, wr)

case isIndexTooSmall(err):
log.Warning("Watch of subnet leases failed because etcd index outside history window")
wr, err := esr.leasesWatchReset(ctx)
if err != nil {
log.Errorf("error resetting etcd watch: %s", err)
}
results = append(results, wr)
case wresp.Header.Revision != 0:
log.Warning("Watch of subnet leases failed because header revision != 0")
results = append(results, lease.LeaseWatchResult{Cursor: watchCursor{wresp.Header.Revision}})

default:
log.Warningf("Watch of subnet failed with error %s", err)
results = append(results, lease.LeaseWatchResult{})
}
if err != nil {
log.Errorf("error resetting etcd watch: %s", err)
log.Errorf("error parsing etcd event: %s", err)
}
results = append(results, wr)
case wresp.Header.Revision != 0:
log.Warning("Watch of subnet leases failed because header revision != 0")
results = append(results, lease.LeaseWatchResult{Cursor: watchCursor{wresp.Header.Revision}})

default:
log.Warningf("Watch of subnet failed with error %s", err)
results = append(results, lease.LeaseWatchResult{})
}
if err != nil {
log.Errorf("error parsing etcd event: %s", err)
if len(results) > 0 {
leaseWatchChan <- results
}
}
if len(results) > 0 {
leaseWatchChan <- results
}
}

}
}

func (esr *etcdSubnetRegistry) watchSubnet(ctx context.Context, since int64, sn ip.IP4Net, sn6 ip.IP6Net, leaseWatchChan chan []lease.LeaseWatchResult) error {
key := path.Join(esr.etcdCfg.Prefix, "subnets", subnet.MakeSubnetKey(sn, sn6))

wctx, cancel := context.WithCancel(ctx)
//release context ASAP to free resources
defer cancel()

rch := esr.cli.Watch(etcd.WithRequireLeader(wctx), key, etcd.WithPrefix(), etcd.WithRev(since))
if rch == nil {
return errNoWatchChannel
}
subnetKey := subnet.MakeSubnetKey(sn, sn6)
key := path.Join(esr.etcdCfg.Prefix, "subnets", subnetKey)
initialBackoff := 100 * time.Millisecond // Initial backoff duration
exponentialBackoff := initialBackoff
maxBackoff := 5 * time.Second // Cap for backoff duration

for {
select {
case <-ctx.Done():
esr.cli.Close()
close(leaseWatchChan)
return ctx.Err()
case wresp := <-rch:
batch := make([]lease.LeaseWatchResult, 0)
for _, etcdEvent := range wresp.Events {
subnetEvent, err := parseSubnetWatchResponse(ctx, esr.cli, etcdEvent)
switch {
case err == nil:
wr := lease.LeaseWatchResult{
Events: []lease.Event{subnetEvent},
Cursor: watchCursor{wresp.Header.Revision},
}
batch = append(batch, wr)
case isIndexTooSmall(err):
log.Warning("Watch of subnet leases failed because etcd index outside history window")
wr, err := esr.leasesWatchReset(ctx)
wctx, cancel := context.WithCancel(ctx)
defer cancel()
rch := esr.cli.Watch(etcd.WithRequireLeader(wctx), key, etcd.WithPrefix(), etcd.WithRev(since))
if rch == nil {
log.Errorf("Failed to establish etcd watch channel")
cancel()
time.Sleep(exponentialBackoff) // Avoid CPU spinning
exponentialBackoff = min(exponentialBackoff*2, maxBackoff) // Exponential backoff
continue
}

innerLoop:
for {
select {
case <-ctx.Done():
esr.cli.Close()
close(leaseWatchChan)
return ctx.Err()
case wresp, ok := <-rch:
err := wresp.Err()
if !ok || err != nil {
if err != nil {
log.Errorf("error resetting etcd watch: %s", err)
log.Warningf("etcd watch channel for %s closed with error %v, reconnecting...", key, err)
} else {
log.Warningf("etcd watch channel for %s closed, reconnecting...", key)
}
batch = append(batch, wr)
default:
log.Errorf("couldn't read etcd event: %s", err)
cancel()
time.Sleep(exponentialBackoff)
exponentialBackoff = min(exponentialBackoff*2, maxBackoff)
break innerLoop
}
exponentialBackoff = initialBackoff // Reset backoff on success
batch := make([]lease.LeaseWatchResult, 0)
for _, etcdEvent := range wresp.Events {
subnetEvent, err := parseSubnetWatchResponse(ctx, esr.cli, etcdEvent)
switch {
case err == nil:
wr := lease.LeaseWatchResult{
Events: []lease.Event{subnetEvent},
Cursor: watchCursor{wresp.Header.Revision},
}
batch = append(batch, wr)
case isIndexTooSmall(err):
log.Warning("Watch of subnet leases failed because etcd index outside history window")
wr, err := esr.leasesWatchReset(ctx)
if err != nil {
log.Errorf("error resetting etcd watch: %s", err)
}
batch = append(batch, wr)
default:
log.Errorf("couldn't read etcd event: %s", err)
}
}
if len(batch) > 0 {
leaseWatchChan <- batch
}
}
if len(batch) > 0 {
leaseWatchChan <- batch
}
}
}
Expand Down
Loading