Skip to content

Commit

Permalink
* Preserve OVS datapath flows after an agent restart
Browse files Browse the repository at this point in the history
Without this patch, when the antrea-ovs container exits gracefully
(e.g. as part of a RollingUpdate for the DaemonSet), datapath flows are
deleted, which breaks existing connections. It takes 10 seconds for
exiting connections to start working again. A recent OVS patch changed
the default behavior of ovs-vswitchd so that datapath flows are no
longer deleted on exit. Because this patch is not yet available in any
released OVS version (and won't be for > 6 months), we apply it manually
when we build our base OVS Docker image.

This is a step towards non-disruptive upgrade. However, there are other
factors in play: 1) when the OVS daemons are stopped, the VXLAN device
(vxlan_sys_4789) goes away, which impacts connectivity between Nodes,
and 2) when ovs-vswitchd restarts, datapath flows are flushed and we
have to wait until the Antrea Agent has replayed the flows.

* Disable new e2e test for Kind

When using the netdev datapath, packet forwarding is done by the
ovs-vswitchd daemon itself, so even existing connections are broken.

* Use arping instead of ping for test

When using ping the test can be flaky: if one of the hosts need to send
an ARP request to refresh its ARP entry while the test is ongoing, odds
are that there is no cached flow in the datapath. If this happens, the
host will mark the ARP entry has "incomplete" and the ping will start
failing.

Fixes antrea-io#355
  • Loading branch information
antoninbas committed Feb 19, 2020
1 parent b6f87f3 commit e4a0467
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 35 deletions.
9 changes: 7 additions & 2 deletions build/images/ovs/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
FROM ubuntu:18.04 as ovs-debs

# Some patches may not apply cleanly if another version is provided.
ARG OVS_VERSION=2.11.1

# Install dependencies for building OVS deb packages
RUN apt-get update && \
apt-get install -y --no-install-recommends wget ca-certificates build-essential fakeroot graphviz \
apt-get install -y --no-install-recommends wget curl git ca-certificates build-essential fakeroot graphviz \
bzip2 autoconf automake debhelper dh-autoreconf libssl-dev libtool openssl procps \
python-all python-twisted-conch python-zopeinterface python-six libunbound-dev

COPY apply-patches.sh /

# Download OVS source code and build debs
RUN wget -q -O - https://www.openvswitch.org/releases/openvswitch-$OVS_VERSION.tar.gz | tar xz -C /tmp && \
rm -rf openvswitch-$OVS_VERSION.tar.gz && \
cd /tmp/openvswitch* && DEB_BUILD_OPTIONS='parallel=8 nocheck' fakeroot debian/rules binary && \
cd /tmp/openvswitch* && \
/apply-patches.sh && \
DEB_BUILD_OPTIONS='parallel=8 nocheck' fakeroot debian/rules binary && \
cd /tmp && mkdir ovs-debs && \
mv libopenvswitch_*.deb openvswitch-common_*.deb openvswitch-switch_*.deb python-openvswitch_*.deb \
openvswitch-ipsec_*.deb ovs-debs/ && \
Expand Down
35 changes: 35 additions & 0 deletions build/images/ovs/apply-patches.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env bash

# Copyright 2020 Antrea Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This script applies unreleased patches (or released in a more recent version
# of OVS than the one Antrea is using) to OVS before building it. It needs to be
# run from the root of the OVS source tree.

set -eo pipefail

# We cannot use 3-way merge unless we are in a git repository. If we need 3-way
# merge, we will need to clone the repository with git instead of downloading a
# release tarball (see Dockerfile).

# These 2 patches (post 2.13.0) ensures that datapath flows are not deleted on
# ovs-vswitchd exit by default. Antrea relies on this to support hitless upgrade
# of the Agent DaemonSet.
# The second patch depends on the first one.
curl https://github.com/openvswitch/ovs/commit/586cd3101e7fda54d14fb5bf12d847f35d968627.patch | \
git apply
# We exclude 2 files which are likely to cause conflicts.
curl https://github.com/openvswitch/ovs/commit/79eadafeb1b47a3871cb792aa972f6e4d89d1a0b.patch | \
git apply --exclude NEWS --exclude vswitchd/ovs-vswitchd.8.in
96 changes: 81 additions & 15 deletions test/e2e/connectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package e2e

import (
"fmt"
"strings"
"testing"
"time"

"github.com/vmware-tanzu/antrea/pkg/agent/config"
)
Expand Down Expand Up @@ -185,10 +187,73 @@ func TestPodConnectivityAfterAntreaRestart(t *testing.T) {
data.runPingMesh(t, podNames)
}

// TestOVSRestart checks that when OVS restarts unexpectedly the Antrea agent takes care of
// replaying flows. More precisely this tests check that Pod connectivity is not broken after a
// restart.
func TestOVSRestart(t *testing.T) {
// TestOVSRestartSameNode verifies that datapath flows are not removed when the Antrea Agent Pod is
// stopped gracefully (e.g. as part of a RollingUpdate). The test sends ARP requests every 1s and
// checks that there is no packet loss during the restart. This test does not apply to the userspace
// ndetdev datapath, since in this case the datapath functionality is implemented by the
// ovs-vswitchd daemon itself. When ovs-vswitchd restarts, datapath flows are flushed and it may
// take some time for the Agent to replay the flows. This will not impact this test, since we are
// just testing L2 connectivity betwwen 2 Pods on the same Node, and the default behavior of the
// br-int bridge is to implement normal L2 forwarding.
func TestOVSRestartSameNode(t *testing.T) {
skipIfProviderIs(t, "kind", "test not valid for the netdev datapath type")
data, err := setupTest(t)
if err != nil {
t.Fatalf("Error when setting up test: %v", err)
}
defer teardownTest(t, data)

workerNode := workerNodeName(1)
t.Logf("Creating two busybox test Pods on '%s'", workerNode)
podNames, podIPs, cleanupFn := createTestBusyboxPods(t, data, 2, workerNode)
defer cleanupFn()

resCh := make(chan error, 1)

runArping := func() error {
// we send arp pings for 25 seconds; this duration is a bit arbitrary and we assume
// that restarting Antrea takes less than that time. Unfortunately, the arping
// utility in busybox does not let us choose a smaller interval than 1 second.
count := 25
cmd := fmt.Sprintf("arping -c %d %s", count, podIPs[1])
stdout, stderr, err := data.runCommandFromPod(testNamespace, podNames[0], busyboxContainerName, strings.Fields(cmd))
if err != nil {
return fmt.Errorf("error when running arping command: %v - stdout: %s - stderr: %s", err, stdout, stderr)
}
// if the datapath flows have been flushed, there will be some unanswered ARP
// requests.
_, _, lossRate, err := parseArpingStdout(stdout)
if err != nil {
return err
}
t.Logf("Arping loss rate: %f%%", lossRate)
if lossRate > 0 {
t.Logf(stdout)
return fmt.Errorf("arping loss rate is %f%%", lossRate)
}
return nil
}
go func() {
resCh <- runArping()
}()
// make sure that by the time we delete the Antrea agent, at least one unicast ARP has been
// sent (and cached in the OVS kernel datapath).
time.Sleep(3 * time.Second)

t.Logf("Restarting antrea-agent on Node '%s'", workerNode)
if _, err := data.deleteAntreaAgentOnNode(workerNode, 30 /* grace period in seconds */, defaultTimeout); err != nil {
t.Fatalf("Error when restarting antrea-agent on Node '%s': %v", workerNode, err)
}

if err := <-resCh; err != nil {
t.Errorf("Arping test failed: %v", err)
}
}

// TestOVSFlowReplay checks that when OVS restarts unexpectedly the Antrea agent takes care of
// replaying flows. More precisely this test checks that Pod connectivity still works after deleting
// the flows and force-restarting the OVS dameons.
func TestOVSFlowReplay(t *testing.T) {
skipIfProviderIs(t, "kind", "stopping OVS daemons create connectivity issues")
data, err := setupTest(t)
if err != nil {
Expand Down Expand Up @@ -219,17 +284,18 @@ func TestOVSRestart(t *testing.T) {
}
t.Logf("The Antrea Pod for Node '%s' is '%s'", workerNode, antreaPodName)

t.Logf("Restarting OVS daemons on Node '%s'", workerNode)
// We cannot use "ovs-ctl restart" as it takes care of saving / restoring the flows, while
// we are trying to test whether the Antrea agent takes care of replaying the flows after an
// unscheduled restart.
stopCmd := []string{"/usr/share/openvswitch/scripts/ovs-ctl", "stop"}
if stdout, stderr, err := data.runCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, stopCmd); err != nil {
t.Fatalf("Error when stopping OVS with ovs-ctl: %v - stdout: %s - stderr: %s", err, stdout, stderr)
}
startCmd := []string{"/usr/share/openvswitch/scripts/ovs-ctl", "--system-id=random", "start", "--db-file=/var/run/openvswitch/conf.db"}
if stdout, stderr, err := data.runCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, startCmd); err != nil {
t.Fatalf("Error when starting OVS with ovs-ctl: %v - stdout: %s - stderr: %s", err, stdout, stderr)
t.Logf("Deleting flows and restarting OVS daemons on Node '%s'", workerNode)
delFlows := func() {
cmd := []string{"ovs-ofctl", "del-flows", defaultBridgeName}
_, stderr, err := data.runCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd)
if err != nil {
t.Fatalf("error when deleting flows: <%v>, err: <%v>", stderr, err)
}
}
delFlows()
restartCmd := []string{"/usr/share/openvswitch/scripts/ovs-ctl", "--system-id=random", "restart", "--db-file=/var/run/openvswitch/conf.db"}
if stdout, stderr, err := data.runCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, restartCmd); err != nil {
t.Fatalf("Error when restarting OVS with ovs-ctl: %v - stdout: %s - stderr: %s", err, stdout, stderr)
}

// This should give Antrea ~10s to restore flows, since we generate 10 "pings" with a 1s
Expand Down
67 changes: 67 additions & 0 deletions test/e2e/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,70 @@ func deletePodWrapper(tb testing.TB, data *TestData, name string) {
tb.Logf("Error when deleting Pod: %v", err)
}
}

// createTestBusyboxPods creates the desired number of busybox Pods and wait for their IP address to
// become available. This is a common patter in our tests, so having this helper function makes
// sense. It calls Fatalf in case of error, so it must be called from the goroutine running the test
// or benchmark function. You can create all the Pods on the same Node by setting nodeName. If
// nodeName is the empty string, each Pod will be created on an arbitrary
// Node. createTestBusyboxPods returns the cleanupFn function which can be used to delete the
// created Pods. Pods are created in parallel to reduce the time required to run the tests.
func createTestBusyboxPods(tb testing.TB, data *TestData, num int, nodeName string) (
podNames []string, podIPs []string, cleanupFn func(),
) {
cleanupFn = func() {
for _, podName := range podNames {
deletePodWrapper(tb, data, podName)
}
}

type podData struct {
podName string
podIP string
err error
}

createPodAndGetIP := func() (string, string, error) {
podName := randName("test-pod-")

tb.Logf("Creating a busybox test Pod '%s' and waiting for IP", podName)
if err := data.createBusyboxPodOnNode(podName, nodeName); err != nil {
tb.Errorf("Error when creating busybox test Pod '%s': %v", podName, err)
return "", "", err
}

if podIP, err := data.podWaitForIP(defaultTimeout, podName); err != nil {
tb.Errorf("Error when waiting for IP for Pod '%s': %v", podName, err)
return podName, "", err
} else {
return podName, podIP, nil
}
}

podsCh := make(chan podData, num)

for i := 0; i < num; i++ {
go func() {
podName, podIP, err := createPodAndGetIP()
podsCh <- podData{podName, podIP, err}
}()
}

errCnt := 0
for i := 0; i < num; i++ {
pod := <-podsCh
if pod.podName != "" {
podNames = append(podNames, pod.podName)
podIPs = append(podIPs, pod.podIP)
}
if pod.err != nil {
errCnt++
}
}
if errCnt > 0 {
defer cleanupFn()
tb.Fatalf("%d / %d Pods could not be created successfully", errCnt, num)
}

return podNames, podIPs, cleanupFn
}
20 changes: 20 additions & 0 deletions test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,26 @@ func (data *TestData) forAllAntreaPods(fn func(nodeName, podName string) error)
return nil
}

func parseArpingStdout(out string) (sent uint32, received uint32, loss float32, err error) {
re := regexp.MustCompile(`Sent\s+(\d+)\s+probe.*\nReceived\s+(\d+)\s+response`)
matches := re.FindStringSubmatch(out)
if len(matches) == 0 {
return 0, 0, 0.0, fmt.Errorf("Unexpected arping output")
}
if v, err := strconv.ParseUint(matches[1], 10, 32); err != nil {
return 0, 0, 0.0, fmt.Errorf("Error when retrieving 'sent probes' from arpping output: %v", err)
} else {
sent = uint32(v)
}
if v, err := strconv.ParseUint(matches[2], 10, 32); err != nil {
return 0, 0, 0.0, fmt.Errorf("Error when retrieving 'received responses' from arpping output: %v", err)
} else {
received = uint32(v)
}
loss = 100. * float32(sent-received) / float32(sent)
return sent, received, loss, nil
}

func (data *TestData) runPingCommandFromTestPod(podName string, targetIP string, count int) error {
cmd := []string{"ping", "-c", strconv.Itoa(count), targetIP}
_, _, err := data.runCommandFromPod(testNamespace, podName, busyboxContainerName, cmd)
Expand Down
22 changes: 4 additions & 18 deletions test/e2e/networkpolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,24 +62,10 @@ func TestIPBlockWithExcept(t *testing.T) {
}
}()

podName0 := randName("test-pod-networkpolicy-")
if err := data.createBusyboxPodOnNode(podName0, workerNode); err != nil {
t.Fatalf("Error when creating busybox test Pod: %v", err)
}
defer deletePodWrapper(t, data, podName0)
if _, err := data.podWaitForIP(defaultTimeout, podName0); err != nil {
t.Fatalf("Error when waiting for IP for Pod '%s': %v", podName0, err)
}

podName1 := randName("test-pod-networkpolicy-")
if err := data.createBusyboxPodOnNode(podName1, workerNode); err != nil {
t.Fatalf("Error when creating busybox test Pod: %v", err)
}
defer deletePodWrapper(t, data, podName1)
podIP1, err := data.podWaitForIP(defaultTimeout, podName1)
if err != nil {
t.Fatalf("Error when waiting for IP for Pod '%s': %v", podName1, err)
}
podNames, podIPs, cleanupFn := createTestBusyboxPods(t, data, 2, workerNode)
defer cleanupFn()
podName0 := podNames[0]
podName1, podIP1 := podNames[1], podIPs[1]

// Both pods cannot connect to service.
if err = data.runNetcatCommandFromTestPod(podName0, svcName, 80); err == nil {
Expand Down

0 comments on commit e4a0467

Please sign in to comment.