Skip to content

Commit

Permalink
Adds endpoint-status file-writing and endpoint-status CNI waiting (#8469
Browse files Browse the repository at this point in the history
)

CNI plugin can now delay pod startup until Felix has finished programming the dataplane.
  • Loading branch information
aaaaaaaalex authored Mar 11, 2024
1 parent a0d4556 commit b549e68
Show file tree
Hide file tree
Showing 40 changed files with 2,297 additions and 427 deletions.
11 changes: 11 additions & 0 deletions api/pkg/apis/projectcalico/v3/felixconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,14 @@ type FelixConfigurationSpec struct {
// +kubebuilder:validation:Pattern=`^([0-9]+(\\.[0-9]+)?(ms|s|m|h))*$`
EndpointReportingDelay *metav1.Duration `json:"endpointReportingDelay,omitempty" configv1timescale:"seconds" confignamev1:"EndpointReportingDelaySecs"`

// EndpointStatusPathPrefix is the path to the directory
// where endpoint status will be written. Endpoint status
// file reporting is disabled if field is left empty.
//
// Chosen directory should match the directory used by the CNI for PodStartupDelay.
// [Default: ""]
EndpointStatusPathPrefix string `json:"endpointStatusPathPrefix,omitempty"`

// IptablesMarkMask is the mask that Felix selects its IPTables Mark bits from. Should be a 32 bit hexadecimal
// number with at least 8 bits set, none of which clash with any other mark bits in use on the system.
// [Default: 0xff000000]
Expand Down Expand Up @@ -408,6 +416,9 @@ type FelixConfigurationSpec struct {
// +kubebuilder:validation:Type=string
// +kubebuilder:validation:Pattern=`^([0-9]+(\\.[0-9]+)?(ms|s|m|h))*$`
DebugSimulateDataplaneHangAfter *metav1.Duration `json:"debugSimulateDataplaneHangAfter,omitempty" configv1timescale:"seconds"`
// +kubebuilder:validation:Type=string
// +kubebuilder:validation:Pattern=`^([0-9]+(\\.[0-9]+)?(ms|s|m|h))*$`
DebugSimulateDataplaneApplyDelay *metav1.Duration `json:"debugSimulateDataplaneApplyDelay,omitempty" configv1timescale:"seconds"`
// DebugHost is the host IP or hostname to bind the debug port to. Only used
// if DebugPort is set. [Default:localhost]
DebugHost *string `json:"debugHost,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions api/pkg/apis/projectcalico/v3/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions api/pkg/openapi/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion calicoctl/calicoctl/commands/crds/crds.go

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions cni-plugin/internal/pkg/utils/utils_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright (c) 2024 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils_test

import (
Expand Down
26 changes: 25 additions & 1 deletion cni-plugin/pkg/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net"
"os"
"runtime"
"strings"
"time"

Expand Down Expand Up @@ -50,6 +51,7 @@ import (
"github.com/projectcalico/calico/cni-plugin/internal/pkg/utils/cri"
"github.com/projectcalico/calico/cni-plugin/pkg/dataplane"
"github.com/projectcalico/calico/cni-plugin/pkg/types"
"github.com/projectcalico/calico/cni-plugin/pkg/wait"
)

// CmdAddK8s performs the "ADD" operation on a kubernetes pod
Expand Down Expand Up @@ -490,7 +492,8 @@ func CmdAddK8s(ctx context.Context, args *skel.CmdArgs, conf types.NetConf, epID
// Pod resource. (In Enterprise) Felix also modifies the pod through a patch and setting this avoids patching the
// same fields as Felix so that we can't clobber Felix's updates.
ctxPatchCNI := k8sresources.ContextWithPatchMode(ctx, k8sresources.PatchModeCNI)
if _, err := utils.CreateOrUpdate(ctxPatchCNI, calicoClient, endpoint); err != nil {
var endpointOut *libapi.WorkloadEndpoint
if endpointOut, err = utils.CreateOrUpdate(ctxPatchCNI, calicoClient, endpoint); err != nil {
logger.WithError(err).Error("Error creating/updating endpoint in datastore.")
releaseIPAM()
return nil, err
Expand All @@ -502,6 +505,27 @@ func CmdAddK8s(ctx context.Context, args *skel.CmdArgs, conf types.NetConf, epID
Name: endpoint.Spec.InterfaceName},
)

// Conditionally wait for host-local Felix to program the policy for this WEP.
// Error if negative, ignore if 0.
if conf.PolicySetupTimeoutSeconds < 0 {
return nil, fmt.Errorf("invalid pod startup delay of %d", conf.PolicySetupTimeoutSeconds)
} else if conf.PolicySetupTimeoutSeconds > 0 {
if runtime.GOOS == "windows" {
logger.Warn("Config policy_setup_timeout_seconds is not supported on Windows. Ignoring...")
} else {
if conf.EndpointStatusDir == "" {
conf.EndpointStatusDir = "/var/run/calico/endpoint-status"
}
timeout := time.Duration(conf.PolicySetupTimeoutSeconds) * time.Second
// Must use endpointOut because we need the endpoint's Name field
// to be filled in.
err := wait.ForEndpointReadyWithTimeout(conf.EndpointStatusDir, endpointOut, timeout)
if err != nil {
logrus.WithError(err).Warn("Error waiting for endpoint to become ready. Unblocking pod creation...")
}
}
}

return result, nil
}

Expand Down
16 changes: 16 additions & 0 deletions cni-plugin/pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,22 @@ type NetConf struct {
// The CNI plugin waits until all the endpoints specified in ReadinessGates are ready
ReadinessGates []string `json:"readiness_gates"`

// PolicySetupTimeoutSeconds is the maximum duration to delay
// pod startup when waiting for Felix to program policy for the endpoint.
//
// When set to a positive integer, the CNI will watch the directory specified
// by EndpointStatusDir for status updates about workloads being created.
//
// Feature is off when set to 0.
//
// Default: 0
PolicySetupTimeoutSeconds int `json:"policy_setup_timeout_seconds,omitempty"`

// The directory to watch for workload status updates from Felix.
//
// Default: /var/run/calico/endpoint-status
EndpointStatusDir string `json:"endpoint_status_dir,omitempty"`

// Options below here are deprecated.
EtcdAuthority string `json:"etcd_authority"`
Hostname string `json:"hostname"`
Expand Down
225 changes: 225 additions & 0 deletions cni-plugin/pkg/wait/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
// Copyright (c) 2024 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package wait

import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"time"

"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"

libapi "github.com/projectcalico/calico/libcalico-go/lib/apis/v3"
"github.com/projectcalico/calico/libcalico-go/lib/names"
)

// ForEndpointReadyWithTimeout blocks until a status file for the given endpoint
// is seen in the provided directory. Unblocks with an error after exceeding timeout.
func ForEndpointReadyWithTimeout(policyDir string, endpoint *libapi.WorkloadEndpoint, timeout time.Duration) error {
if endpoint == nil {
logrus.Panic("Endpoint is nil")
}

key, err := names.V3WorkloadEndpointToWorkloadEndpointKey(endpoint)
if err != nil {
return fmt.Errorf("failed to convert endpoint to key: %w", err)
}
filename := names.WorkloadEndpointKeyToStatusFilename(key)
log := logrus.WithFields(logrus.Fields{
"policyDir": policyDir,
"namespace": endpoint.Namespace,
"workload": endpoint.Name,
"desiredFile": filename,
})
log.Debug("Waiting for workload's status file to appear")

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
err = waitUntilFileExists(ctx, policyDir, filename)
if err != nil {
return fmt.Errorf("timed out waiting for endpoint status file '%s': %w", filepath.Join(policyDir, filename), err)
}

return nil
}

// Unblocks without error when the designated file is seen in directory.
// Returns with error if context is cancelled before file is found.
func waitUntilFileExists(ctx context.Context, directory, filename string) error {
log := logrus.WithFields(logrus.Fields{
"directory": directory,
"disiredFile": filename,
})

retryInterval := 500 * time.Millisecond
for {
found, err := waitUntilFileExistsOrError(ctx, directory, filename)
if err != nil {
log.WithError(err).Info("Filesystem check failed. Scheduling retry...")
} else if found {
break
}

// Not found: ctx cancelled or error.
select {
case <-ctx.Done():
goto exitFileNotFound
case <-time.After(retryInterval):
log.Info("Retrying filesystem check...")
}
}

return nil

exitFileNotFound:
return errors.New("file not found")
}

// Waits for a file to exist on disk.
//
// # Sets up a watcher and also stats the filesystem
//
// Returns the last error encountered, though
// more than one error may occurr before returning.
func waitUntilFileExistsOrError(ctx context.Context, directory, filename string) (found bool, err error) {
log := logrus.WithFields(logrus.Fields{
"directory": directory,
"desiredFile": filename,
})

watch, cleanup, err := startWatchForFile(ctx, directory, filename)
if err != nil {
log.WithError(err).Warn("Encountered an error while starting a filesystem watch, polling filesystem instead...")
} else {
defer cleanup()
}

// Always call a stat before continuing with watcher processing.
// If watches are broken, the loop will then degrade into a poll.
// If the watch is healthy, we should still stat in case we missed
// the create event.
found, err = statFile(directory, filename)
if err != nil {
log.WithError(err).Warn("Encountered an error polling filesystem for file.")
} else if found {
return true, nil
}

// In the case where the watcher was created, progress to
// consuming watch events. Otherwise return error.
if watch == nil {
return false, err
}
log.Debug("Progressing to watch event processing")
return watch()
}

// Starts a watcher in the given directory and returns:
//
// - A blocking function which consumes watch events.
//
// - func returns a bool indicating whether or not a create-type event was
// seen for a file whose name matches the passed string.
//
// - Returns non-nil error if an error is received from the watcher.
//
// - A cleanup func which closes the watcher and ignores any errors
// encountered while doing so.
//
// - An error if one is encountered while setting up watcher boilerplate
// (in-which-case other return values will be nil).
func startWatchForFile(ctx context.Context, directory, filename string) (func() (bool, error), func(), error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, nil, err
}

err = watcher.Add(directory)
if err != nil {
defer closeWatcherAndIgnoreErr(watcher)
return nil, nil, err
}

watchFunc := func() (bool, error) {
return waitForCreateEvent(ctx, watcher, filename)
}
cleanupFunc := func() {
closeWatcherAndIgnoreErr(watcher)
}
return watchFunc, cleanupFunc, nil
}

// Processes events from watcher and returns a boolean indicating
// whether an event was seen for a file whose base matches filename.
//
// Will only return an error if an error event from the watcher is received.
//
// Returns false, and nil error if context is cancelled.
func waitForCreateEvent(ctx context.Context, watcher *fsnotify.Watcher, filename string) (bool, error) {
log := logrus.WithFields(logrus.Fields{
"watchedDirectories": watcher.WatchList(),
"targetFilename": filename,
})

if watcher == nil {
log.Panic("watcher is nil")
}

for {
select {
case e := <-watcher.Events:
log.WithField("eventName", e.Name).Debug("Received watch event")
switch e.Op {
case fsnotify.Create:
if filepath.Base(e.Name) == filename {
log.WithField("eventName", e.Name).Debug("FS 'Create' event seen for file. Firing notification and stopping watch")
return true, nil
}
default:
}

case err := <-watcher.Errors:
return false, err
case <-ctx.Done():
return false, nil
}
}
}

func closeWatcherAndIgnoreErr(w *fsnotify.Watcher) {
err := w.Close()
if err != nil {
logrus.WithError(err).Debug("Ignoring error encountered while closing filesystem watch")
}
}

func statFile(directory, filename string) (bool, error) {
f, err := os.Stat(filepath.Join(directory, filename))
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
return false, nil
}
return false, err
} else if f != nil && f.Name() == filename {
return true, nil
}

return false, err
}
Loading

0 comments on commit b549e68

Please sign in to comment.