Skip to content

Commit

Permalink
Merge branch 'main' into doc/fixMarkdownLinks
Browse files Browse the repository at this point in the history
  • Loading branch information
lirenjie95 authored Sep 14, 2024
2 parents af058a2 + bea988d commit 5d3e96d
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 272 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2"
vendorCtx "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/contexts"
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/managers"
observability "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/observability"
observ_utils "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/observability/utils"
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers"
)

Expand Down Expand Up @@ -53,8 +55,15 @@ func (s *ActivationsCleanupManager) Enabled() bool {
}

func (s *ActivationsCleanupManager) Poll() []error {
log.Info("M (Activation Cleanup): Polling activations")
activations, err := s.ActivationsManager.ListState(context.Background(), "")
// TODO: initialize the context with id correctly
ctx, span := observability.StartSpan("Activations Cleanup Manager", context.Background(), &map[string]string{
"method": "Poll",
})
var err error = nil
defer observ_utils.CloseSpanWithError(span, &err)

log.InfoCtx(ctx, "M (Activation Cleanup): Polling activations")
activations, err := s.ActivationsManager.ListState(ctx, "")
if err != nil {
return []error{err}
}
Expand All @@ -66,11 +75,11 @@ func (s *ActivationsCleanupManager) Poll() []error {
if activation.Status.UpdateTime == "" {
// Ugrade scenario: update time is not set for activations created before. Set it to now and the activation will be deleted later.
// UpdateTime will be set in ReportStatus function
err = s.ActivationsManager.ReportStatus(context.Background(), activation.ObjectMeta.Name, activation.ObjectMeta.Namespace, *activation.Status)
err = s.ActivationsManager.ReportStatus(ctx, activation.ObjectMeta.Name, activation.ObjectMeta.Namespace, *activation.Status)
if err != nil {
// Delete activation immediately if update time cannot be set? Cx may be confused why activations disappeared
// Just leave those activations as it is and let Cx delete them manually
log.Error("M (Activation Cleanup): Cannot set update time for activation "+activation.ObjectMeta.Name+" since update time cannot be set: %+v", err)
log.ErrorfCtx(ctx, "M (Activation Cleanup): Cannot set update time for activation %s since update time cannot be set: %+v", activation.ObjectMeta.Name, err)
ret = append(ret, err)
}
continue
Expand All @@ -80,13 +89,13 @@ func (s *ActivationsCleanupManager) Poll() []error {
updateTime, err := time.Parse(time.RFC3339, activation.Status.UpdateTime)
if err != nil {
// TODO: should not happen, force update time to Time.Now() ?
log.Info("M (Activation Cleanup): Cannot parse update time of " + activation.ObjectMeta.Name)
log.InfofCtx(ctx, "M (Activation Cleanup): Cannot parse update time of %s", activation.ObjectMeta.Name)
ret = append(ret, err)
}
duration := time.Since(updateTime)
if duration > s.RetentionDuration {
log.Info("M (Activation Cleanup): Deleting activation " + activation.ObjectMeta.Name + " since it has completed for " + duration.String())
err = s.ActivationsManager.DeleteState(context.Background(), activation.ObjectMeta.Name, activation.ObjectMeta.Namespace)
log.InfofCtx(ctx, "M (Activation Cleanup): Deleting activation %s since it has completed for %s", activation.ObjectMeta.Name, duration.String())
err = s.ActivationsManager.DeleteState(ctx, activation.ObjectMeta.Name, activation.ObjectMeta.Namespace)
if err != nil {
ret = append(ret, err)
}
Expand Down
17 changes: 17 additions & 0 deletions api/pkg/apis/v1alpha1/managers/activations/activations-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func (m *ActivationsManager) GetState(ctx context.Context, name string, namespac
defer observ_utils.CloseSpanWithError(span, &err)
defer observ_utils.EmitUserDiagnosticsLogs(ctx, &err)

log.InfofCtx(ctx, "Get activation state %s in namespace %s", name, namespace)

getRequest := states.GetRequest{
ID: name,
Metadata: map[string]interface{}{
Expand All @@ -85,6 +87,7 @@ func (m *ActivationsManager) GetState(ctx context.Context, name string, namespac
var ret model.ActivationState
ret, err = getActivationState(entry.Body, entry.ETag)
if err != nil {
log.ErrorfCtx(ctx, "Failed to convert to activation state for %s in namespace %s: %v", name, namespace, err)
return model.ActivationState{}, err
}
return ret, nil
Expand Down Expand Up @@ -115,7 +118,10 @@ func (m *ActivationsManager) UpsertState(ctx context.Context, name string, state
defer observ_utils.CloseSpanWithError(span, &err)
defer observ_utils.EmitUserDiagnosticsLogs(ctx, &err)

log.InfofCtx(ctx, "Upsert activation state %s in namespace %s", name, state.ObjectMeta.Namespace)

if state.ObjectMeta.Name != "" && state.ObjectMeta.Name != name {
log.ErrorfCtx(ctx, "Name in metadata (%s) does not match name in request (%s)", state.ObjectMeta.Name, name)
return v1alpha2.NewCOAError(nil, fmt.Sprintf("Name in metadata (%s) does not match name in request (%s)", state.ObjectMeta.Name, name), v1alpha2.BadRequest)
}
state.ObjectMeta.FixNames(name)
Expand Down Expand Up @@ -166,6 +172,7 @@ func (m *ActivationsManager) DeleteState(ctx context.Context, name string, names
defer observ_utils.CloseSpanWithError(span, &err)
defer observ_utils.EmitUserDiagnosticsLogs(ctx, &err)

log.InfofCtx(ctx, "Delete activation state %s in namespace %s", name, namespace)
err = m.StateProvider.Delete(ctx, states.DeleteRequest{
ID: name,
Metadata: map[string]interface{}{
Expand All @@ -187,6 +194,8 @@ func (t *ActivationsManager) ListState(ctx context.Context, namespace string) ([
defer observ_utils.CloseSpanWithError(span, &err)
defer observ_utils.EmitUserDiagnosticsLogs(ctx, &err)

log.InfofCtx(ctx, "List activation state for namespace %s", namespace)

listRequest := states.ListRequest{
Metadata: map[string]interface{}{
"version": "v1",
Expand All @@ -210,6 +219,7 @@ func (t *ActivationsManager) ListState(ctx context.Context, namespace string) ([
}
ret = append(ret, rt)
}
log.InfofCtx(ctx, "List activation state for namespace %s get total count %d", namespace, len(ret))
return ret, nil
}
func (t *ActivationsManager) ReportStatus(ctx context.Context, name string, namespace string, current model.ActivationStatus) error {
Expand All @@ -222,6 +232,8 @@ func (t *ActivationsManager) ReportStatus(ctx context.Context, name string, name
lock.Lock()
defer lock.Unlock()

log.InfofCtx(ctx, "ReportStatus for activation %s in namespace %s as %s", name, namespace, current.StatusMessage)

var activationState model.ActivationState
activationState, err = t.GetState(ctx, name, namespace)
if err != nil {
Expand Down Expand Up @@ -266,16 +278,20 @@ func (t *ActivationsManager) ReportStageStatus(ctx context.Context, name string,
lock.Lock()
defer lock.Unlock()

log.InfofCtx(ctx, "ReportStageStatus for activation %s stage %s in namespace %s as %s", name, current.Stage, namespace, current.StatusMessage)

var activationState model.ActivationState
activationState, err = t.GetState(ctx, name, namespace)
if err != nil {
log.ErrorfCtx(ctx, "Failed to get activation %s in namespace %s: %v", name, namespace, err)
return err
}

activationState.Status.UpdateTime = time.Now().Format(time.RFC3339) // TODO: is this correct? Shouldn't it be reported?

err = mergeStageStatus(&activationState, current)
if err != nil {
log.ErrorfCtx(ctx, "Failed to merge stage status for activation %s in namespace %s: %v", name, namespace, err)
return err
}

Expand All @@ -301,6 +317,7 @@ func (t *ActivationsManager) ReportStageStatus(ctx context.Context, name string,
}
_, err = t.StateProvider.Upsert(ctx, upsertRequest)
if err != nil {
log.ErrorfCtx(ctx, "Failed to update status in state store for activation %s in namespace %s: %v", name, namespace, err)
return err
}
return nil
Expand Down
11 changes: 10 additions & 1 deletion api/pkg/apis/v1alpha1/managers/campaigns/campaigns-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
observ_utils "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/observability/utils"
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers"
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/states"
"github.com/eclipse-symphony/symphony/coa/pkg/logger"
)

var log = logger.NewLogger("coa.runtime")

type CampaignsManager struct {
managers.Manager
StateProvider states.IStateProvider
Expand Down Expand Up @@ -59,6 +62,8 @@ func (m *CampaignsManager) GetState(ctx context.Context, name string, namespace
defer observ_utils.CloseSpanWithError(span, &err)
defer observ_utils.EmitUserDiagnosticsLogs(ctx, &err)

log.InfofCtx(ctx, "Get campaign state %s in namespace", name, namespace)

getRequest := states.GetRequest{
ID: name,
Metadata: map[string]interface{}{
Expand All @@ -77,6 +82,7 @@ func (m *CampaignsManager) GetState(ctx context.Context, name string, namespace
var ret model.CampaignState
ret, err = getCampaignState(entry.Body)
if err != nil {
log.ErrorfCtx(ctx, "Failed to convert to campaign state for %s in namespace %s: %v", name, namespace, err)
return model.CampaignState{}, err
}
return ret, nil
Expand All @@ -103,6 +109,7 @@ func (m *CampaignsManager) UpsertState(ctx context.Context, name string, state m
defer observ_utils.CloseSpanWithError(span, &err)
defer observ_utils.EmitUserDiagnosticsLogs(ctx, &err)

log.InfofCtx(ctx, "Upsert campaign state %s in namespace %s", name, state.ObjectMeta.Namespace)
if state.ObjectMeta.Name != "" && state.ObjectMeta.Name != name {
return v1alpha2.NewCOAError(nil, fmt.Sprintf("Name in metadata (%s) does not match name in request (%s)", state.ObjectMeta.Name, name), v1alpha2.BadRequest)
}
Expand Down Expand Up @@ -151,6 +158,7 @@ func (m *CampaignsManager) DeleteState(ctx context.Context, name string, namespa
defer observ_utils.CloseSpanWithError(span, &err)
defer observ_utils.EmitUserDiagnosticsLogs(ctx, &err)

log.InfofCtx(ctx, "Delete campaign state %s in namespace %s", name, namespace)
if m.needValidate {
if err = m.ValidateDelete(ctx, name, namespace); err != nil {
return err
Expand All @@ -177,7 +185,7 @@ func (t *CampaignsManager) ListState(ctx context.Context, namespace string) ([]m
var err error = nil
defer observ_utils.CloseSpanWithError(span, &err)
defer observ_utils.EmitUserDiagnosticsLogs(ctx, &err)

log.InfofCtx(ctx, "List campaign state for namespace %s", namespace)
listRequest := states.ListRequest{
Metadata: map[string]interface{}{
"version": "v1",
Expand All @@ -201,6 +209,7 @@ func (t *CampaignsManager) ListState(ctx context.Context, namespace string) ([]m
}
ret = append(ret, rt)
}
log.InfofCtx(ctx, "List campaign state for namespace %s get total count %d", namespace, len(ret))
return ret, nil
}

Expand Down
48 changes: 33 additions & 15 deletions api/pkg/apis/v1alpha1/managers/jobs/jobs-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,17 +226,30 @@ func (s *JobsManager) pollSchedules() []error {
entryData, _ := json.Marshal(entry.Body)
err = json.Unmarshal(entryData, &activationData)
if err != nil {
return []error{err}
// suppress the unmarshal error and proceed with other entries
// Maybe emit a metrics counter for this to indicate bad activationData?
log.ErrorfCtx(ctx, " M (Job): get bad ActivationData from state store")
continue
}
if activationData.Schedule != "" {
var fire bool
fire, err = activationData.ShouldFireNow()
if err != nil {
return []error{err}
// TODO: Remove the event from the state store directly?
log.ErrorfCtx(ctx, " M (Job): Unable to determine if schedule should fire for activation: %s", activationData.Activation)
continue
}
if fire {
// TODO: check if the activation is in paused state
// if not paused, skip trigger event and delete scheduled event directly
log.InfofCtx(ctx, " M (Job): firing schedule %s", activationData.Activation)
activationData.Schedule = ""
err = s.PersistentStateProvider.Delete(ctx, states.DeleteRequest{
// trigger the activation first and then delete the schedule events in state store
s.Context.Publish("trigger", v1alpha2.Event{
Body: activationData,
Context: ctx,
})
s.PersistentStateProvider.Delete(ctx, states.DeleteRequest{
ID: entry.ID,
Metadata: map[string]interface{}{
"namespace": activationData.Namespace,
Expand All @@ -245,13 +258,8 @@ func (s *JobsManager) pollSchedules() []error {
"resource": Scheduled,
},
})
if err != nil {
return []error{err}
}
s.Context.Publish("trigger", v1alpha2.Event{
Body: activationData,
Context: ctx,
})
} else {
log.DebugfCtx(ctx, " M (Job): activation %s is not firing", activationData.Activation)
}
}
}
Expand Down Expand Up @@ -282,7 +290,7 @@ func (s *JobsManager) HandleHeartBeatEvent(ctx context.Context, event v1alpha2.E
namespace = "default"
}
// TODO: the heart beat data should contain a "finished" field so data can be cleared
log.DebugfCtx(ctx, " M (Job): handling heartbeat h_%s, %v, %v", heartbeat.JobId, heartbeat.Action, heartbeat.JobAction)
log.DebugfCtx(ctx, " M (Job): handling heartbeat h_%s, %v, %v in namespace %s", heartbeat.JobId, heartbeat.Action, heartbeat.JobAction, namespace)
if heartbeat.JobAction == v1alpha2.JobUpdate {
log.DebugfCtx(ctx, " M (Job): update heartbeat h_%s", heartbeat.JobId)
_, err = s.VolatileStateProvider.Upsert(ctx, states.UpsertRequest{
Expand Down Expand Up @@ -335,24 +343,28 @@ func (s *JobsManager) DelayOrSkipJob(ctx context.Context, namespace string, obje
log.ErrorfCtx(ctx, " M (Job): error getting heartbeat %s: %s", key, err.Error())
return err
}
log.DebugfCtx(ctx, " M (Job): heartbeat %s is not found, entry: %+v", key, entry)
log.DebugfCtx(ctx, " M (Job): heartbeat %s is not found", key)
return nil // no heartbeat
}
var heartbeat v1alpha2.HeartBeatData
jData, _ := json.Marshal(entry.Body)
err = json.Unmarshal(jData, &heartbeat)
if err != nil {
return err
// heartbeat in the state store cannot be parsed. Log error and act as no hearbeat
log.ErrorfCtx(ctx, " M (Job): error parsing heartbeat %s: %v", key, entry.Body)
return nil
}
if time.Since(heartbeat.Time) > time.Duration(60)*time.Second { //TODO: make this configurable
// heartbeat is too old
return nil
}

if job.Action == v1alpha2.JobDelete && heartbeat.Action == v1alpha2.HeartBeatUpdate {
log.InfofCtx(ctx, " M (Job): delete job is delayed for %s", job.Id)
err = v1alpha2.NewCOAError(nil, "delete job is delayed", v1alpha2.Delayed)
return err
}
log.InfofCtx(ctx, " M (Job): skip job %s as existing job in progress", job.Id)
err = v1alpha2.NewCOAError(nil, "existing job in progress", v1alpha2.Untouched)
return err
}
Expand All @@ -368,7 +380,8 @@ func (s *JobsManager) HandleScheduleEvent(ctx context.Context, event v1alpha2.Ev
jData, _ := json.Marshal(event.Body)
err = json.Unmarshal(jData, &activationData)
if err != nil {
return v1alpha2.NewCOAError(nil, "event body is not a activation data", v1alpha2.BadRequest)
log.ErrorfCtx(ctx, " M (Job): schedule event body is not an activation data: %v", event.Body)
return v1alpha2.NewCOAError(nil, "event body is not an activation data", v1alpha2.BadRequest)
}
key := fmt.Sprintf("sch_%s-%s", activationData.Campaign, activationData.Activation)
_, err = s.PersistentStateProvider.Upsert(ctx, states.UpsertRequest{
Expand All @@ -383,6 +396,9 @@ func (s *JobsManager) HandleScheduleEvent(ctx context.Context, event v1alpha2.Ev
"resource": Scheduled,
},
})
if err != nil {
log.ErrorfCtx(ctx, " M (Job): error upserting schedule %s: %s", key, err.Error())
}
return err
}
func (s *JobsManager) HandleJobEvent(ctx context.Context, event v1alpha2.Event) error {
Expand Down Expand Up @@ -486,7 +502,7 @@ func (s *JobsManager) HandleJobEvent(ctx context.Context, event v1alpha2.Event)
} else {
err = s.apiClient.DeleteInstance(ctx, deployment.Instance.ObjectMeta.Name, namespace, s.user, s.password)
if err != nil {
log.Errorf(" M (Job): failed to delete instance %s: %s", instanceName, err.Error())
log.ErrorfCtx(ctx, " M (Job): failed to delete instance %s: %s", instanceName, err.Error())
return err
}
log.DebugfCtx(ctx, " M (Job): delete instance success state entry, instance: %s", instance.ObjectMeta.Name)
Expand Down Expand Up @@ -601,6 +617,8 @@ func (s *JobsManager) HandleJobEvent(ctx context.Context, event v1alpha2.Event)
}
}
}
} else {
log.ErrorfCtx(ctx, " M (Job): handleJobEvent objectType not found in metadata: %v", event.Metadata)
}
return nil
}
Expand Down
Loading

0 comments on commit 5d3e96d

Please sign in to comment.