Skip to content

Commit

Permalink
feat(backend): Add Semaphore and Mutex fields to Workflow CR
Browse files Browse the repository at this point in the history
- Added `Semaphore` and `Mutex` fields to the Workflow Spec to support concurrency control mechanisms directly within workflows.
- Introduced a new environment variable, `SEMAPHORE_CONFIGMAP_NAME`, to the API Server deployment for managing semaphore configurations.
- Added an empty ConfigMap manifest for semaphores to facilitate initial setup and testing.

Signed-off-by: ddalvi <ddalvi@redhat.com>
  • Loading branch information
DharmitD committed Feb 18, 2025
1 parent 87498e8 commit d98f3c5
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 13 deletions.
36 changes: 34 additions & 2 deletions backend/src/apiserver/template/v2_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,33 @@ func NewGenericScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Schedu
}, nil
}

// func getKubernetesSpec(platformSpec map[string]*pipelinespec.SinglePlatformSpec) *pipelinespec.SinglePlatformSpec {
// var kubernetesSpec *pipelinespec.SinglePlatformSpec

// // Check for "kubernetes" key in the platformSpec map
// if platformSpec != nil {
// if platform, ok := platformSpec["kubernetes"]; ok && platform != nil {
// kubernetesSpec = platform
// }
// }
// return kubernetesSpec
// }

func getPipelineOptions(platform *pipelinespec.SinglePlatformSpec) *argocompiler.Options {
var pipelineOptions *argocompiler.Options

if platform != nil && platform.PipelineConfig != nil {
pipelineOptions = &argocompiler.Options{}
if platform.PipelineConfig.SemaphoreKey != "" {
pipelineOptions.SemaphoreKey = platform.PipelineConfig.SemaphoreKey
}
if platform.PipelineConfig.MutexName != "" {
pipelineOptions.MutexName = platform.PipelineConfig.MutexName
}
}
return pipelineOptions
}

// Converts modelJob to ScheduledWorkflow.
func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.ScheduledWorkflow, error) {
job := &pipelinespec.PipelineJob{}
Expand All @@ -98,17 +125,19 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche
return nil, util.Wrap(err, "invalid pipeline job inputs")
}

// kubernetesSpec := getKubernetesSpec(t.platformSpec.Platforms)
// Pick out Kubernetes platform configs
var kubernetesSpec *pipelinespec.SinglePlatformSpec
if t.platformSpec != nil {
if _, ok := t.platformSpec.Platforms["kubernetes"]; ok {
kubernetesSpec = t.platformSpec.Platforms["kubernetes"]
}
}
pipelineOptions := getPipelineOptions(kubernetesSpec)

var obj interface{}
if util.CurrentExecutionType() == util.ArgoWorkflow {
obj, err = argocompiler.Compile(job, kubernetesSpec, nil)
obj, err = argocompiler.Compile(job, kubernetesSpec, pipelineOptions)
} else if util.CurrentExecutionType() == util.TektonPipelineRun {
obj, err = tektoncompiler.Compile(job, kubernetesSpec, &tektoncompiler.Options{LauncherImage: Launcher})
}
Expand Down Expand Up @@ -305,17 +334,20 @@ func (t *V2Spec) RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (u
if err = t.validatePipelineJobInputs(job); err != nil {
return nil, util.Wrap(err, "invalid pipeline job inputs")
}

// Pick out Kubernetes platform configs
var kubernetesSpec *pipelinespec.SinglePlatformSpec
if t.platformSpec != nil {
if _, ok := t.platformSpec.Platforms["kubernetes"]; ok {
kubernetesSpec = t.platformSpec.Platforms["kubernetes"]
}
}
// kubernetesSpec := getKubernetesSpec(t.platformSpec.Platforms)
pipelineOptions := getPipelineOptions(kubernetesSpec)

var obj interface{}
if util.CurrentExecutionType() == util.ArgoWorkflow {
obj, err = argocompiler.Compile(job, kubernetesSpec, nil)
obj, err = argocompiler.Compile(job, kubernetesSpec, pipelineOptions)
} else if util.CurrentExecutionType() == util.TektonPipelineRun {
obj, err = tektoncompiler.Compile(job, kubernetesSpec, nil)
}
Expand Down
46 changes: 36 additions & 10 deletions backend/src/v2/compiler/argocompiler/argo.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"os"
"strings"

"github.com/kubeflow/pipelines/backend/src/apiserver/common"

wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/v2/compiler"
Expand All @@ -41,6 +43,16 @@ type Options struct {
// optional
PipelineRoot string
// TODO(Bobgy): add an option -- dev mode, ImagePullPolicy should only be Always in dev mode.
SemaphoreKey string
MutexName string
}

func getSemaphoreConfigMapName() string {
const defaultConfigMapName = "semaphore-config"
if name := os.Getenv("SEMAPHORE_CONFIGMAP_NAME"); name != "" {
return name
}
return defaultConfigMapName
}

func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.SinglePlatformSpec, opts *Options) (*wfapi.Workflow, error) {
Expand Down Expand Up @@ -87,22 +99,13 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
}
}

// initialization
wf := &wfapi.Workflow{
TypeMeta: k8smeta.TypeMeta{
APIVersion: "argoproj.io/v1alpha1",
Kind: "Workflow",
},
ObjectMeta: k8smeta.ObjectMeta{
GenerateName: retrieveLastValidString(spec.GetPipelineInfo().GetName()) + "-",
// Note, uncomment the following during development to view argo inputs/outputs in KFP UI.
// TODO(Bobgy): figure out what annotations we should use for v2 engine.
// For now, comment this annotation, so that in KFP UI, it shows argo input/output params/artifacts
// suitable for debugging.
//
// Annotations: map[string]string{
// "pipelines.kubeflow.org/v2_pipeline": "true",
// },
},
Spec: wfapi.WorkflowSpec{
PodMetadata: &wfapi.Metadata{
Expand All @@ -120,6 +123,29 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
Entrypoint: tmplEntrypoint,
},
}

sync := &wfapi.Synchronization{}
if opts != nil && opts.SemaphoreKey != "" {
sync.Semaphore = &wfapi.SemaphoreRef{
ConfigMapKeyRef: &k8score.ConfigMapKeySelector{
LocalObjectReference: k8score.LocalObjectReference{
Name: getSemaphoreConfigMapName(),
},
Key: opts.SemaphoreKey,
},
}
}

if opts != nil && opts.MutexName != "" {
sync.Mutex = &wfapi.Mutex{
Name: opts.MutexName,
}
}

if sync.Semaphore != nil || sync.Mutex != nil {
wf.Spec.Synchronization = sync
}

c := &workflowCompiler{
wf: wf,
templates: make(map[string]*wfapi.Template),
Expand Down
2 changes: 1 addition & 1 deletion backend/src/v2/compiler/visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (state *pipelineDFS) dfs(name string, component *pipelinespec.ComponentSpec
}

// Add kubernetes spec to annotation
if state.kubernetesSpec != nil {
if state.kubernetesSpec != nil && state.kubernetesSpec.DeploymentSpec != nil {
kubernetesExecSpec, ok := state.kubernetesSpec.DeploymentSpec.Executors[executorLabel]
if ok {
state.visitor.AddKubernetesSpec(name, kubernetesExecSpec)
Expand Down
1 change: 1 addition & 0 deletions manifests/kustomize/base/pipeline/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ resources:
- ml-pipeline-scheduledworkflow-role.yaml
- ml-pipeline-scheduledworkflow-rolebinding.yaml
- ml-pipeline-scheduledworkflow-sa.yaml
- ml-pipeline-semaphore-configmap.yaml
- ml-pipeline-ui-deployment.yaml
- ml-pipeline-ui-configmap.yaml
- ml-pipeline-ui-role.yaml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ spec:
fieldPath: metadata.namespace
- name: OBJECTSTORECONFIG_SECURE
value: "false"
- name: SEMAPHORE_CONFIGMAP_NAME
value: "semaphore-config"
- name: OBJECTSTORECONFIG_BUCKETNAME
valueFrom:
configMapKeyRef:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
kind: ConfigMap
apiVersion: v1
metadata:
name: semaphore-config
data: {}
---
apiVersion: batch/v1
kind: Job
metadata:
name: semaphore-configmap-init
namespace: kubeflow
spec:
template:
spec:
containers:
- name: create-configmap
image: bitnami/kubectl:latest
command:
- /bin/sh
- -c
- |
if ! kubectl get configmap semaphore-config -n kubeflow > /dev/null 2>&1; then
echo "Creating semaphore-config ConfigMap..."
kubectl create configmap semaphore-config -n kubeflow --from-literal=init=""
else
echo "ConfigMap semaphore-config already exists. Skipping creation."
fi
restartPolicy: OnFailure
backoffLimit: 3

0 comments on commit d98f3c5

Please sign in to comment.