Skip to content

Commit

Permalink
unify CloudEvent payload for single and bundle resources.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <lcao@redhat.com>
  • Loading branch information
morvencao committed Jan 20, 2025
1 parent 4c139da commit a78c98c
Show file tree
Hide file tree
Showing 25 changed files with 345 additions and 724 deletions.
2 changes: 1 addition & 1 deletion cmd/maestro/agent/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const maxJSONRawLength int32 = 1024 * 1024

func NewAgentCommand() *cobra.Command {
agentOption.MaxJSONRawLength = maxJSONRawLength
agentOption.CloudEventsClientCodecs = []string{"manifest", "manifestbundle"}
agentOption.CloudEventsClientCodecs = []string{"manifestbundle"}
cfg := spoke.NewWorkAgentConfig(commonOptions, agentOption)
cmdConfig := commonOptions.CommonOpts.
NewControllerCommandConfig("maestro-agent", version.Get(), cfg.RunWorkloadAgent)
Expand Down
3 changes: 1 addition & 2 deletions cmd/maestro/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ func handleStatusUpdate(ctx context.Context, resource *api.Resource, resourceSer

// set the resource source and type back for broadcast
resource.Source = found.Source
resource.Type = found.Type

// convert the resource status to cloudevent
statusEvent, err := api.JSONMAPToCloudEvent(resource.Status)
Expand All @@ -203,7 +202,7 @@ func handleStatusUpdate(ctx context.Context, resource *api.Resource, resourceSer
}

// decode the cloudevent data as manifest status
statusPayload := &workpayload.ManifestStatus{}
statusPayload := &workpayload.ManifestBundleStatus{}
if err := statusEvent.DataAs(statusPayload); err != nil {
return fmt.Errorf("failed to decode cloudevent data as resource status: %v", err)
}
Expand Down
36 changes: 8 additions & 28 deletions cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,15 @@ func (bkr *GRPCBroker) Publish(ctx context.Context, pubReq *pbv1.PublishRequest)

// handler resync request
if eventType.Action == types.ResyncRequestAction {
err := bkr.respondResyncSpecRequest(ctx, eventType.CloudEventsDataType, evt)
err := bkr.respondResyncSpecRequest(ctx, evt)
if err != nil {
return nil, fmt.Errorf("failed to respond resync spec request: %v", err)
}
return &emptypb.Empty{}, nil
}

// decode the cloudevent data as resource with status
resource, err := decodeResourceStatus(eventType.CloudEventsDataType, evt)
resource, err := decodeResourceStatus(evt)
if err != nil {
return nil, fmt.Errorf("failed to decode cloudevent: %v", err)
}
Expand Down Expand Up @@ -278,7 +278,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
}

// decodeResourceStatus translates a CloudEvent into a resource containing the status JSON map.
func decodeResourceStatus(eventDataType types.CloudEventsDataType, evt *ce.Event) (*api.Resource, error) {
func decodeResourceStatus(evt *ce.Event) (*api.Resource, error) {
evtExtensions := evt.Context.GetExtensions()

clusterName, err := cetypes.ToString(evtExtensions[types.ExtensionClusterName])
Expand Down Expand Up @@ -311,15 +311,6 @@ func decodeResourceStatus(eventDataType types.CloudEventsDataType, evt *ce.Event
Status: status,
}

switch eventDataType {
case workpayload.ManifestEventDataType:
resource.Type = api.ResourceTypeSingle
case workpayload.ManifestBundleEventDataType:
resource.Type = api.ResourceTypeBundle
default:
return nil, fmt.Errorf("unsupported cloudevents data type %s", eventDataType)
}

return resource, nil
}

Expand All @@ -331,13 +322,10 @@ func encodeResourceSpec(resource *api.Resource) (*ce.Event, error) {
}

eventType := types.CloudEventsType{
CloudEventsDataType: workpayload.ManifestEventDataType,
CloudEventsDataType: workpayload.ManifestBundleEventDataType,
SubResource: types.SubResourceSpec,
Action: types.EventAction("create_request"),
}
if resource.Type == api.ResourceTypeBundle {
eventType.CloudEventsDataType = workpayload.ManifestBundleEventDataType
}
evt.SetType(eventType.String())
evt.SetSource("maestro")
// TODO set resource.Source with a new extension attribute if the agent needs
Expand All @@ -360,14 +348,11 @@ func encodeResourceSpec(resource *api.Resource) (*ce.Event, error) {
// resend the resource.
// - If the requested resource version is older than the source's current maintained resource version, the source
// sends the resource.
func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataType types.CloudEventsDataType, evt *ce.Event) error {
// - If the resource does not exist on the source, but exists on the agent, the source sends a delete event for the
// resource.
func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, evt *ce.Event) error {
log := logger.NewOCMLogger(ctx)

resyncType := api.ResourceTypeSingle
if eventDataType == workpayload.ManifestBundleEventDataType {
resyncType = api.ResourceTypeBundle
}

resourceVersions, err := payload.DecodeSpecResyncRequest(*evt)
if err != nil {
return err
Expand All @@ -379,7 +364,7 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy
}
clusterName := fmt.Sprintf("%s", clusterNameValue)

objs, err := bkr.resourceService.List(types.ListOptions{ClusterName: clusterName, CloudEventsDataType: eventDataType})
objs, err := bkr.resourceService.List(types.ListOptions{ClusterName: clusterName})
if err != nil {
return err
}
Expand All @@ -390,10 +375,6 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy
}

for _, obj := range objs {
// only respond with the resource of the resync type
if obj.Type != resyncType {
continue
}
// respond with the deleting resource regardless of the resource version
if !obj.GetDeletionTimestamp().IsZero() {
bkr.handleRes(obj)
Expand Down Expand Up @@ -427,7 +408,6 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy
},
Version: int32(rv.ResourceVersion),
ConsumerName: clusterName,
Type: resyncType,
}
// mark the resource as deleting
obj.Meta.DeletedAt.Time = time.Now()
Expand Down
76 changes: 17 additions & 59 deletions cmd/maestro/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ import (
"fmt"
"net"
"os"
"time"

ce "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
cetypes "github.com/cloudevents/sdk-go/v2/types"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
Expand All @@ -24,7 +22,6 @@ import (
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
workpayload "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec"

"github.com/openshift-online/maestro/pkg/api"
"github.com/openshift-online/maestro/pkg/client/cloudevents"
Expand Down Expand Up @@ -184,14 +181,14 @@ func (svr *GRPCServer) Publish(ctx context.Context, pubReq *pbv1.PublishRequest)

// handler resync request
if eventType.Action == types.ResyncRequestAction {
err := svr.respondResyncStatusRequest(ctx, eventType.CloudEventsDataType, evt)
err := svr.respondResyncStatusRequest(ctx, evt)
if err != nil {
return nil, fmt.Errorf("failed to respond resync status request: %v", err)
}
return &emptypb.Empty{}, nil
}

res, err := decodeResourceSpec(eventType.CloudEventsDataType, evt)
res, err := decodeResourceSpec(evt)
if err != nil {
return nil, fmt.Errorf("failed to decode cloudevent: %v", err)
}
Expand All @@ -203,20 +200,17 @@ func (svr *GRPCServer) Publish(ctx context.Context, pubReq *pbv1.PublishRequest)
return nil, fmt.Errorf("failed to create resource: %v", err)
}
case common.UpdateRequestAction:
if res.Type == api.ResourceTypeBundle {
found, err := svr.resourceService.Get(ctx, res.ID)
if err != nil {
return nil, fmt.Errorf("failed to get resource: %v", err)
}
found, err := svr.resourceService.Get(ctx, res.ID)
if err != nil {
return nil, fmt.Errorf("failed to get resource: %v", err)
}

if res.Version == 0 {
// the resource version is not guaranteed to be increased by source client,
// using the latest resource version.
res.Version = found.Version
}
if res.Version == 0 {
// the resource version is not guaranteed to be increased by source client,
// using the latest resource version.
res.Version = found.Version
}
_, err := svr.resourceService.Update(ctx, res)
if err != nil {
if _, err = svr.resourceService.Update(ctx, res); err != nil {
return nil, fmt.Errorf("failed to update resource: %v", err)
}
case common.DeleteRequestAction:
Expand Down Expand Up @@ -282,7 +276,7 @@ func (svr *GRPCServer) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
}

// decodeResourceSpec translates a CloudEvent into a resource containing the spec JSON map.
func decodeResourceSpec(eventDataType types.CloudEventsDataType, evt *ce.Event) (*api.Resource, error) {
func decodeResourceSpec(evt *ce.Event) (*api.Resource, error) {
evtExtensions := evt.Context.GetExtensions()

clusterName, err := cetypes.ToString(evtExtensions[types.ExtensionClusterName])
Expand Down Expand Up @@ -322,46 +316,19 @@ func decodeResourceSpec(eventDataType types.CloudEventsDataType, evt *ce.Event)
return nil, fmt.Errorf("failed to convert cloudevent to resource payload: %v", err)
}
resource.Payload = payload

switch eventDataType {
case workpayload.ManifestEventDataType:
resource.Type = api.ResourceTypeSingle
case workpayload.ManifestBundleEventDataType:
resource.Type = api.ResourceTypeBundle
default:
return nil, fmt.Errorf("unsupported cloudevents data type %s", eventDataType)
}
// set the resource type to bundle from grpc source
resource.Type = api.ResourceTypeBundle

return resource, nil
}

// encodeResourceStatus translates a resource status JSON map into a CloudEvent.
func encodeResourceStatus(resource *api.Resource) (*ce.Event, error) {
if resource.Type == api.ResourceTypeSingle {
// single resource, return the status directly
return api.JSONMAPToCloudEvent(resource.Status)
}

statusEvt, err := api.JSONMAPToCloudEvent(resource.Status)
if err != nil {
return nil, err
}

// set basic fields
evt := ce.NewEvent()
evt.SetID(uuid.New().String())
evt.SetTime(time.Now())
evt.SetType(statusEvt.Type())
evt.SetSource(statusEvt.Source())
for key, val := range statusEvt.Extensions() {
evt.SetExtension(key, val)
}

// set work meta back from status event
if workMeta, ok := statusEvt.Extensions()[codec.ExtensionWorkMeta]; ok {
evt.SetExtension(codec.ExtensionWorkMeta, workMeta)
}

// manifest bundle status from the resource status
manifestBundleStatus := &workpayload.ManifestBundleStatus{}
if err := statusEvt.DataAs(manifestBundleStatus); err != nil {
Expand All @@ -382,16 +349,16 @@ func encodeResourceStatus(resource *api.Resource) (*ce.Event, error) {
manifestBundleStatus.ManifestBundle = manifestBundle
}

if err := evt.SetData(ce.ApplicationJSON, manifestBundleStatus); err != nil {
if err := statusEvt.SetData(ce.ApplicationJSON, manifestBundleStatus); err != nil {
return nil, err
}

return &evt, nil
return statusEvt, nil
}

// respondResyncStatusRequest responds to the status resync request by comparing the status hash of the resources
// from the database and the status hash in the request, and then respond the resources whose status is changed.
func (svr *GRPCServer) respondResyncStatusRequest(ctx context.Context, eventDataType types.CloudEventsDataType, evt *ce.Event) error {
func (svr *GRPCServer) respondResyncStatusRequest(ctx context.Context, evt *ce.Event) error {
objs, serviceErr := svr.resourceService.FindBySource(ctx, evt.Source())
if serviceErr != nil {
return fmt.Errorf("failed to list resources: %s", serviceErr)
Expand All @@ -411,16 +378,7 @@ func (svr *GRPCServer) respondResyncStatusRequest(ctx context.Context, eventData
return nil
}

resyncType := api.ResourceTypeSingle
if eventDataType == workpayload.ManifestBundleEventDataType {
resyncType = api.ResourceTypeBundle
}

for _, obj := range objs {
if obj.Type != resyncType {
continue
}

lastHash, ok := findStatusHash(string(obj.GetUID()), statusHashes.Hashes)
if !ok {
// ignore the resource that is not on the source, but exists on the maestro, wait for the source deleting it
Expand Down
79 changes: 0 additions & 79 deletions pkg/api/presenters/resource.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package presenters

import (
"encoding/json"
"fmt"

"gorm.io/datatypes"

"github.com/openshift-online/maestro/pkg/api"
Expand Down Expand Up @@ -70,79 +67,3 @@ func PresentResource(resource *api.Resource) (*openapi.Resource, error) {

return res, nil
}

// PresentResourceBundle converts a resource from the API to the openapi representation.
func PresentResourceBundle(resource *api.Resource) (*openapi.ResourceBundle, error) {
metadata, manifestBundle, err := api.DecodeManifestBundle(resource.Payload)
if err != nil {
return nil, err
}
status, err := api.DecodeBundleStatus(resource.Status)
if err != nil {
return nil, err
}

reference := openapi.ObjectReference{
Id: openapi.PtrString(resource.ID),
Kind: openapi.PtrString("ResourceBundle"),
Href: openapi.PtrString(fmt.Sprintf("%s/%s/%s", BasePath, "resource-bundles", resource.ID)),
}

manifests := make([]map[string]interface{}, 0, len(manifestBundle.Manifests))
for _, manifest := range manifestBundle.Manifests {
mbytes, err := json.Marshal(manifest)
if err != nil {
return nil, err
}
m := map[string]interface{}{}
if err := json.Unmarshal(mbytes, &m); err != nil {
return nil, err
}
manifests = append(manifests, m)
}
deleteOption := map[string]interface{}{}
if manifestBundle.DeleteOption != nil {
dbytes, err := json.Marshal(manifestBundle.DeleteOption)
if err != nil {
return nil, err
}
if err := json.Unmarshal(dbytes, &deleteOption); err != nil {
return nil, err
}
}
manifestConfigs := make([]map[string]interface{}, 0, len(manifestBundle.ManifestConfigs))
for _, manifestConfig := range manifestBundle.ManifestConfigs {
mbytes, err := json.Marshal(manifestConfig)
if err != nil {
return nil, err
}
m := map[string]interface{}{}
if err := json.Unmarshal(mbytes, &m); err != nil {
return nil, err
}
manifestConfigs = append(manifestConfigs, m)
}

res := &openapi.ResourceBundle{
Id: reference.Id,
Kind: reference.Kind,
Href: reference.Href,
Name: openapi.PtrString(resource.Name),
ConsumerName: openapi.PtrString(resource.ConsumerName),
Version: openapi.PtrInt32(resource.Version),
CreatedAt: openapi.PtrTime(resource.CreatedAt),
UpdatedAt: openapi.PtrTime(resource.UpdatedAt),
Metadata: metadata,
Manifests: manifests,
DeleteOption: deleteOption,
ManifestConfigs: manifestConfigs,
Status: status,
}

// set the deletedAt field if the resource has been marked as deleted
if !resource.DeletedAt.Time.IsZero() {
res.DeletedAt = openapi.PtrTime(resource.DeletedAt.Time)
}

return res, nil
}
Loading

0 comments on commit a78c98c

Please sign in to comment.