Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow v1 conversion #3417

Merged
merged 2 commits into from
Jun 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion cmd/webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"

"knative.dev/eventing/pkg/logconfig"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
Expand All @@ -46,6 +45,8 @@ import (
"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1"
"knative.dev/eventing/pkg/apis/flows"
flowsv1 "knative.dev/eventing/pkg/apis/flows/v1"
flowsv1beta1 "knative.dev/eventing/pkg/apis/flows/v1beta1"
"knative.dev/eventing/pkg/apis/messaging"
channeldefaultconfig "knative.dev/eventing/pkg/apis/messaging/config"
Expand All @@ -55,6 +56,7 @@ import (
sourcesv1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1"
sourcesv1alpha2 "knative.dev/eventing/pkg/apis/sources/v1alpha2"
"knative.dev/eventing/pkg/leaderelection"
"knative.dev/eventing/pkg/logconfig"
"knative.dev/eventing/pkg/reconciler/sinkbinding"
)

Expand Down Expand Up @@ -93,6 +95,9 @@ var ourTypes = map[schema.GroupVersionKind]resourcesemantics.GenericCRD{
// v1beta1
flowsv1beta1.SchemeGroupVersion.WithKind("Parallel"): &flowsv1beta1.Parallel{},
flowsv1beta1.SchemeGroupVersion.WithKind("Sequence"): &flowsv1beta1.Sequence{},
// v1
flowsv1.SchemeGroupVersion.WithKind("Parallel"): &flowsv1.Parallel{},
flowsv1.SchemeGroupVersion.WithKind("Sequence"): &flowsv1.Sequence{},

// For group configs.knative.dev
configsv1alpha1.SchemeGroupVersion.WithKind("ConfigMapPropagation"): &configsv1alpha1.ConfigMapPropagation{},
Expand Down Expand Up @@ -226,6 +231,8 @@ func NewConversionController(ctx context.Context, cmw configmap.Watcher) *contro
eventingv1_ = eventingv1.SchemeGroupVersion.Version
messagingv1beta1_ = messagingv1beta1.SchemeGroupVersion.Version
messagingv1_ = messagingv1.SchemeGroupVersion.Version
flowsv1beta1_ = flowsv1beta1.SchemeGroupVersion.Version
flowsv1_ = flowsv1.SchemeGroupVersion.Version
sourcesv1alpha1_ = sourcesv1alpha1.SchemeGroupVersion.Version
sourcesv1alpha2_ = sourcesv1alpha2.SchemeGroupVersion.Version
)
Expand Down Expand Up @@ -264,6 +271,24 @@ func NewConversionController(ctx context.Context, cmw configmap.Watcher) *contro
},
},

// flows
flowsv1.Kind("Sequence"): {
DefinitionName: flows.SequenceResource.String(),
HubVersion: flowsv1beta1_,
Zygotes: map[string]conversion.ConvertibleObject{
flowsv1beta1_: &flowsv1beta1.Sequence{},
flowsv1_: &flowsv1.Sequence{},
},
},
flowsv1.Kind("Parallel"): {
DefinitionName: flows.ParallelResource.String(),
HubVersion: flowsv1beta1_,
Zygotes: map[string]conversion.ConvertibleObject{
flowsv1beta1_: &flowsv1beta1.Parallel{},
flowsv1_: &flowsv1.Parallel{},
},
},

// Sources
sourcesv1alpha2.Kind("ApiServerSource"): {
DefinitionName: sources.ApiServerSourceResource.String(),
Expand Down
28 changes: 16 additions & 12 deletions pkg/apis/duck/v1beta1/delivery_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ func (source *DeliverySpec) ConvertTo(ctx context.Context, to apis.Convertible)
case *eventingduckv1.DeliverySpec:
sink.Retry = source.Retry
sink.BackoffDelay = source.BackoffDelay
if *source.BackoffPolicy == BackoffPolicyLinear {
linear := eventingduckv1.BackoffPolicyLinear
sink.BackoffPolicy = &linear
} else if *source.BackoffPolicy == BackoffPolicyExponential {
exponential := eventingduckv1.BackoffPolicyExponential
sink.BackoffPolicy = &exponential
if source.BackoffPolicy != nil {
if *source.BackoffPolicy == BackoffPolicyLinear {
linear := eventingduckv1.BackoffPolicyLinear
sink.BackoffPolicy = &linear
} else if *source.BackoffPolicy == BackoffPolicyExponential {
exponential := eventingduckv1.BackoffPolicyExponential
sink.BackoffPolicy = &exponential
}
}
sink.DeadLetterSink = source.DeadLetterSink
return nil
Expand All @@ -51,12 +53,14 @@ func (sink *DeliverySpec) ConvertFrom(ctx context.Context, from apis.Convertible
case *eventingduckv1.DeliverySpec:
sink.Retry = source.Retry
sink.BackoffDelay = source.BackoffDelay
if *source.BackoffPolicy == eventingduckv1.BackoffPolicyLinear {
linear := BackoffPolicyLinear
sink.BackoffPolicy = &linear
} else if *source.BackoffPolicy == eventingduckv1.BackoffPolicyExponential {
exponential := BackoffPolicyExponential
sink.BackoffPolicy = &exponential
if source.BackoffPolicy != nil {
if *source.BackoffPolicy == eventingduckv1.BackoffPolicyLinear {
linear := BackoffPolicyLinear
sink.BackoffPolicy = &linear
} else if *source.BackoffPolicy == eventingduckv1.BackoffPolicyExponential {
exponential := BackoffPolicyExponential
sink.BackoffPolicy = &exponential
}
}
sink.DeadLetterSink = source.DeadLetterSink
return nil
Expand Down
132 changes: 128 additions & 4 deletions pkg/apis/flows/v1beta1/parallel_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,138 @@ import (
"fmt"

"knative.dev/pkg/apis"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/eventing/pkg/apis/flows/v1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Format Go code:

Suggested change
"knative.dev/eventing/pkg/apis/flows/v1"

messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1"
)

// ConvertTo implements apis.Convertible
func (source *Parallel) ConvertTo(ctx context.Context, sink apis.Convertible) error {
return fmt.Errorf("v1beta1 is the highest known version, got: %T", sink)
// Converts obj from v1beta1.Parallel into v1.Parallel
func (source *Parallel) ConvertTo(ctx context.Context, obj apis.Convertible) error {
switch sink := obj.(type) {
case *v1.Parallel:
sink.ObjectMeta = source.ObjectMeta

sink.Spec.Branches = make([]v1.ParallelBranch, len(source.Spec.Branches))
for i, b := range source.Spec.Branches {
sink.Spec.Branches[i] = v1.ParallelBranch{
Filter: b.Filter,
Subscriber: b.Subscriber,
Reply: b.Reply,
}

if b.Delivery != nil {
sink.Spec.Branches[i].Delivery = &eventingduckv1.DeliverySpec{}
if err := b.Delivery.ConvertTo(ctx, sink.Spec.Branches[i].Delivery); err != nil {
return err
}
}
}

if source.Spec.ChannelTemplate != nil {
sink.Spec.ChannelTemplate = &messagingv1.ChannelTemplateSpec{
TypeMeta: source.Spec.ChannelTemplate.TypeMeta,
Spec: source.Spec.ChannelTemplate.Spec,
}
}
sink.Spec.Reply = source.Spec.Reply

sink.Status.Status = source.Status.Status
sink.Status.AddressStatus = source.Status.AddressStatus

sink.Status.IngressChannelStatus = v1.ParallelChannelStatus{
Channel: source.Status.IngressChannelStatus.Channel,
ReadyCondition: source.Status.IngressChannelStatus.ReadyCondition,
}

if source.Status.BranchStatuses != nil {
sink.Status.BranchStatuses = make([]v1.ParallelBranchStatus, len(source.Status.BranchStatuses))
for i, b := range source.Status.BranchStatuses {
sink.Status.BranchStatuses[i] = v1.ParallelBranchStatus{
FilterSubscriptionStatus: v1.ParallelSubscriptionStatus{
Subscription: b.FilterSubscriptionStatus.Subscription,
ReadyCondition: b.FilterSubscriptionStatus.ReadyCondition,
},
FilterChannelStatus: v1.ParallelChannelStatus{
Channel: b.FilterChannelStatus.Channel,
ReadyCondition: b.FilterChannelStatus.ReadyCondition,
},
SubscriptionStatus: v1.ParallelSubscriptionStatus{
Subscription: b.SubscriptionStatus.Subscription,
ReadyCondition: b.SubscriptionStatus.ReadyCondition,
},
}
}
}

return nil
default:
return fmt.Errorf("Unknown conversion, got: %T", sink)
}
}

// ConvertFrom implements apis.Convertible
func (sink *Parallel) ConvertFrom(ctx context.Context, source apis.Convertible) error {
return fmt.Errorf("v1beta1 is the highest known version, got: %T", source)
// Converts obj from v1.Parallel into v1beta1.Parallel
func (sink *Parallel) ConvertFrom(ctx context.Context, obj apis.Convertible) error {
switch source := obj.(type) {
case *v1.Parallel:
sink.ObjectMeta = source.ObjectMeta

sink.Spec.Branches = make([]ParallelBranch, len(source.Spec.Branches))
for i, b := range source.Spec.Branches {
sink.Spec.Branches[i] = ParallelBranch{
Filter: b.Filter,
Subscriber: b.Subscriber,
Reply: b.Reply,
}
if b.Delivery != nil {
sink.Spec.Branches[i].Delivery = &eventingduckv1beta1.DeliverySpec{}
if err := sink.Spec.Branches[i].Delivery.ConvertFrom(ctx, b.Delivery); err != nil {
return err
}
}
}
if source.Spec.ChannelTemplate != nil {
sink.Spec.ChannelTemplate = &messagingv1beta1.ChannelTemplateSpec{
TypeMeta: source.Spec.ChannelTemplate.TypeMeta,
Spec: source.Spec.ChannelTemplate.Spec,
}
}
sink.Spec.Reply = source.Spec.Reply

sink.Status.Status = source.Status.Status
sink.Status.AddressStatus = source.Status.AddressStatus

sink.Status.IngressChannelStatus = ParallelChannelStatus{
Channel: source.Status.IngressChannelStatus.Channel,
ReadyCondition: source.Status.IngressChannelStatus.ReadyCondition,
}

if source.Status.BranchStatuses != nil {
sink.Status.BranchStatuses = make([]ParallelBranchStatus, len(source.Status.BranchStatuses))
for i, b := range source.Status.BranchStatuses {
sink.Status.BranchStatuses[i] = ParallelBranchStatus{
FilterSubscriptionStatus: ParallelSubscriptionStatus{
Subscription: b.FilterSubscriptionStatus.Subscription,
ReadyCondition: b.FilterSubscriptionStatus.ReadyCondition,
},
FilterChannelStatus: ParallelChannelStatus{
Channel: b.FilterChannelStatus.Channel,
ReadyCondition: b.FilterChannelStatus.ReadyCondition,
},
SubscriptionStatus: ParallelSubscriptionStatus{
Subscription: b.SubscriptionStatus.Subscription,
ReadyCondition: b.SubscriptionStatus.ReadyCondition,
},
}
}
}

return nil
default:
return fmt.Errorf("unknown version, got: %T", source)
}
}
Loading