Skip to content

Commit

Permalink
chore: batch event schema messages for faster processing
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed May 29, 2023
1 parent 3f88f50 commit 7414469
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 87 deletions.
31 changes: 31 additions & 0 deletions proto/event-schema/types.helper.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package proto

import (
"crypto/md5"
"encoding/hex"
"fmt"
"sort"
"strings"

"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -30,3 +34,30 @@ func UnmarshalEventSchemaMessage(raw []byte) (*EventSchemaMessage, error) {
}
return p, nil
}

// SchemaHash returns a hash of the schema. Keys are sorted lexicographically during hashing.
func SchemaHash(schema map[string]string) string {
keys := make([]string, 0, len(schema))
for k := range schema {
keys = append(keys, k)
}
sort.Strings(keys)
var sb strings.Builder
for _, k := range keys {
sb.WriteString(k)
sb.WriteString(":")
sb.WriteString(schema[k])
sb.WriteString(",")
}
md5Sum := md5.Sum([]byte(sb.String()))
schemaHash := hex.EncodeToString(md5Sum[:])
return schemaHash
}

func (sm *EventSchemaMessage) Merge(other *EventSchemaMessage) {
sm.BatchCount += other.BatchCount + 1
sm.Sample = other.Sample
if other.ObservedAt.AsTime().After(sm.ObservedAt.AsTime()) {
sm.ObservedAt = other.ObservedAt
}
}
35 changes: 27 additions & 8 deletions proto/event-schema/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions proto/event-schema/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ message EventSchemaMessage {
google.protobuf.Timestamp observedAt = 4;
bytes sample = 5;
bytes correlationID = 6;
string hash = 7;
int64 batchCount = 8;
}
68 changes: 68 additions & 0 deletions schema-forwarder/internal/batcher/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package batcher

import (
"github.com/rudderlabs/rudder-server/jobsdb"
proto "github.com/rudderlabs/rudder-server/proto/event-schema"
"github.com/rudderlabs/rudder-server/schema-forwarder/internal/transformer"
)

type EventSchemaMessageBatch struct {
Message *proto.EventSchemaMessage
Jobs []*jobsdb.JobT
}

func NewEventSchemaMessageBatcher(transformer transformer.Transformer) *EventSchemaMessageBatcher {
return &EventSchemaMessageBatcher{
transformer: transformer,
batchIndex: make(map[batchKey]*EventSchemaMessageBatch),
}
}

type EventSchemaMessageBatcher struct {
transformer transformer.Transformer

batchOrder []batchKey
batchIndex map[batchKey]*EventSchemaMessageBatch
}

// Add adds a job to the batcher after transforming it to an [EventSchemaMessage].
// If the message is already in the batcher, the two messages will be merged to one.
func (sb *EventSchemaMessageBatcher) Add(job *jobsdb.JobT) error {
msg, err := sb.transformer.Transform(job)
if err != nil {
return err
}
key := batchKey{
writeKey: msg.Key.WriteKey,
eventType: msg.Key.EventType,
eventIdentifier: msg.Key.EventIdentifier,
hash: msg.Hash,
}
if _, ok := sb.batchIndex[key]; !ok {
sb.batchOrder = append(sb.batchOrder, key)
sb.batchIndex[key] = &EventSchemaMessageBatch{
Message: msg,
Jobs: []*jobsdb.JobT{job},
}
} else {
sb.batchIndex[key].Jobs = append(sb.batchIndex[key].Jobs, job)
sb.batchIndex[key].Message.Merge(msg)
}
return nil
}

// GetMessageBatches returns the message batches in the order they were added.
func (sb *EventSchemaMessageBatcher) GetMessageBatches() []*EventSchemaMessageBatch {
batches := make([]*EventSchemaMessageBatch, len(sb.batchOrder))
for i, key := range sb.batchOrder {
batches[i] = sb.batchIndex[key]
}
return batches
}

type batchKey struct {
writeKey string
eventType string
eventIdentifier string
hash string
}
143 changes: 74 additions & 69 deletions schema-forwarder/internal/forwarder/jobsforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/internal/pulsar"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/schema-forwarder/internal/batcher"
"github.com/rudderlabs/rudder-server/schema-forwarder/internal/transformer"
"github.com/rudderlabs/rudder-server/utils/bytesize"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/samber/lo"
)

// JobsForwarder is a forwarder that transforms and forwards jobs to a pulsar topic
Expand Down Expand Up @@ -92,101 +92,106 @@ func (jf *JobsForwarder) Start() error {
jf.terminalErrFn(err) // we are signaling to shutdown the app
return err
}
var mu sync.Mutex // protects statuses and retryJobs
var mu sync.Mutex // protects statuses and toRetry
var statuses []*jobsdb.JobStatusT
toRetry := append([]*jobsdb.JobT{}, jobs...)

jobDone := func(job *jobsdb.JobT) {
_, idx, _ := lo.FindIndexOf(toRetry, func(j *jobsdb.JobT) bool {
return j.JobID == job.JobID
})
toRetry = slices.Delete(toRetry, idx, idx+1)
schemaBatcher := batcher.NewEventSchemaMessageBatcher(jf.transformer)
for _, job := range jobs {
if err := schemaBatcher.Add(job); err != nil { // mark job as aborted
errorResponse, _ := json.Marshal(map[string]string{"transform_error": err.Error()})
statuses = append(statuses, &jobsdb.JobStatusT{
JobID: job.JobID,
AttemptNum: job.LastJobStatus.AttemptNum + 1,
JobState: jobsdb.Aborted.State,
ExecTime: time.Now(),
RetryTime: time.Now(),
ErrorCode: "400",
Parameters: []byte(`{}`),
JobParameters: job.Parameters,
ErrorResponse: errorResponse,
})
jf.stat.NewTaggedStat("schema_forwarder_jobs", stats.CountType, stats.Tags{"state": "invalid"}).Increment()
continue
}
}
messageBatches := schemaBatcher.GetMessageBatches()
for _, messageBatch := range messageBatches {
if !bytes.Equal(messageBatch.Message.Sample, []byte("{}")) && // if the sample is not an empty json object (redacted) and
(len(messageBatch.Message.Sample) > int(jf.maxSampleSize) || // sample is too big or
!jf.sampler.Sample(messageBatch.Message.Key.String())) { // sample should be skipped
messageBatch.Message.Sample = nil // by setting to sample to nil we are instructing the schema worker to keep the previous sample
}
}
jf.stat.NewStat("schema_forwarder_batch_size", stats.CountType).Count(len(messageBatches))

// try to forward toRetry jobs to pulsar. Succeeded or invalid jobs are removed from toRetry
tryForwardJobs := func() error {
// try to forward messageBatches to pulsar. Succeeded jobs are removed from messageBatches
tryForward := func() error {
if ctx.Err() != nil { // we are shutting down
return nil //nolint:nilerr
}
toForward := append([]*jobsdb.JobT{}, toRetry...)
for _, job := range toForward {
job := job // job can be used in a goroutine
msg, orderKey, err := jf.transformer.Transform(job)
if err != nil { // mark job as aborted and remove from toRetry
mu.Lock()
jobDone(job)
errorResponse, _ := json.Marshal(map[string]string{"transform_error": err.Error()})
statuses = append(statuses, &jobsdb.JobStatusT{
JobID: job.JobID,
AttemptNum: job.LastJobStatus.AttemptNum + 1,
JobState: jobsdb.Aborted.State,
ExecTime: time.Now(),
RetryTime: time.Now(),
ErrorCode: "400",
Parameters: []byte(`{}`),
JobParameters: job.Parameters,
ErrorResponse: errorResponse,
})
jf.stat.NewTaggedStat("schema_forwarder_jobs", stats.CountType, stats.Tags{"state": "invalid"}).Increment()
mu.Unlock()
continue
}
if !bytes.Equal(msg.Sample, []byte("{}")) && // if the sample is not an empty json object (redacted) and
(len(msg.Sample) > int(jf.maxSampleSize) || // sample is too big or
!jf.sampler.Sample(msg.Key.String())) { // sample should be skipped
msg.Sample = nil // by setting to sample to nil we are instructing the schema worker to keep the previous sample
}
toForward := append([]*batcher.EventSchemaMessageBatch{}, messageBatches...)
for idx, batch := range toForward {
batch := batch // can be used in a goroutine
idx := idx // can be used in a goroutine
msg := batch.Message
orderKey := msg.Key.WriteKey
jf.pulsarProducer.SendMessageAsync(ctx, orderKey, orderKey, msg.MustMarshal(),
func(_ pulsarType.MessageID, _ *pulsarType.ProducerMessage, err error) {
if err == nil { // mark job as succeeded and remove from toRetry
mu.Lock()
defer mu.Unlock()
jobDone(job)
statuses = append(statuses, &jobsdb.JobStatusT{
JobID: job.JobID,
AttemptNum: job.LastJobStatus.AttemptNum + 1,
JobState: jobsdb.Succeeded.State,
ExecTime: time.Now(),
Parameters: []byte(`{}`),
JobParameters: job.Parameters,
})
jf.stat.NewTaggedStat("schema_forwarder_processed_jobs", stats.CountType, stats.Tags{"state": "succeeded"}).Increment()
messageBatches = slices.Delete(messageBatches, idx, idx+1)
for _, job := range batch.Jobs {
statuses = append(statuses, &jobsdb.JobStatusT{
JobID: job.JobID,
AttemptNum: job.LastJobStatus.AttemptNum + 1,
JobState: jobsdb.Succeeded.State,
ExecTime: time.Now(),
Parameters: []byte(`{}`),
JobParameters: job.Parameters,
})
}

jf.stat.NewTaggedStat("schema_forwarder_processed_jobs", stats.CountType, stats.Tags{"state": "succeeded"}).Count(len(batch.Jobs))
} else {
jf.stat.NewTaggedStat("schema_forwarder_processed_jobs", stats.CountType, stats.Tags{"state": "failed"}).Increment()
jf.log.Errorf("failed to forward job %s: %v", job.JobID, err)
jf.stat.NewTaggedStat("schema_forwarder_processed_jobs", stats.CountType, stats.Tags{"state": "failed"}).Count(len(batch.Jobs))
jf.log.Errorf("failed to forward %d jobs : %v", len(batch.Jobs), err)
}
})
}
if err := jf.pulsarProducer.Flush(); err != nil {
return fmt.Errorf("failed to flush pulsar producer: %w", err)
}
if len(toRetry) > 0 {
return fmt.Errorf("failed to forward %d jobs", len(toRetry))
if len(messageBatches) > 0 {
return fmt.Errorf("failed to forward %d jobs", len(messageBatches))
}
return nil
}

// Retry to forward the jobs batch to pulsar until there are no more jobs to retry or until maxRetryElapsedTime is reached
// Retry to forward the batches to pulsar until there are no more messageBatches to retry or until maxRetryElapsedTime is reached
expB := backoff.NewExponentialBackOff()
expB.InitialInterval = jf.initialRetryInterval
expB.MaxInterval = jf.maxRetryInterval
expB.MaxElapsedTime = jf.maxRetryElapsedTime
if err = backoff.Retry(tryForwardJobs, backoff.WithContext(expB, ctx)); err != nil {
for _, job := range toRetry { // mark all to retry jobs as aborted
errorResponse, _ := json.Marshal(map[string]string{"error": err.Error()})
statuses = append(statuses, &jobsdb.JobStatusT{
JobID: job.JobID,
AttemptNum: job.LastJobStatus.AttemptNum + 1,
JobState: jobsdb.Aborted.State,
ExecTime: time.Now(),
RetryTime: time.Now(),
ErrorCode: "400",
Parameters: []byte(`{}`),
JobParameters: job.Parameters,
ErrorResponse: errorResponse,
})
if err = backoff.Retry(tryForward, backoff.WithContext(expB, ctx)); err != nil {
errorResponse, _ := json.Marshal(map[string]string{"error": err.Error()})
var abortedCount int
for _, schemaBatch := range messageBatches { // mark all messageBatches left over as aborted
for _, job := range schemaBatch.Jobs {
statuses = append(statuses, &jobsdb.JobStatusT{
JobID: job.JobID,
AttemptNum: job.LastJobStatus.AttemptNum + 1,
JobState: jobsdb.Aborted.State,
ExecTime: time.Now(),
RetryTime: time.Now(),
ErrorCode: "400",
Parameters: []byte(`{}`),
JobParameters: job.Parameters,
ErrorResponse: errorResponse,
})
abortedCount++
}
}
jf.stat.NewTaggedStat("schema_forwarder_processed_jobs", stats.CountType, stats.Tags{"state": "abort"}).Count(len(toRetry))
jf.stat.NewTaggedStat("schema_forwarder_jobs", stats.CountType, stats.Tags{"state": "abort"}).Count(abortedCount)
}
if err := jf.MarkJobStatuses(ctx, statuses); err != nil {
if ctx.Err() != nil { // we are shutting down
Expand Down
Loading

0 comments on commit 7414469

Please sign in to comment.