Skip to content

Commit

Permalink
fix comments: add ut, change logging and activation status update
Browse files Browse the repository at this point in the history
  • Loading branch information
RemindD committed Jun 11, 2024
1 parent 700d9b5 commit aa59063
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 24 deletions.
45 changes: 29 additions & 16 deletions api/pkg/apis/v1alpha1/vendors/stage-vendor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,29 @@ func (s *StageVendor) Init(config vendors.VendorConfig, factories []managers.IMa
return v1alpha2.NewCOAError(nil, "event body is not an activation job", v1alpha2.BadRequest)
}
campaignName := api_utils.ReplaceSeperator(actData.Campaign)

campaign, err := s.CampaignsManager.GetState(context.TODO(), campaignName, actData.Namespace)
if err != nil {
log.Error("V (Stage): unable to find campaign: %+v", err)
err = s.reportActivationStatusWithBadRequest(actData.Activation, actData.Namespace, err)
// If report status succeeded, return an empty err so the subscribe function will not be retried
// The actual error will be stored in Activation cr
return err
}
activation, err := s.ActivationsManager.GetState(context.TODO(), actData.Activation, actData.Namespace)
if err != nil {
log.Error("V (Stage): unable to find activation: %+v", err)
err = s.reportActivationStatusWithBadRequest(actData.Activation, actData.Namespace, err)
// If report status succeeded, return an empty err so the subscribe function will not be retried
// The actual error will be stored in Activation cr
return err
}

evt, err := s.StageManager.HandleActivationEvent(context.TODO(), actData, *campaign.Spec, activation)
if err != nil {
err = s.reportActivationStatusWithBadRequest(actData.Activation, actData.Namespace, err)
// If report status succeeded, return an empty err so the subscribe function will not be retried
// The actual error will be stored in Activation cr
return err
}

Expand All @@ -122,15 +132,8 @@ func (s *StageVendor) Init(config vendors.VendorConfig, factories []managers.IMa
err := json.Unmarshal(jData, &triggerData)
if err != nil {
err = v1alpha2.NewCOAError(nil, "event body is not an activation job", v1alpha2.BadRequest)
status.Status = v1alpha2.BadRequest
status.StatusMessage = v1alpha2.BadRequest.String()
status.ErrorMessage = err.Error()
status.IsActive = false
sLog.Errorf("V (Stage): failed to deserialize activation data: %v", err)
err = s.ActivationsManager.ReportStatus(context.TODO(), triggerData.Activation, triggerData.Namespace, status)
if err != nil {
sLog.Errorf("V (Stage): failed to report error status: %v (%v)", status.ErrorMessage, err)
}
err = s.reportActivationStatusWithBadRequest(triggerData.Activation, triggerData.Namespace, err)
// If report status succeeded, return an empty err so the subscribe function will not be retried
// The actual error will be stored in Activation cr
return err
Expand All @@ -140,15 +143,8 @@ func (s *StageVendor) Init(config vendors.VendorConfig, factories []managers.IMa
campaignName := api_utils.ReplaceSeperator(triggerData.Campaign)
campaign, err := s.CampaignsManager.GetState(context.TODO(), campaignName, triggerData.Namespace)
if err != nil {
status.Status = v1alpha2.BadRequest
status.StatusMessage = v1alpha2.BadRequest.String()
status.ErrorMessage = err.Error()
status.IsActive = false
sLog.Errorf("V (Stage): failed to get campaign spec: %v", err)
err = s.ActivationsManager.ReportStatus(context.TODO(), triggerData.Activation, triggerData.Namespace, status)
if err != nil {
sLog.Errorf("V (Stage): failed to report error status: %v (%v)", status.ErrorMessage, err)
}
err = s.reportActivationStatusWithBadRequest(triggerData.Activation, triggerData.Namespace, err)
// If report status succeeded, return an empty err so the subscribe function will not be retried
// The actual error will be stored in Activation cr
return err
Expand Down Expand Up @@ -312,3 +308,20 @@ func (s *StageVendor) Init(config vendors.VendorConfig, factories []managers.IMa
})
return nil
}

func (s *StageVendor) reportActivationStatusWithBadRequest(activation string, namespace string, err error) error {
status := model.ActivationStatus{
Stage: "",
NextStage: "",
Outputs: map[string]interface{}{},
Status: v1alpha2.BadRequest,
StatusMessage: v1alpha2.BadRequest.String(),
ErrorMessage: err.Error(),
IsActive: true,
}
err = s.ActivationsManager.ReportStatus(context.TODO(), activation, namespace, status)
if err != nil {
sLog.Errorf("V (Stage): failed to report error status on activtion %s/%s: %v (%v)", namespace, activation, status.ErrorMessage, err)
}
return err
}
12 changes: 6 additions & 6 deletions coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (s *RedisPubSubProvider) SetContext(ctx *contexts.ManagerContext) {
func (i *RedisPubSubProvider) InitWithMap(properties map[string]string) error {
config, err := RedisPubSubProviderConfigFromMap(properties)
if err != nil {
mLog.Debugf(" P (Redis PubSub) : failed to initialize provider %v", err)
mLog.Errorf(" P (Redis PubSub) : failed to initialize provider %v", err)
return err
}
return i.Init(config)
Expand All @@ -153,7 +153,7 @@ func (i *RedisPubSubProvider) InitWithMap(properties map[string]string) error {
func (i *RedisPubSubProvider) Init(config providers.IProviderConfig) error {
vConfig, err := toRedisPubSubProviderConfig(config)
if err != nil {
mLog.Debugf(" P (Redis PubSub): failed to parse provider config %+v", err)
mLog.Errorf(" P (Redis PubSub): failed to parse provider config %+v", err)
return v1alpha2.NewCOAError(nil, "provided config is not a valid redis pub-sub provider config", v1alpha2.BadConfig)
}
i.Config = vConfig
Expand All @@ -176,7 +176,7 @@ func (i *RedisPubSubProvider) Init(config providers.IProviderConfig) error {
}
client := redis.NewClient(options)
if _, err := client.Ping().Result(); err != nil {
mLog.Debugf(" P (Redis PubSub): failed to connect to redis %+v", err)
mLog.Errorf(" P (Redis PubSub): failed to connect to redis %+v", err)
return v1alpha2.NewCOAError(err, fmt.Sprintf("redis stream: error connecting to redis at %s", i.Config.Host), v1alpha2.InternalError)
}
i.Client = client
Expand Down Expand Up @@ -383,7 +383,7 @@ func toRedisPubSubProviderConfig(config providers.IProviderConfig) (RedisPubSubP
var configs map[string]interface{}
err = json.Unmarshal(data, &configs)
if err != nil {
mLog.Info(" P (Redis PubSub): failed to parse to map[string]interface{} %+v", err)
mLog.Errorf(" P (Redis PubSub): failed to parse to map[string]interface{} %+v", err)
return ret, err
}
configStrings := map[string]string{}
Expand All @@ -393,7 +393,7 @@ func toRedisPubSubProviderConfig(config providers.IProviderConfig) (RedisPubSubP

ret, err = RedisPubSubProviderConfigFromMap(configStrings)
if err != nil {
mLog.Info(" P (Redis PubSub): failed to parse to RedisPubSubProviderConfig %+v", err)
mLog.Errorf(" P (Redis PubSub): failed to parse to RedisPubSubProviderConfig %+v", err)
return ret, err
}
return ret, err
Expand All @@ -404,7 +404,7 @@ func parseDuration(val string) (time.Duration, error) {
if err != nil {
n, err = time.ParseDuration(val)
if err != nil {
return time.Duration(1), err
return time.Duration(1000), err
} else {
return n, nil
}
Expand Down
27 changes: 25 additions & 2 deletions coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func TestRedisPubSubProviderConfigFromMap(t *testing.T) {
"numberOfWorkers": "1",
"queueDepth": "10",
"consumerID": "test-consumer",
"processingTimeout": "10s",
"processingTimeout": "10",
"redeliverInterval": "10",
}
config, err := RedisPubSubProviderConfigFromMap(configMap)
Expand All @@ -309,6 +309,29 @@ func TestRedisPubSubProviderConfigFromMap(t *testing.T) {
assert.Equal(t, 1, config.NumberOfWorkers)
assert.Equal(t, 10, config.QueueDepth)
assert.Equal(t, "test-consumer", config.ConsumerID)
assert.Equal(t, time.Duration(10*time.Second), config.ProcessingTimeout)
assert.Equal(t, time.Duration(10), config.ProcessingTimeout)
assert.Equal(t, time.Duration(10), config.RedeliverInterval)

configMap = map[string]string{
"name": "test",
"host": "localhost:6379",
"password": "123",
"requiresTLS": "true",
"numberOfWorkers": "1",
"queueDepth": "10",
"consumerID": "test-consumer",
"processingTimeout": "10s",
"redeliverInterval": "10s",
}
config, err = RedisPubSubProviderConfigFromMap(configMap)
assert.Nil(t, err)
assert.Equal(t, "test", config.Name)
assert.Equal(t, "localhost:6379", config.Host)
assert.Equal(t, "123", config.Password)
assert.Equal(t, true, config.RequiresTLS)
assert.Equal(t, 1, config.NumberOfWorkers)
assert.Equal(t, 10, config.QueueDepth)
assert.Equal(t, "test-consumer", config.ConsumerID)
assert.Equal(t, time.Duration(10*time.Second), config.ProcessingTimeout)
assert.Equal(t, time.Duration(10*time.Second), config.RedeliverInterval)
}

0 comments on commit aa59063

Please sign in to comment.