Skip to content

Commit

Permalink
Fix apache#1785: propagate klb changes to integrations
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolaferraro committed Oct 26, 2020
1 parent 747d2be commit 34c9165
Show file tree
Hide file tree
Showing 8 changed files with 364 additions and 118 deletions.
20 changes: 20 additions & 0 deletions e2e/knative/files/display.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

from('knative:channel/messages')
.convertBodyTo(String.class)
.to('log:info?showAll=false')
60 changes: 60 additions & 0 deletions e2e/knative/kamelet_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// +build integration

// To enable compilation of this file in Goland, go to "Settings -> Go -> Vendoring & Build Tags -> Custom Tags" and add "integration"

/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package knative

import (
"testing"

. "github.com/apache/camel-k/e2e/support"
camelv1 "github.com/apache/camel-k/pkg/apis/camel/v1"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
messaging "knative.dev/eventing/pkg/apis/messaging/v1beta1"
)

// Test that kamelet binding can be changed and changes propagated to integrations
func TestKameletChange(t *testing.T) {

WithNewTestNamespace(t, func(ns string) {
RegisterTestingT(t)

Expect(Kamel("install", "-n", ns).Execute()).Should(BeNil())
Expect(CreateTimerKamelet(ns, "timer-source")()).Should(BeNil())
Expect(CreateKnativeChannelv1Beta1(ns, "messages")()).Should(BeNil())
Expect(Kamel("run", "-n", ns, "files/display.groovy", "-w").Execute()).Should(BeNil())
ref := v1.ObjectReference{
Kind: "InMemoryChannel",
Name: "messages",
APIVersion: messaging.SchemeGroupVersion.String(),
}
Expect(BindKameletTo(ns, "timer-binding", "timer-source", ref, map[string]string{"message": "message is Hello"})()).Should(BeNil())
Eventually(IntegrationPodPhase(ns, "timer-binding"), TestTimeoutMedium).Should(Equal(v1.PodRunning))
Eventually(IntegrationCondition(ns, "timer-binding", camelv1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(v1.ConditionTrue))
Eventually(IntegrationLogs(ns, "display"), TestTimeoutShort).Should(ContainSubstring("message is Hello"))

Expect(BindKameletTo(ns, "timer-binding", "timer-source", ref, map[string]string{"message": "message is Hi"})()).Should(BeNil())
Eventually(IntegrationPodPhase(ns, "timer-binding"), TestTimeoutMedium).Should(Equal(v1.PodRunning))
Eventually(IntegrationCondition(ns, "timer-binding", camelv1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(v1.ConditionTrue))
Eventually(IntegrationLogs(ns, "display"), TestTimeoutShort).Should(ContainSubstring("message is Hi"))
})

}
90 changes: 89 additions & 1 deletion e2e/support/test_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package support

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -33,9 +34,10 @@ import (
"testing"
"time"

"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/util/kubernetes"
"github.com/google/uuid"
"github.com/onsi/gomega"

"github.com/spf13/cobra"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/api/batch/v1beta1"
Expand Down Expand Up @@ -890,6 +892,92 @@ func CreateKnativeChannelv1Beta1(ns string, name string) func() error {
}
}

/*
Kamelets
*/

func CreateTimerKamelet(ns string, name string) func() error {
return func() error {
kamelet := v1alpha1.Kamelet{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Spec: v1alpha1.KameletSpec{
Definition: v1alpha1.JSONSchemaProps{
Properties: map[string]v1alpha1.JSONSchemaProps{
"message": {
Type: "string",
},
},
},
Flow: asFlow(map[string]interface{}{
"from": map[string]interface{}{
"uri": "timer:tick",
"steps": []map[string]interface{}{
{
"set-body": map[string]interface{}{
"constant": "{{message}}",
},
},
{
"to": "kamelet:sink",
},
},
},
}),
},
}
return TestClient.Create(TestContext, &kamelet)
}
}

func BindKameletTo(ns, name, from string, to corev1.ObjectReference, properties map[string]string) func() error {
return func() error {
kb := v1alpha1.KameletBinding{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Spec: v1alpha1.KameletBindingSpec{
Source: v1alpha1.Endpoint{
Ref: &corev1.ObjectReference{
Kind: "Kamelet",
APIVersion: v1alpha1.SchemeGroupVersion.String(),
Name: from,
},
Properties: asEndpointProperties(properties),
},
Sink: v1alpha1.Endpoint{
Ref: &to,
Properties: asEndpointProperties(map[string]string{}),
},
},
}
return kubernetes.ReplaceResource(TestContext, TestClient, &kb)
}
}

func asFlow(source map[string]interface{}) *v1.Flow {
bytes, err := json.Marshal(source)
if err != nil {
panic(err)
}
return &v1.Flow{
RawMessage: bytes,
}
}

func asEndpointProperties(props map[string]string) v1alpha1.EndpointProperties {
bytes, err := json.Marshal(props)
if err != nil {
panic(err)
}
return v1alpha1.EndpointProperties{
RawMessage: bytes,
}
}

/*
Namespace testing functions
*/
Expand Down
16 changes: 8 additions & 8 deletions pkg/cmd/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ func (o *resetCmdOptions) reset(_ *cobra.Command, _ []string) {
}

var n int
if !o.SkipKameletBindings {
if n, err = o.deleteAllKameletBindings(c); err != nil {
fmt.Print(err)
return
}
fmt.Printf("%d kamelet bindings deleted from namespace %s\n", n, o.Namespace)
}

if !o.SkipIntegrations {
if n, err = o.deleteAllIntegrations(c); err != nil {
fmt.Print(err)
Expand All @@ -77,14 +85,6 @@ func (o *resetCmdOptions) reset(_ *cobra.Command, _ []string) {
fmt.Printf("%d integration kits deleted from namespace %s\n", n, o.Namespace)
}

if !o.SkipKameletBindings {
if n, err = o.deleteAllKameletBindings(c); err != nil {
fmt.Print(err)
return
}
fmt.Printf("%d kamelet bindings deleted from namespace %s\n", n, o.Namespace)
}

if err = o.resetIntegrationPlatform(c); err != nil {
fmt.Println(err)
return
Expand Down
139 changes: 139 additions & 0 deletions pkg/controller/kameletbinding/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kameletbinding

import (
"context"
"encoding/json"

v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/client"
"github.com/apache/camel-k/pkg/platform"
"github.com/apache/camel-k/pkg/util/bindings"
"github.com/apache/camel-k/pkg/util/knative"
"github.com/pkg/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding *v1alpha1.KameletBinding) (*v1.Integration, error) {
controller := true
blockOwnerDeletion := true
it := v1.Integration{
ObjectMeta: metav1.ObjectMeta{
Namespace: kameletbinding.Namespace,
Name: kameletbinding.Name,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: kameletbinding.APIVersion,
Kind: kameletbinding.Kind,
Name: kameletbinding.Name,
UID: kameletbinding.UID,
Controller: &controller,
BlockOwnerDeletion: &blockOwnerDeletion,
},
},
},
}
// start from the integration spec defined in the binding
if kameletbinding.Spec.Integration != nil {
it.Spec = *kameletbinding.Spec.Integration.DeepCopy()
}

profile, err := determineProfile(ctx, c, kameletbinding)
if err != nil {
return nil, err
}
it.Spec.Profile = profile

bindingContext := bindings.BindingContext{
Ctx: ctx,
Client: c,
Namespace: it.Namespace,
Profile: profile,
}

from, err := bindings.Translate(bindingContext, v1alpha1.EndpointTypeSource, kameletbinding.Spec.Source)
if err != nil {
return nil, errors.Wrap(err, "could not determine source URI")
}
to, err := bindings.Translate(bindingContext, v1alpha1.EndpointTypeSink, kameletbinding.Spec.Sink)
if err != nil {
return nil, errors.Wrap(err, "could not determine sink URI")
}

if len(from.Traits) > 0 || len(to.Traits) > 0 {
if it.Spec.Traits == nil {
it.Spec.Traits = make(map[string]v1.TraitSpec)
}
for k, v := range from.Traits {
it.Spec.Traits[k] = v
}
for k, v := range to.Traits {
it.Spec.Traits[k] = v
}
}

flow := map[string]interface{}{
"from": map[string]interface{}{
"uri": from.URI,
"steps": []map[string]interface{}{
{
"to": to.URI,
},
},
},
}
encodedFlow, err := json.Marshal(flow)
if err != nil {
return nil, err
}
it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedFlow})

return &it, nil
}

func determineProfile(ctx context.Context, c client.Client, binding *v1alpha1.KameletBinding) (v1.TraitProfile, error) {
if binding.Spec.Integration != nil && binding.Spec.Integration.Profile != "" {
return binding.Spec.Integration.Profile, nil
}
pl, err := platform.GetCurrentPlatform(ctx, c, binding.Namespace)
if err != nil && !k8serrors.IsNotFound(err) {
return "", errors.Wrap(err, "error while retrieving the integration platform")
}
if pl != nil {
if pl.Status.Profile != "" {
return pl.Status.Profile, nil
}
if pl.Spec.Profile != "" {
return pl.Spec.Profile, nil
}
}
if knative.IsEnabledInNamespace(ctx, c, binding.Namespace) {
return v1.TraitProfileKnative, nil
}
if pl != nil {
// Determine profile from cluster type
plProfile := platform.GetProfile(pl)
if plProfile != "" {
return plProfile, nil
}
}
return v1.DefaultTraitProfile, nil
}
Loading

0 comments on commit 34c9165

Please sign in to comment.