Skip to content

Commit

Permalink
add network connectivity test (#1424)
Browse files Browse the repository at this point in the history
* add network connectivity test

- tests traffic for following scenarios on same node and on different node
  - Primary ENI IP - Primary ENI IP
  - Primary ENI IP - Secondary ENI IP
  - Secondary ENI IP - Secondary ENI IP
- traffic type - TCP, UPD, ICMP

* add readme for integration testing
  • Loading branch information
abhipth authored Apr 14, 2021
1 parent 1ddd82f commit 0013eba
Show file tree
Hide file tree
Showing 13 changed files with 1,013 additions and 5 deletions.
2 changes: 1 addition & 1 deletion test/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ func New(options Options) *Framework {
Options: options,
K8sClient: k8sClient,
CloudServices: aws.NewCloud(cloudConfig),
K8sResourceManagers: k8s.NewResourceManager(k8sClient),
K8sResourceManagers: k8s.NewResourceManager(k8sClient, k8sSchema, config),
}
}
99 changes: 98 additions & 1 deletion test/framework/resources/aws/services/ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@ import (

type EC2 interface {
DescribeInstanceType(instanceType string) ([]*ec2.InstanceTypeInfo, error)
DescribeInstance(instanceID string) (*ec2.Instance, error)
AuthorizeSecurityGroupIngress(groupID string, protocol string, fromPort int, toPort int, cidrIP string) error
RevokeSecurityGroupIngress(groupID string, protocol string, fromPort int, toPort int, cidrIP string) error
AuthorizeSecurityGroupEgress(groupID string, protocol string, fromPort int, toPort int, cidrIP string) error
RevokeSecurityGroupEgress(groupID string, protocol string, fromPort int, toPort int, cidrIP string) error
}

type defaultEC2 struct {
ec2iface.EC2API
}

func (d defaultEC2) DescribeInstanceType(instanceType string) ([]*ec2.InstanceTypeInfo, error) {
func (d *defaultEC2) DescribeInstanceType(instanceType string) ([]*ec2.InstanceTypeInfo, error) {
describeInstanceTypeIp := &ec2.DescribeInstanceTypesInput{
InstanceTypes: aws.StringSlice([]string{instanceType}),
}
Expand All @@ -44,6 +49,98 @@ func (d defaultEC2) DescribeInstanceType(instanceType string) ([]*ec2.InstanceTy
return describeInstanceOp.InstanceTypes, nil
}

func (d *defaultEC2) DescribeInstance(instanceID string) (*ec2.Instance, error) {
describeInstanceInput := &ec2.DescribeInstancesInput{
InstanceIds: aws.StringSlice([]string{instanceID}),
}
describeInstanceOutput, err := d.EC2API.DescribeInstances(describeInstanceInput)
if err != nil {
return nil, err
}
if describeInstanceOutput == nil || len(describeInstanceOutput.Reservations) == 0 ||
len(describeInstanceOutput.Reservations[0].Instances) == 0 {
return nil, fmt.Errorf("failed to find instance %s", instanceID)
}
return describeInstanceOutput.Reservations[0].Instances[0], nil
}

func (d *defaultEC2) AuthorizeSecurityGroupIngress(groupID string, protocol string,
fromPort int, toPort int, cidrIP string) error {
ipPermissions := &ec2.IpPermission{
FromPort: aws.Int64(int64(fromPort)),
ToPort: aws.Int64(int64(toPort)),
IpProtocol: aws.String(protocol),
IpRanges: []*ec2.IpRange{
{
CidrIp: aws.String(cidrIP),
},
},
}
authorizeSecurityGroupIngressInput := &ec2.AuthorizeSecurityGroupIngressInput{
GroupId: aws.String(groupID),
IpPermissions: []*ec2.IpPermission{ipPermissions},
}
_, err := d.EC2API.AuthorizeSecurityGroupIngress(authorizeSecurityGroupIngressInput)
return err
}

func (d *defaultEC2) RevokeSecurityGroupIngress(groupID string, protocol string, fromPort int, toPort int, cidrIP string) error {
ipPermissions := &ec2.IpPermission{
FromPort: aws.Int64(int64(fromPort)),
ToPort: aws.Int64(int64(toPort)),
IpProtocol: aws.String(protocol),
IpRanges: []*ec2.IpRange{
{
CidrIp: aws.String(cidrIP),
},
},
}
revokeSecurityGroupIngressInput := &ec2.RevokeSecurityGroupIngressInput{
GroupId: aws.String(groupID),
IpPermissions: []*ec2.IpPermission{ipPermissions},
}
_, err := d.EC2API.RevokeSecurityGroupIngress(revokeSecurityGroupIngressInput)
return err
}

func (d *defaultEC2) AuthorizeSecurityGroupEgress(groupID string, protocol string, fromPort int, toPort int, cidrIP string) error {
ipPermissions := &ec2.IpPermission{
FromPort: aws.Int64(int64(fromPort)),
ToPort: aws.Int64(int64(toPort)),
IpProtocol: aws.String(protocol),
IpRanges: []*ec2.IpRange{
{
CidrIp: aws.String(cidrIP),
},
},
}
authorizeSecurityGroupEgressInput := &ec2.AuthorizeSecurityGroupEgressInput{
GroupId: aws.String(groupID),
IpPermissions: []*ec2.IpPermission{ipPermissions},
}
_, err := d.EC2API.AuthorizeSecurityGroupEgress(authorizeSecurityGroupEgressInput)
return err
}

func (d *defaultEC2) RevokeSecurityGroupEgress(groupID string, protocol string, fromPort int, toPort int, cidrIP string) error {
ipPermissions := &ec2.IpPermission{
FromPort: aws.Int64(int64(fromPort)),
ToPort: aws.Int64(int64(toPort)),
IpProtocol: aws.String(protocol),
IpRanges: []*ec2.IpRange{
{
CidrIp: aws.String(cidrIP),
},
},
}
revokeSecurityGroupEgressInput := &ec2.RevokeSecurityGroupEgressInput{
GroupId: aws.String(groupID),
IpPermissions: []*ec2.IpPermission{ipPermissions},
}
_, err := d.EC2API.RevokeSecurityGroupEgress(revokeSecurityGroupEgressInput)
return err
}

func NewEC2(session *session.Session) EC2 {
return &defaultEC2{
EC2API: ec2.New(session),
Expand Down
19 changes: 18 additions & 1 deletion test/framework/resources/k8s/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package k8s
import (
"github.com/aws/amazon-vpc-cni-k8s/test/framework/resources/k8s/resources"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand All @@ -26,6 +28,8 @@ type ResourceManagers interface {
NamespaceManager() resources.NamespaceManager
ServiceManager() resources.ServiceManager
NodeManager() resources.NodeManager
PodManager() resources.PodManager
DaemonSetManager() resources.DaemonSetManager
}

type defaultManager struct {
Expand All @@ -35,16 +39,21 @@ type defaultManager struct {
namespaceManager resources.NamespaceManager
serviceManager resources.ServiceManager
nodeManager resources.NodeManager
podManager resources.PodManager
daemonSetManager resources.DaemonSetManager
}

func NewResourceManager(k8sClient client.DelegatingClient) ResourceManagers {
func NewResourceManager(k8sClient client.DelegatingClient,
scheme *runtime.Scheme, config *rest.Config) ResourceManagers {
return &defaultManager{
jobManager: resources.NewDefaultJobManager(k8sClient),
deploymentManager: resources.NewDefaultDeploymentManager(k8sClient),
customResourceManager: resources.NewCustomResourceManager(k8sClient),
namespaceManager: resources.NewDefaultNamespaceManager(k8sClient),
serviceManager: resources.NewDefaultServiceManager(k8sClient),
nodeManager: resources.NewDefaultNodeManager(k8sClient),
podManager: resources.NewDefaultPodManager(k8sClient, scheme, config),
daemonSetManager: resources.NewDefaultDaemonSetManager(k8sClient),
}
}

Expand All @@ -71,3 +80,11 @@ func (m *defaultManager) ServiceManager() resources.ServiceManager {
func (m *defaultManager) NodeManager() resources.NodeManager {
return m.nodeManager
}

func (m *defaultManager) PodManager() resources.PodManager {
return m.podManager
}

func (m *defaultManager) DaemonSetManager() resources.DaemonSetManager {
return m.daemonSetManager
}
12 changes: 10 additions & 2 deletions test/framework/resources/k8s/manifest/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package manifest

import (
utils "github.com/aws/amazon-vpc-cni-k8s/test/framework/utils"
"github.com/aws/amazon-vpc-cni-k8s/test/framework/utils"

"github.com/aws/aws-sdk-go/aws"
v1 "k8s.io/api/apps/v1"
Expand All @@ -38,11 +38,19 @@ func NewBusyBoxDeploymentBuilder() *DeploymentBuilder {
name: "deployment-test",
replicas: 10,
container: NewBusyBoxContainerBuilder().Build(),
labels: map[string]string{"d": "d"},
labels: map[string]string{"role": "test"},
terminationGracePeriod: 0,
}
}

func NewDefaultDeploymentBuilder() *DeploymentBuilder {
return &DeploymentBuilder{
namespace: utils.DefaultTestNamespace,
terminationGracePeriod: 0,
labels: map[string]string{"role": "test"},
}
}

func (d *DeploymentBuilder) Namespace(namespace string) *DeploymentBuilder {
d.namespace = namespace
return d
Expand Down
69 changes: 69 additions & 0 deletions test/framework/resources/k8s/resources/daemonset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package resources

import (
"context"

"github.com/aws/amazon-vpc-cni-k8s/test/framework/utils"
v1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type DaemonSetManager interface {
GetDaemonSet(namespace string, name string) (*v1.DaemonSet, error)
UpdateAndWaitTillDaemonSetReady(old *v1.DaemonSet, new *v1.DaemonSet) (*v1.DaemonSet, error)
}

type defaultDaemonSetManager struct {
k8sClient client.DelegatingClient
}

func NewDefaultDaemonSetManager(k8sClient client.DelegatingClient) DaemonSetManager {
return &defaultDaemonSetManager{k8sClient: k8sClient}
}

func (d *defaultDaemonSetManager) GetDaemonSet(namespace string, name string) (*v1.DaemonSet, error) {
ctx := context.Background()
daemonSet := &v1.DaemonSet{}
err := d.k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespace,
Name: name,
}, daemonSet)
return daemonSet, err
}

func (d *defaultDaemonSetManager) UpdateAndWaitTillDaemonSetReady(old *v1.DaemonSet, new *v1.DaemonSet) (*v1.DaemonSet, error) {
ctx := context.Background()
err := d.k8sClient.Patch(ctx, new, client.MergeFrom(old))
if err != nil {
return nil, err
}

observed := &v1.DaemonSet{}
return observed, wait.PollImmediateUntil(utils.PollIntervalShort, func() (bool, error) {
if err := d.k8sClient.Get(ctx, utils.NamespacedName(new), observed); err != nil {
return false, err
}
if observed.Status.NumberReady == (new.Status.DesiredNumberScheduled) &&
observed.Status.NumberAvailable == (new.Status.DesiredNumberScheduled) &&
observed.Status.UpdatedNumberScheduled == (new.Status.DesiredNumberScheduled) &&
observed.Status.ObservedGeneration >= new.Generation {
return true, nil
}
return false, nil
}, ctx.Done())
}
105 changes: 105 additions & 0 deletions test/framework/resources/k8s/resources/pod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package resources

import (
"bytes"
"context"
"net/http"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

type PodManager interface {
PodExec(namespace string, name string, command []string) (string, string, error)
GetPodsWithLabelSelector(labelKey string, labelVal string) (v1.PodList, error)
}

type defaultPodManager struct {
k8sClient client.DelegatingClient
k8sSchema *runtime.Scheme
config *rest.Config
}

func NewDefaultPodManager(k8sClient client.DelegatingClient, k8sSchema *runtime.Scheme,
config *rest.Config) PodManager {

return &defaultPodManager{
k8sClient: k8sClient,
k8sSchema: k8sSchema,
config: config,
}
}

func (d *defaultPodManager) PodExec(namespace string, name string, command []string) (string, string, error) {
pod := &v1.Pod{}
err := d.k8sClient.Get(context.Background(), types.NamespacedName{
Namespace: namespace,
Name: name,
}, pod)
if err != nil {
return "", "", err
}

gkv, err := apiutil.GVKForObject(pod, d.k8sSchema)
if err != nil {
return "", "", err
}
restClient, err := apiutil.RESTClientForGVK(gkv, d.config, serializer.NewCodecFactory(d.k8sSchema))
if err != nil {
return "", "", err
}

execOptions := &v1.PodExecOptions{
Stdout: true,
Stderr: true,
Command: command,
}

req := restClient.Post().
Resource("pods").
Name(pod.Name).
Namespace(pod.Namespace).
SubResource("exec").
VersionedParams(execOptions, runtime.NewParameterCodec(d.k8sSchema))

exec, err := remotecommand.NewSPDYExecutor(d.config, http.MethodPost, req.URL())
if err != nil {
return "", "", err
}

var stdout, stderr bytes.Buffer
err = exec.Stream(remotecommand.StreamOptions{
Stdout: &stdout,
Stderr: &stderr,
})

return stdout.String(), stderr.String(), err
}

func (d *defaultPodManager) GetPodsWithLabelSelector(labelKey string, labelVal string) (v1.PodList, error) {
ctx := context.Background()
podList := v1.PodList{}
err := d.k8sClient.List(ctx, &podList, client.MatchingLabels{
labelKey: labelVal,
})
return podList, err
}
Loading

0 comments on commit 0013eba

Please sign in to comment.