Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backward compatible metadata keys #4738

Merged
merged 5 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions pkg/lib/envelope/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (

// Metadata keys
const (
KeyMessageType = "Bacalhau-Type"
KeyPayloadEncoding = "Bacalhau-PayloadEncoding"
HeaderPrefix = "Bacalhau-"
KeyMessageType = "Type"
KeyPayloadEncoding = "PayloadEncoding"
)

// Metadata contains metadata about the message
Expand All @@ -23,11 +24,22 @@ func (m Metadata) ToMap() map[string]string {
func (m Metadata) ToHeaders() map[string][]string {
headers := make(map[string][]string, len(m))
for k, v := range m {
headers[k] = []string{v}
headers[HeaderPrefix+k] = []string{v}
}
return headers
}

// FromHeaders creates a new Metadata object from a map[string][]string
func FromHeaders(headers map[string][]string) *Metadata {
metadata := make(Metadata, len(headers))
for k, v := range headers {
if len(v) > 0 {
metadata[k[len(HeaderPrefix):]] = v[0]
}
}
return &metadata
}
wdbaruni marked this conversation as resolved.
Show resolved Hide resolved

// NewMetadataFromMap creates a new shallow copy Metadata object from a map.
// Changes to the map will be reflected in the Metadata object, but more efficient than NewMetadataFromMapCopy
func NewMetadataFromMap(m map[string]string) *Metadata {
Expand Down
10 changes: 5 additions & 5 deletions pkg/lib/ncl/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package ncl

// Metadata keys
const (
KeySource = "Bacalhau-Source"
KeyEventTime = "Bacalhau-EventTime"
KeyMessageUUID = "Bacalhau-MessageUUID"
KeyMessageID = "Bacalhau-MessageID"
KeySubject = "Bacalhau-Subject"
KeySource = "Source"
KeyEventTime = "EventTime"
KeyMessageUUID = "MessageUUID"
KeyMessageID = "MessageID"
KeySubject = "Subject"
)
2 changes: 1 addition & 1 deletion pkg/lib/ncl/error_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const (
ErrorMessageType = "ncl.ErrorResponse"

// KeyStatusCode is the key for the status code
KeyStatusCode = "Bacalhau-StatusCode"
KeyStatusCode = "StatusCode"

// StatusBadRequest is the status code for a bad request
StatusBadRequest = 400
Expand Down
2 changes: 1 addition & 1 deletion pkg/lib/ncl/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *subscriber) Subscribe(ctx context.Context, subjects ...string) error {
// messageHandler is the callback function for message processing
func (s *subscriber) handleNatsMessage(m *nats.Msg) {
if err := s.processMessage(m); err != nil {
log.Error().Err(err).Msg("failed to process message")
log.Error().Err(err).Str("handler", s.config.Name).Msg("failed to process message")

s.consecutiveFailures += 1
delay := s.config.Backoff.BackoffDuration(s.consecutiveFailures)
Expand Down
2 changes: 1 addition & 1 deletion pkg/transport/nclprotocol/dispatcher/constants.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package dispatcher

const (
KeySeqNum = "Bacalhau-SeqNum"
KeySeqNum = "SeqNum"
)
2 changes: 1 addition & 1 deletion pkg/transport/nclprotocol/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type ConnectionHealth struct {
}

const (
KeySeqNum = "Bacalhau-SeqNum"
KeySeqNum = "SeqNum"
)

// MessageCreator defines how events from the watcher are converted into
Expand Down
Loading