-
Notifications
You must be signed in to change notification settings - Fork 374
/
agent.go
1340 lines (1238 loc) · 51.7 KB
/
agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2019 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net"
"os"
"strconv"
"strings"
"time"
"github.com/containernetworking/plugins/pkg/ip"
"github.com/spf13/afero"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
clockutils "k8s.io/utils/clock"
"antrea.io/antrea/pkg/agent/cniserver"
"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/controller/noderoute"
"antrea.io/antrea/pkg/agent/controller/trafficcontrol"
"antrea.io/antrea/pkg/agent/externalnode"
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/openflow/cookie"
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/agent/wireguard"
"antrea.io/antrea/pkg/apis/crd/v1alpha1"
"antrea.io/antrea/pkg/client/clientset/versioned"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/ovs/ovsctl"
"antrea.io/antrea/pkg/util/env"
utilip "antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/pkg/util/k8s"
utilwait "antrea.io/antrea/pkg/util/wait"
)
const (
// Default name of the default tunnel interface on the OVS bridge.
defaultTunInterfaceName = "antrea-tun0"
maxRetryForHostLink = 5
// ipsecPSKEnvKey is environment variable.
ipsecPSKEnvKey = "ANTREA_IPSEC_PSK"
roundNumKey = "roundNum" // round number key in externalIDs.
initialRoundNum = 1
maxRetryForRoundNumSave = 5
// On Linux, OVS configures the MTU for tunnel interfaces to 65000.
// See https://github.com/openvswitch/ovs/blame/3e666ba000b5eff58da8abb4e8c694ac3f7b08d6/lib/dpif-netlink-rtnl.c#L348-L360
// There are some edge cases (e.g., Kind clusters) where the transport Node's MTU may be
// larger than that (e.g., 65535), and packets may be dropped. To account for this, we use
// 65000 as an upper bound for the MTU calculated in getInterfaceMTU, when encap is
// supported. For simplicity's sake, we also use this upper bound for Windows, even if it
// does not apply.
ovsTunnelMaxMTU = 65000
)
var (
// getIPNetDeviceFromIP is meant to be overridden for testing.
getIPNetDeviceFromIP = util.GetIPNetDeviceFromIP
// getIPNetDeviceByV4CIDR is meant to be overridden for testing.
getIPNetDeviceByCIDRs = util.GetIPNetDeviceByCIDRs
// getTransportIPNetDeviceByNameFn is meant to be overridden for testing.
getTransportIPNetDeviceByNameFn = getTransportIPNetDeviceByName
// setLinkUp is meant to be overridden for testing
setLinkUp = util.SetLinkUp
// configureLinkAddresses is meant to be overridden for testing
configureLinkAddresses = util.ConfigureLinkAddresses
)
// otherConfigKeysForIPsecCertificates are configurations added to OVS bridge when AuthenticationMode is "cert" and
// need to be deleted when changing to "psk".
var otherConfigKeysForIPsecCertificates = []string{"certificate", "private_key", "ca_cert", "remote_cert", "remote_name"}
var (
// Declared as variables for testing.
defaultFs = afero.NewOsFs()
clock clockutils.WithTicker = &clockutils.RealClock{}
getNodeTimeout = 30 * time.Second
)
// Initializer knows how to setup host networking, OpenVSwitch, and Openflow.
type Initializer struct {
client clientset.Interface
crdClient versioned.Interface
ovsBridgeClient ovsconfig.OVSBridgeClient
ovsCtlClient ovsctl.OVSCtlClient
ofClient openflow.Client
routeClient route.Interface
wireGuardClient wireguard.Interface
ifaceStore interfacestore.InterfaceStore
ovsBridge string
hostGateway string // name of gateway port on the OVS bridge
mtu int
networkConfig *config.NetworkConfig
nodeConfig *config.NodeConfig
wireGuardConfig *config.WireGuardConfig
egressConfig *config.EgressConfig
serviceConfig *config.ServiceConfig
l7NetworkPolicyConfig *config.L7NetworkPolicyConfig
enableL7NetworkPolicy bool
enableL7FlowExporter bool
connectUplinkToBridge bool
enableAntreaProxy bool
// podNetworkWait should be decremented once the Node's network is ready.
// The CNI server will wait for it before handling any CNI Add requests.
podNetworkWait *utilwait.Group
// flowRestoreCompleteWait is used to indicate that required flows have
// been installed. We use it to determine whether flows from previous
// rounds can be deleted.
flowRestoreCompleteWait *utilwait.Group
stopCh <-chan struct{}
nodeType config.NodeType
externalNodeNamespace string
}
func NewInitializer(
k8sClient clientset.Interface,
crdClient versioned.Interface,
ovsBridgeClient ovsconfig.OVSBridgeClient,
ovsCtlClient ovsctl.OVSCtlClient,
ofClient openflow.Client,
routeClient route.Interface,
ifaceStore interfacestore.InterfaceStore,
ovsBridge string,
hostGateway string,
mtu int,
networkConfig *config.NetworkConfig,
wireGuardConfig *config.WireGuardConfig,
egressConfig *config.EgressConfig,
serviceConfig *config.ServiceConfig,
podNetworkWait *utilwait.Group,
flowRestoreCompleteWait *utilwait.Group,
stopCh <-chan struct{},
nodeType config.NodeType,
externalNodeNamespace string,
connectUplinkToBridge bool,
enableAntreaProxy bool,
enableL7NetworkPolicy bool,
enableL7FlowExporter bool,
) *Initializer {
return &Initializer{
ovsBridgeClient: ovsBridgeClient,
ovsCtlClient: ovsCtlClient,
client: k8sClient,
crdClient: crdClient,
ifaceStore: ifaceStore,
ofClient: ofClient,
routeClient: routeClient,
ovsBridge: ovsBridge,
hostGateway: hostGateway,
mtu: mtu,
networkConfig: networkConfig,
wireGuardConfig: wireGuardConfig,
egressConfig: egressConfig,
serviceConfig: serviceConfig,
l7NetworkPolicyConfig: &config.L7NetworkPolicyConfig{},
podNetworkWait: podNetworkWait,
flowRestoreCompleteWait: flowRestoreCompleteWait,
stopCh: stopCh,
nodeType: nodeType,
externalNodeNamespace: externalNodeNamespace,
connectUplinkToBridge: connectUplinkToBridge,
enableAntreaProxy: enableAntreaProxy,
enableL7NetworkPolicy: enableL7NetworkPolicy,
enableL7FlowExporter: enableL7FlowExporter,
}
}
// GetNodeConfig returns the NodeConfig.
func (i *Initializer) GetNodeConfig() *config.NodeConfig {
return i.nodeConfig
}
// GetWireGuardClient returns the Wireguard client.
func (i *Initializer) GetWireGuardClient() wireguard.Interface {
return i.wireGuardClient
}
// setupOVSBridge sets up the OVS bridge and create host gateway interface and tunnel port
func (i *Initializer) setupOVSBridge() error {
if err := i.ovsBridgeClient.Create(); err != nil {
klog.ErrorS(err, "Failed to create OVS bridge")
return err
}
// Wait for the datapath ID for the bridge to be available, as it indicates that the bridge
// has been configured and that we should be able to query supported datapath features.
if _, err := i.ovsBridgeClient.WaitForDatapathID(5 * time.Second); err != nil {
return fmt.Errorf("error when waiting for OVS bridge datapath ID: %w", err)
}
if err := i.validateSupportedDPFeatures(); err != nil {
return err
}
if err := i.prepareOVSBridge(); err != nil {
return err
}
// Initialize interface cache
if err := i.initInterfaceStore(); err != nil {
return err
}
if i.nodeType == config.K8sNode {
if err := i.setupDefaultTunnelInterface(); err != nil {
return err
}
// Set up host gateway interface
err := i.setupGatewayInterface()
if err != nil {
return err
}
}
return nil
}
func (i *Initializer) validateSupportedDPFeatures() error {
gotFeatures, err := i.ovsCtlClient.GetDPFeatures()
if err != nil {
return err
}
// Basic requirements.
requiredFeatures := []ovsctl.DPFeature{
ovsctl.CTStateFeature,
ovsctl.CTZoneFeature,
ovsctl.CTMarkFeature,
ovsctl.CTLabelFeature,
}
// AntreaProxy requires CTStateNAT feature.
if i.enableAntreaProxy {
requiredFeatures = append(requiredFeatures, ovsctl.CTStateNATFeature)
}
for _, feature := range requiredFeatures {
supported, found := gotFeatures[feature]
if !found {
return fmt.Errorf("the required OVS DP feature '%s' support is unknown", feature)
}
if !supported {
return fmt.Errorf("the required OVS DP feature '%s' is not supported", feature)
}
}
return nil
}
// initInterfaceStore initializes InterfaceStore with all OVS ports retrieved
// from the OVS bridge.
func (i *Initializer) initInterfaceStore() error {
ovsPorts, err := i.ovsBridgeClient.GetPortList()
if err != nil {
klog.ErrorS(err, "Failed to list OVS ports")
return err
}
parseGatewayInterfaceFunc := func(port *ovsconfig.OVSPortData, ovsPort *interfacestore.OVSPortConfig) *interfacestore.InterfaceConfig {
intf := &interfacestore.InterfaceConfig{
Type: interfacestore.GatewayInterface,
InterfaceName: port.Name,
MAC: port.MAC,
OVSPortConfig: ovsPort}
if intf.InterfaceName != i.hostGateway {
klog.InfoS("The discovered gateway interface name is different from the configured value", "discovered", intf.InterfaceName, "configured", i.hostGateway)
// Set the gateway interface name to the discovered name.
i.hostGateway = intf.InterfaceName
}
return intf
}
parseUplinkInterfaceFunc := func(port *ovsconfig.OVSPortData, ovsPort *interfacestore.OVSPortConfig) *interfacestore.InterfaceConfig {
return &interfacestore.InterfaceConfig{
Type: interfacestore.UplinkInterface,
InterfaceName: port.Name,
OVSPortConfig: ovsPort,
}
}
parseTunnelInterfaceFunc := func(port *ovsconfig.OVSPortData, ovsPort *interfacestore.OVSPortConfig) *interfacestore.InterfaceConfig {
intf := noderoute.ParseTunnelInterfaceConfig(port, ovsPort)
// This function can be called for both the "regular" tunnel port or an IPsec tunnel
// port, but the rest of the function only applies to the "regular" tunnel port.
if intf != nil && intf.Type == interfacestore.TunnelInterface && intf.InterfaceName != i.nodeConfig.DefaultTunName {
klog.InfoS("The discovered default tunnel interface name is different from the default value", "discovered", intf.InterfaceName, "default", i.nodeConfig.DefaultTunName)
// Set the default tunnel interface name to the discovered name.
i.nodeConfig.DefaultTunName = intf.InterfaceName
}
return intf
}
ifaceList := make([]*interfacestore.InterfaceConfig, 0, len(ovsPorts))
for index := range ovsPorts {
port := &ovsPorts[index]
ovsPort := &interfacestore.OVSPortConfig{
PortUUID: port.UUID,
OFPort: port.OFPort}
var intf *interfacestore.InterfaceConfig
interfaceType, ok := port.ExternalIDs[interfacestore.AntreaInterfaceTypeKey]
if !ok {
klog.ErrorS(nil, "Interface type is not set, you may be trying to upgrade from an Antrea version which is too old", "interfaceName", intf.InterfaceName)
continue
}
switch interfaceType {
case interfacestore.AntreaGateway:
intf = parseGatewayInterfaceFunc(port, ovsPort)
case interfacestore.AntreaUplink:
intf = parseUplinkInterfaceFunc(port, ovsPort)
case interfacestore.AntreaTunnel:
fallthrough
case interfacestore.AntreaIPsecTunnel:
intf = parseTunnelInterfaceFunc(port, ovsPort)
case interfacestore.AntreaHost:
if port.Name == i.ovsBridge {
// Need not to load the OVS bridge port to the interfaceStore
intf = nil
} else {
var err error
intf, err = externalnode.ParseHostInterfaceConfig(i.ovsBridgeClient, port, ovsPort)
if err != nil {
return fmt.Errorf("failed to get interfaceConfig by port %s: %v", port.Name, err)
}
}
case interfacestore.AntreaContainer:
// The port should be for a container interface.
intf = cniserver.ParseOVSPortInterfaceConfig(port, ovsPort)
case interfacestore.AntreaTrafficControl:
intf = trafficcontrol.ParseTrafficControlInterfaceConfig(port, ovsPort)
default:
klog.InfoS("Unknown Antrea interface type", "type", interfaceType)
}
if intf != nil {
klog.V(2).InfoS("Adding interface to cache", "interfaceName", intf.InterfaceName)
ifaceList = append(ifaceList, intf)
}
}
i.ifaceStore.Initialize(ifaceList)
return nil
}
func (i *Initializer) restorePortConfigs() error {
interfaces := i.ifaceStore.ListInterfaces()
for _, intf := range interfaces {
switch intf.Type {
case interfacestore.IPSecTunnelInterface:
fallthrough
case interfacestore.TrafficControlInterface:
if intf.OFPort < 0 {
klog.InfoS("Skipped setting no-flood for port due to invalid ofPort", "port", intf.InterfaceName, "ofport", intf.OFPort)
continue
}
if err := i.ovsCtlClient.SetPortNoFlood(int(intf.OFPort)); err != nil {
return fmt.Errorf("failed to set no-flood for port %s: %w", intf.InterfaceName, err)
}
klog.InfoS("Set no-flood for port", "port", intf.InterfaceName)
}
}
return nil
}
// Initialize sets up agent initial configurations.
func (i *Initializer) Initialize() error {
klog.Info("Setting up node network")
if err := i.initNodeLocalConfig(); err != nil {
return err
}
if err := i.prepareHostNetwork(); err != nil {
return err
}
if err := i.setupOVSBridge(); err != nil {
return err
}
if err := i.restorePortConfigs(); err != nil {
return err
}
if i.enableL7NetworkPolicy || i.enableL7FlowExporter {
// prepareL7EngineInterfaces must be executed after setupOVSBridge since it requires interfaceStore.
if err := i.prepareL7EngineInterfaces(); err != nil {
return err
}
}
// initializeWireGuard must be executed after setupOVSBridge as it requires gateway addresses on the OVS bridge.
if i.networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeWireGuard {
if err := i.initializeWireGuard(); err != nil {
return err
}
}
// TODO: clean up WireGuard related configurations.
// Initialize for IPsec PSK mode.
if i.networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeIPSec &&
i.networkConfig.IPsecConfig.AuthenticationMode == config.IPsecAuthenticationModePSK {
if err := i.waitForIPsecMonitorDaemon(); err != nil {
return err
}
if err := i.readIPSecPSK(); err != nil {
return err
}
}
// Initialize for IPsec Certificate mode.
if i.networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeIPSec &&
i.networkConfig.IPsecConfig.AuthenticationMode == config.IPsecAuthenticationModeCert {
if err := i.waitForIPsecMonitorDaemon(); err != nil {
return err
}
} else {
configs, err := i.ovsBridgeClient.GetOVSOtherConfig()
if err != nil {
return fmt.Errorf("failed to get OVS other configs: %w", err)
}
// Clean up certificate and private key files.
if configs["certificate"] != "" {
if err := os.Remove(configs["certificate"]); err != nil && !os.IsNotExist(err) {
klog.ErrorS(err, "Failed to delete unused IPsec certificate", "file", configs["certificate"])
}
}
if configs["private_key"] != "" {
if err := os.Remove(configs["private_key"]); err != nil && !os.IsNotExist(err) {
klog.ErrorS(err, "Failed to delete unused IPsec private key", "file", configs["private_key"])
}
}
toDelete := make(map[string]interface{})
for _, key := range otherConfigKeysForIPsecCertificates {
toDelete[key] = ""
}
// Clean up stale configs in OVS database.
if err := i.ovsBridgeClient.DeleteOVSOtherConfig(toDelete); err != nil {
return fmt.Errorf("failed to clean up OVS other configs: %w", err)
}
}
if i.nodeType == config.K8sNode {
i.podNetworkWait.Increment()
// routeClient.Initialize() should be after i.setupOVSBridge() which
// creates the host gateway interface.
if err := i.routeClient.Initialize(i.nodeConfig, i.podNetworkWait.Done); err != nil {
return err
}
// Install OpenFlow entries on OVS bridge.
if err := i.initOpenFlowPipeline(); err != nil {
return err
}
} else {
// Install OpenFlow entries on OVS bridge.
if err := i.initOpenFlowPipeline(); err != nil {
return err
}
}
klog.Infof("Agent initialized NodeConfig=%v, NetworkConfig=%v", i.nodeConfig, i.networkConfig)
return nil
}
// persistRoundNum will save the provided round number to OVSDB as an external ID. To account for
// transient failures, this (synchronous) function includes a retry mechanism.
func persistRoundNum(num uint64, bridgeClient ovsconfig.OVSBridgeClient, interval time.Duration, maxRetries int) {
klog.Infof("Persisting round number %d to OVSDB", num)
retry := 0
for {
err := saveRoundNum(num, bridgeClient)
if err == nil {
klog.Infof("Round number %d was persisted to OVSDB", num)
return // success
}
klog.Errorf("Error when writing round number to OVSDB: %v", err)
if retry >= maxRetries {
break
}
time.Sleep(interval)
}
klog.Errorf("Unable to persist round number %d to OVSDB after %d tries", num, maxRetries+1)
}
// initOpenFlowPipeline sets up necessary Openflow entries, including pipeline, classifiers, conn_track, and gateway flows
// Every time the agent is (re)started, we go through the following sequence:
// 1. agent determines the new round number (this is done by incrementing the round number
// persisted in OVSDB, or if it's not available by picking round 1).
// 2. any existing flow for which the round number matches the round number obtained from step 1
// is deleted.
// 3. all required flows are installed, using the round number obtained from step 1.
// 4. after convergence, all existing flows for which the round number matches the previous round
// number (i.e. the round number which was persisted in OVSDB, if any) are deleted.
// 5. the new round number obtained from step 1 is persisted to OVSDB.
//
// The rationale for not persisting the new round number until after all previous flows have been
// deleted is to avoid a situation in which some stale flows are never deleted because of successive
// agent restarts (with the agent crashing before step 4 can be completed). With the sequence
// described above, We guarantee that at most two rounds of flows exist in the switch at any given
// time.
func (i *Initializer) initOpenFlowPipeline() error {
roundInfo := getRoundInfo(i.ovsBridgeClient)
// Set up all basic flows.
ofConnCh, err := i.ofClient.Initialize(roundInfo, i.nodeConfig, i.networkConfig, i.egressConfig, i.serviceConfig, i.l7NetworkPolicyConfig)
if err != nil {
klog.Errorf("Failed to initialize openflow client: %v", err)
return err
}
if i.nodeType == config.ExternalNode {
if err := i.installVMInitialFlows(); err != nil {
return err
}
}
go func() {
// Delete stale flows from previous round. We need to wait long enough to ensure
// that all the flow which are still required have received an updated cookie (with
// the new round number), otherwise we would disrupt the dataplane. Unfortunately,
// the time required for convergence may be large and there is no simple way to
// determine when is a right time to perform the cleanup task.
// We took a first step towards introducing a deterministic mechanism through which
// the different entities responsible for installing flows can notify the agent that
// this deletion operation can take place. i.flowRestoreCompleteWait.Wait() will
// block until some key flows (NetworkPolicy flows, Pod flows, Node route flows)
// have been installed. But not all entities responsible for installing flows
// currently use this wait group, so we block for a minimum of 10 seconds.
time.Sleep(10 * time.Second)
i.flowRestoreCompleteWait.Wait()
klog.Info("Deleting stale flows from previous round if any")
if err := i.ofClient.DeleteStaleFlows(); err != nil {
klog.Errorf("Error when deleting stale flows from previous round: %v", err)
return
}
persistRoundNum(roundInfo.RoundNum, i.ovsBridgeClient, 1*time.Second, maxRetryForRoundNumSave)
}()
go func() {
for {
if _, ok := <-ofConnCh; !ok {
return
}
klog.Info("Replaying OF flows to OVS bridge")
i.ofClient.ReplayFlows()
klog.Info("Flow replay completed")
klog.InfoS("Restoring OF port configs to OVS bridge")
if err := i.restorePortConfigs(); err != nil {
klog.ErrorS(err, "Failed to restore OF port configs")
} else {
klog.InfoS("Port configs restoration completed")
}
// ofClient and ovsBridgeClient have their own mechanisms to restore connections with OVS, and it could
// happen that ovsBridgeClient's connection is not ready when ofClient completes flow replay. We retry it
// with a timeout that is longer time than ovsBridgeClient's maximum connecting retry interval (8 seconds)
// to ensure the flag can be removed successfully.
err = wait.PollUntilContextTimeout(context.TODO(), 200*time.Millisecond, 10*time.Second, true,
func(ctx context.Context) (done bool, err error) {
if err := i.FlowRestoreComplete(); err != nil {
return false, nil
}
return true, nil
})
// This shouldn't happen unless OVS is disconnected again after replaying flows. If it happens, we will try
// to clean up the config again so an error log should be fine.
if err != nil {
klog.Errorf("Failed to clean up flow-restore-wait config: %v", err)
}
}
}()
return nil
}
func (i *Initializer) FlowRestoreComplete() error {
// Issue #1600: A rare case has been found that the "flow-restore-wait" config was still true even though the delete
// call below was considered success. At the moment we don't know if it's a race condition caused by "ovs-vsctl set
// --no-wait" or a problem with OVSDB golang lib or OVSDB itself. To work around it, we check if the config is true
// before deleting it and if it is false after deleting it, and we will log warnings and retry a few times if
// anything unexpected happens.
// If the issue can still happen, it must be that some other code sets the config back after it's deleted.
getFlowRestoreWait := func() (bool, error) {
otherConfig, err := i.ovsBridgeClient.GetOVSOtherConfig()
if err != nil {
return false, fmt.Errorf("error when getting OVS other config")
}
return otherConfig["flow-restore-wait"] == "true", nil
}
// "flow-restore-wait" is supposed to be true here.
err := wait.PollUntilContextTimeout(context.TODO(), 200*time.Millisecond, 2*time.Second, true,
func(ctx context.Context) (done bool, err error) {
flowRestoreWait, err := getFlowRestoreWait()
if err != nil {
return false, err
}
if !flowRestoreWait {
// If the log is seen and the config becomes true later, we should look at why "ovs-vsctl set --no-wait"
// doesn't take effect on ovsdb immediately.
klog.Warning("flow-restore-wait was not true before the delete call was made, will retry")
return false, nil
}
return true, nil
})
if err != nil {
if wait.Interrupted(err) {
// This could happen if the method is triggered by OVS disconnection event, in which OVS doesn't restart.
klog.Info("flow-restore-wait was not true, skip cleaning it up")
return nil
}
return err
}
for retries := 0; retries < 3; retries++ {
// ovs-vswitchd is started with flow-restore-wait set to true for the following reasons:
// 1. It prevents packets from being mishandled by ovs-vswitchd in its default fashion,
// which could affect existing connections' conntrack state and cause issues like #625.
// 2. It prevents ovs-vswitchd from flushing or expiring previously set datapath flows,
// so existing connections can achieve 0 downtime during OVS restart.
// As a result, we remove the config here after restoring necessary flows.
klog.Info("Cleaning up flow-restore-wait config")
if err := i.ovsBridgeClient.DeleteOVSOtherConfig(map[string]interface{}{"flow-restore-wait": "true"}); err != nil {
return fmt.Errorf("error when cleaning up flow-restore-wait config: %v", err)
}
flowRestoreWait, err := getFlowRestoreWait()
if err != nil {
return err
}
if flowRestoreWait {
// If it is seen, we should look at OVSDB golang lib and OVS.
klog.Warningf("flow-restore-wait was still true even though the delete call was considered success")
continue
}
klog.Info("Cleaned up flow-restore-wait config")
return nil
}
return fmt.Errorf("error when cleaning up flow-restore-wait config: delete calls failed to take effect")
}
// setupGatewayInterface creates the host gateway interface which is an internal port on OVS. The ofport for host
// gateway interface is predefined, so invoke CreateInternalPort with a specific ofport_request
func (i *Initializer) setupGatewayInterface() error {
// Create host Gateway port if it does not exist
gatewayIface, portExists := i.ifaceStore.GetInterface(i.hostGateway)
if !portExists {
klog.V(2).Infof("Creating gateway port %s on OVS bridge", i.hostGateway)
externalIDs := map[string]interface{}{
interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaGateway,
}
mac := util.GenerateRandomMAC()
gwPortUUID, err := i.ovsBridgeClient.CreateInternalPort(i.hostGateway, config.DefaultHostGatewayOFPort, mac.String(), externalIDs)
if err != nil {
klog.ErrorS(err, "Failed to create gateway port on OVS bridge", "port", i.hostGateway)
return err
}
gwPort, err := i.ovsBridgeClient.GetOFPort(i.hostGateway, false)
if err != nil {
klog.ErrorS(err, "Failed to get gateway ofport", "port", i.hostGateway)
return err
}
klog.InfoS("Allocated OpenFlow port for gateway interface", "port", i.hostGateway, "ofPort", gwPort)
gatewayIface = interfacestore.NewGatewayInterface(i.hostGateway, mac)
gatewayIface.OVSPortConfig = &interfacestore.OVSPortConfig{PortUUID: gwPortUUID, OFPort: gwPort}
i.ifaceStore.AddInterface(gatewayIface)
} else {
klog.V(2).InfoS("Gateway port already exists on OVS bridge", "name", i.hostGateway, "ofPort", gatewayIface.OFPort)
}
// Idempotent operation to set the gateway's MTU: we perform this operation regardless of
// whether the gateway interface already exists, as the desired MTU may change across
// restarts.
klog.V(4).Infof("Setting gateway interface %s MTU to %d", i.hostGateway, i.networkConfig.InterfaceMTU)
if err := i.configureGatewayInterface(gatewayIface); err != nil {
return err
}
if err := i.setInterfaceMTU(i.hostGateway, i.networkConfig.InterfaceMTU); err != nil {
return err
}
// Set arp_announce to 1 on Linux platform to make the ARP requests sent on the gateway
// interface always use the gateway IP as the source IP, otherwise the ARP requests would be
// dropped by ARP SpoofGuard flow.
if i.nodeConfig.GatewayConfig.IPv4 != nil {
if err := setInterfaceARPAnnounce(gatewayIface.InterfaceName, 1); err != nil {
return err
}
}
return nil
}
func (i *Initializer) configureGatewayInterface(gatewayIface *interfacestore.InterfaceConfig) error {
var gwMAC net.HardwareAddr
var gwLinkIdx int
var err error
// Host link might not be queried at once after creating OVS internal port; retry max 5 times with 1s
// delay each time to ensure the link is ready.
for retry := 0; retry < maxRetryForHostLink; retry++ {
gwMAC, gwLinkIdx, err = setLinkUp(i.hostGateway)
if err == nil {
break
}
if _, ok := err.(util.LinkNotFound); ok {
klog.V(2).Infof("Not found host link for gateway %s, retry after 1s", i.hostGateway)
time.Sleep(1 * time.Second)
continue
}
return err
}
if err != nil {
klog.Errorf("Failed to find host link for gateway %s: %v", i.hostGateway, err)
return err
}
// Persist the MAC configured in the network interface when the gatewayIface.MAC is not set. This may
// happen in upgrade case.
// Note the "mac" field in Windows OVS internal Interface has no impact on the network adapter's actual MAC,
// set it to the same value just to keep consistency.
if bytes.Compare(gatewayIface.MAC, gwMAC) != 0 {
gatewayIface.MAC = gwMAC
if err := i.ovsBridgeClient.SetInterfaceMAC(gatewayIface.InterfaceName, gwMAC); err != nil {
klog.ErrorS(err, "Failed to persist interface MAC address", "interface", gatewayIface.InterfaceName, "mac", gwMAC)
}
}
i.nodeConfig.GatewayConfig = &config.GatewayConfig{
Name: i.hostGateway,
MAC: gwMAC,
LinkIndex: gwLinkIdx,
OFPort: uint32(gatewayIface.OFPort),
}
gatewayIface.IPs = []net.IP{}
if i.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() {
// Assign IP to gw as required by SpoofGuard.
if i.nodeConfig.NodeIPv4Addr != nil {
i.nodeConfig.GatewayConfig.IPv4 = i.nodeConfig.NodeTransportIPv4Addr.IP
gatewayIface.IPs = append(gatewayIface.IPs, i.nodeConfig.NodeTransportIPv4Addr.IP)
}
if i.nodeConfig.NodeIPv6Addr != nil {
i.nodeConfig.GatewayConfig.IPv6 = i.nodeConfig.NodeTransportIPv6Addr.IP
gatewayIface.IPs = append(gatewayIface.IPs, i.nodeConfig.NodeTransportIPv6Addr.IP)
}
// No need to assign local CIDR to gw0 because local CIDR is not managed by Antrea
return nil
}
// Allocate the gateway IP address for each Pod CIDR allocated to the Node. For each CIDR,
// the first address in the subnet is assigned to the host gateway interface.
podCIDRs := []*net.IPNet{i.nodeConfig.PodIPv4CIDR, i.nodeConfig.PodIPv6CIDR}
if err := i.allocateGatewayAddresses(podCIDRs, gatewayIface); err != nil {
return err
}
return nil
}
func (i *Initializer) setupDefaultTunnelInterface() error {
tunnelPortName := i.nodeConfig.DefaultTunName
tunnelIface, portExists := i.ifaceStore.GetInterface(tunnelPortName)
localIP := i.getTunnelPortLocalIP()
localIPStr := ""
if localIP != nil {
localIPStr = localIP.String()
}
// The correct OVS tunnel type to use GRE with an IPv6 overlay is
// "ip6gre" and not "gre". While it would be possible to support GRE for
// an IPv6-only cluster (by simply setting the tunnel type to "ip6gre"),
// things would be more complicated for a dual-stack cluster. For such a
// cluster, we have both IPv4 and IPv6 tunnels for inter-Node
// traffic. We would therefore need to create 2 default tunnel ports:
// one with type "gre" and one with type "ip6gre". This would introduce
// some complexity as the code currently assumes that we have a single
// default tunnel port. So for now, we just reject configurations that
// request a GRE tunnel when the Node network supports IPv6.
// See https://github.com/antrea-io/antrea/issues/3150
if i.networkConfig.TrafficEncapMode.SupportsEncap() &&
i.networkConfig.TunnelType == ovsconfig.GRETunnel &&
i.nodeConfig.NodeIPv6Addr != nil {
return fmt.Errorf("GRE tunnel type is not supported for IPv6 overlay")
}
// Enabling UDP checksum can greatly improve the performance for Geneve and
// VXLAN tunnels by triggering GRO on the receiver for old Linux kernel versions.
// It's not necessary for new Linux kernel versions with the following patch:
// https://github.com/torvalds/linux/commit/89e5c58fc1e2857ccdaae506fb8bc5fed57ee063.
shouldEnableCsum := i.networkConfig.TunnelCsum && (i.networkConfig.TunnelType == ovsconfig.GeneveTunnel || i.networkConfig.TunnelType == ovsconfig.VXLANTunnel)
createTunnelInterface := i.networkConfig.NeedsTunnelInterface()
// Check the default tunnel port.
if portExists {
if createTunnelInterface &&
tunnelIface.TunnelInterfaceConfig.Type == i.networkConfig.TunnelType &&
tunnelIface.TunnelInterfaceConfig.DestinationPort == i.networkConfig.TunnelPort &&
tunnelIface.TunnelInterfaceConfig.LocalIP.Equal(localIP) {
klog.V(2).InfoS("Tunnel port already exists on OVS bridge", "name", tunnelPortName, "ofPort", tunnelIface.OFPort)
if shouldEnableCsum != tunnelIface.TunnelInterfaceConfig.Csum {
klog.InfoS("Updating csum for tunnel port", "port", tunnelPortName, "csum", shouldEnableCsum)
if err := i.setTunnelCsum(tunnelPortName, shouldEnableCsum); err != nil {
return fmt.Errorf("failed to update csum for tunnel port %s to %v: %v", tunnelPortName, shouldEnableCsum, err)
}
tunnelIface.TunnelInterfaceConfig.Csum = shouldEnableCsum
}
i.nodeConfig.TunnelOFPort = uint32(tunnelIface.OFPort)
return nil
}
if err := i.ovsBridgeClient.DeletePort(tunnelIface.PortUUID); err != nil {
if createTunnelInterface {
return fmt.Errorf("failed to remove tunnel port %s with wrong tunnel type: %s", tunnelPortName, err)
}
klog.Errorf("Failed to remove tunnel port %s in NoEncapMode: %v", tunnelPortName, err)
} else {
klog.Infof("Removed tunnel port %s with tunnel type: %s", tunnelPortName, tunnelIface.TunnelInterfaceConfig.Type)
i.ifaceStore.DeleteInterface(tunnelIface)
}
}
// Create the default tunnel port and interface.
if createTunnelInterface {
if tunnelPortName != defaultTunInterfaceName {
// Reset the tunnel interface name to the desired name before
// recreating the tunnel port and interface.
tunnelPortName = defaultTunInterfaceName
i.nodeConfig.DefaultTunName = tunnelPortName
}
externalIDs := map[string]interface{}{
interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaTunnel,
}
extraOptions := map[string]interface{}{}
if i.networkConfig.TunnelPort != 0 {
extraOptions["dst_port"] = strconv.Itoa(int(i.networkConfig.TunnelPort))
}
tunnelPortUUID, err := i.ovsBridgeClient.CreateTunnelPortExt(tunnelPortName,
i.networkConfig.TunnelType, config.DefaultTunOFPort, shouldEnableCsum, localIPStr, "", "", "", extraOptions, externalIDs)
if err != nil {
klog.ErrorS(err, "Failed to create tunnel port on OVS bridge", "port", tunnelPortName, "type", i.networkConfig.TunnelType)
return err
}
tunPort, err := i.ovsBridgeClient.GetOFPort(tunnelPortName, false)
if err != nil {
klog.ErrorS(err, "Failed to get tunnel ofport on OVS bridge", "port", tunnelPortName, "type", i.networkConfig.TunnelType)
return err
}
klog.InfoS("Allocated OpenFlow port for tunnel interface", "port", tunnelPortName, "ofPort", tunPort)
ovsPortConfig := &interfacestore.OVSPortConfig{PortUUID: tunnelPortUUID, OFPort: tunPort}
tunnelIface = interfacestore.NewTunnelInterface(tunnelPortName, i.networkConfig.TunnelType, i.networkConfig.TunnelPort, localIP, shouldEnableCsum, ovsPortConfig)
i.ifaceStore.AddInterface(tunnelIface)
i.nodeConfig.TunnelOFPort = uint32(tunPort)
}
return nil
}
func (i *Initializer) setTunnelCsum(tunnelPortName string, enable bool) error {
options, err := i.ovsBridgeClient.GetInterfaceOptions(tunnelPortName)
if err != nil {
return fmt.Errorf("error getting interface options: %w", err)
}
updatedOptions := make(map[string]interface{})
for k, v := range options {
updatedOptions[k] = v
}
updatedOptions["csum"] = strconv.FormatBool(enable)
return i.ovsBridgeClient.SetInterfaceOptions(tunnelPortName, updatedOptions)
}
// initK8sNodeLocalConfig retrieves node's subnet CIDR from node.spec.PodCIDR, which is used for IPAM and setup
// host gateway interface.
func (i *Initializer) initK8sNodeLocalConfig(nodeName string) error {
var node *v1.Node
if err := wait.PollUntilContextTimeout(context.TODO(), 5*time.Second, getNodeTimeout, true,
func(ctx context.Context) (bool, error) {
var err error
node, err = i.client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
if err != nil {
return false, fmt.Errorf("failed to get Node with name %s from K8s: %w", nodeName, err)
}
// Except in networkPolicyOnly mode, we need a PodCIDR for the Node.
if !i.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() {
// Validate that PodCIDR has been configured.
if node.Spec.PodCIDRs == nil && node.Spec.PodCIDR == "" {
klog.InfoS("Waiting for Node PodCIDR configuration to complete", "nodeName", nodeName)
return false, nil
}
}
return true, nil
}); err != nil {
if wait.Interrupted(err) {
klog.ErrorS(err, "Spec.PodCIDR is empty for Node. Please make sure --allocate-node-cidrs is enabled "+
"for kube-controller-manager and --cluster-cidr specifies a sufficient CIDR range, or nodeIPAM is "+
"enabled for antrea-controller", "nodeName", nodeName)
return fmt.Errorf("Spec.PodCIDR is empty for Node %s", nodeName)
}
return err
}
// nodeInterface is the interface that has K8s Node IP. transportInterface is the interface that is used for
// tunneling or routing the traffic across Nodes. It defaults to nodeInterface and can be overridden by the
// configuration parameters TransportInterface and TransportInterfaceCIDRs.
var nodeInterface, transportInterface *net.Interface
// nodeIPv4Addr and nodeIPv6Addr are the IP addresses of nodeInterface.
// transportIPv4Addr and transportIPv6Addr are the IP addresses of transportInterface.
var nodeIPv4Addr, nodeIPv6Addr, transportIPv4Addr, transportIPv6Addr *net.IPNet
// Find the interface configured with Node IP and use it for Pod traffic.
ipAddrs, err := k8s.GetNodeAddrs(node)
if err != nil {
return fmt.Errorf("failed to obtain local IP addresses from K8s: %w", err)
}
nodeIPv4Addr, nodeIPv6Addr, nodeInterface, err = i.getNodeInterfaceFromIP(ipAddrs)
if err != nil {
return fmt.Errorf("failed to get local IPNet device with IP %v: %v", ipAddrs, err)
}
transportIPv4Addr = nodeIPv4Addr
transportIPv6Addr = nodeIPv6Addr
transportInterface = nodeInterface
if i.networkConfig.TransportIface != "" {
// Find the configured transport interface, and update its IP address in Node's annotation.
transportIPv4Addr, transportIPv6Addr, transportInterface, err = getTransportIPNetDeviceByNameFn(i.networkConfig.TransportIface, i.ovsBridge)
if err != nil {
return fmt.Errorf("failed to get local IPNet device with transport interface %s: %v", i.networkConfig.TransportIface, err)
}
klog.InfoS("Updating Node transport addresses annotation")
var ips []string
if transportIPv4Addr != nil {
ips = append(ips, transportIPv4Addr.IP.String())
}
if transportIPv6Addr != nil {
ips = append(ips, transportIPv6Addr.IP.String())
}
if err := i.patchNodeAnnotations(nodeName, types.NodeTransportAddressAnnotationKey, strings.Join(ips, ",")); err != nil {
return err
}
} else if len(i.networkConfig.TransportIfaceCIDRs) > 0 {
transportIPv4Addr, transportIPv6Addr, transportInterface, err = getIPNetDeviceByCIDRs(i.networkConfig.TransportIfaceCIDRs)
if err != nil {
return fmt.Errorf("failed to get local IPNet device with transport Address CIDR %s: %v", i.networkConfig.TransportIfaceCIDRs, err)
}
var ips []string
if transportIPv4Addr != nil {
ips = append(ips, transportIPv4Addr.IP.String())
}
if transportIPv6Addr != nil {
ips = append(ips, transportIPv6Addr.IP.String())
}
klog.InfoS("Updating Node transport addresses annotation")
if err := i.patchNodeAnnotations(nodeName, types.NodeTransportAddressAnnotationKey, strings.Join(ips, ",")); err != nil {
return err
}
} else {
// Remove the existing annotation "transport-address" if transportInterface is not set in the configuration.
if node.Annotations[types.NodeTransportAddressAnnotationKey] != "" {
klog.InfoS("Removing Node transport address annotation")
i.patchNodeAnnotations(nodeName, types.NodeTransportAddressAnnotationKey, nil)
}
}
// Update the Node's MAC address in the annotations of the Node. The MAC address will be used for direct routing by
// OVS in noencap case on Windows Nodes. As a mixture of Linux and Windows nodes is possible, Linux Nodes' MAC
// addresses should be reported too to make them discoverable for Windows Nodes.
if i.networkConfig.TrafficEncapMode.SupportsNoEncap() {
klog.InfoS("Updating Node MAC annotation")
if err := i.patchNodeAnnotations(nodeName, types.NodeMACAddressAnnotationKey, transportInterface.HardwareAddr.String()); err != nil {
return err
}
}
i.nodeConfig = &config.NodeConfig{
Name: nodeName,
Type: config.K8sNode,
OVSBridge: i.ovsBridge,
DefaultTunName: defaultTunInterfaceName,
NodeIPv4Addr: nodeIPv4Addr,
NodeIPv6Addr: nodeIPv6Addr,
NodeTransportInterfaceName: transportInterface.Name,
NodeTransportIPv4Addr: transportIPv4Addr,
NodeTransportIPv6Addr: transportIPv6Addr,
UplinkNetConfig: new(config.AdapterNetConfig),
NodeTransportInterfaceMTU: transportInterface.MTU,