Skip to content

Commit

Permalink
Adds date label to benthos metrics (#2698)
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzelei authored Sep 16, 2024
1 parent dad3f03 commit b4e24cb
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
7 changes: 7 additions & 0 deletions backend/pkg/metrics/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ const (
TableNameLabel = "tableName"
JobTypeLabel = "jobType"
IsUpdateConfigLabel = "isUpdateConfig"

NeosyncDateLabel = "date"
NeosyncDateFormat = "2006-01-02"

TemporalWorkflowIdEnvKey = "TEMPORAL_WORKFLOW_ID"
TemporalRunIdEnvKey = "TEMPORAL_ENV_ID"
NeosyncDateEnvKey = "NEOSYNC_DATE"
)

func NewEqLabel(key, value string) MetricLabel {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,9 @@ func (b *benthosBuilder) GenerateBenthosConfigs(
labels := metrics.MetricLabels{
metrics.NewEqLabel(metrics.AccountIdLabel, job.AccountId),
metrics.NewEqLabel(metrics.JobIdLabel, job.Id),
metrics.NewEqLabel(metrics.TemporalWorkflowId, "${TEMPORAL_WORKFLOW_ID}"),
metrics.NewEqLabel(metrics.TemporalRunId, "${TEMPORAL_RUN_ID}"),
metrics.NewEqLabel(metrics.TemporalWorkflowId, withEnvInterpolation(metrics.TemporalWorkflowIdEnvKey)),
metrics.NewEqLabel(metrics.TemporalRunId, withEnvInterpolation(metrics.TemporalRunIdEnvKey)),
metrics.NewEqLabel(metrics.NeosyncDateLabel, withEnvInterpolation(metrics.NeosyncDateEnvKey)),
}
for _, resp := range responses {
joinedLabels := append(labels, resp.metriclabels...) //nolint:gocritic
Expand Down Expand Up @@ -292,6 +293,10 @@ func (b *benthosBuilder) GenerateBenthosConfigs(
}, nil
}

func withEnvInterpolation(input string) string {
return fmt.Sprintf("${%s}", input)
}

// tries to get destination schema column info map
// if not uses source destination schema column info map
func (b *benthosBuilder) GetSqlSchemaColumnMap(
Expand Down
6 changes: 4 additions & 2 deletions worker/pkg/workflows/datasync/activities/sync/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
_ "github.com/warpstreamlabs/bento/public/components/sql"

neosynclogger "github.com/nucleuscloud/neosync/backend/pkg/logger"
"github.com/nucleuscloud/neosync/backend/pkg/metrics"
connectiontunnelmanager "github.com/nucleuscloud/neosync/worker/internal/connection-tunnel-manager"
"github.com/nucleuscloud/neosync/worker/internal/connection-tunnel-manager/providers"
"github.com/nucleuscloud/neosync/worker/internal/connection-tunnel-manager/providers/mongoprovider"
Expand Down Expand Up @@ -279,8 +280,9 @@ func (a *Activity) Sync(ctx context.Context, req *SyncRequest, metadata *SyncMet
}

envKeyMap := syncMapToStringMap(&envKeyDsnSyncMap)
envKeyMap["TEMPORAL_WORKFLOW_ID"] = info.WorkflowExecution.ID
envKeyMap["TEMPORAL_RUN_ID"] = info.WorkflowExecution.RunID
envKeyMap[metrics.TemporalWorkflowIdEnvKey] = info.WorkflowExecution.ID
envKeyMap[metrics.TemporalRunIdEnvKey] = info.WorkflowExecution.RunID
envKeyMap[metrics.NeosyncDateEnvKey] = time.Now().UTC().Format(metrics.NeosyncDateFormat)

streamBuilderMu.Lock()
streambldr := benthosenv.NewStreamBuilder()
Expand Down

0 comments on commit b4e24cb

Please sign in to comment.