Skip to content

Commit

Permalink
Merge branch 'main' into users/xingdong/deletetimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
msftcoderdjw authored May 27, 2024
2 parents 3b4a3ee + 6156e30 commit eff72ca
Show file tree
Hide file tree
Showing 21 changed files with 192 additions and 310 deletions.
9 changes: 6 additions & 3 deletions api/pkg/apis/v1alpha1/managers/jobs/jobs-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,16 +208,19 @@ func (s *JobsManager) pollSchedules() []error {
if err != nil {
return []error{err}
}
if activationData.Schedule != nil {
if activationData.Schedule != "" {
var fire bool
fire, err = activationData.Schedule.ShouldFireNow()
fire, err = activationData.ShouldFireNow()
if err != nil {
return []error{err}
}
if fire {
activationData.Schedule = nil
activationData.Schedule = ""
err = s.StateProvider.Delete(context, states.DeleteRequest{
ID: entry.ID,
Metadata: map[string]interface{}{
"namespace": activationData.Namespace,
},
})
if err != nil {
return []error{err}
Expand Down
2 changes: 1 addition & 1 deletion api/pkg/apis/v1alpha1/managers/jobs/jobs-manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestPoll(t *testing.T) {
})
assert.Nil(t, err)
jobManager.HandleScheduleEvent(context.Background(), v1alpha2.Event{
Body: v1alpha2.ActivationData{Campaign: "campaign1", Activation: "activation1", Schedule: &v1alpha2.ScheduleSpec{Time: "03:04:05PM", Date: "2006-01-02"}},
Body: v1alpha2.ActivationData{Campaign: "campaign1", Activation: "activation1", Schedule: "2006-01-02T15:04:05Z"},
})
enabled := jobManager.Enabled()
assert.True(t, enabled)
Expand Down
9 changes: 4 additions & 5 deletions api/pkg/apis/v1alpha1/managers/stage/stage-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (s *StageManager) HandleDirectTriggerEvent(ctx context.Context, triggerData
provider.(*remote.RemoteStageProvider).SetOutputsContext(triggerData.Outputs)
}

if triggerData.Schedule != nil && !isRemote {
if triggerData.Schedule != "" && !isRemote {
s.Context.Publish("schedule", v1alpha2.Event{
Body: triggerData,
})
Expand Down Expand Up @@ -481,9 +481,8 @@ func (s *StageManager) HandleTriggerEvent(ctx context.Context, campaign model.Ca
inputs["__activationGeneration"] = triggerData.ActivationGeneration
inputs["__previousStage"] = triggerData.TriggeringStage
inputs["__site"] = s.VendorContext.SiteInfo.SiteId
if triggerData.Schedule != nil {
jSchedule, _ := json.Marshal(triggerData.Schedule)
inputs["__schedule"] = string(jSchedule)
if triggerData.Schedule != "" {
inputs["__schedule"] = triggerData.Schedule
}
for k, v := range inputs {
var val interface{}
Expand Down Expand Up @@ -573,7 +572,7 @@ func (s *StageManager) HandleTriggerEvent(ctx context.Context, campaign model.Ca
provider.(*remote.RemoteStageProvider).SetOutputsContext(triggerData.Outputs)
}

if triggerData.Schedule != nil {
if triggerData.Schedule != "" {
s.Context.Publish("schedule", v1alpha2.Event{
Body: triggerData,
})
Expand Down
12 changes: 2 additions & 10 deletions api/pkg/apis/v1alpha1/managers/stage/stage-manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1104,11 +1104,7 @@ func TestHandleDirectTriggerScheduleEvent(t *testing.T) {
},
Outputs: nil,
Provider: "providers.stage.counter",
Schedule: &v1alpha2.ScheduleSpec{
Date: "2020-01-01",
Time: "12:00:00PM",
Zone: "PST",
},
Schedule: "2020-01-01T12:00:00-08:00",
}
status := manager.HandleDirectTriggerEvent(context.Background(), activation)
assert.Equal(t, v1alpha2.Paused, status.Status)
Expand Down Expand Up @@ -1214,11 +1210,7 @@ func TestTriggerEventWithSchedule(t *testing.T) {
},
Outputs: nil,
Provider: "providers.stage.mock",
Schedule: &v1alpha2.ScheduleSpec{
Date: "2020-01-01",
Time: "12:00:00PM",
Zone: "PST",
},
Schedule: "2020-01-01T12:00:00-08:00",
}

status, _ := manager.HandleTriggerEvent(context.Background(), model.CampaignSpec{
Expand Down
40 changes: 39 additions & 1 deletion api/pkg/apis/v1alpha1/model/campaign.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
package model

import (
"encoding/json"
"errors"
"fmt"
"reflect"
"time"

"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2"
)
Expand All @@ -31,7 +34,42 @@ type StageSpec struct {
StageSelector string `json:"stageSelector,omitempty"`
Inputs map[string]interface{} `json:"inputs,omitempty"`
HandleErrors bool `json:"handleErrors,omitempty"`
Schedule *v1alpha2.ScheduleSpec `json:"schedule,omitempty"`
Schedule string `json:"schedule,omitempty"`
}

// UnmarshalJSON customizes the JSON unmarshalling for StageSpec
func (s *StageSpec) UnmarshalJSON(data []byte) error {
type Alias StageSpec
aux := &struct {
*Alias
}{
Alias: (*Alias)(s),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
// validate if Schedule meet RFC 3339
if s.Schedule != "" {
if _, err := time.Parse(time.RFC3339, s.Schedule); err != nil {
return fmt.Errorf("invalid timestamp format: %v", err)
}
}
return nil
}

// MarshalJSON customizes the JSON marshalling for StageSpec
func (s StageSpec) MarshalJSON() ([]byte, error) {
type Alias StageSpec
if s.Schedule != "" {
if _, err := time.Parse(time.RFC3339, s.Schedule); err != nil {
return nil, fmt.Errorf("invalid timestamp format: %v", err)
}
}
return json.Marshal(&struct {
*Alias
}{
Alias: (*Alias)(&s),
})
}

func (s StageSpec) DeepEquals(other IDeepEquals) (bool, error) {
Expand Down
13 changes: 2 additions & 11 deletions api/pkg/apis/v1alpha1/model/campaign_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package model
import (
"testing"

"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -217,16 +216,8 @@ func TestStageNotMatch(t *testing.T) {
"objectType": "sites",
"namesObject": true,
}
stage1.Schedule = &v1alpha2.ScheduleSpec{
Date: "2020-10-31",
Time: "12:00:00PM",
Zone: "PDT",
}
stage2.Schedule = &v1alpha2.ScheduleSpec{
Date: "2020-10-31",
Time: "12:00:00PM",
Zone: "PST",
}
stage1.Schedule = "2020-10-31T12:00:00-07:00"
stage2.Schedule = "2020-10-31T12:00:00-08:00"
equal, err = stage1.DeepEquals(stage2)
assert.Nil(t, err)
assert.False(t, equal)
Expand Down
18 changes: 8 additions & 10 deletions api/pkg/apis/v1alpha1/vendors/stage-vendor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/providers/stage/materialize"
"github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/providers/stage/mock"
"github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/providers/stage/wait"
"github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/utils"
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2"
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/managers"
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers"
Expand Down Expand Up @@ -248,24 +249,21 @@ func (s *StageVendor) Init(config vendors.VendorConfig, factories []managers.IMa
}

// restore schedule
var schedule *v1alpha2.ScheduleSpec
var schedule = ""
if v, ok := dataPackage.Inputs["__schedule"]; ok {
err = json.Unmarshal([]byte(v.(string)), &schedule)
if err != nil {
return err
}
schedule = utils.FormatAsString(v)
}

triggerData := v1alpha2.ActivationData{
Activation: dataPackage.Inputs["__activation"].(string),
ActivationGeneration: dataPackage.Inputs["__activationGeneration"].(string),
Campaign: dataPackage.Inputs["__campaign"].(string),
Stage: dataPackage.Inputs["__stage"].(string),
Activation: utils.FormatAsString(dataPackage.Inputs["__activation"]),
ActivationGeneration: utils.FormatAsString(dataPackage.Inputs["__activationGeneration"]),
Campaign: utils.FormatAsString(dataPackage.Inputs["__campaign"]),
Stage: utils.FormatAsString(dataPackage.Inputs["__stage"]),
Inputs: dataPackage.Inputs,
Outputs: dataPackage.Outputs,
Schedule: schedule,
NeedsReport: true,
Namespace: dataPackage.Inputs["__namespace"].(string),
Namespace: utils.FormatAsString(dataPackage.Inputs["__namespace"]),
}

triggerData.Inputs["__origin"] = event.Metadata["origin"]
Expand Down
83 changes: 39 additions & 44 deletions coa/pkg/apis/v1alpha2/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package v1alpha2

import (
"encoding/json"
"fmt"
"time"
)

Expand Down Expand Up @@ -48,10 +49,45 @@ type ActivationData struct {
Provider string `json:"provider,omitempty"`
Config interface{} `json:"config,omitempty"`
TriggeringStage string `json:"triggeringStage,omitempty"`
Schedule *ScheduleSpec `json:"schedule,omitempty"`
Schedule string `json:"schedule,omitempty"`
NeedsReport bool `json:"needsReport,omitempty"`
}

// UnmarshalJSON customizes the JSON unmarshalling for ActivationData
func (s *ActivationData) UnmarshalJSON(data []byte) error {
type Alias ActivationData
aux := &struct {
*Alias
}{
Alias: (*Alias)(s),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
// validate if Schedule meet RFC 3339
if s.Schedule != "" {
if _, err := time.Parse(time.RFC3339, s.Schedule); err != nil {
return fmt.Errorf("invalid timestamp format: %v", err)
}
}
return nil
}

// MarshalJSON customizes the JSON marshalling for ActivationData
func (s ActivationData) MarshalJSON() ([]byte, error) {
type Alias ActivationData
if s.Schedule != "" {
if _, err := time.Parse(time.RFC3339, s.Schedule); err != nil {
return nil, fmt.Errorf("invalid timestamp format: %v", err)
}
}
return json.Marshal(&struct {
*Alias
}{
Alias: (*Alias)(&s),
})
}

type HeartBeatAction string

const (
Expand All @@ -65,57 +101,16 @@ type HeartBeatData struct {
Action HeartBeatAction `json:"action"`
Time time.Time `json:"time"`
}
type ScheduleSpec struct {
Date string `json:"date"`
Time string `json:"time"`
Zone string `json:"zone"`
}

func (s ScheduleSpec) ShouldFireNow() (bool, error) {
dt, err := s.GetTime()
func (s ActivationData) ShouldFireNow() (bool, error) {
dt, err := time.Parse(time.RFC3339, s.Schedule)
if err != nil {
return false, err
}
dtNow := time.Now().UTC()
dtUTC := dt.In(time.UTC)
return dtUTC.Before(dtNow), nil
}
func (s ScheduleSpec) GetTime() (time.Time, error) {
dt, err := parseTimeWithZone(s.Time, s.Date, s.Zone)
if err != nil {
return time.Time{}, err
}
return dt, nil
}

func parseTimeWithZone(timeStr string, dateStr string, zoneStr string) (time.Time, error) {
dtStr := dateStr + " " + timeStr

switch zoneStr {
case "LOCAL":
zoneStr = ""
case "PST", "PDT":
zoneStr = "America/Los_Angeles"
case "EST", "EDT":
zoneStr = "America/New_York"
case "CST", "CDT":
zoneStr = "America/Chicago"
case "MST", "MDT":
zoneStr = "America/Denver"
}

loc, err := time.LoadLocation(zoneStr)
if err != nil {
return time.Time{}, err
}

dt, err := time.ParseInLocation("2006-01-02 3:04:05PM", dtStr, loc)
if err != nil {
return time.Time{}, err
}

return dt, nil
}

type InputOutputData struct {
Inputs map[string]interface{} `json:"inputs,omitempty"`
Expand Down
Loading

0 comments on commit eff72ca

Please sign in to comment.