Skip to content

Commit

Permalink
feat: support cancelling restores
Browse files Browse the repository at this point in the history
  • Loading branch information
shreddedbacon committed Dec 30, 2024
1 parent 45fff5a commit 5596039
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 43 deletions.
32 changes: 32 additions & 0 deletions internal/helpers/helper_types.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
package helpers

import (
"context"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// LagoonEnvironmentVariable is used to define Lagoon environment variables.
type LagoonEnvironmentVariable struct {
Name string `json:"name"`
Expand Down Expand Up @@ -31,3 +39,27 @@ type LagoonAPIConfiguration struct {
SSHHost string
SSHPort string
}

func K8UPVersions(ctx context.Context, c client.Client) (bool, bool, error) {
k8upv1alpha1Exists := false
k8upv1Exists := false
crdv1alpha1 := &apiextensionsv1.CustomResourceDefinition{}
if err := c.Get(context.TODO(), types.NamespacedName{Name: "restores.backup.appuio.ch"}, crdv1alpha1); err != nil {
if err := IgnoreNotFound(err); err != nil {
return k8upv1alpha1Exists, k8upv1Exists, err
}
}
if crdv1alpha1.ObjectMeta.Name == "restores.backup.appuio.ch" {
k8upv1alpha1Exists = true
}
crdv1 := &apiextensionsv1.CustomResourceDefinition{}
if err := c.Get(context.TODO(), types.NamespacedName{Name: "restores.k8up.io"}, crdv1); err != nil {
if err := IgnoreNotFound(err); err != nil {
return k8upv1alpha1Exists, k8upv1Exists, err
}
}
if crdv1.ObjectMeta.Name == "restores.k8up.io" {
k8upv1Exists = true
}
return k8upv1alpha1Exists, k8upv1Exists, nil
}
46 changes: 45 additions & 1 deletion internal/messenger/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,14 +406,25 @@ func (m *Messenger) Consumer(targetName string) { //error {
}
}
case "deploytarget:restic:backup:restore", "kubernetes:restic:backup:restore":
v1alpha1, v1, err := helpers.K8UPVersions(ctx, m.Client)
if err != nil {
//@TODO: send msg back to lagoon and update task to failed?
message.Ack(false) // ack to remove from queue
return
}
if !v1alpha1 && !v1 {
// k8up not installed
message.Ack(false) // ack to remove from queue
return
}
opLog.Info(
fmt.Sprintf(
"Received backup restoration for project %s, environment %s",
jobSpec.Project.Name,
jobSpec.Environment.Name,
),
)
err := m.ResticRestore(namespace, jobSpec)
err = m.ResticRestore(ctx, namespace, jobSpec, v1alpha1, v1, false)
if err != nil {
opLog.Error(err,
fmt.Sprintf(
Expand All @@ -426,6 +437,39 @@ func (m *Messenger) Consumer(targetName string) { //error {
message.Ack(false) // ack to remove from queue
return
}
case "deploytarget:restic:cancel:restore":
v1alpha1, v1, err := helpers.K8UPVersions(ctx, m.Client)
if err != nil {
//@TODO: send msg back to lagoon and update task to failed?
message.Ack(false) // ack to remove from queue
return
}
if !v1alpha1 && !v1 {
// k8up not installed
message.Ack(false) // ack to remove from queue
return
}
// if this is a request to cancel a restore attempt
opLog.Info(
fmt.Sprintf(
"Received restore cancellation for project %s, environment %s",
jobSpec.Project.Name,
jobSpec.Environment.Name,
),
)
err = m.ResticRestore(ctx, namespace, jobSpec, v1alpha1, v1, true)
if err != nil {
opLog.Error(err,
fmt.Sprintf(
"Cancel restore for project %s, environment %s failed",
jobSpec.Project.Name,
jobSpec.Environment.Name,
),
)
//@TODO: send msg back to lagoon and update task to failed?
message.Ack(false) // ack to remove from queue
return
}
case "deploytarget:route:migrate", "kubernetes:route:migrate", "openshift:route:migrate":
opLog.Info(
fmt.Sprintf(
Expand Down
164 changes: 122 additions & 42 deletions internal/messenger/tasks_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@ import (
"fmt"

"github.com/go-logr/logr"
"github.com/uselagoon/machinery/api/schema"
lagoonv1beta2 "github.com/uselagoon/remote-controller/api/lagoon/v1beta2"
"github.com/uselagoon/remote-controller/internal/helpers"
"k8s.io/apimachinery/pkg/types"

ctrl "sigs.k8s.io/controller-runtime"

k8upv1 "github.com/k8up-io/k8up/v2/api/v1"
k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/types"
)

type cancelRestore struct {
RestoreName string `json:"restoreName"`
BackupID string `json:"backupId"`
}

// ResticRestore handles creating the restic restore jobs.
func (m *Messenger) ResticRestore(namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
func (m *Messenger) ResticRestore(ctx context.Context, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec, v1alpha1, v1, cancel bool) error {
opLog := ctrl.Log.WithName("handlers").WithName("LagoonTasks")
vers, err := checkRestoreVersionFromCore(jobSpec.Misc.MiscResource)
if err != nil {
Expand All @@ -31,51 +37,41 @@ func (m *Messenger) ResticRestore(namespace string, jobSpec *lagoonv1beta2.Lagoo
return nil
}

// check if k8up crds exist in the cluster
k8upv1alpha1Exists := false
k8upv1Exists := false
crdv1alpha1 := &apiextensionsv1.CustomResourceDefinition{}
if err = m.Client.Get(context.TODO(), types.NamespacedName{Name: "restores.backup.appuio.ch"}, crdv1alpha1); err != nil {
if err := helpers.IgnoreNotFound(err); err != nil {
return err
}
}
if crdv1alpha1.ObjectMeta.Name == "restores.backup.appuio.ch" {
k8upv1alpha1Exists = true
}
crdv1 := &apiextensionsv1.CustomResourceDefinition{}
if err = m.Client.Get(context.TODO(), types.NamespacedName{Name: "restores.k8up.io"}, crdv1); err != nil {
if err := helpers.IgnoreNotFound(err); err != nil {
return err
}
}
if crdv1.ObjectMeta.Name == "restores.k8up.io" {
k8upv1Exists = true
}
handlev1alpha1 := false
handlev1 := false
// check the version, if there is no version in the payload, assume it is k8up v2
if m.SupportK8upV2 {
if vers == "backup.appuio.ch/v1alpha1" {
if k8upv1alpha1Exists {
return m.createv1alpha1Restore(opLog, namespace, jobSpec)
if v1alpha1 {
handlev1alpha1 = true
}
} else {
if k8upv1Exists {
if err := m.createv1Restore(opLog, namespace, jobSpec); err != nil {
return err
}
if v1 {
handlev1 = true
} else {
if k8upv1alpha1Exists {
if err := m.createv1alpha1Restore(opLog, namespace, jobSpec); err != nil {
return err
}
if v1alpha1 {
handlev1alpha1 = true
}
}
}
} else {
if k8upv1alpha1Exists {
if err := m.createv1alpha1Restore(opLog, namespace, jobSpec); err != nil {
return err
}
if v1alpha1 {
handlev1alpha1 = true
}
}

if handlev1alpha1 {
if cancel {
return m.cancelv1alpha1Restore(ctx, opLog, namespace, jobSpec)
} else {
return m.createv1alpha1Restore(ctx, opLog, namespace, jobSpec)
}
}
if handlev1 {
if cancel {
return m.cancelv1Restore(ctx, opLog, namespace, jobSpec)
} else {
return m.createv1Restore(ctx, opLog, namespace, jobSpec)
}
}
return nil
Expand All @@ -97,7 +93,7 @@ func checkRestoreVersionFromCore(resource []byte) (string, error) {
}

// createv1alpha1Restore will create a restore task using the restores.backup.appuio.ch v1alpha1 api (k8up v1)
func (m *Messenger) createv1alpha1Restore(opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
func (m *Messenger) createv1alpha1Restore(ctx context.Context, opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
restorev1alpha1 := &k8upv1alpha1.Restore{}
if err := json.Unmarshal(jobSpec.Misc.MiscResource, restorev1alpha1); err != nil {
opLog.Error(err,
Expand All @@ -109,7 +105,7 @@ func (m *Messenger) createv1alpha1Restore(opLog logr.Logger, namespace string, j
return err
}
restorev1alpha1.SetNamespace(namespace)
if err := m.Client.Create(context.Background(), restorev1alpha1); err != nil {
if err := m.Client.Create(ctx, restorev1alpha1); err != nil {
opLog.Error(err,
fmt.Sprintf(
"Unable to create restore %s with k8up v1alpha1 api.",
Expand All @@ -122,7 +118,7 @@ func (m *Messenger) createv1alpha1Restore(opLog logr.Logger, namespace string, j
}

// createv1Restore will create a restore task using the restores.k8up.io v1 api (k8up v2)
func (m *Messenger) createv1Restore(opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
func (m *Messenger) createv1Restore(ctx context.Context, opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
restorev1 := &k8upv1.Restore{}
if err := json.Unmarshal(jobSpec.Misc.MiscResource, restorev1); err != nil {
opLog.Error(err,
Expand All @@ -134,7 +130,7 @@ func (m *Messenger) createv1Restore(opLog logr.Logger, namespace string, jobSpec
return err
}
restorev1.SetNamespace(namespace)
if err := m.Client.Create(context.Background(), restorev1); err != nil {
if err := m.Client.Create(ctx, restorev1); err != nil {
opLog.Error(err,
fmt.Sprintf(
"Unable to create restore %s with k8up v1 api.",
Expand All @@ -145,3 +141,87 @@ func (m *Messenger) createv1Restore(opLog logr.Logger, namespace string, jobSpec
}
return nil
}

// cancelv1alpha1Restore will attempt to cancel a restore task using the restores.backup.appuio.ch v1alpha1 api (k8up v1)
func (m *Messenger) cancelv1alpha1Restore(ctx context.Context, opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
restorev1alpha1 := &k8upv1alpha1.Restore{}
cr := &cancelRestore{}
if err := json.Unmarshal(jobSpec.Misc.MiscResource, &cr); err != nil {
return err
}
if err := m.Client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: cr.RestoreName}, restorev1alpha1); helpers.IgnoreNotFound(err) != nil {
opLog.Error(err,
fmt.Sprintf(
"Unable to get restore %s with k8up v1alpha1 api.",
cr.RestoreName,
),
)
return err
}
if restorev1alpha1.Name != "" {
if err := m.Client.Delete(ctx, restorev1alpha1); err != nil {
opLog.Error(err,
fmt.Sprintf(
"Unable to delete restore %s with k8up v1alpha1 api.",
cr.RestoreName,
),
)
return err
}
}
// if no matching restore found, or the restore is deleted, send the cancellation message back to core
m.pubRestoreCancel(opLog, namespace, cr.RestoreName, jobSpec)
return nil
}

// cancelv1Restore will attempt to cancel a restore task using the restores.k8up.io v1 api (k8up v2)
func (m *Messenger) cancelv1Restore(ctx context.Context, opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
restorev1 := &k8upv1.Restore{}
cr := &cancelRestore{}
if err := json.Unmarshal(jobSpec.Misc.MiscResource, &cr); err != nil {
return err
}
if err := m.Client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: cr.RestoreName}, restorev1); helpers.IgnoreNotFound(err) != nil {
opLog.Error(err,
fmt.Sprintf(
"Unable to get restore %s with k8up v1 api.",
cr.RestoreName,
),
)
return err
}
if restorev1.Name != "" {
if err := m.Client.Delete(ctx, restorev1); err != nil {
opLog.Error(err,
fmt.Sprintf(
"Unable to delete restore %s with k8up v1alpha1 api.",
cr.RestoreName,
),
)
return err
}
}
// if no matching restore found, or the restore is deleted, send the cancellation message back to core
m.pubRestoreCancel(opLog, namespace, cr.RestoreName, jobSpec)
return nil
}

func (m *Messenger) pubRestoreCancel(opLog logr.Logger, namespace, restorename string, jobSpec *lagoonv1beta2.LagoonTaskSpec) {
msg := schema.LagoonMessage{
Type: "restore:cancel",
Namespace: namespace,
Meta: &schema.LagoonLogMeta{
Environment: jobSpec.Environment.Name,
Project: jobSpec.Project.Name,
JobName: restorename,
},
}
msgBytes, err := json.Marshal(msg)
if err != nil {
opLog.Error(err, "Unable to encode message as JSON")
}
// publish the cancellation result back to lagoon
if err := m.Publish("lagoon-tasks:controller", msgBytes); err != nil {
opLog.Error(err, "Unable to publish message.")
}
}
30 changes: 30 additions & 0 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,36 @@ var _ = Describe("controller", Ordered, func() {
Expect(strings.TrimSpace(string(result))).To(Equal(string(testResult)))
}

By("validating that restore cancellations are working")
By("creating a restore cancellation task via rabbitmq")
cmd = exec.Command(
"curl",
"-s",
"-u",
"guest:guest",
"-H",
"'Accept: application/json'",
"-H",
"'Content-Type:application/json'",
"-X",
"POST",
"-d",
"@test/e2e/testdata/cancel-restore.json",
"http://172.17.0.1:15672/api/exchanges/%2f/lagoon-tasks/publish",
)
_, err = utils.Run(cmd)
ExpectWithOffset(1, err).NotTo(HaveOccurred())

time.Sleep(10 * time.Second)

By("validating that the restore is deleted")
cmd = exec.Command("kubectl", "get",
"restores.k8up.io", "restore-bf072a0-uqxqo4",
"-n", "nginx-example-main",
)
_, err = utils.Run(cmd)
ExpectWithOffset(1, err).To(HaveOccurred())

By("validating that the harbor robot credentials get rotated successfully")
cmd = exec.Command("kubectl", "get",
"pods", "-l", "control-plane=controller-manager",
Expand Down
17 changes: 17 additions & 0 deletions test/e2e/testdata/cancel-restore.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{"properties":{"delivery_mode":2},"routing_key":"ci-local-controller-kubernetes:misc",
"payload":"{
\"misc\":{
\"miscResource\":\"eyJyZXN0b3JlTmFtZSI6InJlc3RvcmUtYmYwNzJhMC11cXhxbzQiLCJiYWNrdXBJZCI6ImJmMDcyYTA5ZTE3NzI2ZGE1NGFkYzc5OTM2ZWM4NzQ1NTIxOTkzNTk5ZDQxMjExZGZjOTQ2NmRmZDViYzMyYTUifQ==\"
},
\"key\":\"deploytarget:restic:cancel:restore\",
\"environment\":{
\"name\":\"main\",
\"openshiftProjectName\":\"nginx-example-main\"
},
\"project\":{
\"name\":\"nginx-example\"
},
\"advancedTask\":{}
}",
"payload_encoding":"string"
}

0 comments on commit 5596039

Please sign in to comment.