Skip to content

Commit

Permalink
Adds support for syncing/suspending/inventorying generic resources
Browse files Browse the repository at this point in the history
  • Loading branch information
foot committed Oct 20, 2023
1 parent dad5ebe commit 86a9aaa
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 109 deletions.
64 changes: 59 additions & 5 deletions core/fluxsync/adapters.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package fluxsync

import (
"errors"

helmv2 "github.com/fluxcd/helm-controller/api/v2beta1"
imgautomationv1 "github.com/fluxcd/image-automation-controller/api/v1beta1"
reflectorv1 "github.com/fluxcd/image-reflector-controller/api/v1beta2"
kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1"
"github.com/fluxcd/pkg/apis/meta"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
sourcev1b2 "github.com/fluxcd/source-controller/api/v1beta2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -302,6 +303,49 @@ func (obj ImageUpdateAutomationAdapter) DeepCopyClientObject() client.Object {
return obj.DeepCopy()
}

type UnstructuredAdapter struct {
*unstructured.Unstructured
}

func (obj UnstructuredAdapter) GetLastHandledReconcileRequest() string {
if val, found, _ := unstructured.NestedString(obj.Object, "status", "lastHandledReconcileAt"); found {
return val
}
return ""
}

func (obj UnstructuredAdapter) GetConditions() []metav1.Condition {
conditionsSlice, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions")
if !found || err != nil {
return nil
}

var conditions []metav1.Condition
for _, c := range conditionsSlice {
var condition metav1.Condition
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(c.(map[string]interface{}), &condition); err != nil {
continue
}
conditions = append(conditions, condition)
}

return conditions
}

func (obj UnstructuredAdapter) AsClientObject() client.Object {
// Important for the client reflection stuff to work
// We can't return just `obj` here as it seems to break stuff.
return obj.Unstructured
}

func (obj UnstructuredAdapter) SetSuspended(suspend bool) {
unstructured.SetNestedField(obj.Object, suspend, "spec", "suspend")

Check failure on line 342 in core/fluxsync/adapters.go

View workflow job for this annotation

GitHub Actions / CI Check Static Checks (1.20.X, 16.X)

Error return value of `unstructured.SetNestedField` is not checked (errcheck)
}

func (obj UnstructuredAdapter) DeepCopyClientObject() client.Object {
return obj.DeepCopy()
}

type sRef struct {
apiVersion string
name string
Expand All @@ -325,8 +369,8 @@ func (s sRef) Kind() string {
return s.kind
}

func ToReconcileable(kind string) (client.ObjectList, Reconcilable, error) {
switch kind {
func ToReconcileable(gvk schema.GroupVersionKind) (client.ObjectList, Reconcilable, error) {
switch gvk.Kind {
case kustomizev1.KustomizationKind:
return &kustomizev1.KustomizationList{}, NewReconcileable(&kustomizev1.Kustomization{}), nil

Expand Down Expand Up @@ -355,5 +399,15 @@ func ToReconcileable(kind string) (client.ObjectList, Reconcilable, error) {
return &imgautomationv1.ImageUpdateAutomationList{}, NewReconcileable(&imgautomationv1.ImageUpdateAutomation{}), nil
}

return nil, nil, errors.New("could not find source type")
return ToUnstructuredReconcilable(gvk)
}

func ToUnstructuredReconcilable(gvk schema.GroupVersionKind) (client.ObjectList, Reconcilable, error) {
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(gvk)

objList := &unstructured.UnstructuredList{}
objList.SetGroupVersionKind(gvk)

return objList, UnstructuredAdapter{Unstructured: obj}, nil
}
131 changes: 69 additions & 62 deletions core/server/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"strings"
Expand Down Expand Up @@ -53,17 +54,17 @@ func (cs *coreServer) GetInventory(ctx context.Context, msg *pb.GetInventoryRequ
if err != nil {
return nil, fmt.Errorf("failed getting helm Release inventory: %w", err)
}
// case kustomizev1.KustomizationKind:
// inventoryRefs, err = cs.getKustomizationInventory(ctx, client, msg.Name, msg.Namespace)
// if err != nil {
// return nil, fmt.Errorf("failed getting kustomization inventory: %w", err)
// }
case kustomizev1.KustomizationKind:
inventoryRefs, err = cs.getKustomizationInventory(ctx, client, msg.Name, msg.Namespace)
if err != nil {
return nil, fmt.Errorf("failed getting kustomization inventory: %w", err)
}
default:
gvk, err := cs.primaryKinds.Lookup(msg.Kind)
if err != nil {
return nil, err
}
inventoryRefs, err = cs.getUnstructedInventory(ctx, client, msg.Name, msg.Namespace, *gvk)
inventoryRefs, err = GetFluxLikeInventory(ctx, client, msg.Name, msg.Namespace, *gvk)
if err != nil {
return nil, fmt.Errorf("failed getting %s inventory: %w", msg.Kind, err)
}
Expand Down Expand Up @@ -109,55 +110,7 @@ func (cs *coreServer) getKustomizationInventory(ctx context.Context, k8sClient c
for _, ref := range ks.Status.Inventory.Entries {
obj, err := ResourceRefToUnstructured(ref.ID, ref.Version)
if err != nil {
cs.logger.Error(err, "failed converting inventory entry", "entry", ref)
return nil, err
}
objects = append(objects, &obj)
}

return objects, nil
}

func (cs *coreServer) getUnstructedInventory(ctx context.Context, k8sClient client.Client, name, namespace string, gvk schema.GroupVersionKind) ([]*unstructured.Unstructured, error) {
// Create an unstructured object with the desired GVK (GroupVersionKind)
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(gvk)
obj.SetName(name)
obj.SetNamespace(namespace)

// Get the object from the Kubernetes cluster
if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(obj), obj); err != nil {
return nil, fmt.Errorf("failed to get kustomization: %w", err)
}

content := obj.UnstructuredContent()

// Check if status.inventory is present
inventory, found, err := unstructured.NestedMap(content, "status", "inventory")
if err != nil || !found {
return nil, nil
}

// Check if status.inventory.entries is present
entries, found, err := unstructured.NestedSlice(inventory, "entries")
if err != nil || !found {
return nil, nil
}

objects := []*unstructured.Unstructured{}
for _, entryInterface := range entries {
entry, ok := entryInterface.(map[string]interface{})
if !ok {
// Handle error, the type is not as expected
continue
}

id, _, _ := unstructured.NestedString(entry, "id")
version, _, _ := unstructured.NestedString(entry, "v")
obj, err := ResourceRefToUnstructured(id, version)
if err != nil {
cs.logger.Error(err, "failed converting inventory entry", "entry", entry)
return nil, err
return nil, fmt.Errorf("failed converting inventory entry: %w", err)
}
objects = append(objects, &obj)
}
Expand All @@ -182,13 +135,6 @@ func (cs *coreServer) getHelmReleaseInventory(ctx context.Context, k8sClient cli
return nil, fmt.Errorf("failed to get helm release objects: %w", err)
}

// FIXME: do we need this?
for _, obj := range objects {
if obj.GetNamespace() == "" {
obj.SetNamespace(namespace)
}
}

return objects, nil
}

Expand Down Expand Up @@ -257,6 +203,67 @@ func getHelmReleaseObjects(ctx context.Context, k8sClient client.Client, helmRel
return nil, fmt.Errorf("failed to read the Helm storage object for HelmRelease '%s': %w", helmRelease.Name, err)
}

// FIXME: do we need this?
for _, obj := range objects {
if obj.GetNamespace() == "" {
obj.SetNamespace(helmRelease.Namespace)
}
}

return objects, nil
}

// GetFluxLikeInventory returns the inventory on a resource if
// it matches the structure of the flux inventory format (e.g. kustomizations)
// It returns an error if the inventory is not as expected
func GetFluxLikeInventory(ctx context.Context, k8sClient client.Client, name, namespace string, gvk schema.GroupVersionKind) ([]*unstructured.Unstructured, error) {
// Create an unstructured object with the desired GVK (GroupVersionKind)
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(gvk)
obj.SetName(name)
obj.SetNamespace(namespace)

// Get the object from the Kubernetes cluster
if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(obj), obj); err != nil {
return nil, fmt.Errorf("failed to get kustomization: %w", err)
}

return ParseInventoryFromUnstructured(obj)
}

// Parse the inventory from an unstructured object
// It returns an error if the inventory is not as expected (should look like a kustomization's inventory)
func ParseInventoryFromUnstructured(obj *unstructured.Unstructured) ([]*unstructured.Unstructured, error) {
content := obj.UnstructuredContent()

// Check if status.inventory is present
inventory, found, err := unstructured.NestedMap(content, "status", "inventory")
if err != nil || !found {
return nil, errors.New("no status.inventory found on resource, it hasn't been synced yet or is not queryable from this endpoint")
}

// Check if status.inventory.entries is present
entries, found, err := unstructured.NestedSlice(inventory, "entries")
if err != nil || !found {
return nil, nil
}

objects := []*unstructured.Unstructured{}
for _, entryInterface := range entries {
entry, ok := entryInterface.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("failed converting inventory entry to map[string]interface{}: %+v", entry)
}

id, _, _ := unstructured.NestedString(entry, "id")
version, _, _ := unstructured.NestedString(entry, "v")
obj, err := ResourceRefToUnstructured(id, version)
if err != nil {
return nil, fmt.Errorf("failed converting inventory entry: %w", err)
}
objects = append(objects, &obj)
}

return objects, nil
}

Expand Down
Loading

0 comments on commit 86a9aaa

Please sign in to comment.