diff --git a/pkg/apis/flows/v1/parallel_lifecycle.go b/pkg/apis/flows/v1/parallel_lifecycle.go index 18857479ab3..cccd20503e0 100644 --- a/pkg/apis/flows/v1/parallel_lifecycle.go +++ b/pkg/apis/flows/v1/parallel_lifecycle.go @@ -77,7 +77,7 @@ func (ps *ParallelStatus) InitializeConditions() { // PropagateSubscriptionStatuses sets the ParallelConditionSubscriptionsReady based on // the status of the incoming subscriptions. func (ps *ParallelStatus) PropagateSubscriptionStatuses(filterSubscriptions []*messagingv1.Subscription, subscriptions []*messagingv1.Subscription) { - if ps.BranchStatuses == nil { + if ps.BranchStatuses == nil || len(subscriptions) != len(ps.BranchStatuses) { ps.BranchStatuses = make([]ParallelBranchStatus, len(subscriptions)) } allReady := true @@ -136,7 +136,7 @@ func (ps *ParallelStatus) PropagateSubscriptionStatuses(filterSubscriptions []*m // PropagateChannelStatuses sets the ChannelStatuses and ParallelConditionChannelsReady based on the // status of the incoming channels. func (ps *ParallelStatus) PropagateChannelStatuses(ingressChannel *duckv1.Channelable, channels []*duckv1.Channelable) { - if ps.BranchStatuses == nil { + if ps.BranchStatuses == nil || len(channels) != len(ps.BranchStatuses) { ps.BranchStatuses = make([]ParallelBranchStatus, len(channels)) } allReady := true diff --git a/pkg/apis/flows/v1/parallel_lifecycle_test.go b/pkg/apis/flows/v1/parallel_lifecycle_test.go index d5fd532ec48..95320fd8f6c 100644 --- a/pkg/apis/flows/v1/parallel_lifecycle_test.go +++ b/pkg/apis/flows/v1/parallel_lifecycle_test.go @@ -293,6 +293,37 @@ func TestParallelPropagateChannelStatuses(t *testing.T) { } } +func TestParallelPropagateChannelStatusUpdated(t *testing.T) { + inChannel := getChannelable(true) + initialChannels := []*eventingduckv1.Channelable{getChannelable(true)} + afterChannels := []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)} + ps := ParallelStatus{} + ps.PropagateChannelStatuses(inChannel, initialChannels) + if len(ps.BranchStatuses) != 1 { + t.Errorf("unexpected branchstatuses want 1 got %d", len(ps.BranchStatuses)) + } + ps.PropagateChannelStatuses(inChannel, afterChannels) + if len(ps.BranchStatuses) != 2 { + t.Errorf("unexpected branchstatuses want 2 got %d", len(ps.BranchStatuses)) + } +} + +func TestParallelPropagateSubscriptionStatusUpdated(t *testing.T) { + initialFsubs := []*messagingv1.Subscription{getSubscription("fsub0", true)} + initialSubs := []*messagingv1.Subscription{getSubscription("sub0", true)} + afterFsubs := []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)} + afterSubs := []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)} + ps := ParallelStatus{} + ps.PropagateSubscriptionStatuses(initialFsubs, initialSubs) + if len(ps.BranchStatuses) != 1 { + t.Errorf("unexpected branchstatuses want 1 got %d", len(ps.BranchStatuses)) + } + ps.PropagateSubscriptionStatuses(afterFsubs, afterSubs) + if len(ps.BranchStatuses) != 2 { + t.Errorf("unexpected branchstatuses want 2 got %d", len(ps.BranchStatuses)) + } +} + func TestParallelReady(t *testing.T) { tests := []struct { name string