Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Jan 22, 2024
1 parent 55425a7 commit a68a83d
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 38 deletions.
2 changes: 1 addition & 1 deletion pkg/index/job/readreplica/rotate/service/rotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
"github.com/vdaas/vald/internal/errors"
client "github.com/vdaas/vald/internal/k8s/client"
"github.com/vdaas/vald/internal/k8s/client"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/sync/errgroup"
Expand Down
60 changes: 24 additions & 36 deletions tests/e2e/crud/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,12 @@ func init() {
flag.Parse()

var err error
kubeClient, err = client.New(*kubeConfig)
if err != nil {
panic(err)
}
if *pf {
kubeClient, err = client.New(*kubeConfig)
if err != nil {
panic(err)
}

forwarder = kubeClient.Portforward(namespace, *pfPodName, port, *pfPodPort)

err = forwarder.Start()
if err != nil {
panic(err)
Expand Down Expand Up @@ -843,46 +841,36 @@ func TestE2EReadReplica(t *testing.T) {
sleep(t, waitAfterInsertDuration)

t.Log("starting to restart all the agent pods to make it backup index to pvc...")
cmd := exec.CommandContext(ctx, "sh", "-c",
"kubectl delete pod -l app=vald-agent-ngt && kubectl wait --timeout=120s --for=condition=Ready pod -l app=vald-agent-ngt")
out, err := cmd.Output()
if err != nil {
parseCmdErrorAndFail(t, out, err)
if err := kubeClient.RolloutResource(ctx, "statefulsets/vald-agent-ngt"); err != nil {
t.Fatalf("failed to restart all the agent pods: %s", err)

}
t.Log(string(out))

t.Log("getting agent statefulset replicas...")
cmd = exec.CommandContext(ctx, "sh", "-c", "kubectl get statefulset vald-agent-ngt -o=jsonpath='{.spec.replicas}'")
out, err = cmd.Output()
t.Log("starting to create read replica rotators...")
pods, err := kubeClient.GetPods(ctx, namespace, "app=vald-agent-ngt")
if err != nil {
parseCmdErrorAndFail(t, out, err)
t.Fatalf("GetPods failed: %s", err)
}
replicasStr := string(out)
replicas, err := strconv.Atoi(replicasStr)
cronJobs, err := kubeClient.ListCronJob(ctx, namespace, "app=vald-readreplica-rotate")
if err != nil {
t.Fatalf("failed to parse replicas: %s", err)
t.Fatalf("ListCronJob failed: %s", err)
}
t.Log("statefulset replicas found as:" + string(out))

t.Log("starting to create read replica rotators...")
for id := 0; id < replicas; id++ {
patchCmd := fmt.Sprintf(`kubectl patch cronjob vald-readreplica-rotate --namespace=default --type='json' -p='[{"op": "replace", "path": "/spec/jobTemplate/spec/template/spec/containers/0/env/0/value", "value": "%d"}]'`, id)
createCmd := fmt.Sprintf("kubectl create job vald-readreplica-rotate-%d --from=cronjob/vald-readreplica-rotate", id)
cmd := exec.CommandContext(ctx, "sh", "-c", patchCmd+" && "+createCmd)
out, err := cmd.Output()
if err != nil {
parseCmdErrorAndFail(t, out, err)
}
t.Log(string(out))
cronJob := cronJobs[0]
for id := 0; id < len(pods); id++ {
cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Env[0].Value = strconv.Itoa(id)
kubeClient.CreateJobFromCronJob(ctx, "vald-readreplica-rotate-"+strconv.Itoa(id), namespace, &cronJob)
}

t.Log("waiting for read replica rotator jobs to complete...")
cmd = exec.CommandContext(ctx, "sh", "-c", "kubectl wait --timeout=120s --for=condition=complete job -l app=vald-readreplica-rotate")
out, err = cmd.Output()
if err != nil {
parseCmdErrorAndFail(t, out, err)
// cmd := exec.CommandContext(ctx, "sh", "-c", "kubectl wait --timeout=120s --for=condition=complete job -l app=vald-readreplica-rotate")
// out, err := cmd.Output()
// if err != nil {
// parseCmdErrorAndFail(t, out, err)
// }
// t.Log(string(out))
if err := kubeClient.WaitResources(ctx, "job", "app=vald-readreplica-rotate", "complete", "120s"); err != nil {
t.Fatalf("failed to wait for read replica rotator jobs to complete: %s", err)
}
t.Log(string(out))

err = op.Search(t, ctx, operation.Dataset{
Test: ds.Test[searchFrom : searchFrom+searchNum],
Expand Down
106 changes: 105 additions & 1 deletion tests/e2e/kubernetes/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ package client

import (
"context"
"fmt"
"os"
"os/exec"
"time"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/file"
"github.com/vdaas/vald/internal/strings"
"github.com/vdaas/vald/tests/e2e/kubernetes/portforward"
v1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -58,6 +62,29 @@ type Client interface {
namespace, name string,
timeout time.Duration,
) (ok bool, err error)
WaitForPodsReady(
ctx context.Context,
namespace, labelSelector, timeout string,
) error
ListCronJob(
ctx context.Context,
namespace, labelSelector string,
) ([]v1.CronJob, error)
CreateJob(
ctx context.Context,
namespace string,
job *v1.Job,
) error
CreateJobFromCronJob(
ctx context.Context,
name, namespace string,
cronJob *v1.CronJob,
) error
RolloutResource(
ctx context.Context,
resource string,
) error
WaitResources(ctx context.Context, resource, labelSelector, condition, timeout string) error
}

type client struct {
Expand Down Expand Up @@ -109,7 +136,6 @@ func (cli *client) GetPod(
if err != nil {
return nil, err
}

return pod, nil
}

Expand Down Expand Up @@ -174,3 +200,81 @@ func (cli *client) WaitForPodReady(
}
}
}

func (cli *client) RolloutResource(ctx context.Context, resource string) error {
cmd := exec.CommandContext(ctx, "sh", "-c",
fmt.Sprintf("kubectl rollout restart %s && kubectl rollout status %s", resource, resource),
)
out, err := cmd.Output()
if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
return errors.New(string(exitErr.Stderr))
} else {
return fmt.Errorf("unexpected error: %w", err)
}
}
fmt.Println(string(out))
return nil
}

func (cli *client) WaitResources(ctx context.Context, resource, labelSelector, condition, timeout string) error {
cmd := exec.CommandContext(ctx, "sh", "-c",
fmt.Sprintf("kubectl wait --for=condition=%s %s -l %s --timeout %s", condition, resource, labelSelector, timeout),
)
out, err := cmd.Output()
if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
return errors.New(string(exitErr.Stderr))
} else {
return fmt.Errorf("unexpected error: %w", err)
}
}
fmt.Println(string(out))
return nil
}

func (cli *client) WaitForPodsReady(ctx context.Context, namespace, labelSelector, timeout string) error {
// use kubectl wait because it's complicated to implement this with client-go
cmd := exec.CommandContext(ctx, "sh", "-c",
fmt.Sprintf("kubectl wait --timeout=%s --for=condition=Ready pod -n %s -l %s", timeout, namespace, labelSelector),
)
out, err := cmd.Output()
if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
return errors.New(string(exitErr.Stderr))
} else {
return fmt.Errorf("unexpected error: %w", err)
}
}
fmt.Println(string(out))
return nil
}

func (cli *client) ListCronJob(ctx context.Context, namespace, labelSelector string) ([]v1.CronJob, error) {
cronJobs, err := cli.clientset.BatchV1().CronJobs(namespace).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return nil, err
}

return cronJobs.Items, nil
}

func (cli *client) CreateJob(ctx context.Context, namespace string, job *v1.Job) error {
_, err := cli.clientset.BatchV1().Jobs(namespace).Create(ctx, job, metav1.CreateOptions{})
return err
}

func (cli *client) CreateJobFromCronJob(ctx context.Context, name, namespace string, cronJob *v1.CronJob) error {
job := &v1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: cronJob.Spec.JobTemplate.Spec,
}

_, err := cli.clientset.BatchV1().Jobs(namespace).Create(ctx, job, metav1.CreateOptions{})
return err
}

0 comments on commit a68a83d

Please sign in to comment.