Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flows v1beta1 #2407

Merged
merged 7 commits into from
Jan 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions hack/update-codegen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ KNATIVE_CODEGEN_PKG=${KNATIVE_CODEGEN_PKG:-$(cd ${REPO_ROOT_DIR}; ls -d -1 $(dir
# instead of the $GOPATH directly. For normal projects this can be dropped.
${CODEGEN_PKG}/generate-groups.sh "deepcopy,client,informer,lister" \
knative.dev/eventing/pkg/client knative.dev/eventing/pkg/apis \
"eventing:v1alpha1 messaging:v1alpha1 messaging:v1beta1 flows:v1alpha1 sources:v1alpha1" \
"eventing:v1alpha1 messaging:v1alpha1 messaging:v1beta1 flows:v1alpha1 flows:v1beta1 sources:v1alpha1" \
--go-header-file ${REPO_ROOT_DIR}/hack/boilerplate/boilerplate.go.txt

# TODO(#2312): Remove this after v0.13.
Expand All @@ -48,7 +48,7 @@ ${CODEGEN_PKG}/generate-groups.sh "deepcopy" \
# Knative Injection
${KNATIVE_CODEGEN_PKG}/hack/generate-knative.sh "injection" \
knative.dev/eventing/pkg/client knative.dev/eventing/pkg/apis \
"eventing:v1alpha1 messaging:v1alpha1 messaging:v1beta1 flows:v1alpha1 sources:v1alpha1 duck:v1alpha1 duck:v1beta1" \
"eventing:v1alpha1 messaging:v1alpha1 messaging:v1beta1 flows:v1alpha1 flows:v1beta1 sources:v1alpha1 duck:v1alpha1 duck:v1beta1" \
--go-header-file ${REPO_ROOT_DIR}/hack/boilerplate/boilerplate.go.txt

# TODO(#2312): Remove this after v0.13.
Expand Down
20 changes: 20 additions & 0 deletions pkg/apis/flows/v1beta1/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package v1beta1 is the v1beta1 version of the API.
// +k8s:deepcopy-gen=package
// +groupName=flows.knative.dev
package v1beta1
41 changes: 41 additions & 0 deletions pkg/apis/flows/v1beta1/implements_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2020 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package v1beta1

import (
"testing"

"knative.dev/pkg/apis/duck"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

func TestTypesImplements(t *testing.T) {
testCases := []struct {
instance interface{}
iface duck.Implementable
}{
// Sequence
{instance: &Sequence{}, iface: &duckv1.Conditions{}},
// Parallel
{instance: &Parallel{}, iface: &duckv1.Conditions{}},
}
for _, tc := range testCases {
if err := duck.VerifyType(tc.instance, tc.iface); err != nil {
t.Error(err)
}
}
}
37 changes: 37 additions & 0 deletions pkg/apis/flows/v1beta1/parallel_defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2020 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package v1beta1

import (
"context"

messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1"
)

func (s *Parallel) SetDefaults(ctx context.Context) {
if s != nil && s.Spec.ChannelTemplate == nil {
// The singleton may not have been set, if so ignore it and validation will reject the
// Channel.
if cd := messagingv1beta1.ChannelDefaulterSingleton; cd != nil {
channelTemplate := cd.GetDefault(s.Namespace)
s.Spec.ChannelTemplate = channelTemplate
}
}
s.Spec.SetDefaults(ctx)
}

func (ss *ParallelSpec) SetDefaults(ctx context.Context) {}
200 changes: 200 additions & 0 deletions pkg/apis/flows/v1beta1/parallel_lifecycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Copyright 2020 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package v1beta1

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1"
"knative.dev/pkg/apis"
pkgduckv1 "knative.dev/pkg/apis/duck/v1"
)

var pParallelCondSet = apis.NewLivingConditionSet(ParallelConditionReady, ParallelConditionChannelsReady, ParallelConditionSubscriptionsReady, ParallelConditionAddressable)

const (
// ParallelConditionReady has status True when all subconditions below have been set to True.
ParallelConditionReady = apis.ConditionReady

// ParallelConditionChannelsReady has status True when all the channels created as part of
// this parallel are ready.
ParallelConditionChannelsReady apis.ConditionType = "ChannelsReady"

// ParallelConditionSubscriptionsReady has status True when all the subscriptions created as part of
// this parallel are ready.
ParallelConditionSubscriptionsReady apis.ConditionType = "SubscriptionsReady"

// ParallelConditionAddressable has status true when this Parallel meets
// the Addressable contract and has a non-empty hostname.
ParallelConditionAddressable apis.ConditionType = "Addressable"
)

// GetGroupVersionKind returns GroupVersionKind for Parallel
func (p *Parallel) GetGroupVersionKind() schema.GroupVersionKind {
return SchemeGroupVersion.WithKind("Parallel")
}

// GetUntypedSpec returns the spec of the Parallel.
func (p *Parallel) GetUntypedSpec() interface{} {
return p.Spec
}

// GetCondition returns the condition currently associated with the given type, or nil.
func (ps *ParallelStatus) GetCondition(t apis.ConditionType) *apis.Condition {
return pParallelCondSet.Manage(ps).GetCondition(t)
}

// IsReady returns true if the resource is ready overall.
func (ps *ParallelStatus) IsReady() bool {
return pParallelCondSet.Manage(ps).IsHappy()
}

// InitializeConditions sets relevant unset conditions to Unknown state.
func (ps *ParallelStatus) InitializeConditions() {
pParallelCondSet.Manage(ps).InitializeConditions()
}

// PropagateSubscriptionStatuses sets the ParallelConditionSubscriptionsReady based on
// the status of the incoming subscriptions.
func (ps *ParallelStatus) PropagateSubscriptionStatuses(filterSubscriptions []*messagingv1beta1.Subscription, subscriptions []*messagingv1beta1.Subscription) {
if ps.BranchStatuses == nil {
ps.BranchStatuses = make([]ParallelBranchStatus, len(subscriptions))
}
allReady := true
// If there are no subscriptions, treat that as a False branch. Could go either way, but this seems right.
if len(subscriptions) == 0 {
allReady = false
}

for i, s := range subscriptions {
ps.BranchStatuses[i].SubscriptionStatus = ParallelSubscriptionStatus{
Subscription: corev1.ObjectReference{
APIVersion: s.APIVersion,
Kind: s.Kind,
Name: s.Name,
Namespace: s.Namespace,
},
}

readyCondition := s.Status.GetTopLevelCondition()
if readyCondition != nil {
ps.BranchStatuses[i].SubscriptionStatus.ReadyCondition = *readyCondition
if readyCondition.Status != corev1.ConditionTrue {
allReady = false
}
} else {
allReady = false
}

fs := filterSubscriptions[i]
ps.BranchStatuses[i].FilterSubscriptionStatus = ParallelSubscriptionStatus{
Subscription: corev1.ObjectReference{
APIVersion: fs.APIVersion,
Kind: fs.Kind,
Name: fs.Name,
Namespace: fs.Namespace,
},
}
readyCondition = fs.Status.GetCondition(messagingv1beta1.SubscriptionConditionReady)
if readyCondition != nil {
ps.BranchStatuses[i].FilterSubscriptionStatus.ReadyCondition = *readyCondition
if readyCondition.Status != corev1.ConditionTrue {
allReady = false
}
} else {
allReady = false
}

}
if allReady {
pParallelCondSet.Manage(ps).MarkTrue(ParallelConditionSubscriptionsReady)
} else {
ps.MarkSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none")
}
}

// PropagateChannelStatuses sets the ChannelStatuses and ParallelConditionChannelsReady based on the
// status of the incoming channels.
func (ps *ParallelStatus) PropagateChannelStatuses(ingressChannel *duckv1beta1.Channelable, channels []*duckv1beta1.Channelable) {
if ps.BranchStatuses == nil {
ps.BranchStatuses = make([]ParallelBranchStatus, len(channels))
}
allReady := true

ps.IngressChannelStatus.Channel = corev1.ObjectReference{
APIVersion: ingressChannel.APIVersion,
Kind: ingressChannel.Kind,
Name: ingressChannel.Name,
Namespace: ingressChannel.Namespace,
}

address := ingressChannel.Status.AddressStatus.Address
if address != nil {
ps.IngressChannelStatus.ReadyCondition = apis.Condition{Type: apis.ConditionReady, Status: corev1.ConditionTrue}
} else {
ps.IngressChannelStatus.ReadyCondition = apis.Condition{Type: apis.ConditionReady, Status: corev1.ConditionFalse, Reason: "NotAddressable", Message: "Channel is not addressable"}
allReady = false
}
ps.setAddress(address)

for i, c := range channels {
ps.BranchStatuses[i].FilterChannelStatus = ParallelChannelStatus{
Channel: corev1.ObjectReference{
APIVersion: c.APIVersion,
Kind: c.Kind,
Name: c.Name,
Namespace: c.Namespace,
},
}
// TODO: Once the addressable has a real status to dig through, use that here instead of
// addressable, because it might be addressable but not ready.
address := c.Status.AddressStatus.Address
if address != nil {
ps.BranchStatuses[i].FilterChannelStatus.ReadyCondition = apis.Condition{Type: apis.ConditionReady, Status: corev1.ConditionTrue}
} else {
ps.BranchStatuses[i].FilterChannelStatus.ReadyCondition = apis.Condition{Type: apis.ConditionReady, Status: corev1.ConditionFalse, Reason: "NotAddressable", Message: "Channel is not addressable"}
allReady = false
}
}
if allReady {
pParallelCondSet.Manage(ps).MarkTrue(ParallelConditionChannelsReady)
} else {
ps.MarkChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none")
}
}

func (ps *ParallelStatus) MarkChannelsNotReady(reason, messageFormat string, messageA ...interface{}) {
pParallelCondSet.Manage(ps).MarkFalse(ParallelConditionChannelsReady, reason, messageFormat, messageA...)
}

func (ps *ParallelStatus) MarkSubscriptionsNotReady(reason, messageFormat string, messageA ...interface{}) {
pParallelCondSet.Manage(ps).MarkFalse(ParallelConditionSubscriptionsReady, reason, messageFormat, messageA...)
}

func (ps *ParallelStatus) MarkAddressableNotReady(reason, messageFormat string, messageA ...interface{}) {
pParallelCondSet.Manage(ps).MarkFalse(ParallelConditionAddressable, reason, messageFormat, messageA...)
}

func (ps *ParallelStatus) setAddress(address *pkgduckv1.Addressable) {
ps.Address = address
if address == nil {
pParallelCondSet.Manage(ps).MarkFalse(ParallelConditionAddressable, "emptyAddress", "addressable is nil")
} else {
pParallelCondSet.Manage(ps).MarkTrue(ParallelConditionAddressable)
}
}
Loading