Skip to content

Commit

Permalink
Merge pull request #33 from seaneagan/job_deps_labels
Browse files Browse the repository at this point in the history
Support labels in job dependencies
  • Loading branch information
pprokop authored Apr 6, 2018
2 parents e24ee79 + 84de742 commit fca2c73
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 32 deletions.
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@ Example:
Simple example how to use downward API to get `POD_NAME` can be found [here](https://raw.githubusercontent.com/kubernetes/kubernetes.github.io/master/docs/user-guide/downward-api/dapi-pod.yaml).

### Job
Checks if a given job succeded at least once.
Example:
Checks if a given job or set of jobs with matching name and/or labels succeeded at least once.
In order to use labels DEPENDENCY_JOBS_JSON must be used, but DEPENDENCY_JOBS is supported
as well for backward compatibility.
Examples:

`DEPENDENCY_JOBS=nova-init,neutron-init`
`DEPENDENCY_JOBS_JSON='[{"namespace": "foo", "name": "nova-init"}, {"labels": {"initializes": "neutron"}}]'`
`DEPENDENCY_JOBS=nova-init,neutron-init'`

### Config
This dependency performs a container level templating of configuration files. It can template an ip address `{{ .IP }}` and hostname `{{ .HOSTNAME }}`.
Expand All @@ -91,13 +94,12 @@ Example:
Checks if at least one pod matching the specified labels is already running, by
default anywhere in the cluster, or use `"requireSameNode": true` to require a
a pod on the same node.
In contrast to other dependencies, the syntax uses json in order to avoid inventing a new
format to specify labels and the parsing complexities that would come with that.
As seen below the syntax uses JSON to allow for label support.
This dependency requires a `POD_NAME` env which can be easily passed through the
[downward api](http://kubernetes.io/docs/user-guide/downward-api/). The `POD_NAME` variable is mandatory and is used to resolve dependencies.
Example:

`DEPENDENCY_POD="[{\"namespace\": \"foo\", \"labels\": {\"k1\": \"v1\", \"k2\": \"v2\"}}, {\"labels\": {\"k1\": \"v1\", \"k2\": \"v2\"}, \"requireSameNode\": true}]"`
`DEPENDENCY_POD_JSON='[{"namespace": "foo", "labels": {"k1": "v1", "k2": "v2"}}, {"labels": {"k1": "v1", "k2": "v2"}, "requireSameNode": true}]'`

## Image

Expand Down
63 changes: 52 additions & 11 deletions dependencies/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,87 @@ import (
"fmt"

entry "github.com/stackanetes/kubernetes-entrypoint/entrypoint"
"github.com/stackanetes/kubernetes-entrypoint/logger"
"github.com/stackanetes/kubernetes-entrypoint/util/env"
api "k8s.io/client-go/1.5/pkg/api"
"k8s.io/client-go/1.5/pkg/apis/batch/v1"
"k8s.io/client-go/1.5/pkg/labels"
)

const FailingStatusFormat = "Job %s is not completed yet"

type Job struct {
name string
namespace string
labels map[string]string
}

func init() {
jobsEnv := fmt.Sprintf("%sJOBS", entry.DependencyPrefix)
if jobsDeps := env.SplitEnvToDeps(jobsEnv); jobsDeps != nil {
jobsJsonEnv := fmt.Sprintf("%s%s", jobsEnv, entry.JsonSuffix)
if jobsDeps := env.SplitJobEnvToDeps(jobsEnv, jobsJsonEnv); jobsDeps != nil {
if len(jobsDeps) > 0 {
for _, dep := range jobsDeps {
entry.Register(NewJob(dep.Name, dep.Namespace))
job := NewJob(dep.Name, dep.Namespace, dep.Labels)
if job != nil {
entry.Register(*job)
}
}
}
}
}

func NewJob(name string, namespace string) Job {
return Job{
func NewJob(name string, namespace string, labels map[string]string) *Job {
if name != "" && labels != nil {
logger.Warning.Printf("Cannot specify both name and labels for job depependency")
return nil
}
return &Job{
name: name,
namespace: namespace,
labels: labels,
}

}

func (j Job) IsResolved(entrypoint entry.EntrypointInterface) (bool, error) {
job, err := entrypoint.Client().Jobs(j.namespace).Get(j.name)
if err != nil {
return false, err
iface := entrypoint.Client().Jobs(j.namespace)
var jobs []v1.Job

if j.name != "" {
job, err := iface.Get(j.name)
if err != nil {
return false, err
}
jobs = []v1.Job{*job}
} else if j.labels != nil {
label := labels.SelectorFromSet(j.labels)
opts := api.ListOptions{LabelSelector: label}
jobList, err := iface.List(opts)
if err != nil {
return false, err
}
jobs = jobList.Items
}
if job.Status.Succeeded == 0 {
return false, fmt.Errorf(FailingStatusFormat, j)
if len(jobs) == 0 {
return false, fmt.Errorf("No matching jobs found: %v", j)
}

for _, job := range jobs {
if job.Status.Succeeded == 0 {
return false, fmt.Errorf(FailingStatusFormat, j)
}
}
return true, nil
}

func (j Job) String() string {
return fmt.Sprintf("Job %s in namespace %s", j.name, j.namespace)
var prefix string
if j.name != "" {
prefix = fmt.Sprintf("Job %s", j.name)
} else if j.labels != nil {
prefix = fmt.Sprintf("Jobs with labels %s", j.labels)
} else {
prefix = "Jobs"
}
return fmt.Sprintf("%s in namespace %s", prefix, j.namespace)
}
48 changes: 40 additions & 8 deletions dependencies/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
const testJobName = "TEST_JOB_NAME"
const testJobNamespace = "TEST_JOB_NAMESPACE"

var testLabels = map[string]string{
"k1": "v1",
}

var testEntrypoint entrypoint.EntrypointInterface

var _ = Describe("Job", func() {
Expand All @@ -21,29 +25,57 @@ var _ = Describe("Job", func() {
testEntrypoint = mocks.NewEntrypoint()
})

It("checks the name of a newly created job", func() {
job := NewJob(testJobName, testJobNamespace)
It("constructor correctly assigns fields", func() {
nameJob := NewJob(testJobName, testJobNamespace, nil)

Expect(nameJob.name).To(Equal(testJobName))
Expect(nameJob.namespace).To(Equal(testJobNamespace))

labelsJob := NewJob("", testJobNamespace, testLabels)

Expect(labelsJob.labels).To(Equal(testLabels))
})

It("constructor returns nil when both name and labels specified", func() {
job := NewJob(testJobName, testJobNamespace, testLabels)

Expect(job.name).To(Equal(testJobName))
Expect(job.namespace).To(Equal(testJobNamespace))
Expect(job).To(BeNil())
})

It("checks resolution of a succeeding job", func() {
job := NewJob(mocks.SucceedingJobName, mocks.SucceedingJobName)
It("checks resolution of a succeeding job by name", func() {
job := NewJob(mocks.SucceedingJobName, mocks.SucceedingJobName, nil)

isResolved, err := job.IsResolved(testEntrypoint)

Expect(isResolved).To(Equal(true))
Expect(err).NotTo(HaveOccurred())
})

It("checks resolution failure of a failing job", func() {
job := NewJob(mocks.FailingJobName, mocks.FailingJobName)
It("checks resolution failure of a failing job by name", func() {
job := NewJob(mocks.FailingJobName, mocks.FailingJobName, nil)

isResolved, err := job.IsResolved(testEntrypoint)

Expect(isResolved).To(Equal(false))
Expect(err.Error()).To(Equal(fmt.Sprintf(FailingStatusFormat, job)))
})

It("checks resolution of a succeeding job by labels", func() {
job := NewJob("", mocks.SucceedingJobName, map[string]string{"name": mocks.SucceedingJobLabel})

isResolved, err := job.IsResolved(testEntrypoint)

Expect(isResolved).To(Equal(true))
Expect(err).NotTo(HaveOccurred())
})

It("checks resolution failure of a failing job by labels", func() {
job := NewJob("", mocks.FailingJobName, map[string]string{"name": mocks.FailingJobLabel})

isResolved, err := job.IsResolved(testEntrypoint)

Expect(isResolved).To(Equal(false))
Expect(err.Error()).To(Equal(fmt.Sprintf(FailingStatusFormat, job)))
})

})
2 changes: 1 addition & 1 deletion dependencies/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Pod struct {
}

func init() {
podEnv := fmt.Sprintf("%sPOD", entry.DependencyPrefix)
podEnv := fmt.Sprintf("%sPOD%s", entry.DependencyPrefix, entry.JsonSuffix)
if podDeps := env.SplitPodEnvToDeps(podEnv); podDeps != nil {
for _, dep := range podDeps {
pod, err := NewPod(dep.Labels, dep.Namespace, dep.RequireSameNode)
Expand Down
1 change: 1 addition & 0 deletions entrypoint/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var dependencies []Resolver // List containing all dependencies to be resolved
const (
//DependencyPrefix is a prefix for env variables
DependencyPrefix = "DEPENDENCY_"
JsonSuffix = "_JSON"
resolverSleepInterval = 2
)

Expand Down
Binary file modified kubernetes-entrypoint
Binary file not shown.
24 changes: 21 additions & 3 deletions mocks/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
)

const (
SucceedingJobName = "succeed"
FailingJobName = "fail"
SucceedingJobName = "succeed"
FailingJobName = "fail"
SucceedingJobLabel = "succeed"
FailingJobLabel = "fail"
)

type jClient struct {
Expand Down Expand Up @@ -41,7 +43,17 @@ func (j jClient) DeleteCollection(options *api.DeleteOptions, listOptions api.Li
return fmt.Errorf("Not implemented")
}
func (j jClient) List(options api.ListOptions) (*batch.JobList, error) {
return nil, fmt.Errorf("Not implemented")
var jobs []batch.Job
if options.LabelSelector.String() == fmt.Sprintf("name=%s", SucceedingJobLabel) {
jobs = []batch.Job{NewJob(1)}
} else if options.LabelSelector.String() == fmt.Sprintf("name=%s", FailingJobLabel) {
jobs = []batch.Job{NewJob(1), NewJob(0)}
} else {
return nil, fmt.Errorf("Mock job didnt work")
}
return &batch.JobList{
Items: jobs,
}, nil
}

func (j jClient) Update(job *batch.Job) (*batch.Job, error) {
Expand All @@ -62,3 +74,9 @@ func (j jClient) Patch(name string, pt api.PatchType, data []byte, subresources
func NewJClient() v1batch.JobInterface {
return jClient{}
}

func NewJob(succeeded int32) batch.Job {
return batch.Job{
Status: batch.JobStatus{Succeeded: succeeded},
}
}
50 changes: 50 additions & 0 deletions util/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ type PodDependency struct {
RequireSameNode bool
}

type JobDependency struct {
Name string
Labels map[string]string
Namespace string
}

func SplitCommand() []string {
command := os.Getenv("COMMAND")
if command == "" {
Expand Down Expand Up @@ -96,6 +102,50 @@ func SplitPodEnvToDeps(env string) []PodDependency {
return deps
}

//SplitJobEnvToDeps returns list of JobDependency
func SplitJobEnvToDeps(env string, jsonEnv string) []JobDependency {
deps := []JobDependency{}

namespace := GetBaseNamespace()

envVal := os.Getenv(env)
jsonEnvVal := os.Getenv(jsonEnv)
if jsonEnvVal != "" {
if envVal != "" {
logger.Warning.Printf("Ignoring %s since %s was specified", env, jsonEnv)
}
err := json.Unmarshal([]byte(jsonEnvVal), &deps)
if err != nil {
logger.Warning.Printf("Invalid format: ", jsonEnvVal)
return []JobDependency{}
}

valid := []JobDependency{}
for _, dep := range deps {
if dep.Namespace == "" {
dep.Namespace = namespace
}

valid = append(valid, dep)
}

return valid
}

if envVal != "" {
plainDeps := SplitEnvToDeps(env)

deps = []JobDependency{}
for _, dep := range plainDeps {
deps = append(deps, JobDependency{Name: dep.Name, Namespace: dep.Namespace})
}

return deps
}

return deps
}

//GetBaseNamespace returns default namespace when user set empty one
func GetBaseNamespace() string {
namespace := os.Getenv("NAMESPACE")
Expand Down
Loading

0 comments on commit fca2c73

Please sign in to comment.