Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a -w flag to kamel install #331

Merged
merged 1 commit into from
Jan 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions pkg/cmd/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"fmt"
"strings"

"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/util/watch"

"github.com/apache/camel-k/pkg/client"
"github.com/apache/camel-k/pkg/install"
"github.com/apache/camel-k/pkg/util/kubernetes"
Expand All @@ -40,6 +43,7 @@ func newCmdInstall(rootCmdOptions *RootCmdOptions) *cobra.Command {
RunE: impl.install,
}

cmd.Flags().BoolVarP(&impl.wait, "wait", "w", false, "Waits for the platform to be running")
cmd.Flags().BoolVar(&impl.clusterSetupOnly, "cluster-setup", false, "Execute cluster-wide operations only (may require admin rights)")
cmd.Flags().BoolVar(&impl.skipClusterSetup, "skip-cluster-setup", false, "Skip the cluster-setup phase")
cmd.Flags().BoolVar(&impl.exampleSetup, "example", false, "Install example integration")
Expand All @@ -66,6 +70,7 @@ func newCmdInstall(rootCmdOptions *RootCmdOptions) *cobra.Command {

type installCmdOptions struct {
*RootCmdOptions
wait bool
clusterSetupOnly bool
skipClusterSetup bool
exampleSetup bool
Expand Down Expand Up @@ -155,6 +160,13 @@ func (o *installCmdOptions) install(cmd *cobra.Command, args []string) error {
}

if collection == nil {
if o.wait {
err = o.waitForPlatformReady(platform)
if err != nil {
return err
}
}

fmt.Println("Camel K installed in namespace", namespace)
}
}
Expand Down Expand Up @@ -186,3 +198,25 @@ func (o *installCmdOptions) printOutput(collection *kubernetes.Collection) error
}
return nil
}

func (o *installCmdOptions) waitForPlatformReady(platform *v1alpha1.IntegrationPlatform) error {
handler := func(i *v1alpha1.IntegrationPlatform) bool {
if i.Status.Phase != "" {
fmt.Println("platform \""+platform.Name+"\" in phase", i.Status.Phase)

if i.Status.Phase == v1alpha1.IntegrationPlatformPhaseReady {
// TODO display some error info when available in the status
return false
}

if i.Status.Phase == v1alpha1.IntegrationPlatformPhaseError {
fmt.Println("platform installation failed")
return false
}
}

return true
}

return watch.HandlePlatformStateChanges(o.Context, platform, handler)
}
2 changes: 1 addition & 1 deletion pkg/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (o *runCmdOptions) waitForIntegrationReady(integration *v1alpha1.Integratio
return true
}

return watch.HandleStateChanges(o.Context, integration, handler)
return watch.HandleIntegrationStateChanges(o.Context, integration, handler)
}

func (o *runCmdOptions) syncIntegration(c client.Client, sources []string) error {
Expand Down
18 changes: 18 additions & 0 deletions pkg/util/kubernetes/customclient/customclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package customclient

import (
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -65,3 +66,20 @@ func GetDynamicClientFor(group string, version string, kind string, namespace st
Resource: kind,
}).Namespace(namespace), nil
}

// GetDefaultDynamicClientFor returns a dynamic client for a given kind
func GetDefaultDynamicClientFor(kind string, namespace string) (dynamic.ResourceInterface, error) {
conf, err := config.GetConfig()
if err != nil {
return nil, err
}
dynamicClient, err := dynamic.NewForConfig(conf)
if err != nil {
return nil, err
}
return dynamicClient.Resource(schema.GroupVersionResource{
Group: v1alpha1.SchemeGroupVersion.Group,
Version: v1alpha1.SchemeGroupVersion.Version,
Resource: kind,
}).Namespace(namespace), nil
}
88 changes: 75 additions & 13 deletions pkg/util/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,20 @@ package watch
import (
"context"

"github.com/apache/camel-k/pkg/util/kubernetes"

"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/util/kubernetes/customclient"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/json"
)

//
// HandleStateChanges watches a integration resource and invoke the given handler when its status changes.
// HandleIntegrationStateChanges watches a integration resource and invoke the given handler when its status changes.
//
// err := watch.HandleStateChanges(ctx, integration, func(i *v1alpha1.Integration) bool {
// err := watch.HandleIntegrationStateChanges(ctx, integration, func(i *v1alpha1.Integration) bool {
// if i.Status.Phase == v1alpha1.IntegrationPhaseRunning {
// return false
// }
Expand All @@ -42,8 +43,8 @@ import (
//
// This function blocks until the handler function returns true or either the events channel or the context is closed.
//
func HandleStateChanges(ctx context.Context, integration *v1alpha1.Integration, handler func(integration *v1alpha1.Integration) bool) error {
dynamicClient, err := customclient.GetDynamicClientFor(v1alpha1.SchemeGroupVersion.Group, v1alpha1.SchemeGroupVersion.Version, "integrations", integration.Namespace)
func HandleIntegrationStateChanges(ctx context.Context, integration *v1alpha1.Integration, handler func(integration *v1alpha1.Integration) bool) error {
dynamicClient, err := customclient.GetDefaultDynamicClientFor("integrations", integration.Namespace)
if err != nil {
return err
}
Expand All @@ -70,23 +71,84 @@ func HandleStateChanges(ctx context.Context, integration *v1alpha1.Integration,

if e.Object != nil {
if runtimeUnstructured, ok := e.Object.(runtime.Unstructured); ok {
unstr := unstructured.Unstructured{
Object: runtimeUnstructured.UnstructuredContent(),
jsondata, err := kubernetes.ToJSON(runtimeUnstructured)
if err != nil {
return err
}
copy := integration.DeepCopy()
err = json.Unmarshal(jsondata, copy)
if err != nil {
logrus.Error("Unexpected error detected when watching resource", err)
return nil
}
jsondata, err := unstr.MarshalJSON()

if lastObservedState == nil || *lastObservedState != copy.Status.Phase {
lastObservedState = &copy.Status.Phase
if !handler(copy) {
return nil
}
}
}
}
}
}
}

//
// HandlePlatformStateChanges watches a platform resource and invoke the given handler when its status changes.
//
// err := watch.HandlePlatformStateChanges(ctx, platform, func(i *v1alpha1.IntegrationPlatform) bool {
// if i.Status.Phase == v1alpha1.IntegrationPlatformPhaseReady {
// return false
// }
//
// return true
// })
//
// This function blocks until the handler function returns true or either the events channel or the context is closed.
//
func HandlePlatformStateChanges(ctx context.Context, platform *v1alpha1.IntegrationPlatform, handler func(platform *v1alpha1.IntegrationPlatform) bool) error {
dynamicClient, err := customclient.GetDefaultDynamicClientFor("integrationplatforms", platform.Namespace)
if err != nil {
return err
}
watcher, err := dynamicClient.Watch(metav1.ListOptions{
FieldSelector: "metadata.name=" + platform.Name,
})
if err != nil {
return err
}

defer watcher.Stop()
events := watcher.ResultChan()

var lastObservedState *v1alpha1.IntegrationPlatformPhase

for {
select {
case <-ctx.Done():
return nil
case e, ok := <-events:
if !ok {
return nil
}

if e.Object != nil {
if runtimeUnstructured, ok := e.Object.(runtime.Unstructured); ok {
jsondata, err := kubernetes.ToJSON(runtimeUnstructured)
if err != nil {
return err
}
icopy := integration.DeepCopy()
err = json.Unmarshal(jsondata, icopy)
copy := platform.DeepCopy()
err = json.Unmarshal(jsondata, copy)
if err != nil {
logrus.Error("Unexpected error detected when watching resource", err)
return nil
}

if lastObservedState == nil || *lastObservedState != icopy.Status.Phase {
lastObservedState = &icopy.Status.Phase
if !handler(icopy) {
if lastObservedState == nil || *lastObservedState != copy.Status.Phase {
lastObservedState = &copy.Status.Phase
if !handler(copy) {
return nil
}
}
Expand Down