Skip to content

Commit

Permalink
Implement #4057 Channeleable.Delivery (#4652)
Browse files Browse the repository at this point in the history
* Implement #4057

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Fix

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Retry

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Try to fix that ducktype

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Reverted yaml usage

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Now this should just works

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Now it works!

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Just some doc!

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
  • Loading branch information
slinkydeveloper authored Dec 17, 2020
1 parent 11b4b71 commit 9643baf
Show file tree
Hide file tree
Showing 13 changed files with 475 additions and 40 deletions.
68 changes: 66 additions & 2 deletions pkg/apis/messaging/v1/channel_template_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@ limitations under the License.
package v1

import (
"encoding/json"

"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"

v1 "knative.dev/eventing/pkg/apis/duck/v1"
)

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand All @@ -40,8 +45,67 @@ type ChannelTemplateSpecInternal struct {
// +optional
metav1.ObjectMeta `json:"metadata,omitempty"`

// Spec defines the Spec to use for each channel created. Passed
// Spec includes the Channel CR ChannelableSpec and the physical channel spec.
// In order to create a new ChannelTemplateSpecInternalSpec, you must use NewChannelTemplateSpecInternalSpec
Spec *ChannelTemplateSpecInternalSpec `json:"spec,omitempty"`
}

// ChannelTemplateSpecInternalSpec merges the "general" spec from Channel CR and the template of the physical channel spec.
// Note that this struct properly implements only Marshalling, unmarshalling doesn't work!
type ChannelTemplateSpecInternalSpec struct {
// ChannelableSpec includes the fields from the Channel Spec section
v1.ChannelableSpec

// PhysicalChannelSpec includes the fields from the physical channel Spec. Passed
// in verbatim to the Channel CRD as Spec section.
// +optional
Spec *runtime.RawExtension `json:"spec,omitempty"`
PhysicalChannelSpec *runtime.RawExtension
}

// NewChannelTemplateSpecInternalSpec creates a new ChannelTemplateSpecInternalSpec, returning nil if channelableSpec is empty and physicalChannelSpec is nil.
func NewChannelTemplateSpecInternalSpec(channelableSpec v1.ChannelableSpec, physicalChannelSpec *runtime.RawExtension) *ChannelTemplateSpecInternalSpec {
if physicalChannelSpec == nil && equality.Semantic.DeepEqual(channelableSpec, v1.ChannelableSpec{}) {
return nil
}
return &ChannelTemplateSpecInternalSpec{
ChannelableSpec: channelableSpec,
PhysicalChannelSpec: physicalChannelSpec,
}
}

func (s ChannelTemplateSpecInternalSpec) MarshalJSON() ([]byte, error) {
// Check if empty
if s.PhysicalChannelSpec == nil && equality.Semantic.DeepEqual(s.ChannelableSpec, v1.ChannelableSpec{}) {
return []byte{}, nil
}

// Let's merge the channel template spec and the channelable spec from channel
channelableSpec := make(map[string]interface{})
physicalChannelTemplateSpec := make(map[string]interface{})

rawChannelSpec, err := json.Marshal(s.ChannelableSpec)
if err != nil {
return nil, err
}
if err := json.Unmarshal(rawChannelSpec, &channelableSpec); err != nil {
return nil, err
}

if s.PhysicalChannelSpec != nil {
rawPhysicalChannelTemplateSpec, err := json.Marshal(s.PhysicalChannelSpec)
if err != nil {
return nil, err
}
if err := json.Unmarshal(rawPhysicalChannelTemplateSpec, &physicalChannelTemplateSpec); err != nil {
return nil, err
}
}

// Merge the two maps into channelableSpec
for k, v := range physicalChannelTemplateSpec {
channelableSpec[k] = v
}

// Just return the merged map marshalled
return json.Marshal(channelableSpec)
}
24 changes: 23 additions & 1 deletion pkg/apis/messaging/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 63 additions & 8 deletions pkg/reconciler/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
clientgotesting "k8s.io/client-go/testing"
"k8s.io/utils/pointer"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
fakedynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake"
logtesting "knative.dev/pkg/logging/testing"
"knative.dev/pkg/network"
. "knative.dev/pkg/reconciler/testing"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
v1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
Expand All @@ -35,14 +45,6 @@ import (
channelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/channel"
"knative.dev/eventing/pkg/duck"
. "knative.dev/eventing/pkg/reconciler/testing/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
fakedynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake"
logtesting "knative.dev/pkg/logging/testing"
"knative.dev/pkg/network"
. "knative.dev/pkg/reconciler/testing"
)

const (
Expand All @@ -54,6 +56,10 @@ var (
testKey = fmt.Sprintf("%s/%s", testNS, channelName)

backingChannelHostname = network.GetServiceHostname("foo", "bar")

deliverySpec = &eventingduckv1.DeliverySpec{
Retry: pointer.Int32Ptr(10),
}
)

func init() {
Expand Down Expand Up @@ -176,6 +182,55 @@ func TestReconcile(t *testing.T) {
WithChannelNoAddress(),
WithBackingChannelUnknown("BackingChannelNotConfigured", "BackingChannel has not yet been reconciled.")),
}},
}, {
Name: "Backing channel created with delivery",
Key: testKey,
Objects: []runtime.Object{
NewChannel(channelName, testNS,
WithChannelTemplate(channelCRD()),
WithInitChannelConditions,
WithBackingChannelObjRef(backingChannelObjRef()),
WithBackingChannelReady,
WithChannelDelivery(deliverySpec),
WithChannelAddress(backingChannelHostname)),
},
WantCreates: []runtime.Object{
&unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "messaging.knative.dev/v1",
"kind": "InMemoryChannel",
"metadata": map[string]interface{}{
"creationTimestamp": nil,
"namespace": testNS,
"name": channelName,
"ownerReferences": []interface{}{
map[string]interface{}{
"apiVersion": "messaging.knative.dev/v1",
"blockOwnerDeletion": true,
"controller": true,
"kind": "Channel",
"name": channelName,
"uid": "",
},
},
},
"spec": map[string]interface{}{
"delivery": map[string]interface{}{
"retry": int64(10),
},
},
},
},
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewChannel(channelName, testNS,
WithChannelTemplate(channelCRD()),
WithInitChannelConditions,
WithBackingChannelObjRef(backingChannelObjRef()),
WithChannelNoAddress(),
WithChannelDelivery(deliverySpec),
WithBackingChannelUnknown("BackingChannelNotConfigured", "BackingChannel has not yet been reconciled.")),
}},
}, {
Name: "Generation Bump",
Key: testKey,
Expand Down
7 changes: 5 additions & 2 deletions pkg/reconciler/channel/resources/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package resources
import (
"encoding/json"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"knative.dev/pkg/kmeta"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "knative.dev/eventing/pkg/apis/messaging/v1"
)

Expand All @@ -41,7 +41,10 @@ func NewChannel(c *v1.Channel) (*unstructured.Unstructured, error) {
Name: c.Name,
Namespace: c.Namespace,
},
Spec: c.Spec.ChannelTemplate.Spec,
Spec: v1.NewChannelTemplateSpecInternalSpec(
c.Spec.ChannelableSpec,
c.Spec.ChannelTemplate.Spec,
),
}
raw, err := json.Marshal(template)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/reconciler/mtbroker/resources/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/pkg/kmeta"

Expand Down Expand Up @@ -53,7 +54,7 @@ func NewChannel(channelType string, owner kmeta.OwnerRefable, channelTemplate *m
Labels: l,
Annotations: map[string]string{eventing.ScopeAnnotationKey: eventing.ScopeCluster},
},
Spec: channelTemplate.Spec,
Spec: messagingv1.NewChannelTemplateSpecInternalSpec(duckv1.ChannelableSpec{}, channelTemplate.Spec),
}
raw, err := json.Marshal(template)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/reconciler/parallel/resources/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"knative.dev/pkg/kmeta"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
v1 "knative.dev/eventing/pkg/apis/flows/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
)
Expand Down Expand Up @@ -54,7 +55,7 @@ func NewChannel(name string, p *v1.Parallel) (*unstructured.Unstructured, error)
Name: name,
Namespace: p.Namespace,
},
Spec: p.Spec.ChannelTemplate.Spec,
Spec: messagingv1.NewChannelTemplateSpecInternalSpec(duckv1.ChannelableSpec{}, p.Spec.ChannelTemplate.Spec),
}
raw, err := json.Marshal(template)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/reconciler/sequence/resources/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"knative.dev/pkg/kmeta"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
v1 "knative.dev/eventing/pkg/apis/flows/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
)
Expand All @@ -49,7 +50,7 @@ func NewChannel(name string, p *v1.Sequence) (*unstructured.Unstructured, error)
Name: name,
Namespace: p.Namespace,
},
Spec: p.Spec.ChannelTemplate.Spec,
Spec: messagingv1.NewChannelTemplateSpecInternalSpec(duckv1.ChannelableSpec{}, p.Spec.ChannelTemplate.Spec),
}
raw, err := json.Marshal(template)
if err != nil {
Expand Down
Loading

0 comments on commit 9643baf

Please sign in to comment.