Skip to content

Commit

Permalink
add support for customised timeout
Browse files Browse the repository at this point in the history
closes #263
  • Loading branch information
BeryJu committed May 22, 2024
1 parent 79c45eb commit 2169d52
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 24 deletions.
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,30 @@ Download the binary of the latest release from https://github.com/BeryJu/korb/re
### Usage

```
Error: requires at least 1 arg(s), only received 0
Move data between Kubernetes PVCs on different Storage Classes.
Usage:
korb [pvc [pvc]] [flags]
Flags:
--container-image string Image to use for moving jobs (default "ghcr.io/beryju/korb-mover:latest")
--container-image string Image to use for moving jobs (default "ghcr.io/beryju/korb-mover:v2")
--force Ignore warning which would normally halt the tool during validation.
-h, --help help for korb
--kube-config string (optional) absolute path to the kubeconfig file (default "/Users/jens/.kube/config")
--new-pvc-access-mode strings Access mode(s) for the new PVC. If empty, the access mode of the source will be used. Accepts formats like used in Kubernetes Manifests (ReadWriteOnce, ReadWriteMany, ...)
--new-pvc-name string Name for the new PVC. If empty, same name will be reused.
--new-pvc-namespace string Namespace for the new PVCs to be created in. If empty, the namespace from your kubeconfig file will be used.
--new-pvc-size string Size for the new PVC. If empty, the size of the source will be used. Accepts formats like used in Kubernetes Manifests (Gi, Ti, ...)
--new-pvc-storage-class string Storage class to use for the new PVC. If empty, the storage class of the source will be used.
--skip-pvc-bind-wait Skip waiting for PVC to be bound.
--source-namespace string Namespace where the old PVCs reside. If empty, the namespace from your kubeconfig file will be used.
--strategy string Strategy to use, by default will try to auto-select
requires at least 1 arg(s), only received 0
--timeout string Overwrite auto-generated timeout (by default 60s for Pod to start, copy timeout is based on PVC size)
--tolerate-any-node Allow job to tolerating any node node taints.
```

#### Strategies

To see existing [strategies](https://github.com/BeryJu/korb/tree/main/pkg/strategies) and what they do, please check out the comments in source code of the strategy.

### Example (Moving from PVC to PVC)
Expand Down
14 changes: 14 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"path/filepath"
"time"

"beryju.org/korb/pkg/config"
"beryju.org/korb/pkg/migrator"
Expand All @@ -26,6 +27,8 @@ var pvcNewAccessModes []string
var force bool
var skipWaitPVCBind bool
var tolerateAllNodes bool
var timeout string

var Version string

// rootCmd represents the base command when called without any subcommands
Expand All @@ -35,10 +38,20 @@ var rootCmd = &cobra.Command{
Long: `Move data between Kubernetes PVCs on different Storage Classes.`,
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
var t *time.Duration
if timeout != "" {
_t, err := time.ParseDuration(timeout)
if err != nil {
log.WithError(err).Panic("Failed to parse custom duration")
return
}
t = &_t
}
for _, pvc := range args {
m := migrator.New(kubeConfig, strategy, tolerateAllNodes)
m.Force = force
m.WaitForTempDestPVCBind = skipWaitPVCBind
m.Timeout = t

// We can only support operating in a single namespace currently
// Since cross-namespace PVC mounts are not a thing
Expand Down Expand Up @@ -97,4 +110,5 @@ func init() {

rootCmd.Flags().StringVar(&config.ContainerImage, "container-image", config.ContainerImage, "Image to use for moving jobs")
rootCmd.Flags().StringVar(&strategy, "strategy", "", "Strategy to use, by default will try to auto-select")
rootCmd.Flags().StringVar(&timeout, "timeout", "", "Overwrite auto-generated timeout (by default 60s for Pod to start, copy timeout is based on PVC size)")
}
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
module beryju.org/korb

go 1.21
toolchain go1.22.2
go 1.22.0

toolchain go1.22.3

require (
github.com/goware/prefixer v0.0.0-20160118172347-395022866408
Expand Down
3 changes: 3 additions & 0 deletions pkg/migrator/migrator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package migrator

import (
"time"

"beryju.org/korb/pkg/strategies"
log "github.com/sirupsen/logrus"

Expand All @@ -22,6 +24,7 @@ type Migrator struct {
Force bool
WaitForTempDestPVCBind bool
TolerateAllNodes bool
Timeout *time.Duration

kConfig *rest.Config
kClient *kubernetes.Clientset
Expand Down
2 changes: 1 addition & 1 deletion pkg/migrator/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func (m *Migrator) Validate() (*v1.PersistentVolumeClaim, []strategies.Strategy)
if err != nil {
m.log.WithError(err).Panic("Failed to get controllers")
}
baseStrategy := strategies.NewBaseStrategy(m.kConfig, m.kClient, m.TolerateAllNodes)
baseStrategy := strategies.NewBaseStrategy(m.kConfig, m.kClient, m.TolerateAllNodes, m.Timeout)
allStrategies := strategies.StrategyInstances(baseStrategy)
compatibleStrategies := make([]strategies.Strategy, 0)
ctx := strategies.MigrationContext{
Expand Down
18 changes: 9 additions & 9 deletions pkg/mover/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
)

func (m *MoverJob) getPods() []v1.Pod {
func (m *MoverJob) getPods(ctx context.Context) []v1.Pod {
selector := fmt.Sprintf("job-name=%s", m.Name)
pods, err := m.kClient.CoreV1().Pods(m.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector})
pods, err := m.kClient.CoreV1().Pods(m.Namespace).List(ctx, metav1.ListOptions{LabelSelector: selector})
if err != nil {
m.log.WithError(err).Warning("Failed to get pods")
return make([]v1.Pod, 0)
}
return pods.Items
}

func (m *MoverJob) WaitForRunning() *v1.Pod {
func (m *MoverJob) WaitForRunning(timeout time.Duration) *v1.Pod {
// First we wait for all pods to be running
var runningPod v1.Pod
err := wait.PollImmediate(2*time.Second, 60*time.Second, func() (bool, error) {
pods := m.getPods()
err := wait.PollUntilContextTimeout(context.Background(), 2*time.Second, timeout, true, func(ctx context.Context) (bool, error) {
pods := m.getPods(ctx)
if len(pods) != 1 {
return false, nil
}
Expand All @@ -44,16 +44,16 @@ func (m *MoverJob) WaitForRunning() *v1.Pod {
return &runningPod
}

func (m *MoverJob) Wait(timeout time.Duration) error {
pod := m.WaitForRunning()
func (m *MoverJob) Wait(startTimeout time.Duration, moveTimeout time.Duration) error {
pod := m.WaitForRunning(startTimeout)
if pod == nil {
return errors.New("pod not in correct state")
}
runningPod := *pod
go m.followLogs(runningPod)

err := wait.PollImmediate(2*time.Second, timeout, func() (bool, error) {
job, err := m.kClient.BatchV1().Jobs(m.Namespace).Get(context.TODO(), m.kJob.Name, metav1.GetOptions{})
err := wait.PollUntilContextTimeout(context.Background(), 2*time.Second, moveTimeout, true, func(ctx context.Context) (bool, error) {
job, err := m.kClient.BatchV1().Jobs(m.Namespace).Get(ctx, m.kJob.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/strategies/copyTwiceName.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (c *CopyTwiceNameStrategy) Do(sourcePVC *v1.PersistentVolumeClaim, destTemp
c.tempMover.SourceVolume = sourcePVC
c.tempMover.DestVolume = c.TempDestPVC
c.tempMover.Name = fmt.Sprintf("korb-job-%s", sourcePVC.UID)
err = c.tempMover.Start().Wait(c.MoveTimeout)
err = c.tempMover.Start().Wait(c.timeout, c.MoveTimeout)
if err != nil {
c.log.WithError(err).Warning("Failed to move data")
c.pvcsToDelete = []*v1.PersistentVolumeClaim{c.TempDestPVC}
Expand Down Expand Up @@ -118,7 +118,7 @@ func (c *CopyTwiceNameStrategy) Do(sourcePVC *v1.PersistentVolumeClaim, destTemp
c.finalMover.SourceVolume = c.TempDestPVC
c.finalMover.DestVolume = c.DestPVC
c.finalMover.Name = fmt.Sprintf("korb-job-%s", tempDestInst.UID)
err = c.finalMover.Start().Wait(c.MoveTimeout)
err = c.finalMover.Start().Wait(c.timeout, c.MoveTimeout)
if err != nil {
c.log.WithError(err).Warning("Failed to move data")
c.pvcsToDelete = []*v1.PersistentVolumeClaim{c.DestPVC}
Expand Down Expand Up @@ -157,7 +157,7 @@ func (c *CopyTwiceNameStrategy) setTimeout(pvc *v1.PersistentVolumeClaim) {
}

func (c *CopyTwiceNameStrategy) waitForPVCDeletion(pvc *v1.PersistentVolumeClaim) error {
return wait.Poll(2*time.Second, 30*time.Second, func() (bool, error) {
return wait.Poll(2*time.Second, c.timeout, func() (bool, error) {
_, err := c.kClient.CoreV1().PersistentVolumeClaims(pvc.ObjectMeta.Namespace).Get(context.TODO(), pvc.ObjectMeta.Name, metav1.GetOptions{})
if errors.IsNotFound(err) {
return true, nil
Expand All @@ -168,7 +168,7 @@ func (c *CopyTwiceNameStrategy) waitForPVCDeletion(pvc *v1.PersistentVolumeClaim
}

func (c *CopyTwiceNameStrategy) waitForBound(pvc *v1.PersistentVolumeClaim) error {
return wait.Poll(2*time.Second, 30*time.Second, func() (bool, error) {
return wait.Poll(2*time.Second, c.timeout, func() (bool, error) {
pvc, err := c.kClient.CoreV1().PersistentVolumeClaims(pvc.ObjectMeta.Namespace).Get(context.TODO(), pvc.ObjectMeta.Name, metav1.GetOptions{})
if err != nil {
return false, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/strategies/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (c *ExportStrategy) Do(sourcePVC *v1.PersistentVolumeClaim, destTemplate *v
c.tempMover.SourceVolume = sourcePVC
c.tempMover.Name = fmt.Sprintf("korb-job-%s", sourcePVC.UID)

pod := c.tempMover.Start().WaitForRunning()
pod := c.tempMover.Start().WaitForRunning(c.timeout)
if pod == nil {
c.log.Warning("Failed to move data")
return c.Cleanup()
Expand Down
4 changes: 2 additions & 2 deletions pkg/strategies/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (c *ImportStrategy) CompatibleWithContext(ctx MigrationContext) error {
path := fmt.Sprintf("%s.tar", ctx.SourcePVC.Name)
_, err := os.Stat(path)
if errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("Expected import file '%s' does not exist", path)
return fmt.Errorf("expected import file '%s' does not exist", path)
}
return nil
}
Expand All @@ -55,7 +55,7 @@ func (c *ImportStrategy) Do(sourcePVC *v1.PersistentVolumeClaim, destTemplate *v
c.tempMover.SourceVolume = sourcePVC
c.tempMover.Name = fmt.Sprintf("korb-job-%s", sourcePVC.UID)

pod := c.tempMover.Start().WaitForRunning()
pod := c.tempMover.Start().WaitForRunning(c.timeout)
if pod == nil {
c.log.Warning("Failed to move data")
return c.Cleanup()
Expand Down
12 changes: 11 additions & 1 deletion pkg/strategies/strategy.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package strategies

import (
"time"

log "github.com/sirupsen/logrus"

v1 "k8s.io/api/core/v1"
Expand All @@ -14,13 +16,21 @@ type BaseStrategy struct {

log *log.Entry
tolerateAllNodes bool
timeout time.Duration
}

func NewBaseStrategy(config *rest.Config, client *kubernetes.Clientset, tolerateAllNodes bool) BaseStrategy {
func NewBaseStrategy(config *rest.Config, client *kubernetes.Clientset, tolerateAllNodes bool, timeout *time.Duration) BaseStrategy {
var t time.Duration
if timeout == nil {
t = 60 * time.Second
} else {
t = *timeout
}
return BaseStrategy{
kConfig: config,
kClient: client,
tolerateAllNodes: tolerateAllNodes,
timeout: t,
log: log.WithField("component", "strategy"),
}
}
Expand Down

0 comments on commit 2169d52

Please sign in to comment.