Skip to content

Commit

Permalink
Fix apache#1234: add a master trait (as addon)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolaferraro committed Feb 16, 2020
1 parent 9f48cf8 commit 98588b1
Show file tree
Hide file tree
Showing 22 changed files with 327 additions and 50 deletions.
156 changes: 153 additions & 3 deletions addons/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,180 @@ limitations under the License.
package master

import (
"fmt"
"strings"

"github.com/apache/camel-k/deploy"
v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/metadata"
"github.com/apache/camel-k/pkg/trait"
"github.com/apache/camel-k/pkg/util"
"github.com/apache/camel-k/pkg/util/kubernetes"
"github.com/apache/camel-k/pkg/util/uri"
"k8s.io/apimachinery/pkg/runtime"
)

// The Master trait allows to configure the integration to automatically leverage Kubernetes resources for doing
// leader election and starting *master* routes only on certain instances.
//
// It's activated automatically when using the master endpoint in a route, e.g. `from("master:telegram:bots")...`.
// It's activated automatically when using the master endpoint in a route, e.g. `from("master:lockname:telegram:bots")...`.
//
// NOTE: this trait adds special permissions to the integration service account in order to read/write configmaps and read pods.
// It's recommended to use a different service account than "default" when running the integration.
//
// +camel-k:trait=master
type masterTrait struct {
trait.BaseTrait `property:",squash"`
// Enables automatic configuration of the trait.
Auto *bool `property:"auto"`
// When this flag is active, the operator analyzes the source code to add dependencies required by delegate endpoints.
// E.g. when using `master:lockname:timer`, then `camel:timer` is automatically added to the set of dependencies.
// It's enabled by default.
IncludeDelegateDependencies *bool `property:"include-delegate-dependencies"`
// Name of the configmap that will be used to store the lock. Defaults to "<integration-name>-lock".
Configmap string `property:"configmap"`
// Label that will be used to identify all pods contending the lock. Defaults to "camel.apache.org/integration".
LabelKey string `property:"label-key"`
// Label value that will be used to identify all pods contending the lock. Defaults to the integration name.
LabelValue string `property:"label-value"`
delegateDependencies []string
}

// NewMasterTrait --
func NewMasterTrait() trait.Trait {
return &masterTrait{
BaseTrait: trait.NewBaseTrait("master", 2500),
BaseTrait: trait.NewBaseTrait("master", 850),
}
}

const (
masterComponent = "master"
)

func (t *masterTrait) Configure(e *trait.Environment) (bool, error) {
return false, nil
if t.Enabled != nil && !*t.Enabled {
return false, nil
}

if !e.IntegrationInPhase(v1.IntegrationPhaseInitialization, v1.IntegrationPhaseDeploying, v1.IntegrationPhaseRunning) {
return false, nil
}

if t.Auto == nil || *t.Auto {
// Check if the master component has been used
sources, err := kubernetes.ResolveIntegrationSources(t.Ctx, t.Client, e.Integration, e.Resources)
if err != nil {
return false, err
}

meta := metadata.ExtractAll(e.CamelCatalog, sources)

if t.Enabled == nil {
for _, endpoint := range meta.FromURIs {
if uri.GetComponent(endpoint) == masterComponent {
enabled := true
t.Enabled = &enabled
}
}
}

if t.Enabled == nil || !*t.Enabled {
return false, nil
}

if t.IncludeDelegateDependencies == nil || *t.IncludeDelegateDependencies {
t.delegateDependencies = findAdditionalDependencies(e, meta)
}

if t.Configmap == "" {
t.Configmap = fmt.Sprintf("%s-lock", e.Integration.Name)
}

if t.LabelKey == "" {
t.LabelKey = "camel.apache.org/integration"
}

if t.LabelValue == "" {
t.LabelValue = e.Integration.Name
}
}

return t.Enabled != nil && *t.Enabled, nil
}

func (t *masterTrait) Apply(e *trait.Environment) error {

if e.IntegrationInPhase(v1.IntegrationPhaseInitialization) {
util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, "mvn:org.apache.camel.k/camel-k-runtime-master")

// Master sub endpoints need to be added to the list of dependencies
for _, dep := range t.delegateDependencies {
util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, dep)
}

} else if e.IntegrationInPhase(v1.IntegrationPhaseDeploying, v1.IntegrationPhaseRunning) {
serviceAccount := e.Integration.Spec.ServiceAccountName
if serviceAccount == "" {
serviceAccount = "default"
}

var templateData = struct {
Namespace string
Name string
ServiceAccount string
}{
Namespace: e.Integration.Namespace,
Name: fmt.Sprintf("%s-master", e.Integration.Name),
ServiceAccount: serviceAccount,
}

role, err := loadResource(e, "master-role.tmpl", templateData)
if err != nil {
return err
}
roleBinding, err := loadResource(e, "master-role-binding.tmpl", templateData)
if err != nil {
return err
}

e.Resources.Add(role)
e.Resources.Add(roleBinding)

e.Integration.Status.Configuration = append(e.Integration.Status.Configuration,
v1.ConfigurationSpec{Type: "property", Value: "customizer.master.enabled=true"},
v1.ConfigurationSpec{Type: "property", Value: fmt.Sprintf("customizer.master.configMapName=%s", t.Configmap)},
v1.ConfigurationSpec{Type: "property", Value: fmt.Sprintf("customizer.master.labelKey=%s", t.LabelKey)},
v1.ConfigurationSpec{Type: "property", Value: fmt.Sprintf("customizer.master.labelValue=%s", t.LabelValue)},
)
}

return nil
}

func findAdditionalDependencies(e *trait.Environment, meta metadata.IntegrationMetadata) (dependencies []string) {
for _, endpoint := range meta.FromURIs {
if uri.GetComponent(endpoint) == masterComponent {
parts := strings.Split(endpoint, ":")
if len(parts) > 2 {
// syntax "master:lockname:endpoint:..."
childComponent := parts[2]
if artifact := e.CamelCatalog.GetArtifactByScheme(childComponent); artifact != nil {
dependencies = append(dependencies, artifact.GetDependencyID())
}
}
}
}
return dependencies
}

func loadResource(e *trait.Environment, name string, params interface{}) (runtime.Object, error) {
data, err := deploy.TemplateResource(fmt.Sprintf("/addons/master/%s", name), params)
if err != nil {
return nil, err
}
obj, err := kubernetes.LoadResourceFromYaml(e.Client.GetScheme(), data)
if err != nil {
return nil, err
}
return obj, nil
}
5 changes: 5 additions & 0 deletions build/maven/pom-runtime.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@
<artifactId>camel-k-runtime-knative</artifactId>
<version>${runtime.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel.k</groupId>
<artifactId>camel-k-runtime-master</artifactId>
<version>${runtime.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel.k</groupId>
<artifactId>camel-k-runtime-cron</artifactId>
Expand Down
16 changes: 16 additions & 0 deletions deploy/addons/master/master-role-binding.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
name: {{ .Name }}
namespace: {{ .Namespace }}
labels:
app: "camel-k"
subjects:
- kind: ServiceAccount
namespace: {{ .Namespace }}
name: {{ .ServiceAccount }}
roleRef:
kind: Role
namespace: {{ .Namespace }}
name: {{ .Name }}
apiGroup: rbac.authorization.k8s.io
27 changes: 27 additions & 0 deletions deploy/addons/master/master-role.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
kind: Role
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
name: {{ .Name }}
namespace: {{ .Namespace }}
labels:
app: "camel-k"
rules:
- apiGroups:
- ""
resources:
- configmaps
verbs:
- create
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- list
- watch
30 changes: 30 additions & 0 deletions deploy/resources.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions deploy/resources_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ limitations under the License.
package deploy

import (
"bytes"
"io/ioutil"
"strings"
"text/template"

"github.com/apache/camel-k/pkg/util/log"
)
Expand Down Expand Up @@ -51,6 +53,24 @@ func Resource(name string) []byte {
return data
}

// TemplateResource loads a file resource as go template and processes it using the given parameters
func TemplateResource(name string, params interface{}) (string, error) {
rawData := ResourceAsString(name)
if rawData == "" {
return "", nil
}
tmpl, err := template.New(name).Parse(rawData)
if err != nil {
return "", err
}

var buf bytes.Buffer
if err := tmpl.Execute(&buf, params); err != nil {
return "", err
}
return buf.String(), nil
}

// Resources lists all file names in the given path (starts with '/')
func Resources(dirName string) []string {
dir, err := assets.Open(dirName)
Expand Down
20 changes: 20 additions & 0 deletions examples/Master.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// camel-k: language=java

import org.apache.camel.builder.RouteBuilder;

/**
* This example shows how to start a route on a single instance of the integration.
* Increase the number of replicas to see it in action (the route will be started on a single pod only).
*/
public class Master extends RouteBuilder {
@Override
public void configure() throws Exception {

// Write your routes here, for example:
from("master:lock:timer:master?period=1s")
.setBody()
.simple("This message is printed by a single pod, even if you increase the number of replicas!")
.to("log:info");

}
}
18 changes: 17 additions & 1 deletion pkg/apis/camel/v1/camelcatalog_types_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ limitations under the License.

package v1

import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
import (
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// NewCamelCatalog --
func NewCamelCatalog(namespace string, name string) CamelCatalog {
Expand Down Expand Up @@ -57,3 +61,15 @@ func NewCamelCatalogList() CamelCatalogList {
},
}
}

// GetDependencyID returns a Camel K recognizable maven dependency for the artifact
func (a CamelArtifact) GetDependencyID() string {
artifactID := a.ArtifactID
if a.GroupID == "org.apache.camel" && strings.HasPrefix(artifactID, "camel-") {
return "camel:" + artifactID[6:]
}
if a.GroupID == "org.apache.camel.quarkus" && strings.HasPrefix(artifactID, "camel-quarkus-") {
return "camel-quarkus:" + artifactID[14:]
}
return "mvn:" + a.GroupID + ":" + artifactID + ":" + a.Version
}
4 changes: 2 additions & 2 deletions pkg/trait/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) {
}

// CronJob strategy requires common schedule
strategy, err := e.DetermineControllerStrategy(t.ctx, t.client)
strategy, err := e.DetermineControllerStrategy(t.Ctx, t.Client)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -315,7 +315,7 @@ func (t *cronTrait) getGlobalCron(e *Environment) (*cronInfo, error) {
func (t *cronTrait) getSourcesFromURIs(e *Environment) ([]string, error) {
var sources []v1.SourceSpec
var err error
if sources, err = kubernetes.ResolveIntegrationSources(t.ctx, t.client, e.Integration, e.Resources); err != nil {
if sources, err = kubernetes.ResolveIntegrationSources(t.Ctx, t.Client, e.Integration, e.Resources); err != nil {
return nil, err
}
meta := metadata.ExtractAll(e.CamelCatalog, sources)
Expand Down
Loading

0 comments on commit 98588b1

Please sign in to comment.