Skip to content

Commit

Permalink
unit tests for broker + rebase to matts endpoints change
Browse files Browse the repository at this point in the history
  • Loading branch information
vaikas committed Mar 18, 2020
1 parent 3980649 commit ab94ec3
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 182 deletions.
28 changes: 13 additions & 15 deletions pkg/reconciler/mtbroker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/dynamic"
appsv1listers "k8s.io/client-go/listers/apps/v1"
corev1listers "k8s.io/client-go/listers/core/v1"

duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
Expand Down Expand Up @@ -62,8 +61,7 @@ type Reconciler struct {

// listers index properties about resources
brokerLister eventinglisters.BrokerLister
serviceLister corev1listers.ServiceLister
deploymentLister appsv1listers.DeploymentLister
endpointsLister corev1listers.EndpointsLister
subscriptionLister messaginglisters.SubscriptionLister
triggerLister eventinglisters.TriggerLister

Expand Down Expand Up @@ -164,13 +162,21 @@ func (r *Reconciler) reconcileKind(ctx context.Context, b *v1alpha1.Broker) pkgr
b.Status.TriggerChannel = &chanMan.ref
b.Status.PropagateTriggerChannelReadiness(&triggerChan.Status)

filterDeployment, err := r.deploymentLister.Deployments(system.Namespace()).Get("broker-filter")
filterEndpoints, err := r.endpointsLister.Endpoints(system.Namespace()).Get("broker-filter")
if err != nil {
logging.FromContext(ctx).Error("Problem reconciling filter Deployment", zap.Error(err))
b.Status.MarkFilterFailed("DeploymentFailure", "%v", err)
logging.FromContext(ctx).Error("Problem getting endpoints for filter", zap.String("namespace", system.Namespace()), zap.Error(err))
b.Status.MarkFilterFailed("ServiceFailure", "%v", err)
return err
}
b.Status.PropagateFilterDeploymentAvailability(filterDeployment)
b.Status.PropagateFilterAvailability(filterEndpoints)

ingressEndpoints, err := r.endpointsLister.Endpoints(system.Namespace()).Get("broker-ingress")
if err != nil {
logging.FromContext(ctx).Error("Problem getting endpoints for ingress", zap.String("namespace", system.Namespace()), zap.Error(err))
b.Status.MarkIngressFailed("ServiceFailure", "%v", err)
return err
}
b.Status.PropagateIngressAvailability(ingressEndpoints)

// Route everything to shared ingress, just tack on the namespace/name as path
// so we can route there appropriately.
Expand All @@ -180,14 +186,6 @@ func (r *Reconciler) reconcileKind(ctx context.Context, b *v1alpha1.Broker) pkgr
Path: fmt.Sprintf("/%s/%s", b.Namespace, b.Name),
})

ingressDeployment, err := r.deploymentLister.Deployments(system.Namespace()).Get("broker-ingress")
if err != nil {
logging.FromContext(ctx).Error("Problem fetching ingress Deployment", zap.Error(err))
b.Status.MarkIngressFailed("DeploymentFailure", "%v", err)
return err
}
b.Status.PropagateIngressDeploymentAvailability(ingressDeployment)

// So, at this point the Broker is ready and everything should be solid
// for the triggers to act upon.
return nil
Expand Down
208 changes: 55 additions & 153 deletions pkg/reconciler/mtbroker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes/scheme"

clientgotesting "k8s.io/client-go/testing"
Expand All @@ -54,7 +53,6 @@ import (
"knative.dev/pkg/controller"
logtesting "knative.dev/pkg/logging/testing"
"knative.dev/pkg/resolver"
"knative.dev/pkg/system"

_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/trigger/fake"
. "knative.dev/eventing/pkg/reconciler/testing"
Expand All @@ -65,6 +63,7 @@ import (
type channelType string

const (
systemNS = "knative-testing"
testNS = "test-namespace"
brokerName = "test-broker"

Expand Down Expand Up @@ -112,10 +111,8 @@ var (

triggerChannelHostname = fmt.Sprintf("foo.bar.svc.%s", utils.GetClusterDomainName())

filterDeploymentName = fmt.Sprintf("%s-broker-filter", brokerName)
filterServiceName = fmt.Sprintf("%s-broker-filter", brokerName)
ingressDeploymentName = fmt.Sprintf("%s-broker-ingress", brokerName)
ingressServiceName = fmt.Sprintf("%s-broker", brokerName)
filterServiceName = "broker-filter"
ingressServiceName = "broker-ingress"

ingressSubscriptionGenerateName = fmt.Sprintf("internal-ingress-%s-", brokerName)
subscriptionName = fmt.Sprintf("%s-%s-%s", brokerName, triggerName, triggerUID)
Expand Down Expand Up @@ -164,6 +161,12 @@ var (
sinkDNS = "sink.mynamespace.svc." + utils.GetClusterDomainName()
sinkURI = "http://" + sinkDNS
finalizerUpdatedEvent = Eventf(corev1.EventTypeNormal, "FinalizerUpdate", `Updated "test-broker" finalizers`)

brokerAddress = &apis.URL{
Scheme: "http",
Host: fmt.Sprintf("%s.%s.svc.%s", ingressServiceName, systemNS, utils.GetClusterDomainName()),
Path: fmt.Sprintf("/%s/%s", testNS, brokerName),
}
)

func init() {
Expand Down Expand Up @@ -308,15 +311,20 @@ func TestReconcile(t *testing.T) {
WithBrokerChannel(channel()),
WithInitBrokerConditions),
createChannel(testNS, true),
NewEndpoints(filterServiceName, systemNS,
WithEndpointsLabels(FilterLabels()),
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
NewEndpoints(ingressServiceName, systemNS,
WithEndpointsLabels(IngressLabels()),
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
WithBrokerChannel(channel()),
WithBrokerReady,
WithBrokerTriggerChannel(createTriggerChannelRef()),
WithBrokerAddress(fmt.Sprintf("%s.%s.svc.%s", ingressServiceName, testNS, utils.GetClusterDomainName())),
),
WithBrokerAddressURI(brokerAddress)),
}},
WantEvents: []string{
finalizerUpdatedEvent,
Expand All @@ -333,6 +341,12 @@ func TestReconcile(t *testing.T) {
WithBrokerChannel(channel()),
WithInitBrokerConditions),
createChannel(testNS, true),
NewEndpoints(filterServiceName, systemNS,
WithEndpointsLabels(FilterLabels()),
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
NewEndpoints(ingressServiceName, systemNS,
WithEndpointsLabels(IngressLabels()),
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
},
WithReactors: []clientgotesting.ReactionFunc{
InduceFailure("update", "brokers"),
Expand All @@ -343,8 +357,7 @@ func TestReconcile(t *testing.T) {
WithBrokerChannel(channel()),
WithBrokerReady,
WithBrokerTriggerChannel(createTriggerChannelRef()),
WithBrokerAddress(fmt.Sprintf("%s.%s.svc.%s", ingressServiceName, testNS, utils.GetClusterDomainName())),
),
WithBrokerAddressURI(brokerAddress)),
}},
WantEvents: []string{
finalizerUpdatedEvent,
Expand All @@ -363,6 +376,12 @@ func TestReconcile(t *testing.T) {
WithBrokerChannel(channel()),
WithInitBrokerConditions),
createChannel(testNS, true),
NewEndpoints(filterServiceName, systemNS,
WithEndpointsLabels(FilterLabels()),
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
NewEndpoints(ingressServiceName, systemNS,
WithEndpointsLabels(IngressLabels()),
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
NewTrigger(triggerName, testNS, brokerName,
WithTriggerUID(triggerUID),
WithTriggerSubscriberURI(subscriberURI)),
Expand All @@ -385,8 +404,8 @@ func TestReconcile(t *testing.T) {
WithBrokerChannel(channel()),
WithBrokerReady,
WithBrokerTriggerChannel(createTriggerChannelRef()),
WithBrokerAddress(fmt.Sprintf("%s.%s.svc.%s", ingressServiceName, testNS, utils.GetClusterDomainName())))},
},
WithBrokerAddressURI(brokerAddress)),
}},
WantEvents: []string{
finalizerUpdatedEvent,
Eventf(corev1.EventTypeNormal, "TriggerReconciled", "Trigger reconciled"),
Expand Down Expand Up @@ -1008,8 +1027,8 @@ func TestReconcile(t *testing.T) {
subscriptionLister: listers.GetSubscriptionLister(),
triggerLister: listers.GetTriggerLister(),
brokerLister: listers.GetBrokerLister(),
serviceLister: listers.GetK8sServiceLister(),
deploymentLister: listers.GetDeploymentLister(),

endpointsLister: listers.GetEndpointsLister(),
kresourceTracker: duck.NewListableTracker(ctx, conditions.Get, func(types.NamespacedName) {}, 0),
channelableTracker: duck.NewListableTracker(ctx, channelable.Get, func(types.NamespacedName) {}, 0),
addressableTracker: duck.NewListableTracker(ctx, v1a1addr.Get, func(types.NamespacedName) {}, 0),
Expand Down Expand Up @@ -1040,143 +1059,6 @@ func channel() metav1.TypeMeta {
}
}

func livenessProbe() *corev1.Probe {
return &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/healthz",
Port: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 2,
}
}

func readinessProbe() *corev1.Probe {
return &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/readyz",
Port: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 2,
}
}

func envVars(containerName string) []corev1.EnvVar {
switch containerName {
case filterContainerName:
return []corev1.EnvVar{
{
Name: system.NamespaceEnvKey,
Value: system.Namespace(),
},
{
Name: "NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: "CONTAINER_NAME",
Value: filterContainerName,
},
{
Name: "BROKER",
Value: brokerName,
},
{
Name: "METRICS_DOMAIN",
Value: "knative.dev/internal/eventing",
},
}
case ingressContainerName:
return []corev1.EnvVar{
{
Name: system.NamespaceEnvKey,
Value: system.Namespace(),
},
{
Name: "NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: "CONTAINER_NAME",
Value: ingressContainerName,
},
{
Name: "FILTER",
Value: "",
},
{
Name: "CHANNEL",
Value: triggerChannelHostname,
},
{
Name: "BROKER",
Value: brokerName,
},
{
Name: "METRICS_DOMAIN",
Value: "knative.dev/internal/eventing",
},
}
}
return []corev1.EnvVar{}
}

func containerPorts(httpInternal int32) []corev1.ContainerPort {
return []corev1.ContainerPort{
{
Name: "http",
ContainerPort: httpInternal,
},
{
Name: "metrics",
ContainerPort: 9090,
},
}
}

func servicePorts(httpInternal int) []corev1.ServicePort {
svcPorts := []corev1.ServicePort{
{
Name: "http",
Port: 80,
TargetPort: intstr.FromInt(httpInternal),
}, {
Name: "http-metrics",
Port: 9090,
},
}
return svcPorts
}

func createChannel(namespace string, ready bool) *unstructured.Unstructured {
var labels map[string]interface{}
var annotations map[string]interface{}
Expand Down Expand Up @@ -1262,7 +1144,7 @@ func createTriggerChannelRef() *corev1.ObjectReference {
func makeServiceURI() *apis.URL {
return &apis.URL{
Scheme: "http",
Host: fmt.Sprintf("broker-filter.knative-eventing.svc.%s", utils.GetClusterDomainName()),
Host: fmt.Sprintf("broker-filter.%s.svc.%s", systemNS, utils.GetClusterDomainName()),
Path: fmt.Sprintf("/triggers/%s/%s/%s", testNS, triggerName, triggerUID),
}
}
Expand Down Expand Up @@ -1336,8 +1218,14 @@ func allBrokerObjectsReadyPlus(objs ...runtime.Object) []runtime.Object {
WithBrokerFinalizers("brokers.eventing.knative.dev"),
WithBrokerResourceVersion(""),
WithBrokerTriggerChannel(createTriggerChannelRef()),
WithBrokerAddress(fmt.Sprintf("%s.%s.svc.%s", ingressServiceName, testNS, utils.GetClusterDomainName()))),
WithBrokerAddressURI(brokerAddress)),
createChannel(testNS, true),
NewEndpoints(filterServiceName, systemNS,
WithEndpointsLabels(FilterLabels()),
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
NewEndpoints(ingressServiceName, systemNS,
WithEndpointsLabels(IngressLabels()),
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
}
return append(brokerObjs[:], objs...)
}
Expand Down Expand Up @@ -1451,3 +1339,17 @@ func patchRemoveFinalizers(namespace, name string) clientgotesting.PatchActionIm
action.Patch = []byte(patch)
return action
}

// FilterLabels generates the labels present on all resources representing the filter of the given
// Broker.
func FilterLabels() map[string]string {
return map[string]string{
"eventing.knative.dev/brokerRole": "filter",
}
}

func IngressLabels() map[string]string {
return map[string]string{
"eventing.knative.dev/brokerRole": "ingress",
}
}
Loading

0 comments on commit ab94ec3

Please sign in to comment.