From 2169d52babddf8634c8b1e3d552e123f99d9c135 Mon Sep 17 00:00:00 2001 From: Jens Langhammer Date: Wed, 22 May 2024 22:52:02 +0200 Subject: [PATCH] add support for customised timeout closes #263 --- README.md | 11 +++++++---- cmd/root.go | 14 ++++++++++++++ go.mod | 5 +++-- pkg/migrator/migrator.go | 3 +++ pkg/migrator/validate.go | 2 +- pkg/mover/wait.go | 18 +++++++++--------- pkg/strategies/copyTwiceName.go | 8 ++++---- pkg/strategies/export.go | 2 +- pkg/strategies/import.go | 4 ++-- pkg/strategies/strategy.go | 12 +++++++++++- 10 files changed, 55 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index 824428d..e627c57 100644 --- a/README.md +++ b/README.md @@ -18,15 +18,17 @@ Download the binary of the latest release from https://github.com/BeryJu/korb/re ### Usage ``` -Error: requires at least 1 arg(s), only received 0 +Move data between Kubernetes PVCs on different Storage Classes. + Usage: korb [pvc [pvc]] [flags] Flags: - --container-image string Image to use for moving jobs (default "ghcr.io/beryju/korb-mover:latest") + --container-image string Image to use for moving jobs (default "ghcr.io/beryju/korb-mover:v2") --force Ignore warning which would normally halt the tool during validation. -h, --help help for korb --kube-config string (optional) absolute path to the kubeconfig file (default "/Users/jens/.kube/config") + --new-pvc-access-mode strings Access mode(s) for the new PVC. If empty, the access mode of the source will be used. Accepts formats like used in Kubernetes Manifests (ReadWriteOnce, ReadWriteMany, ...) --new-pvc-name string Name for the new PVC. If empty, same name will be reused. --new-pvc-namespace string Namespace for the new PVCs to be created in. If empty, the namespace from your kubeconfig file will be used. --new-pvc-size string Size for the new PVC. If empty, the size of the source will be used. Accepts formats like used in Kubernetes Manifests (Gi, Ti, ...) @@ -34,11 +36,12 @@ Flags: --skip-pvc-bind-wait Skip waiting for PVC to be bound. --source-namespace string Namespace where the old PVCs reside. If empty, the namespace from your kubeconfig file will be used. --strategy string Strategy to use, by default will try to auto-select - -requires at least 1 arg(s), only received 0 + --timeout string Overwrite auto-generated timeout (by default 60s for Pod to start, copy timeout is based on PVC size) + --tolerate-any-node Allow job to tolerating any node node taints. ``` #### Strategies + To see existing [strategies](https://github.com/BeryJu/korb/tree/main/pkg/strategies) and what they do, please check out the comments in source code of the strategy. ### Example (Moving from PVC to PVC) diff --git a/cmd/root.go b/cmd/root.go index 27dc68b..ee3ad48 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "time" "beryju.org/korb/pkg/config" "beryju.org/korb/pkg/migrator" @@ -26,6 +27,8 @@ var pvcNewAccessModes []string var force bool var skipWaitPVCBind bool var tolerateAllNodes bool +var timeout string + var Version string // rootCmd represents the base command when called without any subcommands @@ -35,10 +38,20 @@ var rootCmd = &cobra.Command{ Long: `Move data between Kubernetes PVCs on different Storage Classes.`, Args: cobra.MinimumNArgs(1), Run: func(cmd *cobra.Command, args []string) { + var t *time.Duration + if timeout != "" { + _t, err := time.ParseDuration(timeout) + if err != nil { + log.WithError(err).Panic("Failed to parse custom duration") + return + } + t = &_t + } for _, pvc := range args { m := migrator.New(kubeConfig, strategy, tolerateAllNodes) m.Force = force m.WaitForTempDestPVCBind = skipWaitPVCBind + m.Timeout = t // We can only support operating in a single namespace currently // Since cross-namespace PVC mounts are not a thing @@ -97,4 +110,5 @@ func init() { rootCmd.Flags().StringVar(&config.ContainerImage, "container-image", config.ContainerImage, "Image to use for moving jobs") rootCmd.Flags().StringVar(&strategy, "strategy", "", "Strategy to use, by default will try to auto-select") + rootCmd.Flags().StringVar(&timeout, "timeout", "", "Overwrite auto-generated timeout (by default 60s for Pod to start, copy timeout is based on PVC size)") } diff --git a/go.mod b/go.mod index 693a2ad..b6871c4 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,8 @@ module beryju.org/korb -go 1.21 -toolchain go1.22.2 +go 1.22.0 + +toolchain go1.22.3 require ( github.com/goware/prefixer v0.0.0-20160118172347-395022866408 diff --git a/pkg/migrator/migrator.go b/pkg/migrator/migrator.go index 545f15a..422d0b3 100644 --- a/pkg/migrator/migrator.go +++ b/pkg/migrator/migrator.go @@ -1,6 +1,8 @@ package migrator import ( + "time" + "beryju.org/korb/pkg/strategies" log "github.com/sirupsen/logrus" @@ -22,6 +24,7 @@ type Migrator struct { Force bool WaitForTempDestPVCBind bool TolerateAllNodes bool + Timeout *time.Duration kConfig *rest.Config kClient *kubernetes.Clientset diff --git a/pkg/migrator/validate.go b/pkg/migrator/validate.go index a4588f7..8bb41bc 100644 --- a/pkg/migrator/validate.go +++ b/pkg/migrator/validate.go @@ -14,7 +14,7 @@ func (m *Migrator) Validate() (*v1.PersistentVolumeClaim, []strategies.Strategy) if err != nil { m.log.WithError(err).Panic("Failed to get controllers") } - baseStrategy := strategies.NewBaseStrategy(m.kConfig, m.kClient, m.TolerateAllNodes) + baseStrategy := strategies.NewBaseStrategy(m.kConfig, m.kClient, m.TolerateAllNodes, m.Timeout) allStrategies := strategies.StrategyInstances(baseStrategy) compatibleStrategies := make([]strategies.Strategy, 0) ctx := strategies.MigrationContext{ diff --git a/pkg/mover/wait.go b/pkg/mover/wait.go index 45e91d8..85ed61b 100644 --- a/pkg/mover/wait.go +++ b/pkg/mover/wait.go @@ -11,9 +11,9 @@ import ( "k8s.io/apimachinery/pkg/util/wait" ) -func (m *MoverJob) getPods() []v1.Pod { +func (m *MoverJob) getPods(ctx context.Context) []v1.Pod { selector := fmt.Sprintf("job-name=%s", m.Name) - pods, err := m.kClient.CoreV1().Pods(m.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector}) + pods, err := m.kClient.CoreV1().Pods(m.Namespace).List(ctx, metav1.ListOptions{LabelSelector: selector}) if err != nil { m.log.WithError(err).Warning("Failed to get pods") return make([]v1.Pod, 0) @@ -21,11 +21,11 @@ func (m *MoverJob) getPods() []v1.Pod { return pods.Items } -func (m *MoverJob) WaitForRunning() *v1.Pod { +func (m *MoverJob) WaitForRunning(timeout time.Duration) *v1.Pod { // First we wait for all pods to be running var runningPod v1.Pod - err := wait.PollImmediate(2*time.Second, 60*time.Second, func() (bool, error) { - pods := m.getPods() + err := wait.PollUntilContextTimeout(context.Background(), 2*time.Second, timeout, true, func(ctx context.Context) (bool, error) { + pods := m.getPods(ctx) if len(pods) != 1 { return false, nil } @@ -44,16 +44,16 @@ func (m *MoverJob) WaitForRunning() *v1.Pod { return &runningPod } -func (m *MoverJob) Wait(timeout time.Duration) error { - pod := m.WaitForRunning() +func (m *MoverJob) Wait(startTimeout time.Duration, moveTimeout time.Duration) error { + pod := m.WaitForRunning(startTimeout) if pod == nil { return errors.New("pod not in correct state") } runningPod := *pod go m.followLogs(runningPod) - err := wait.PollImmediate(2*time.Second, timeout, func() (bool, error) { - job, err := m.kClient.BatchV1().Jobs(m.Namespace).Get(context.TODO(), m.kJob.Name, metav1.GetOptions{}) + err := wait.PollUntilContextTimeout(context.Background(), 2*time.Second, moveTimeout, true, func(ctx context.Context) (bool, error) { + job, err := m.kClient.BatchV1().Jobs(m.Namespace).Get(ctx, m.kJob.Name, metav1.GetOptions{}) if err != nil { return false, err } diff --git a/pkg/strategies/copyTwiceName.go b/pkg/strategies/copyTwiceName.go index 2948710..07f7a16 100644 --- a/pkg/strategies/copyTwiceName.go +++ b/pkg/strategies/copyTwiceName.go @@ -89,7 +89,7 @@ func (c *CopyTwiceNameStrategy) Do(sourcePVC *v1.PersistentVolumeClaim, destTemp c.tempMover.SourceVolume = sourcePVC c.tempMover.DestVolume = c.TempDestPVC c.tempMover.Name = fmt.Sprintf("korb-job-%s", sourcePVC.UID) - err = c.tempMover.Start().Wait(c.MoveTimeout) + err = c.tempMover.Start().Wait(c.timeout, c.MoveTimeout) if err != nil { c.log.WithError(err).Warning("Failed to move data") c.pvcsToDelete = []*v1.PersistentVolumeClaim{c.TempDestPVC} @@ -118,7 +118,7 @@ func (c *CopyTwiceNameStrategy) Do(sourcePVC *v1.PersistentVolumeClaim, destTemp c.finalMover.SourceVolume = c.TempDestPVC c.finalMover.DestVolume = c.DestPVC c.finalMover.Name = fmt.Sprintf("korb-job-%s", tempDestInst.UID) - err = c.finalMover.Start().Wait(c.MoveTimeout) + err = c.finalMover.Start().Wait(c.timeout, c.MoveTimeout) if err != nil { c.log.WithError(err).Warning("Failed to move data") c.pvcsToDelete = []*v1.PersistentVolumeClaim{c.DestPVC} @@ -157,7 +157,7 @@ func (c *CopyTwiceNameStrategy) setTimeout(pvc *v1.PersistentVolumeClaim) { } func (c *CopyTwiceNameStrategy) waitForPVCDeletion(pvc *v1.PersistentVolumeClaim) error { - return wait.Poll(2*time.Second, 30*time.Second, func() (bool, error) { + return wait.Poll(2*time.Second, c.timeout, func() (bool, error) { _, err := c.kClient.CoreV1().PersistentVolumeClaims(pvc.ObjectMeta.Namespace).Get(context.TODO(), pvc.ObjectMeta.Name, metav1.GetOptions{}) if errors.IsNotFound(err) { return true, nil @@ -168,7 +168,7 @@ func (c *CopyTwiceNameStrategy) waitForPVCDeletion(pvc *v1.PersistentVolumeClaim } func (c *CopyTwiceNameStrategy) waitForBound(pvc *v1.PersistentVolumeClaim) error { - return wait.Poll(2*time.Second, 30*time.Second, func() (bool, error) { + return wait.Poll(2*time.Second, c.timeout, func() (bool, error) { pvc, err := c.kClient.CoreV1().PersistentVolumeClaims(pvc.ObjectMeta.Namespace).Get(context.TODO(), pvc.ObjectMeta.Name, metav1.GetOptions{}) if err != nil { return false, err diff --git a/pkg/strategies/export.go b/pkg/strategies/export.go index f64a11d..844491d 100644 --- a/pkg/strategies/export.go +++ b/pkg/strategies/export.go @@ -51,7 +51,7 @@ func (c *ExportStrategy) Do(sourcePVC *v1.PersistentVolumeClaim, destTemplate *v c.tempMover.SourceVolume = sourcePVC c.tempMover.Name = fmt.Sprintf("korb-job-%s", sourcePVC.UID) - pod := c.tempMover.Start().WaitForRunning() + pod := c.tempMover.Start().WaitForRunning(c.timeout) if pod == nil { c.log.Warning("Failed to move data") return c.Cleanup() diff --git a/pkg/strategies/import.go b/pkg/strategies/import.go index d35b3eb..0d6686f 100644 --- a/pkg/strategies/import.go +++ b/pkg/strategies/import.go @@ -37,7 +37,7 @@ func (c *ImportStrategy) CompatibleWithContext(ctx MigrationContext) error { path := fmt.Sprintf("%s.tar", ctx.SourcePVC.Name) _, err := os.Stat(path) if errors.Is(err, os.ErrNotExist) { - return fmt.Errorf("Expected import file '%s' does not exist", path) + return fmt.Errorf("expected import file '%s' does not exist", path) } return nil } @@ -55,7 +55,7 @@ func (c *ImportStrategy) Do(sourcePVC *v1.PersistentVolumeClaim, destTemplate *v c.tempMover.SourceVolume = sourcePVC c.tempMover.Name = fmt.Sprintf("korb-job-%s", sourcePVC.UID) - pod := c.tempMover.Start().WaitForRunning() + pod := c.tempMover.Start().WaitForRunning(c.timeout) if pod == nil { c.log.Warning("Failed to move data") return c.Cleanup() diff --git a/pkg/strategies/strategy.go b/pkg/strategies/strategy.go index a42bd5d..f761710 100644 --- a/pkg/strategies/strategy.go +++ b/pkg/strategies/strategy.go @@ -1,6 +1,8 @@ package strategies import ( + "time" + log "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" @@ -14,13 +16,21 @@ type BaseStrategy struct { log *log.Entry tolerateAllNodes bool + timeout time.Duration } -func NewBaseStrategy(config *rest.Config, client *kubernetes.Clientset, tolerateAllNodes bool) BaseStrategy { +func NewBaseStrategy(config *rest.Config, client *kubernetes.Clientset, tolerateAllNodes bool, timeout *time.Duration) BaseStrategy { + var t time.Duration + if timeout == nil { + t = 60 * time.Second + } else { + t = *timeout + } return BaseStrategy{ kConfig: config, kClient: client, tolerateAllNodes: tolerateAllNodes, + timeout: t, log: log.WithField("component", "strategy"), } }