Skip to content

Commit

Permalink
Fix broker_test.go, except 'Successful reconcile'.
Browse files Browse the repository at this point in the history
  • Loading branch information
Harwayne committed Apr 17, 2019
1 parent 8ccb32a commit 33d0f43
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 69 deletions.
38 changes: 21 additions & 17 deletions pkg/reconciler/v1alpha1/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ const (
controllerAgentName = "broker-controller"

// Name of the corev1.Events emitted from the reconciliation process.
brokerReconciled = "BrokerReconciled"
brokerReadinessChanged = "BrokerReadinessChanged"
brokerReconcileError = "BrokerReconcileError"
brokerUpdateStatusFailed = "BrokerUpdateStatusFailed"
ingressSubscriptionDeleteFailed = "IngressSubscriptionDeleteFailed"
ingressSubscriptionCreateFailed = "IngressSubscriptionCreateFailed"
Expand Down Expand Up @@ -141,16 +142,19 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err
return reconcile.Result{}, err
}

originalReadiness := broker.Status.IsReady()

// Reconcile this copy of the Broker and then write back any status updates regardless of
// whether the reconcile error out.
result, reconcileErr := r.reconcile(ctx, broker)
reconcileErr := r.reconcile(ctx, broker)
if reconcileErr != nil {
logging.FromContext(ctx).Error("Error reconciling Broker", zap.Error(reconcileErr))
} else if result.Requeue || result.RequeueAfter > 0 {
logging.FromContext(ctx).Debug("Broker reconcile requeuing")
r.recorder.Event(broker, corev1.EventTypeWarning, brokerReconcileError, fmt.Sprintf("Broker reconcile error: %v", reconcileErr))
} else {
logging.FromContext(ctx).Debug("Broker reconciled")
r.recorder.Event(broker, corev1.EventTypeNormal, brokerReconciled, "Broker reconciled")
if originalReadiness != broker.Status.IsReady() {
r.recorder.Event(broker, corev1.EventTypeNormal, brokerReadinessChanged, fmt.Sprintf("Broker readiness changed to %v", broker.Status.IsReady()))
}
}

if _, err = r.updateStatus(broker); err != nil {
Expand All @@ -160,10 +164,10 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err
}

// Requeue if the resource is not ready:
return result, reconcileErr
return reconcile.Result{}, reconcileErr
}

func (r *reconciler) reconcile(ctx context.Context, b *v1alpha1.Broker) (reconcile.Result, error) {
func (r *reconciler) reconcile(ctx context.Context, b *v1alpha1.Broker) error {
b.Status.InitializeConditions()

// 1. Trigger Channel is created for all events. Triggers will Subscribe to this Channel.
Expand All @@ -179,49 +183,49 @@ func (r *reconciler) reconcile(ctx context.Context, b *v1alpha1.Broker) (reconci

if b.DeletionTimestamp != nil {
// Everything is cleaned up by the garbage collector.
return reconcile.Result{}, nil
return nil
}

triggerChan, err := r.reconcileTriggerChannel(ctx, b)
if err != nil {
logging.FromContext(ctx).Error("Problem reconciling the trigger channel", zap.Error(err))
b.Status.MarkTriggerChannelFailed("ChannelFailure", "%v", err)
return reconcile.Result{}, err
return err
} else if triggerChan.Status.Address.Hostname == "" {
// We check the trigger Channel's address here because it is needed to create the Ingress
// Deployment.
logging.FromContext(ctx).Debug("Trigger Channel does not have an address", zap.Any("triggerChan", triggerChan))
b.Status.MarkTriggerChannelFailed("NoAddress", "Channel does not have an address.")
return reconcile.Result{}, nil
return nil
}
b.Status.PropagateTriggerChannelReadiness(&triggerChan.Status)

filterDeployment, err := r.reconcileFilterDeployment(ctx, b)
if err != nil {
logging.FromContext(ctx).Error("Problem reconciling filter Deployment", zap.Error(err))
b.Status.MarkFilterFailed("DeploymentFailure", "%v", err)
return reconcile.Result{}, err
return err
}
_, err = r.reconcileFilterService(ctx, b)
if err != nil {
logging.FromContext(ctx).Error("Problem reconciling filter Service", zap.Error(err))
b.Status.MarkFilterFailed("ServiceFailure", "%v", err)
return reconcile.Result{}, err
return err
}
b.Status.PropagateFilterDeploymentAvailability(filterDeployment)

ingressDeployment, err := r.reconcileIngressDeployment(ctx, b, triggerChan)
if err != nil {
logging.FromContext(ctx).Error("Problem reconciling ingress Deployment", zap.Error(err))
b.Status.MarkIngressFailed("DeploymentFailure", "%v", err)
return reconcile.Result{}, err
return err
}

svc, err := r.reconcileIngressService(ctx, b)
if err != nil {
logging.FromContext(ctx).Error("Problem reconciling ingress Service", zap.Error(err))
b.Status.MarkIngressFailed("ServiceFailure", "%v", err)
return reconcile.Result{}, err
return err
}
b.Status.PropagateIngressDeploymentAvailability(ingressDeployment)
b.Status.SetAddress(names.ServiceHostName(svc.Name, svc.Namespace))
Expand All @@ -230,19 +234,19 @@ func (r *reconciler) reconcile(ctx context.Context, b *v1alpha1.Broker) (reconci
if err != nil {
logging.FromContext(ctx).Error("Problem reconciling the ingress channel", zap.Error(err))
b.Status.MarkIngressChannelFailed("ChannelFailure", "%v", err)
return reconcile.Result{}, err
return err
}
b.Status.PropagateIngressChannelReadiness(&ingressChan.Status)

ingressSub, err := r.reconcileIngressSubscription(ctx, b, ingressChan, svc)
if err != nil {
logging.FromContext(ctx).Error("Problem reconciling the ingress subscription", zap.Error(err))
b.Status.MarkIngressSubscriptionFailed("SubscriptionFailure", "%v", err)
return reconcile.Result{}, err
return err
}
b.Status.PropagateIngressSubscriptionReadiness(&ingressSub.Status)

return reconcile.Result{}, nil
return nil
}

// updateStatus may in fact update the broker's finalizers in addition to the status.
Expand Down
Loading

0 comments on commit 33d0f43

Please sign in to comment.