Skip to content

Commit

Permalink
Add shutdown listener
Browse files Browse the repository at this point in the history
  • Loading branch information
Claes Mogren committed Oct 9, 2019
1 parent 04a795f commit 624eb22
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 9 deletions.
1 change: 0 additions & 1 deletion ipamd/datastore/data_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,6 @@ func (ds *DataStore) RemoveUnusedENIFromStore(warmIPTarget int) string {

deletableENI := ds.getDeletableENI(warmIPTarget)
if deletableENI == nil {
log.Debugf("No ENI can be deleted at this time")
return ""
}

Expand Down
33 changes: 26 additions & 7 deletions ipamd/ipamd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

log "github.com/cihub/seelog"
Expand Down Expand Up @@ -161,10 +162,10 @@ type IPAMContext struct {
primaryIP map[string]string
lastNodeIPPoolAction time.Time
lastDecreaseIPPool time.Time

// reconcileCooldownCache keeps timestamps of the last time an IP address was unassigned from an ENI,
// so that we don't reconcile and add it back too quickly if IMDS lags behind reality.
reconcileCooldownCache ReconcileCooldownCache
terminating int32 // Flag to warn that the pod is about to shut down.
}

// Keep track of recently freed IPs to avoid reading stale EC2 metadata
Expand Down Expand Up @@ -231,10 +232,13 @@ func New(k8sapiClient k8sapi.K8SAPIs, eniConfig *eniconfig.ENIConfigController)
log.Errorf("Failed to initialize awsutil interface %v", err)
return nil, errors.Wrap(err, "ipamd: can not initialize with AWS SDK interface")
}

c.awsClient = client

c.primaryIP = make(map[string]string)
c.reconcileCooldownCache.cache = make(map[string]time.Time)
c.warmENITarget = getWarmENITarget()
c.warmIPTarget = getWarmIPTarget()
c.useCustomNetworking = UseCustomNetworkCfg()

err = c.nodeInit()
if err != nil {
Expand Down Expand Up @@ -265,10 +269,6 @@ func (c *IPAMContext) nodeInit() error {
}
ipMax.Set(float64(c.maxIPsPerENI * c.maxENI))

c.useCustomNetworking = UseCustomNetworkCfg()
c.primaryIP = make(map[string]string)
c.reconcileCooldownCache.cache = make(map[string]time.Time)

enis, err := c.awsClient.GetAttachedENIs()
if err != nil {
log.Error("Failed to retrieve ENI info")
Expand Down Expand Up @@ -453,11 +453,16 @@ func (c *IPAMContext) decreaseIPPool(interval time.Duration) {

// tryFreeENI always tries to free one ENI
func (c *IPAMContext) tryFreeENI() {
if c.isTerminating() {
log.Debug("AWS CNI is terminating, not detaching any ENIs")
return
}

eni := c.dataStore.RemoveUnusedENIFromStore(c.warmIPTarget)
if eni == "" {
log.Info("No ENI to remove, all ENIs have IPs in use")
return
}

log.Debugf("Start freeing ENI %s", eni)
err := c.awsClient.FreeENI(eni)
if err != nil {
Expand Down Expand Up @@ -560,6 +565,11 @@ func (c *IPAMContext) increaseIPPool() {
return
}

if c.isTerminating() {
log.Debug("AWS CNI is terminating, will not try to attach any new IPs or ENIs right now")
return
}

// Try to add more IPs to existing ENIs first.
increasedPool, err := c.tryAssignIPs()
if err != nil {
Expand Down Expand Up @@ -1050,6 +1060,15 @@ func (c *IPAMContext) ipTargetState() (short int, over int, enabled bool) {
return short, over, true
}

// setTerminating atomically sets the terminating flag.
func (c *IPAMContext) setTerminating() {
atomic.StoreInt32(&c.terminating, 1)
}

func (c *IPAMContext) isTerminating() bool {
return atomic.LoadInt32(&c.terminating) > 0
}

// GetConfigForDebug returns the active values of the configuration env vars (for debugging purposes).
func GetConfigForDebug() map[string]interface{} {
return map[string]interface{}{
Expand Down
11 changes: 11 additions & 0 deletions ipamd/ipamd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,16 @@ func TestNodeInit(t *testing.T) {
ctrl, mockAWS, mockK8S, mockNetwork, _ := setup(t)
defer ctrl.Finish()

term := int32(0) // Initializing to 0, meaning 'false'
mockContext := &IPAMContext{
awsClient: mockAWS,
k8sClient: mockK8S,
maxIPsPerENI: 14,
maxENI: 4,
warmENITarget: 1,
warmIPTarget: 3,
primaryIP: make(map[string]string),
terminating: &term,
networkClient: mockNetwork}

eni1 := awsutils.ENIMetadata{
Expand Down Expand Up @@ -165,6 +168,7 @@ func testIncreaseIPPool(t *testing.T, useENIConfig bool) {
ctrl, mockAWS, mockK8S, mockNetwork, mockENIConfig := setup(t)
defer ctrl.Finish()

term := int32(0) // Initializing to 0, meaning 'false'
mockContext := &IPAMContext{
awsClient: mockAWS,
k8sClient: mockK8S,
Expand All @@ -175,6 +179,7 @@ func testIncreaseIPPool(t *testing.T, useENIConfig bool) {
useCustomNetworking: UseCustomNetworkCfg(),
eniConfig: mockENIConfig,
primaryIP: make(map[string]string),
terminating: &term,
}

mockContext.dataStore = datastore.NewDataStore()
Expand Down Expand Up @@ -242,6 +247,7 @@ func TestTryAddIPToENI(t *testing.T) {
defer ctrl.Finish()

warmIpTarget := 3
term := int32(0) // Initializing to 0, meaning 'false'
mockContext := &IPAMContext{
awsClient: mockAWS,
k8sClient: mockK8S,
Expand All @@ -252,6 +258,7 @@ func TestTryAddIPToENI(t *testing.T) {
networkClient: mockNetwork,
eniConfig: mockENIConfig,
primaryIP: make(map[string]string),
terminating: &term,
}

mockContext.dataStore = datastore.NewDataStore()
Expand Down Expand Up @@ -305,11 +312,13 @@ func TestNodeIPPoolReconcile(t *testing.T) {
ctrl, mockAWS, mockK8S, mockNetwork, _ := setup(t)
defer ctrl.Finish()

term := int32(0) // Initializing to 0, meaning 'false'
mockContext := &IPAMContext{
awsClient: mockAWS,
k8sClient: mockK8S,
networkClient: mockNetwork,
primaryIP: make(map[string]string),
terminating: &term,
}

mockContext.dataStore = datastore.NewDataStore()
Expand Down Expand Up @@ -392,11 +401,13 @@ func TestGetWarmIPTargetState(t *testing.T) {
ctrl, mockAWS, mockK8S, mockNetwork, _ := setup(t)
defer ctrl.Finish()

term := int32(0) // Initializing to 0, meaning 'false'
mockContext := &IPAMContext{
awsClient: mockAWS,
k8sClient: mockK8S,
networkClient: mockNetwork,
primaryIP: make(map[string]string),
terminating: &term,
}

mockContext.dataStore = datastore.NewDataStore()
Expand Down
21 changes: 21 additions & 0 deletions ipamd/rpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ package ipamd

import (
"net"
"os"
"os/signal"
"syscall"

"github.com/pkg/errors"

Expand Down Expand Up @@ -124,9 +127,27 @@ func (c *IPAMContext) RunRPCHandler() error {
healthpb.RegisterHealthServer(s, hs)
// Register reflection service on gRPC server.
reflection.Register(s)
// Add shutdown hook
go c.shutdownListener(s)
if err := s.Serve(lis); err != nil {
log.Errorf("Failed to start server on gRPC port: %v", err)
return errors.Wrap(err, "ipamd: failed to start server on gPRC port")
}
return nil
}

// shutdownListener - Listen to signals and set ipamd to be in status "terminating"
func (c *IPAMContext) shutdownListener(s *grpc.Server) {
log.Info("Setting up shutdown hook.")
sig := make(chan os.Signal, 1)

// Interrupt signal sent from terminal
signal.Notify(sig, syscall.SIGINT)
// Terminate signal sent from Kubernetes
signal.Notify(sig, syscall.SIGTERM)

<-sig
log.Info("Received shutdown signal, setting 'terminating' to true")
// We received an interrupt signal, shut down.
c.setTerminating()
}
2 changes: 1 addition & 1 deletion scripts/install-aws.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ if [[ -f /host/etc/cni/net.d/aws.conf ]]; then
fi

echo "====== Starting amazon-k8s-agent ======"
/app/aws-k8s-agent
exec /app/aws-k8s-agent

0 comments on commit 624eb22

Please sign in to comment.