diff --git a/e2e/knative/files/display.groovy b/e2e/knative/files/display.groovy new file mode 100644 index 0000000000..43a595f6c1 --- /dev/null +++ b/e2e/knative/files/display.groovy @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +from('knative:channel/messages') + .convertBodyTo(String.class) + .to('log:info?showAll=false') diff --git a/e2e/knative/kamelet_test.go b/e2e/knative/kamelet_test.go new file mode 100644 index 0000000000..cfc90f3696 --- /dev/null +++ b/e2e/knative/kamelet_test.go @@ -0,0 +1,60 @@ +// +build integration + +// To enable compilation of this file in Goland, go to "Settings -> Go -> Vendoring & Build Tags -> Custom Tags" and add "integration" + +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package knative + +import ( + "testing" + + . "github.com/apache/camel-k/e2e/support" + camelv1 "github.com/apache/camel-k/pkg/apis/camel/v1" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + messaging "knative.dev/eventing/pkg/apis/messaging/v1beta1" +) + +// Test that kamelet binding can be changed and changes propagated to integrations +func TestKameletChange(t *testing.T) { + + WithNewTestNamespace(t, func(ns string) { + RegisterTestingT(t) + + Expect(Kamel("install", "-n", ns).Execute()).Should(BeNil()) + Expect(CreateTimerKamelet(ns, "timer-source")()).Should(BeNil()) + Expect(CreateKnativeChannelv1Beta1(ns, "messages")()).Should(BeNil()) + Expect(Kamel("run", "-n", ns, "files/display.groovy", "-w").Execute()).Should(BeNil()) + ref := v1.ObjectReference{ + Kind: "InMemoryChannel", + Name: "messages", + APIVersion: messaging.SchemeGroupVersion.String(), + } + Expect(BindKameletTo(ns, "timer-binding", "timer-source", ref, map[string]string{"message": "message is Hello"})()).Should(BeNil()) + Eventually(IntegrationPodPhase(ns, "timer-binding"), TestTimeoutMedium).Should(Equal(v1.PodRunning)) + Eventually(IntegrationCondition(ns, "timer-binding", camelv1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(v1.ConditionTrue)) + Eventually(IntegrationLogs(ns, "display"), TestTimeoutShort).Should(ContainSubstring("message is Hello")) + + Expect(BindKameletTo(ns, "timer-binding", "timer-source", ref, map[string]string{"message": "message is Hi"})()).Should(BeNil()) + Eventually(IntegrationPodPhase(ns, "timer-binding"), TestTimeoutMedium).Should(Equal(v1.PodRunning)) + Eventually(IntegrationCondition(ns, "timer-binding", camelv1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(v1.ConditionTrue)) + Eventually(IntegrationLogs(ns, "display"), TestTimeoutShort).Should(ContainSubstring("message is Hi")) + }) + +} diff --git a/e2e/support/test_support.go b/e2e/support/test_support.go index c070c87c59..e57260202c 100644 --- a/e2e/support/test_support.go +++ b/e2e/support/test_support.go @@ -23,6 +23,7 @@ package support import ( "context" + "encoding/json" "errors" "fmt" "io" @@ -33,9 +34,10 @@ import ( "testing" "time" + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" + "github.com/apache/camel-k/pkg/util/kubernetes" "github.com/google/uuid" "github.com/onsi/gomega" - "github.com/spf13/cobra" appsv1 "k8s.io/api/apps/v1" "k8s.io/api/batch/v1beta1" @@ -890,6 +892,92 @@ func CreateKnativeChannelv1Beta1(ns string, name string) func() error { } } +/* + Kamelets +*/ + +func CreateTimerKamelet(ns string, name string) func() error { + return func() error { + kamelet := v1alpha1.Kamelet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + Spec: v1alpha1.KameletSpec{ + Definition: v1alpha1.JSONSchemaProps{ + Properties: map[string]v1alpha1.JSONSchemaProps{ + "message": { + Type: "string", + }, + }, + }, + Flow: asFlow(map[string]interface{}{ + "from": map[string]interface{}{ + "uri": "timer:tick", + "steps": []map[string]interface{}{ + { + "set-body": map[string]interface{}{ + "constant": "{{message}}", + }, + }, + { + "to": "kamelet:sink", + }, + }, + }, + }), + }, + } + return TestClient.Create(TestContext, &kamelet) + } +} + +func BindKameletTo(ns, name, from string, to corev1.ObjectReference, properties map[string]string) func() error { + return func() error { + kb := v1alpha1.KameletBinding{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + Spec: v1alpha1.KameletBindingSpec{ + Source: v1alpha1.Endpoint{ + Ref: &corev1.ObjectReference{ + Kind: "Kamelet", + APIVersion: v1alpha1.SchemeGroupVersion.String(), + Name: from, + }, + Properties: asEndpointProperties(properties), + }, + Sink: v1alpha1.Endpoint{ + Ref: &to, + Properties: asEndpointProperties(map[string]string{}), + }, + }, + } + return kubernetes.ReplaceResource(TestContext, TestClient, &kb) + } +} + +func asFlow(source map[string]interface{}) *v1.Flow { + bytes, err := json.Marshal(source) + if err != nil { + panic(err) + } + return &v1.Flow{ + RawMessage: bytes, + } +} + +func asEndpointProperties(props map[string]string) v1alpha1.EndpointProperties { + bytes, err := json.Marshal(props) + if err != nil { + panic(err) + } + return v1alpha1.EndpointProperties{ + RawMessage: bytes, + } +} + /* Namespace testing functions */ diff --git a/pkg/cmd/reset.go b/pkg/cmd/reset.go index f6fa6f6cb8..5831d8537e 100644 --- a/pkg/cmd/reset.go +++ b/pkg/cmd/reset.go @@ -61,6 +61,14 @@ func (o *resetCmdOptions) reset(_ *cobra.Command, _ []string) { } var n int + if !o.SkipKameletBindings { + if n, err = o.deleteAllKameletBindings(c); err != nil { + fmt.Print(err) + return + } + fmt.Printf("%d kamelet bindings deleted from namespace %s\n", n, o.Namespace) + } + if !o.SkipIntegrations { if n, err = o.deleteAllIntegrations(c); err != nil { fmt.Print(err) @@ -77,14 +85,6 @@ func (o *resetCmdOptions) reset(_ *cobra.Command, _ []string) { fmt.Printf("%d integration kits deleted from namespace %s\n", n, o.Namespace) } - if !o.SkipKameletBindings { - if n, err = o.deleteAllKameletBindings(c); err != nil { - fmt.Print(err) - return - } - fmt.Printf("%d kamelet bindings deleted from namespace %s\n", n, o.Namespace) - } - if err = o.resetIntegrationPlatform(c); err != nil { fmt.Println(err) return diff --git a/pkg/controller/kameletbinding/common.go b/pkg/controller/kameletbinding/common.go new file mode 100644 index 0000000000..a50b2023fc --- /dev/null +++ b/pkg/controller/kameletbinding/common.go @@ -0,0 +1,139 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kameletbinding + +import ( + "context" + "encoding/json" + + v1 "github.com/apache/camel-k/pkg/apis/camel/v1" + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" + "github.com/apache/camel-k/pkg/client" + "github.com/apache/camel-k/pkg/platform" + "github.com/apache/camel-k/pkg/util/bindings" + "github.com/apache/camel-k/pkg/util/knative" + "github.com/pkg/errors" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding *v1alpha1.KameletBinding) (*v1.Integration, error) { + controller := true + blockOwnerDeletion := true + it := v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: kameletbinding.Namespace, + Name: kameletbinding.Name, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: kameletbinding.APIVersion, + Kind: kameletbinding.Kind, + Name: kameletbinding.Name, + UID: kameletbinding.UID, + Controller: &controller, + BlockOwnerDeletion: &blockOwnerDeletion, + }, + }, + }, + } + // start from the integration spec defined in the binding + if kameletbinding.Spec.Integration != nil { + it.Spec = *kameletbinding.Spec.Integration.DeepCopy() + } + + profile, err := determineProfile(ctx, c, kameletbinding) + if err != nil { + return nil, err + } + it.Spec.Profile = profile + + bindingContext := bindings.BindingContext{ + Ctx: ctx, + Client: c, + Namespace: it.Namespace, + Profile: profile, + } + + from, err := bindings.Translate(bindingContext, v1alpha1.EndpointTypeSource, kameletbinding.Spec.Source) + if err != nil { + return nil, errors.Wrap(err, "could not determine source URI") + } + to, err := bindings.Translate(bindingContext, v1alpha1.EndpointTypeSink, kameletbinding.Spec.Sink) + if err != nil { + return nil, errors.Wrap(err, "could not determine sink URI") + } + + if len(from.Traits) > 0 || len(to.Traits) > 0 { + if it.Spec.Traits == nil { + it.Spec.Traits = make(map[string]v1.TraitSpec) + } + for k, v := range from.Traits { + it.Spec.Traits[k] = v + } + for k, v := range to.Traits { + it.Spec.Traits[k] = v + } + } + + flow := map[string]interface{}{ + "from": map[string]interface{}{ + "uri": from.URI, + "steps": []map[string]interface{}{ + { + "to": to.URI, + }, + }, + }, + } + encodedFlow, err := json.Marshal(flow) + if err != nil { + return nil, err + } + it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedFlow}) + + return &it, nil +} + +func determineProfile(ctx context.Context, c client.Client, binding *v1alpha1.KameletBinding) (v1.TraitProfile, error) { + if binding.Spec.Integration != nil && binding.Spec.Integration.Profile != "" { + return binding.Spec.Integration.Profile, nil + } + pl, err := platform.GetCurrentPlatform(ctx, c, binding.Namespace) + if err != nil && !k8serrors.IsNotFound(err) { + return "", errors.Wrap(err, "error while retrieving the integration platform") + } + if pl != nil { + if pl.Status.Profile != "" { + return pl.Status.Profile, nil + } + if pl.Spec.Profile != "" { + return pl.Spec.Profile, nil + } + } + if knative.IsEnabledInNamespace(ctx, c, binding.Namespace) { + return v1.TraitProfileKnative, nil + } + if pl != nil { + // Determine profile from cluster type + plProfile := platform.GetProfile(pl) + if plProfile != "" { + return plProfile, nil + } + } + return v1.DefaultTraitProfile, nil +} diff --git a/pkg/controller/kameletbinding/initialize.go b/pkg/controller/kameletbinding/initialize.go index 8f794d69fc..cc9c38f16b 100644 --- a/pkg/controller/kameletbinding/initialize.go +++ b/pkg/controller/kameletbinding/initialize.go @@ -19,20 +19,13 @@ package kameletbinding import ( "context" - "encoding/json" "strings" - v1 "github.com/apache/camel-k/pkg/apis/camel/v1" "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" - "github.com/apache/camel-k/pkg/platform" - "github.com/apache/camel-k/pkg/util/bindings" - "github.com/apache/camel-k/pkg/util/knative" "github.com/apache/camel-k/pkg/util/kubernetes" "github.com/apache/camel-k/pkg/util/patch" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -55,80 +48,12 @@ func (action *initializeAction) CanHandle(kameletbinding *v1alpha1.KameletBindin } func (action *initializeAction) Handle(ctx context.Context, kameletbinding *v1alpha1.KameletBinding) (*v1alpha1.KameletBinding, error) { - controller := true - blockOwnerDeletion := true - it := v1.Integration{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: kameletbinding.Namespace, - Name: kameletbinding.Name, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: kameletbinding.APIVersion, - Kind: kameletbinding.Kind, - Name: kameletbinding.Name, - UID: kameletbinding.UID, - Controller: &controller, - BlockOwnerDeletion: &blockOwnerDeletion, - }, - }, - }, - } - // start from the integration spec defined in the binding - if kameletbinding.Spec.Integration != nil { - it.Spec = *kameletbinding.Spec.Integration.DeepCopy() - } - - profile, err := action.determineProfile(ctx, kameletbinding) - if err != nil { - return nil, err - } - it.Spec.Profile = profile - - bindingContext := bindings.BindingContext{ - Ctx: ctx, - Client: action.client, - Namespace: it.Namespace, - Profile: profile, - } - - from, err := bindings.Translate(bindingContext, v1alpha1.EndpointTypeSource, kameletbinding.Spec.Source) - if err != nil { - return nil, errors.Wrap(err, "could not determine source URI") - } - to, err := bindings.Translate(bindingContext, v1alpha1.EndpointTypeSink, kameletbinding.Spec.Sink) - if err != nil { - return nil, errors.Wrap(err, "could not determine sink URI") - } - - if len(from.Traits) > 0 || len(to.Traits) > 0 { - if it.Spec.Traits == nil { - it.Spec.Traits = make(map[string]v1.TraitSpec) - } - for k, v := range from.Traits { - it.Spec.Traits[k] = v - } - for k, v := range to.Traits { - it.Spec.Traits[k] = v - } - } - - flow := map[string]interface{}{ - "from": map[string]interface{}{ - "uri": from.URI, - "steps": []map[string]interface{}{ - { - "to": to.URI, - }, - }, - }, - } - encodedFlow, err := json.Marshal(flow) + it, err := createIntegrationFor(ctx, action.client, kameletbinding) if err != nil { return nil, err } - it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedFlow}) - if err := kubernetes.ReplaceResource(ctx, action.client, &it); err != nil { + if err := kubernetes.ReplaceResource(ctx, action.client, it); err != nil { return nil, errors.Wrap(err, "could not create integration for kamelet binding") } @@ -193,32 +118,3 @@ func (action *initializeAction) findIcon(ctx context.Context, binding *v1alpha1. } return kamelet.Annotations[v1alpha1.AnnotationIcon], nil } - -func (action *initializeAction) determineProfile(ctx context.Context, binding *v1alpha1.KameletBinding) (v1.TraitProfile, error) { - if binding.Spec.Integration != nil && binding.Spec.Integration.Profile != "" { - return binding.Spec.Integration.Profile, nil - } - pl, err := platform.GetCurrentPlatform(ctx, action.client, binding.Namespace) - if err != nil && !k8serrors.IsNotFound(err) { - return "", errors.Wrap(err, "error while retrieving the integration platform") - } - if pl != nil { - if pl.Status.Profile != "" { - return pl.Status.Profile, nil - } - if pl.Spec.Profile != "" { - return pl.Spec.Profile, nil - } - } - if knative.IsEnabledInNamespace(ctx, action.client, binding.Namespace) { - return v1.TraitProfileKnative, nil - } - if pl != nil { - // Determine profile from cluster type - plProfile := platform.GetProfile(pl) - if plProfile != "" { - return plProfile, nil - } - } - return v1.DefaultTraitProfile, nil -} diff --git a/pkg/controller/kameletbinding/kamelet_binding_controller.go b/pkg/controller/kameletbinding/kamelet_binding_controller.go index 459c69ca64..dbebfc35a3 100644 --- a/pkg/controller/kameletbinding/kamelet_binding_controller.go +++ b/pkg/controller/kameletbinding/kamelet_binding_controller.go @@ -21,6 +21,7 @@ import ( "context" "time" + v1 "github.com/apache/camel-k/pkg/apis/camel/v1" "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" "github.com/apache/camel-k/pkg/client" camelevent "github.com/apache/camel-k/pkg/event" @@ -84,6 +85,15 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { return err } + // Watch Integration to propagate changes downstream + err = c.Watch(&source.Kind{Type: &v1.Integration{}}, &handler.EnqueueRequestForOwner{ + OwnerType: &v1alpha1.KameletBinding{}, + IsController: false, + }) + if err != nil { + return err + } + return nil } diff --git a/pkg/controller/kameletbinding/monitor.go b/pkg/controller/kameletbinding/monitor.go index 9980dc515a..9cd768ae3c 100644 --- a/pkg/controller/kameletbinding/monitor.go +++ b/pkg/controller/kameletbinding/monitor.go @@ -19,12 +19,14 @@ package kameletbinding import ( "context" + v1 "github.com/apache/camel-k/pkg/apis/camel/v1" + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" ) // NewMonitorAction returns an action that monitors the kamelet binding after it's fully initialized @@ -52,10 +54,41 @@ func (action *monitorAction) Handle(ctx context.Context, kameletbinding *v1alpha Name: kameletbinding.Name, } it := v1.Integration{} - if err := action.client.Get(ctx, key, &it); err != nil { + if err := action.client.Get(ctx, key, &it); err != nil && k8serrors.IsNotFound(err) { + target := kameletbinding.DeepCopy() + // Rebuild the integration + target.Status.Phase = v1alpha1.KameletBindingPhaseNone + target.Status.SetCondition( + v1alpha1.KameletBindingConditionReady, + corev1.ConditionFalse, + "", + "", + ) + return target, nil + } else if err != nil { return nil, errors.Wrapf(err, "could not load integration for KameletBinding %q", kameletbinding.Name) } + // Check if the integration needs to be changed + expected, err := createIntegrationFor(ctx, action.client, kameletbinding) + if err != nil { + return nil, err + } + + if !equality.Semantic.DeepDerivative(expected.Spec, it.Spec) { + // KameletBinding has changed and needs rebuild + target := kameletbinding.DeepCopy() + // Rebuild the integration + target.Status.Phase = v1alpha1.KameletBindingPhaseNone + target.Status.SetCondition( + v1alpha1.KameletBindingConditionReady, + corev1.ConditionFalse, + "", + "", + ) + return target, nil + } + // Map integration phases to KameletBinding phases target := kameletbinding.DeepCopy() if it.Status.Phase == v1.IntegrationPhaseRunning {