Skip to content

Commit

Permalink
# This is a combination of 2 commits.
Browse files Browse the repository at this point in the history
# This is the 1st commit message:

apache#1199: use kubernetes events during reconciliation

# This is the commit message apache#2:

apache#1199: use kubernetes events during reconciliation
  • Loading branch information
nicolaferraro committed Feb 21, 2020
1 parent 9a7c4f7 commit 1fd56a6
Show file tree
Hide file tree
Showing 14 changed files with 152 additions and 16 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
/kamel
/license-check

# Config files
/kamel-config.yaml

# Released Packages
*.tar.gz

Expand Down
3 changes: 3 additions & 0 deletions akamel-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kamel:
install:
maven-repositories: '[https://repository.apache.org/content/repositories/orgapachecamel-1171]'
10 changes: 8 additions & 2 deletions pkg/apis/camel/v1/integration_types_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package v1

import (
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -209,12 +210,17 @@ func (in *Integration) SetIntegrationPlatform(platform *IntegrationPlatform) {
// SetIntegrationKit --
func (in *Integration) SetIntegrationKit(kit *IntegrationKit) {
cs := corev1.ConditionTrue

message := kit.Name
if kit.Status.Phase != IntegrationKitPhaseReady {
cs = corev1.ConditionFalse
if kit.Status.Phase == IntegrationKitPhaseNone {
message = fmt.Sprintf("creating a new integration kit")
} else {
message = fmt.Sprintf("integration kit %s is in state %q", kit.Name, kit.Status.Phase)
}
}

in.Status.SetCondition(IntegrationConditionKitAvailable, cs, IntegrationConditionKitAvailableReason, kit.Name)
in.Status.SetCondition(IntegrationConditionKitAvailable, cs, IntegrationConditionKitAvailableReason, message)
in.Status.Kit = kit.Name
in.Status.Image = kit.Status.Image
}
Expand Down
21 changes: 20 additions & 1 deletion pkg/cmd/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ import (
"runtime"
"time"

"github.com/apache/camel-k/pkg/client"
camellog "github.com/apache/camel-k/pkg/util/log"
"github.com/operator-framework/operator-sdk/pkg/k8sutil"
"github.com/operator-framework/operator-sdk/pkg/leader"
"github.com/operator-framework/operator-sdk/pkg/ready"
sdkVersion "github.com/operator-framework/operator-sdk/version"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"

"sigs.k8s.io/controller-runtime/pkg/client/config"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -43,6 +47,8 @@ import (
)

var log = logf.Log.WithName("cmd")

// GitCommit --
var GitCommit string

func printVersion() {
Expand Down Expand Up @@ -98,8 +104,21 @@ func Run() {
}
defer r.Unset() // nolint: errcheck

// Configure an event broadcaster
c, err := client.NewClient(false)
if err != nil {
log.Error(err, "cannot initialize client")
os.Exit(1)
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(camellog.WithName("events").Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.CoreV1().Events(namespace)})

// Create a new Cmd to provide shared dependencies and start components
mgr, err := manager.New(cfg, manager.Options{Namespace: namespace})
mgr, err := manager.New(cfg, manager.Options{
Namespace: namespace,
EventBroadcaster: eventBroadcaster,
})
if err != nil {
log.Error(err, "")
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (o *resetCmdOptions) reset(_ *cobra.Command, _ []string) {
}

if err = o.resetIntegrationPlatform(c); err != nil {
fmt.Print(err)
fmt.Println(err)
return
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import (

v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/client"
"github.com/apache/camel-k/pkg/gzip"
"github.com/apache/camel-k/pkg/trait"
"github.com/apache/camel-k/pkg/util"
"github.com/apache/camel-k/pkg/util/gzip"
"github.com/apache/camel-k/pkg/util/kubernetes"
k8slog "github.com/apache/camel-k/pkg/util/kubernetes/log"
"github.com/apache/camel-k/pkg/util/sync"
Expand Down
19 changes: 13 additions & 6 deletions pkg/controller/integration/integration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ package integration
import (
"context"

"github.com/apache/camel-k/pkg/events"
appsv1 "k8s.io/api/apps/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"

k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -58,8 +60,9 @@ func Add(mgr manager.Manager) error {
// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager, c client.Client) reconcile.Reconciler {
return &ReconcileIntegration{
client: c,
scheme: mgr.GetScheme(),
client: c,
scheme: mgr.GetScheme(),
recorder: mgr.GetEventRecorderFor("camel-k-integration-controller"),
}
}

Expand Down Expand Up @@ -206,8 +209,9 @@ var _ reconcile.Reconciler = &ReconcileIntegration{}
type ReconcileIntegration struct {
// This client, initialized using mgr.Client() above, is a split client
// that reads objects from the cache and writes to the apiserver
client client.Client
scheme *runtime.Scheme
client client.Client
scheme *runtime.Scheme
recorder record.EventRecorder
}

// Reconcile reads that state of the cluster for a Integration object and makes changes based on the state read
Expand Down Expand Up @@ -256,12 +260,14 @@ func (r *ReconcileIntegration) Reconcile(request reconcile.Request) (reconcile.R

newTarget, err := a.Handle(ctx, target)
if err != nil {
events.NotifyIntegrationError(r.recorder, &instance, newTarget, err)
return reconcile.Result{}, err
}

if newTarget != nil {
if r, err := r.update(ctx, &instance, newTarget); err != nil {
return r, err
if res, err := r.update(ctx, &instance, newTarget); err != nil {
events.NotifyIntegrationError(r.recorder, &instance, newTarget, err)
return res, err
}

if newTarget.Status.Phase != target.Status.Phase {
Expand All @@ -275,6 +281,7 @@ func (r *ReconcileIntegration) Reconcile(request reconcile.Request) (reconcile.R

// handle one action at time so the resource
// is always at its latest state
events.NotifyIntegrationUpdated(r.recorder, &instance, newTarget)
break
}
}
Expand Down
95 changes: 95 additions & 0 deletions pkg/events/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package events

import (
"fmt"

v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
)

const (
// ReasonIntegrationPhaseUpdated --
ReasonIntegrationPhaseUpdated = "IntegrationPhaseUpdated"
// ReasonIntegrationConditionChanged --
ReasonIntegrationConditionChanged = "IntegrationConditionChanged"
// ReasonIntegrationError
ReasonIntegrationError = "IntegrationError"
)

// NotifyIntegrationError automatically generates error events when the integration reconcile cycle phase has an error
func NotifyIntegrationError(recorder record.EventRecorder, old, new *v1.Integration, err error) {
it := old
if new != nil {
it = new
}
if it == nil {
return
}
recorder.Eventf(it, corev1.EventTypeWarning, ReasonIntegrationError, "Cannot reconcile integration %s: %v", it.Name, err)
}

// NotifyIntegrationUpdated automatically generates events when the integration changes
func NotifyIntegrationUpdated(recorder record.EventRecorder, old, new *v1.Integration) {
if new == nil {
return
}

// Update information about phase changes
if old == nil || old.Status.Phase != new.Status.Phase {
phase := new.Status.Phase
if phase == v1.IntegrationPhaseNone {
phase = "[none]"
}
recorder.Eventf(new, corev1.EventTypeNormal, ReasonIntegrationPhaseUpdated, "Integration %s in phase %s", new.Name, phase)
}

// Update information about changes in conditions
if new.Status.Phase != v1.IntegrationPhaseNone {
for _, cond := range getChangedConditions(old, new) {
head := ""
if cond.Status == corev1.ConditionFalse {
head = "No "
}
tail := ""
if cond.Message != "" {
tail = fmt.Sprintf(": %s", cond.Message)
}
recorder.Eventf(new, corev1.EventTypeNormal, ReasonIntegrationConditionChanged, "%s%s for integration %s%s", head, cond.Type, new.Name, tail)
}
}

}

func getChangedConditions(old, new *v1.Integration) (res []v1.IntegrationCondition) {
if old == nil {
old = &v1.Integration{}
}
if new == nil {
new = &v1.Integration{}
}
for _, newCond := range new.Status.Conditions {
oldCond := old.Status.GetCondition(newCond.Type)
if oldCond == nil || oldCond.Status != newCond.Status || oldCond.Message != newCond.Message {
res = append(res, newCond)
}
}
return res
}
2 changes: 1 addition & 1 deletion pkg/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"github.com/scylladb/go-set/strset"

v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/gzip"
"github.com/apache/camel-k/pkg/util/camel"
"github.com/apache/camel-k/pkg/util/gzip"
"github.com/apache/camel-k/pkg/util/log"

src "github.com/apache/camel-k/pkg/util/source"
Expand Down
4 changes: 3 additions & 1 deletion pkg/trait/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ limitations under the License.
package trait

import (
"fmt"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -119,7 +121,7 @@ func (t *deploymentTrait) Apply(e *Environment) error {
v1.IntegrationConditionDeploymentAvailable,
corev1.ConditionTrue,
v1.IntegrationConditionDeploymentAvailableReason,
deployment.Name,
fmt.Sprintf("deployment name is %s", deployment.Name),
)

if e.IntegrationInPhase(v1.IntegrationPhaseRunning) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/trait/knative_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package trait

import (
"fmt"
"strconv"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -114,7 +115,7 @@ func (t *knativeServiceTrait) Configure(e *Environment) (bool, error) {
v1.IntegrationConditionKnativeServiceAvailable,
corev1.ConditionFalse,
v1.IntegrationConditionKnativeServiceNotAvailableReason,
"controller strategy: "+string(ControllerStrategyDeployment),
fmt.Sprintf("different controller strategy chosen (%s)", string(ControllerStrategyDeployment)),
)

// A controller is already present for the integration
Expand Down Expand Up @@ -183,7 +184,7 @@ func (t *knativeServiceTrait) Apply(e *Environment) error {
v1.IntegrationConditionKnativeServiceAvailable,
corev1.ConditionTrue,
v1.IntegrationConditionKnativeServiceAvailableReason,
ksvc.Name,
fmt.Sprintf("Knative service name is %s", ksvc.Name),
)

if e.IntegrationInPhase(v1.IntegrationPhaseRunning) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/trait/rest-dsl.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/gzip"
"github.com/apache/camel-k/pkg/util/defaults"
"github.com/apache/camel-k/pkg/util/gzip"
"github.com/apache/camel-k/pkg/util/kubernetes"
"github.com/apache/camel-k/pkg/util/maven"
)
Expand Down
File renamed without changes.
File renamed without changes.

0 comments on commit 1fd56a6

Please sign in to comment.