Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V1beta1 reconcile triggers mt #3093

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ require (
k8s.io/apimachinery v0.16.5-beta.1
k8s.io/apiserver v0.16.4
k8s.io/client-go v11.0.1-0.20190805182717-6502b5e7b1b5+incompatible
k8s.io/kubernetes v1.14.7 // indirect
k8s.io/utils v0.0.0-20191010214722-8d271d903fe4
knative.dev/pkg v0.0.0-20200501005942-d980c0865972
knative.dev/test-infra v0.0.0-20200430225942-f7c1fafc1cde
Expand Down
1 change: 0 additions & 1 deletion pkg/apis/eventing/v1alpha1/trigger_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ const (
)

// +genclient
// +genreconciler
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// Trigger represents a request to have events delivered to a consumer from a
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/eventing/v1beta1/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ func (t testHelper) ReadyBrokerStatus() *BrokerStatus {
return bs
}

func (t testHelper) ReadyBrokerCondition() *apis.Condition {
return &apis.Condition{
Type: apis.ConditionReady,
Status: corev1.ConditionTrue,
Severity: apis.ConditionSeverityError,
}
}

func (t testHelper) UnknownBrokerStatus() *BrokerStatus {
bs := &BrokerStatus{}
return bs
Expand Down
3 changes: 1 addition & 2 deletions pkg/apis/eventing/v1beta1/trigger_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ func (ts *TriggerStatus) InitializeConditions() {
triggerCondSet.Manage(ts).InitializeConditions()
}

func (ts *TriggerStatus) PropagateBrokerStatus(bs *BrokerStatus) {
bc := bs.GetTopLevelCondition()
func (ts *TriggerStatus) PropagateBrokerCondition(bc *apis.Condition) {
if bc == nil {
ts.MarkBrokerNotConfigured()
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/eventing/v1beta1/trigger_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func TestTriggerConditionStatus(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
ts := &TriggerStatus{}
if test.brokerStatus != nil {
ts.PropagateBrokerStatus(test.brokerStatus)
ts.PropagateBrokerCondition(test.brokerStatus.GetTopLevelCondition())
}
if test.subscriptionCondition != nil {
ts.PropagateSubscriptionCondition(test.subscriptionCondition)
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/eventing/v1beta1/trigger_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
)

// +genclient
// +genreconciler
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// Trigger represents a request to have events delivered to a consumer from a
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 18 additions & 17 deletions pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,19 @@ import (
"knative.dev/pkg/logging"

duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/eventing/pkg/apis/eventing"
"knative.dev/eventing/pkg/apis/eventing/v1alpha1"
messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1"
clientset "knative.dev/eventing/pkg/client/clientset/versioned"
brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1alpha1/broker"
pkgduckv1alpha1 "knative.dev/pkg/apis/duck/v1alpha1"
pkgduckv1beta1 "knative.dev/pkg/apis/duck/v1beta1"

// Need this for Brokers since we reconcile them still as v1alpha1
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"
messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1alpha1"
eventingv1beta1listers "knative.dev/eventing/pkg/client/listers/eventing/v1beta1"
messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1beta1"
"knative.dev/eventing/pkg/duck"
"knative.dev/eventing/pkg/reconciler/broker/resources"
"knative.dev/eventing/pkg/reconciler/names"
Expand All @@ -71,7 +77,7 @@ type Reconciler struct {
endpointsLister corev1listers.EndpointsLister
deploymentLister appsv1listers.DeploymentLister
subscriptionLister messaginglisters.SubscriptionLister
triggerLister eventinglisters.TriggerLister
triggerLister eventingv1beta1listers.TriggerLister

channelableTracker duck.ListableTracker

Expand Down Expand Up @@ -159,21 +165,16 @@ func (r *Reconciler) reconcileKind(ctx context.Context, b *v1alpha1.Broker) (kme
return nil, fmt.Errorf("Failed to reconcile trigger channel: %v", err)
}

if triggerChan.Status.Address == nil {
logging.FromContext(ctx).Debug("Trigger Channel does not have an address", zap.Any("triggerChan", triggerChan))
b.Status.MarkTriggerChannelFailed("NoAddress", "Channel does not have an address.")
// Ok to return nil for error here, once channel address becomes available, this will get requeued.
return nil, nil
}
if url := triggerChan.Status.Address.GetURL(); url.Host == "" {
// We check the trigger Channel's address here because it is needed to create the Ingress Deployment.
if triggerChan.Status.Address == nil || triggerChan.Status.Address.URL == nil {
logging.FromContext(ctx).Debug("Trigger Channel does not have an address", zap.Any("triggerChan", triggerChan))
b.Status.MarkTriggerChannelFailed("NoAddress", "Channel does not have an address.")
// Ok to return nil for error here, once channel address becomes available, this will get requeued.
return nil, nil
}
b.Status.TriggerChannel = &chanMan.ref
b.Status.PropagateTriggerChannelReadiness(&triggerChan.Status)
// We need to downconvert from v1beta1 -> v1alpha1 since broker only handles v1alpha1.
channelStatus := &duckv1alpha1.ChannelableStatus{AddressStatus: pkgduckv1alpha1.AddressStatus{Address: &pkgduckv1alpha1.Addressable{Addressable: pkgduckv1beta1.Addressable{URL: triggerChan.Status.Address.URL}}}}
b.Status.PropagateTriggerChannelReadiness(channelStatus)

if err := r.reconcileFilterDeployment(ctx, b); err != nil {
logging.FromContext(ctx).Error("Problem reconciling filter Deployment", zap.Error(err))
Expand Down Expand Up @@ -305,7 +306,7 @@ func (r *Reconciler) reconcileFilterService(ctx context.Context, b *v1alpha1.Bro
}

// reconcileChannel reconciles Broker's 'b' underlying channel.
func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterface dynamic.ResourceInterface, channelObjRef corev1.ObjectReference, newChannel *unstructured.Unstructured, b *v1alpha1.Broker) (*duckv1alpha1.Channelable, error) {
func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterface dynamic.ResourceInterface, channelObjRef corev1.ObjectReference, newChannel *unstructured.Unstructured, b *v1alpha1.Broker) (*duckv1beta1.Channelable, error) {
lister, err := r.channelableTracker.ListerFor(channelObjRef)
if err != nil {
logging.FromContext(ctx).Error(fmt.Sprintf("Error getting lister for Channel: %s/%s", channelObjRef.Namespace, channelObjRef.Name), zap.Error(err))
Expand All @@ -322,7 +323,7 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf
return nil, err
}
logging.FromContext(ctx).Info(fmt.Sprintf("Created Channel: %s/%s", channelObjRef.Namespace, channelObjRef.Name), zap.Any("NewChannel", newChannel))
channelable := &duckv1alpha1.Channelable{}
channelable := &duckv1beta1.Channelable{}
err = duckapis.FromUnstructured(created, channelable)
if err != nil {
logging.FromContext(ctx).Error(fmt.Sprintf("Failed to convert to Channelable Object: %s/%s", channelObjRef.Namespace, channelObjRef.Name), zap.Any("createdChannel", created), zap.Error(err))
Expand All @@ -335,7 +336,7 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf
return nil, err
}
logging.FromContext(ctx).Debug(fmt.Sprintf("Found Channel: %s/%s", channelObjRef.Namespace, channelObjRef.Name))
channelable, ok := c.(*duckv1alpha1.Channelable)
channelable, ok := c.(*duckv1beta1.Channelable)
if !ok {
logging.FromContext(ctx).Error(fmt.Sprintf("Failed to convert to Channelable Object: %s/%s", channelObjRef.Namespace, channelObjRef.Name), zap.Error(err))
return nil, err
Expand Down Expand Up @@ -403,12 +404,12 @@ func (r *Reconciler) reconcileService(ctx context.Context, svc *corev1.Service)
}

// reconcileIngressDeploymentCRD reconciles the Ingress Deployment for a CRD backed channel.
func (r *Reconciler) reconcileIngressDeployment(ctx context.Context, b *v1alpha1.Broker, c *duckv1alpha1.Channelable) error {
func (r *Reconciler) reconcileIngressDeployment(ctx context.Context, b *v1alpha1.Broker, c *duckv1beta1.Channelable) error {
expected := resources.MakeIngressDeployment(&resources.IngressArgs{
Broker: b,
Image: r.ingressImage,
ServiceAccountName: r.ingressServiceAccountName,
ChannelAddress: c.Status.Address.GetURL().Host,
ChannelAddress: c.Status.Address.URL.Host,
})
return r.reconcileDeployment(ctx, expected)
}
Expand Down Expand Up @@ -472,7 +473,7 @@ func (r *Reconciler) propagateBrokerStatusToTriggers(ctx context.Context, namesp
if bs == nil {
trigger.Status.MarkBrokerFailed("BrokerDoesNotExist", "Broker %q does not exist", name)
} else {
trigger.Status.PropagateBrokerStatus(bs)
trigger.Status.PropagateBrokerCondition(bs.GetTopLevelCondition())
}
if _, updateStatusErr := r.updateTriggerStatus(ctx, trigger); updateStatusErr != nil {
logging.FromContext(ctx).Error("Failed to update Trigger status", zap.Error(updateStatusErr))
Expand Down
Loading