Skip to content

Commit

Permalink
feat: validate the name length of RayCluster, RayService, and RayJob
Browse files Browse the repository at this point in the history
Signed-off-by: Rueian <rueiancsie@gmail.com>
  • Loading branch information
rueian committed Mar 3, 2025
1 parent 6c4a77d commit c9c3e9c
Show file tree
Hide file tree
Showing 10 changed files with 347 additions and 16 deletions.
2 changes: 1 addition & 1 deletion ray-operator/controllers/ray/common/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func BuildIngressForHeadService(ctx context.Context, cluster rayv1.RayCluster) (

labels := map[string]string{
utils.RayClusterLabelKey: cluster.Name,
utils.RayIDLabelKey: utils.GenerateIdentifier(cluster.Name, rayv1.HeadNode),
utils.RayIDLabelKey: utils.CheckLabel(utils.GenerateIdentifier(cluster.Name, rayv1.HeadNode)),
utils.KubernetesApplicationNameLabelKey: utils.ApplicationName,
utils.KubernetesCreatedByLabelKey: utils.ComponentName,
}
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/controllers/ray/common/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func BuildRouteForHeadService(cluster rayv1.RayCluster) (*routev1.Route, error) {
labels := map[string]string{
utils.RayClusterLabelKey: cluster.Name,
utils.RayIDLabelKey: utils.GenerateIdentifier(cluster.Name, rayv1.HeadNode),
utils.RayIDLabelKey: utils.CheckLabel(utils.GenerateIdentifier(cluster.Name, rayv1.HeadNode)),
utils.KubernetesApplicationNameLabelKey: utils.ApplicationName,
utils.KubernetesCreatedByLabelKey: utils.ComponentName,
}
Expand Down
4 changes: 1 addition & 3 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, instance
return ctrl.Result{}, nil
}

if err := utils.ValidateRayClusterSpec(&instance.Spec, instance.Annotations); err != nil {
if err := utils.ValidateRayClusterSpec(instance.Name, &instance.Spec, instance.Annotations); err != nil {
logger.Error(err, fmt.Sprintf("The RayCluster spec is invalid %s/%s", instance.Namespace, instance.Name))
r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.InvalidRayClusterSpec),
"The RayCluster spec is invalid %s/%s: %v", instance.Namespace, instance.Name, err)
Expand Down Expand Up @@ -1019,8 +1019,6 @@ func (r *RayClusterReconciler) createHeadRoute(ctx context.Context, route *route
func (r *RayClusterReconciler) createService(ctx context.Context, svc *corev1.Service, instance *rayv1.RayCluster) error {
logger := ctrl.LoggerFrom(ctx)

// making sure the name is valid
svc.Name = utils.CheckName(svc.Name)
if err := controllerutil.SetControllerReference(instance, svc, r.Scheme); err != nil {
return err
}
Expand Down
11 changes: 11 additions & 0 deletions ray-operator/controllers/ray/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,17 @@ const (
KubeRayController = "ray.io/kuberay-operator"

ServeConfigLRUSize = 1000

// MaxRayClusterNameLength is the maximum RayCluster name to make sure we don't truncate
// their k8s service names. Currently, "-serve-svc" is the longest service suffix:
// 63 - len("-serve-svc") == 53, so the name should not be longer than 53 characters.
MaxRayClusterNameLength = 53
// MaxRayServiceNameLength is the maximum RayService name to make sure it pass the RayCluster validation.
// Minus 6 since we append 6 characters to the RayService name to create the cluster (GenerateRayClusterName).
MaxRayServiceNameLength = MaxRayClusterNameLength - 6
// MaxRayJobNameLength is the maximum RayJob name to make sure it pass the RayCluster validation
// Minus 6 since we append 6 characters to the RayJob name to create the cluster (GenerateRayClusterName).
MaxRayJobNameLength = MaxRayClusterNameLength - 6
)

type ServiceType string
Expand Down
6 changes: 3 additions & 3 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,9 @@ func GetNamespace(metaData metav1.ObjectMeta) string {
func GenerateHeadServiceName(crdType CRDType, clusterSpec rayv1.RayClusterSpec, ownerName string) (string, error) {
switch crdType {
case RayServiceCRD:
return CheckName(fmt.Sprintf("%s-%s-%s", ownerName, rayv1.HeadNode, "svc")), nil
return fmt.Sprintf("%s-%s-%s", ownerName, rayv1.HeadNode, "svc"), nil
case RayClusterCRD:
headSvcName := CheckName(fmt.Sprintf("%s-%s-%s", ownerName, rayv1.HeadNode, "svc"))
headSvcName := fmt.Sprintf("%s-%s-%s", ownerName, rayv1.HeadNode, "svc")
if clusterSpec.HeadGroupSpec.HeadService != nil && clusterSpec.HeadGroupSpec.HeadService.Name != "" {
headSvcName = clusterSpec.HeadGroupSpec.HeadService.Name
}
Expand Down Expand Up @@ -293,7 +293,7 @@ func ExtractRayIPFromFQDN(fqdnRayIP string) string {

// GenerateServeServiceName generates name for serve service.
func GenerateServeServiceName(serviceName string) string {
return CheckName(fmt.Sprintf("%s-%s-%s", serviceName, ServeName, "svc"))
return fmt.Sprintf("%s-%s-%s", serviceName, ServeName, "svc")
}

// GenerateServeServiceLabel generates label value for serve service selector.
Expand Down
16 changes: 14 additions & 2 deletions ray-operator/controllers/ray/utils/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ func ValidateRayClusterStatus(instance *rayv1.RayCluster) error {
}

// Validation for invalid Ray Cluster configurations.
func ValidateRayClusterSpec(spec *rayv1.RayClusterSpec, annotations map[string]string) error {
func ValidateRayClusterSpec(name string, spec *rayv1.RayClusterSpec, annotations map[string]string) error {
if len(name) > MaxRayClusterNameLength {
return fmt.Errorf("RayCluster name should be no more than %d characters", MaxRayClusterNameLength)
}

if len(spec.HeadGroupSpec.Template.Spec.Containers) == 0 {
return fmt.Errorf("headGroupSpec should have at least one container")
}
Expand Down Expand Up @@ -98,6 +102,10 @@ func ValidateRayJobStatus(rayJob *rayv1.RayJob) error {
}

func ValidateRayJobSpec(rayJob *rayv1.RayJob) error {
if len(rayJob.Name) > MaxRayJobNameLength {
return fmt.Errorf("RayJob name should be no more than %d characters", MaxRayJobNameLength)
}

// KubeRay has some limitations for the suspend operation. The limitations are a subset of the limitations of
// Kueue (https://kueue.sigs.k8s.io/docs/tasks/run_rayjobs/#c-limitations). For example, KubeRay allows users
// to suspend a RayJob with autoscaling enabled, but Kueue doesn't.
Expand All @@ -114,7 +122,7 @@ func ValidateRayJobSpec(rayJob *rayv1.RayJob) error {
}

if rayJob.Spec.RayClusterSpec != nil {
if err := ValidateRayClusterSpec(rayJob.Spec.RayClusterSpec, rayJob.Annotations); err != nil {
if err := ValidateRayClusterSpec("", rayJob.Spec.RayClusterSpec, rayJob.Annotations); err != nil {
return err
}
}
Expand Down Expand Up @@ -158,6 +166,10 @@ func ValidateRayJobSpec(rayJob *rayv1.RayJob) error {
}

func ValidateRayServiceSpec(rayService *rayv1.RayService) error {
if len(rayService.Name) > MaxRayServiceNameLength {
return fmt.Errorf("RayService name should be no more than %d characters", MaxRayServiceNameLength)
}

if headSvc := rayService.Spec.RayClusterSpec.HeadGroupSpec.HeadService; headSvc != nil && headSvc.Name != "" {
return fmt.Errorf("spec.rayClusterConfig.headGroupSpec.headService.metadata.name should not be set")
}
Expand Down
93 changes: 88 additions & 5 deletions ray-operator/controllers/ray/utils/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package utils

import (
"fmt"
"strings"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -206,7 +207,7 @@ func TestValidateRayClusterSpecGcsFaultToleranceOptions(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := ValidateRayClusterSpec(&rayv1.RayClusterSpec{
err := ValidateRayClusterSpec("", &rayv1.RayClusterSpec{
GcsFaultToleranceOptions: tt.gcsFaultToleranceOptions,
HeadGroupSpec: rayv1.HeadGroupSpec{
RayStartParams: tt.rayStartParams,
Expand Down Expand Up @@ -288,7 +289,7 @@ func TestValidateRayClusterSpecRedisPassword(t *testing.T) {
},
},
}
err := ValidateRayClusterSpec(&rayCluster.Spec, rayCluster.Annotations)
err := ValidateRayClusterSpec(rayCluster.Name, &rayCluster.Spec, rayCluster.Annotations)
if tt.expectError {
require.Error(t, err)
} else {
Expand Down Expand Up @@ -358,7 +359,56 @@ func TestValidateRayClusterSpecRedisUsername(t *testing.T) {
},
},
}
err := ValidateRayClusterSpec(&rayCluster.Spec, rayCluster.Annotations)
err := ValidateRayClusterSpec(rayCluster.Name, &rayCluster.Spec, rayCluster.Annotations)
if tt.expectError {
require.Error(t, err)
assert.EqualError(t, err, tt.errorMessage)
} else {
require.NoError(t, err)
}
})
}
}

func TestValidateRayClusterSpecNames(t *testing.T) {
tests := []struct {
rayCluster *rayv1.RayCluster
name string
errorMessage string
expectError bool
}{
{
name: "RayCluster name is too long (> MaxRayClusterNameLength characters)",
rayCluster: &rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{
Name: strings.Repeat("A", MaxRayClusterNameLength+1),
},
},
expectError: true,
errorMessage: fmt.Sprintf("RayCluster name should be no more than %d characters", MaxRayClusterNameLength),
},
{
name: "Both RayCluster name is ok (== MaxRayClusterNameLength)",
rayCluster: &rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{
Name: strings.Repeat("A", MaxRayClusterNameLength),
},
Spec: rayv1.RayClusterSpec{
HeadGroupSpec: rayv1.HeadGroupSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "ray-head"}},
},
},
},
},
},
expectError: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := ValidateRayClusterSpec(tt.rayCluster.Name, &tt.rayCluster.Spec, tt.rayCluster.Annotations)
if tt.expectError {
require.Error(t, err)
assert.EqualError(t, err, tt.errorMessage)
Expand Down Expand Up @@ -430,7 +480,7 @@ func TestValidateRayClusterSpecEmptyContainers(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := ValidateRayClusterSpec(&tt.rayCluster.Spec, tt.rayCluster.Annotations)
err := ValidateRayClusterSpec(tt.rayCluster.Name, &tt.rayCluster.Spec, tt.rayCluster.Annotations)
if tt.expectError {
require.Error(t, err)
assert.EqualError(t, err, tt.errorMessage)
Expand Down Expand Up @@ -507,7 +557,7 @@ func TestValidateRayClusterSpecSuspendingWorkerGroup(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defer features.SetFeatureGateDuringTest(t, features.RayJobDeletionPolicy, tt.featureGate)()
err := ValidateRayClusterSpec(&tt.rayCluster.Spec, tt.rayCluster.Annotations)
err := ValidateRayClusterSpec(tt.rayCluster.Name, &tt.rayCluster.Spec, tt.rayCluster.Annotations)
if tt.expectError {
require.Error(t, err)
assert.EqualError(t, err, tt.errorMessage)
Expand Down Expand Up @@ -714,6 +764,25 @@ func TestValidateRayJobSpec(t *testing.T) {
},
})
require.ErrorContains(t, err, "headGroupSpec should have at least one container")

err = ValidateRayJobSpec(&rayv1.RayJob{
ObjectMeta: metav1.ObjectMeta{
Name: strings.Repeat("j", MaxRayJobNameLength+1),
},
})
require.ErrorContains(t, err, fmt.Sprintf("RayJob name should be no more than %d characters", MaxRayJobNameLength))

err = ValidateRayJobSpec(&rayv1.RayJob{
ObjectMeta: metav1.ObjectMeta{
Name: strings.Repeat("j", MaxRayJobNameLength),
},
Spec: rayv1.RayJobSpec{
RayClusterSpec: &rayv1.RayClusterSpec{
HeadGroupSpec: headGroupSpecWithOneContainer,
},
},
})
require.NoError(t, err)
}

func TestValidateRayServiceSpec(t *testing.T) {
Expand All @@ -737,6 +806,20 @@ func TestValidateRayServiceSpec(t *testing.T) {
})
require.NoError(t, err, "The RayService spec is valid.")

err = ValidateRayServiceSpec(&rayv1.RayService{
ObjectMeta: metav1.ObjectMeta{
Name: strings.Repeat("j", MaxRayServiceNameLength+1),
},
})
require.ErrorContains(t, err, fmt.Sprintf("RayService name should be no more than %d characters", MaxRayServiceNameLength))

err = ValidateRayServiceSpec(&rayv1.RayService{
ObjectMeta: metav1.ObjectMeta{
Name: strings.Repeat("j", MaxRayServiceNameLength),
},
})
require.NoError(t, err)

var upgradeStrat rayv1.RayServiceUpgradeType = "invalidStrategy"
err = ValidateRayServiceSpec(&rayv1.RayService{
Spec: rayv1.RayServiceSpec{
Expand Down
104 changes: 103 additions & 1 deletion ray-operator/test/e2e/raycluster_gcs_ft_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package e2e

import (
"strings"
"testing"
"time"

Expand All @@ -21,7 +22,7 @@ const (
redisAddress = "redis:6379"
)

func TestRayClusterGCSFaultTolerence(t *testing.T) {
func TestRayClusterGCSFaultTolerance(t *testing.T) {
test := With(t)
g := NewWithT(t)

Expand Down Expand Up @@ -128,6 +129,107 @@ func TestRayClusterGCSFaultTolerence(t *testing.T) {
})
}

func TestRayClusterGCSFTWithMaximumName(t *testing.T) {
test := With(t)
g := NewWithT(t)

// Create a namespace
namespace := test.NewTestNamespace()
testScriptAC := newConfigMap(namespace.Name, files(test, "test_detached_actor_1.py", "test_detached_actor_2.py"))
testScript, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), testScriptAC, TestApplyOptions)
g.Expect(err).NotTo(HaveOccurred())

test.T().Run("Test Maximum Cluster Name and Group name (MaxRayClusterNameLength characters)", func(_ *testing.T) {
maximumRayClusterName := strings.Repeat("r", utils.MaxRayClusterNameLength)

checkRedisDBSize := deployRedis(test, namespace.Name, redisPassword)
defer g.Eventually(checkRedisDBSize, time.Second*30, time.Second).Should(BeEquivalentTo("0"))

rayClusterSpecAC := rayv1ac.RayClusterSpec().
WithGcsFaultToleranceOptions(
rayv1ac.GcsFaultToleranceOptions().
WithRedisAddress(redisAddress).
WithRedisPassword(rayv1ac.RedisCredential().WithValue(redisPassword)),
).
WithRayVersion(GetRayVersion()).
WithHeadGroupSpec(rayv1ac.HeadGroupSpec().
WithEnableIngress(true).
WithRayStartParams(map[string]string{
"num-cpus": "0",
}).
WithTemplate(headPodTemplateApplyConfiguration()),
).
WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec().
WithRayStartParams(map[string]string{
"num-cpus": "1",
}).
WithGroupName("group").
WithReplicas(1).
WithMinReplicas(1).
WithMaxReplicas(2).
WithNumOfHosts(2).
WithTemplate(workerPodTemplateApplyConfiguration()),
)
rayClusterAC := rayv1ac.RayCluster(maximumRayClusterName, namespace.Name).
WithAnnotations(map[string]string{
utils.EnableServeServiceKey: "true",
}).
WithSpec(apply(rayClusterSpecAC, mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](testScript, "/home/ray/samples")))

rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions)

g.Expect(err).NotTo(HaveOccurred())
LogWithTimestamp(test.T(), "Created RayCluster %s/%s successfully", rayCluster.Namespace, rayCluster.Name)

LogWithTimestamp(test.T(), "Waiting for RayCluster %s/%s to become ready", rayCluster.Namespace, rayCluster.Name)
g.Eventually(RayCluster(test, namespace.Name, rayCluster.Name), TestTimeoutLong).
Should(WithTransform(StatusCondition(rayv1.RayClusterProvisioned), MatchCondition(metav1.ConditionTrue, rayv1.AllPodRunningAndReadyFirstTime)))

_, err = test.Client().Core().CoreV1().Services(namespace.Name).Get(test.Ctx(), rayCluster.Name+"-head-svc", metav1.GetOptions{})
g.Expect(err).NotTo(HaveOccurred())
_, err = test.Client().Core().CoreV1().Services(namespace.Name).Get(test.Ctx(), rayCluster.Name+"-serve-svc", metav1.GetOptions{})
g.Expect(err).NotTo(HaveOccurred())
_, err = test.Client().Core().CoreV1().Services(namespace.Name).Get(test.Ctx(), rayCluster.Name+"-headless", metav1.GetOptions{})
g.Expect(err).NotTo(HaveOccurred())

headPod, err := GetHeadPod(test, rayCluster)
g.Expect(err).NotTo(HaveOccurred())

LogWithTimestamp(test.T(), "HeadPod Name: %s", headPod.Name)

rayNamespace := "testing-ray-namespace"
LogWithTimestamp(test.T(), "Ray namespace: %s", rayNamespace)

ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "samples/test_detached_actor_1.py", rayNamespace})

expectedOutput := "3"
ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "samples/test_detached_actor_2.py", rayNamespace, expectedOutput})

// Test 1: Delete the head Pod
err = test.Client().Core().CoreV1().Pods(namespace.Name).Delete(test.Ctx(), headPod.Name, metav1.DeleteOptions{})
g.Expect(err).NotTo(HaveOccurred())

PodUID := func(p *corev1.Pod) string { return string(p.UID) }
g.Eventually(HeadPod(test, rayCluster), TestTimeoutMedium).
ShouldNot(WithTransform(PodUID, Equal(string(headPod.UID)))) // Use UID to check if the new head pod is created.

// Pod Status should eventually become Running
PodState := func(p *corev1.Pod) string { return string(p.Status.Phase) }
g.Eventually(HeadPod(test, rayCluster), TestTimeoutMedium).
Should(WithTransform(PodState, Equal("Running")))

headPod, err = GetHeadPod(test, rayCluster) // Replace the old head pod
g.Expect(err).NotTo(HaveOccurred())

expectedOutput = "4"

ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "samples/test_detached_actor_2.py", rayNamespace, expectedOutput})

err = test.Client().Ray().RayV1().RayClusters(namespace.Name).Delete(test.Ctx(), rayCluster.Name, metav1.DeleteOptions{})
g.Expect(err).NotTo(HaveOccurred())
})
}

func TestGcsFaultToleranceOptions(t *testing.T) {
// Each test uses a separate namespace to utilize different Redis instances
// for better isolation.
Expand Down
Loading

0 comments on commit c9c3e9c

Please sign in to comment.