Skip to content

Commit

Permalink
fix: Add missing contentType to new Publish APIs
Browse files Browse the repository at this point in the history
Also added missing UUID and refactored out duplicate code

Signed-off-by: Leonard Goodell <leonard.goodell@intel.com>
  • Loading branch information
Leonard Goodell committed Jul 31, 2023
1 parent ded0d21 commit 9b07666
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 79 deletions.
29 changes: 9 additions & 20 deletions internal/app/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
bootstrapHandlers "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/handlers"
"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos"
"github.com/edgexfoundry/go-mod-messaging/v3/pkg/types"
"github.com/google/uuid"

"github.com/edgexfoundry/app-functions-sdk-go/v3/internal"
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/appfunction"
Expand All @@ -43,8 +44,6 @@ import (
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/webserver"
"github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/interfaces"
"github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/util"
contractsCommon "github.com/edgexfoundry/go-mod-core-contracts/v3/common"

clientInterfaces "github.com/edgexfoundry/go-mod-core-contracts/v3/clients/interfaces"
"github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger"
coreCommon "github.com/edgexfoundry/go-mod-core-contracts/v3/common"
Expand Down Expand Up @@ -742,29 +741,16 @@ func (svc *Service) BuildContext(correlationId string, contentType string) inter
}

// Publish pushes data to the MessageBus using configured topic
func (svc *Service) Publish(data any) error {
messageClient := bootstrapContainer.MessagingClientFrom(svc.dic.Get)
if messageClient == nil {
return fmt.Errorf(messageBusDisabledErr)
}

triggertopic := svc.config.Trigger.PublishTopic
baseTopic := svc.config.MessageBus.BaseTopicPrefix

payload, err := json.Marshal(data)
func (svc *Service) Publish(data any, contentType string) error {
err := svc.PublishWithTopic(svc.config.Trigger.PublishTopic, data, contentType)
if err != nil {
return fmt.Errorf("%v: %v", publishMarshalErr, err)
}
message := types.NewMessageEnvelope(payload, context.Background())
err = messageClient.Publish(message, contractsCommon.BuildTopic(baseTopic, triggertopic))
if err != nil {
return fmt.Errorf("%v: %v", publishDataErr, err)
return err
}
return nil
}

// Publish pushes data to the MessageBus using given topic
func (svc *Service) PublishWithTopic(topic string, data any) error {
// PublishWithTopic pushes data to the MessageBus using given topic
func (svc *Service) PublishWithTopic(topic string, data any, contentType string) error {
messageClient := bootstrapContainer.MessagingClientFrom(svc.dic.Get)
if messageClient == nil {
return fmt.Errorf(messageBusDisabledErr)
Expand All @@ -776,6 +762,9 @@ func (svc *Service) PublishWithTopic(topic string, data any) error {
}

message := types.NewMessageEnvelope(payload, context.Background())
message.CorrelationID = uuid.NewString()
message.ContentType = contentType

err = messageClient.Publish(message, coreCommon.BuildTopic(svc.config.MessageBus.BaseTopicPrefix, topic))
if err != nil {
return fmt.Errorf("%v: %v", publishDataErr, err)
Expand Down
5 changes: 3 additions & 2 deletions internal/app/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/edgexfoundry/go-mod-bootstrap/v3/di"
clients "github.com/edgexfoundry/go-mod-core-contracts/v3/clients/http"
"github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger"
coreCommon "github.com/edgexfoundry/go-mod-core-contracts/v3/common"
"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos"
messageMocks "github.com/edgexfoundry/go-mod-messaging/v3/messaging/mocks"
"github.com/google/uuid"
Expand Down Expand Up @@ -1092,7 +1093,7 @@ func TestService_Publish(t *testing.T) {
config: tt.config,
}

err := svc.Publish(tt.message)
err := svc.Publish(tt.message, coreCommon.ContentTypeJSON)
require.Equal(t, tt.expectedError, err)
})
}
Expand Down Expand Up @@ -1177,7 +1178,7 @@ func TestService_PublishWithTopic(t *testing.T) {
config: tt.config,
}

err := svc.PublishWithTopic(tt.topic, tt.message)
err := svc.PublishWithTopic(tt.topic, tt.message, coreCommon.ContentTypeJSON)
require.Equal(t, tt.expectedError, err)

})
Expand Down
37 changes: 18 additions & 19 deletions internal/appfunction/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
bootstrapInterfaces "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/interfaces"
"github.com/edgexfoundry/go-mod-messaging/v3/pkg/types"
"github.com/google/uuid"

"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/bootstrap/container"
"github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/interfaces"
Expand All @@ -43,6 +44,7 @@ import (
const (
messageBusDisabledErr = "publish failed due to MessageBus disabled via configuration"
publishDataErr = "failed to publish data to messagebus"
topicFormatErr = "failed to format publish topic"
)

// NewContext creates, initializes and return a new Context with implements the interfaces.AppFunctionContext interface
Expand Down Expand Up @@ -280,44 +282,41 @@ func (appContext *Context) PipelineId() string {
}

// Publish pushes data to the MessageBus using configured topic
func (appContext *Context) Publish(data any) error {
messageClient := bootstrapContainer.MessagingClientFrom(appContext.Dic.Get)
if messageClient == nil {
return fmt.Errorf(messageBusDisabledErr)
}
func (appContext *Context) Publish(data any, contentType string) error {
topic := container.ConfigurationFrom(appContext.Dic.Get).Trigger.PublishTopic

triggerConfig := container.ConfigurationFrom(appContext.Dic.Get).Trigger
baseTopic := bootstrapContainer.ConfigurationFrom(appContext.Dic.Get).GetBootstrap().MessageBus.BaseTopicPrefix

payload, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("failed to marshal data for publishing using configured topic: %v", err)
}
message := types.NewMessageEnvelope(payload, context.Background())
err = messageClient.Publish(message, common.BuildTopic(baseTopic, triggerConfig.PublishTopic))
err := appContext.PublishWithTopic(topic, data, contentType)
if err != nil {
return fmt.Errorf("%v: %v", publishDataErr, err)
return err
}
return nil
}

// Publish pushes data to the MessageBus for a given topic
func (appContext *Context) PublishWithTopic(topic string, data any) error {
// PublishWithTopic pushes data to the MessageBus for a given topic
func (appContext *Context) PublishWithTopic(topic string, data any, contentType string) error {
messageClient := bootstrapContainer.MessagingClientFrom(appContext.Dic.Get)
if messageClient == nil {
return fmt.Errorf(messageBusDisabledErr)
}

config := bootstrapContainer.ConfigurationFrom(appContext.Dic.Get)

full_topic := common.BuildTopic(config.GetBootstrap().MessageBus.BaseTopicPrefix, topic)
formattedTopic, err := appContext.ApplyValues(topic)
if err != nil {
return fmt.Errorf("%v: %v", topicFormatErr, err)
}

fullTopic := common.BuildTopic(config.GetBootstrap().MessageBus.BaseTopicPrefix, formattedTopic)
payload, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("failed to marshal data for publishing using given topic: %v", err)
}

message := types.NewMessageEnvelope(payload, context.Background())
err = messageClient.Publish(message, full_topic)
message.ContentType = contentType
message.CorrelationID = uuid.NewString()

err = messageClient.Publish(message, fullTopic)
if err != nil {
return fmt.Errorf("%v: %v", publishDataErr, err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/appfunction/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ func TestContext_Publish(t *testing.T) {

appContext := NewContext("", publishDic, "")

err := appContext.Publish(tt.message)
err := appContext.Publish(tt.message, common.ContentTypeJSON)
require.Equal(t, tt.expectedError, err)
})
}
Expand Down Expand Up @@ -634,7 +634,7 @@ func TestService_PublishWithTopic(t *testing.T) {

appContext := NewContext("", publishDic, "")

err := appContext.PublishWithTopic(tt.topic, tt.message)
err := appContext.PublishWithTopic(tt.topic, tt.message, common.ContentTypeJSON)
require.Equal(t, tt.expectedError, err)

})
Expand Down
6 changes: 3 additions & 3 deletions pkg/interfaces/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type AppFunctionContext interface {
// PipelineId returns the ID of the pipeline that is executing
PipelineId() string
// Publish pushes data to the MessageBus using configured topic
Publish(data any) error
// Publish pushes data to the MessageBus for a given topic
PublishWithTopic(topic string, data any) error
Publish(data any, contentType string) error
// PublishWithTopic pushes data to the MessageBus using given topic
PublishWithTopic(topic string, data any, contentType string) error
}
31 changes: 16 additions & 15 deletions pkg/interfaces/mocks/AppFunctionContext.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 16 additions & 15 deletions pkg/interfaces/mocks/ApplicationService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/interfaces/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ type ApplicationService interface {
// BuildContext allows external callers that may need a context (eg background publishers) to easily create one
BuildContext(correlationId string, contentType string) AppFunctionContext
// Publish pushes data to the MessageBus using configured topic
Publish(data any) error
// Publish pushes data to the MessageBus using given topic
PublishWithTopic(topic string, data any) error
Publish(data any, contentType string) error
// PublishWithTopic pushes data to the MessageBus using given topic
PublishWithTopic(topic string, data any, contentType string) error
}

0 comments on commit 9b07666

Please sign in to comment.