Skip to content

Commit

Permalink
[release-1.14] Upgrade knative.dev/eventing to latest 1.14 (#4113)
Browse files Browse the repository at this point in the history
* Upgrade knative.dev/eventing to latest 1.14

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Pass PodLister as expected

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Migrate to library SchedulerFunc to use the new signature

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

---------

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi authored Sep 24, 2024
1 parent 456b2d7 commit 1754676
Show file tree
Hide file tree
Showing 19 changed files with 275 additions and 275 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func (r *Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGr
return cg.MarkScheduleConsumerFailed("Schedule", err)
}

placements, err := statefulSetScheduler.Schedule(cg)
placements, err := statefulSetScheduler.Schedule(ctx, cg)
if err != nil {
return cg.MarkScheduleConsumerFailed("Schedule", err)
}
Expand Down
48 changes: 21 additions & 27 deletions control-plane/pkg/reconciler/consumergroup/consumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,6 @@ import (
kedaclient "knative.dev/eventing-kafka-broker/third_party/pkg/client/injection/client/fake"
)

type SchedulerFunc func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error)

func (f SchedulerFunc) Schedule(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return f(vpod)
}

const (
testSchedulerKey = "scheduler"
noTestScheduler = "no-scheduler"
Expand Down Expand Up @@ -102,7 +96,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -189,7 +183,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -307,7 +301,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -402,7 +396,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -528,7 +522,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -702,7 +696,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -877,7 +871,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -1034,7 +1028,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
}, nil
Expand Down Expand Up @@ -1121,7 +1115,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -1208,7 +1202,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -1303,7 +1297,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 2},
Expand Down Expand Up @@ -1426,7 +1420,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 2},
Expand Down Expand Up @@ -1533,7 +1527,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -1630,7 +1624,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, io.EOF
}),
},
Expand Down Expand Up @@ -1762,7 +1756,7 @@ func TestReconcileKindNoAutoscaler(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -1926,7 +1920,7 @@ func TestFinalizeKind(t *testing.T) {
},
Key: testKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, nil
}),
},
Expand Down Expand Up @@ -1995,7 +1989,7 @@ func TestFinalizeKind(t *testing.T) {
},
Key: testKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, nil
}),
},
Expand Down Expand Up @@ -2121,7 +2115,7 @@ func TestFinalizeKind(t *testing.T) {
},
Key: testKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, nil
}),
},
Expand Down Expand Up @@ -2167,7 +2161,7 @@ func TestFinalizeKind(t *testing.T) {
},
Key: testKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, nil
}),
kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrUnknownTopicOrPartition,
Expand Down Expand Up @@ -2214,7 +2208,7 @@ func TestFinalizeKind(t *testing.T) {
},
Key: testKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, nil
}),
kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrGroupIDNotFound,
Expand Down Expand Up @@ -2262,7 +2256,7 @@ func TestFinalizeKind(t *testing.T) {
WantErr: true,
Key: testKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, nil
}),
kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrClusterAuthorizationFailed,
Expand Down
17 changes: 10 additions & 7 deletions control-plane/pkg/reconciler/consumergroup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/storage/names"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"

"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing"
Expand Down Expand Up @@ -106,6 +107,9 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
logger.Panicf("unable to process required environment variables: %v", err)
}

dispatcherPodInformer := podinformer.Get(ctx, eventing.DispatcherLabelSelectorStr)
dispatcherPodLister := dispatcherPodInformer.Lister()

c := SchedulerConfig{
RefreshPeriod: time.Duration(env.SchedulerRefreshPeriod) * time.Second,
Capacity: env.PodCapacity,
Expand All @@ -114,13 +118,10 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
}

schedulers := map[string]Scheduler{
KafkaSourceScheduler: createKafkaScheduler(ctx, c, kafkainternals.SourceStatefulSetName),
KafkaTriggerScheduler: createKafkaScheduler(ctx, c, kafkainternals.BrokerStatefulSetName),
//KafkaChannelScheduler: createKafkaScheduler(ctx, c, kafkainternals.ChannelStatefulSetName), //To be added with channel/v2 reconciler version only
KafkaSourceScheduler: createKafkaScheduler(ctx, c, dispatcherPodLister, kafkainternals.SourceStatefulSetName),
KafkaTriggerScheduler: createKafkaScheduler(ctx, c, dispatcherPodLister, kafkainternals.BrokerStatefulSetName),
}

dispatcherPodInformer := podinformer.Get(ctx, eventing.DispatcherLabelSelectorStr)

r := &Reconciler{
SchedulerFunc: func(s string) (Scheduler, bool) { sched, ok := schedulers[strings.ToLower(s)]; return sched, ok },
ConsumerLister: consumer.Get(ctx).Lister(),
Expand Down Expand Up @@ -299,10 +300,11 @@ func enqueueConsumerGroupFromConsumer(enqueue func(name types.NamespacedName)) f
}
}

func createKafkaScheduler(ctx context.Context, c SchedulerConfig, ssName string) Scheduler {
func createKafkaScheduler(ctx context.Context, c SchedulerConfig, podLister corelisters.PodLister, ssName string) Scheduler {
lister := consumergroup.Get(ctx).Lister()
return createStatefulSetScheduler(
ctx,
podLister,
SchedulerConfig{
StatefulSetName: ssName,
RefreshPeriod: c.RefreshPeriod,
Expand Down Expand Up @@ -344,7 +346,7 @@ func getSelectorLabel(ssName string) map[string]string {
return selectorLabel
}

func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister scheduler.VPodLister) Scheduler {
func createStatefulSetScheduler(ctx context.Context, podLister corelisters.PodLister, c SchedulerConfig, lister scheduler.VPodLister) Scheduler {
ss, _ := statefulsetscheduler.New(ctx, &statefulsetscheduler.Config{
StatefulSetNamespace: system.Namespace(),
StatefulSetName: c.StatefulSetName,
Expand All @@ -357,6 +359,7 @@ func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister s
Evictor: newEvictor(ctx, zap.String("kafka.eventing.knative.dev/component", "evictor")).evict,
VPodLister: lister,
NodeLister: nodeinformer.Get(ctx).Lister(),
PodLister: podLister.Pods(system.Namespace()),
})

return Scheduler{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
k8s.io/apiserver v0.29.2
k8s.io/client-go v0.29.2
k8s.io/utils v0.0.0-20240102154912-e7106e64919e
knative.dev/eventing v0.41.0
knative.dev/eventing v0.41.7-0.20240923180940-09cb6334226f
knative.dev/hack v0.0.0-20240404013450-1133b37da8d7
knative.dev/pkg v0.0.0-20240416145024-0f34a8815650
knative.dev/reconciler-test v0.0.0-20240417065737-ca905cbb09a9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1270,8 +1270,8 @@ k8s.io/utils v0.0.0-20200912215256-4140de9c8800/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
knative.dev/eventing v0.41.0 h1:e38nejJiwEpFQI5JgcaT8JRUGZsQn05h1vBWSwNkroY=
knative.dev/eventing v0.41.0/go.mod h1:/DjKZGRcZtBx8FOSvOy3mhxzm1Wem7H3aofb6kHq/68=
knative.dev/eventing v0.41.7-0.20240923180940-09cb6334226f h1:OEpj00D31CAlJTm8xfimpK1haCrDIzVP+3axlNgY4bE=
knative.dev/eventing v0.41.7-0.20240923180940-09cb6334226f/go.mod h1:JjZn90agmsqMIQOqZ4oToWRrcAIvlqn7hlharxcTv/w=
knative.dev/hack v0.0.0-20240404013450-1133b37da8d7 h1:fkWYWvdHm1mVHevKW2vVJnZtxH0NzOlux8imesweKwE=
knative.dev/hack v0.0.0-20240404013450-1133b37da8d7/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q=
knative.dev/pkg v0.0.0-20240416145024-0f34a8815650 h1:m2ahFUO0L2VrgGDYdyOUFdE6xBd3pLXAJozLJwqLRQM=
Expand Down
4 changes: 3 additions & 1 deletion vendor/knative.dev/eventing/pkg/apis/feature/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package feature

import (
"fmt"
"log"
"strings"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -148,7 +149,8 @@ func NewFlagsConfigFromMap(data map[string]string) (Flags, error) {
} else if strings.Contains(k, NodeSelectorLabel) {
flags[sanitizedKey] = Flag(v)
} else {
return flags, fmt.Errorf("cannot parse the feature flag '%s' = '%s'", k, v)
flags[k] = Flag(v)
log.Printf("Warning: unknown feature flag value %q=%q\n", k, v)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,11 @@ const (

// ContainerSourceConditionReceiveAdapterReady has status True when the ContainerSource's ReceiveAdapter is ready.
ContainerSourceConditionReceiveAdapterReady apis.ConditionType = "ReceiveAdapterReady"

ContainerConditionOIDCIdentityCreated apis.ConditionType = "OIDCIdentityCreated"
)

var containerCondSet = apis.NewLivingConditionSet(
ContainerSourceConditionSinkBindingReady,
ContainerSourceConditionReceiveAdapterReady,
ContainerConditionOIDCIdentityCreated,
)

// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface.
Expand Down Expand Up @@ -66,23 +63,7 @@ func (s *ContainerSourceStatus) InitializeConditions() {
containerCondSet.Manage(s).InitializeConditions()
}

func (s *ContainerSourceStatus) MarkOIDCIdentityCreatedSucceeded() {
containerCondSet.Manage(s).MarkTrue(ContainerConditionOIDCIdentityCreated)
}

func (s *ContainerSourceStatus) MarkOIDCIdentityCreatedSucceededWithReason(reason, messageFormat string, messageA ...interface{}) {
containerCondSet.Manage(s).MarkTrueWithReason(ContainerConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (s *ContainerSourceStatus) MarkOIDCIdentityCreatedFailed(reason, messageFormat string, messageA ...interface{}) {
containerCondSet.Manage(s).MarkFalse(ContainerConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (s *ContainerSourceStatus) MarkOIDCIdentityCreatedUnknown(reason, messageFormat string, messageA ...interface{}) {
containerCondSet.Manage(s).MarkUnknown(ContainerConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

// PropagateSinkBindingStatus uses the availability of the provided Deployment to determine if
// PropagateSinkBindingStatus uses the SinkBinding to determine if
// ContainerSourceConditionSinkBindingReady should be marked as true, false or unknown.
func (s *ContainerSourceStatus) PropagateSinkBindingStatus(status *SinkBindingStatus) {
// Do not copy conditions nor observedGeneration
Expand All @@ -105,6 +86,9 @@ func (s *ContainerSourceStatus) PropagateSinkBindingStatus(status *SinkBindingSt
default:
containerCondSet.Manage(s).MarkUnknown(ContainerSourceConditionSinkBindingReady, cond.Reason, cond.Message)
}

// Propagate SinkBindings AuthStatus to containersources AuthStatus
s.Auth = status.Auth
}

// PropagateReceiveAdapterStatus uses the availability of the provided Deployment to determine if
Expand Down
Loading

0 comments on commit 1754676

Please sign in to comment.