Skip to content

Commit

Permalink
chore: batch event schema messages for faster processing
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed May 29, 2023
1 parent 4a94c15 commit ca6a0c2
Show file tree
Hide file tree
Showing 8 changed files with 315 additions and 87 deletions.
32 changes: 32 additions & 0 deletions proto/event-schema/types.helper.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package proto

import (
"crypto/md5"
"encoding/hex"
"fmt"
"sort"
"strings"

"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -30,3 +34,31 @@ func UnmarshalEventSchemaMessage(raw []byte) (*EventSchemaMessage, error) {
}
return p, nil
}

// SchemaHash returns a hash of the schema. Keys are sorted lexicographically during hashing.
func SchemaHash(schema map[string]string) string {
keys := make([]string, 0, len(schema))
for k := range schema {
keys = append(keys, k)
}
sort.Strings(keys)
var sb strings.Builder
for _, k := range keys {
sb.WriteString(k)
sb.WriteString(":")
sb.WriteString(schema[k])
sb.WriteString(",")
}
md5Sum := md5.Sum([]byte(sb.String()))
schemaHash := hex.EncodeToString(md5Sum[:])
return schemaHash
}

// Merge merges the other event schema message into this one.
func (sm *EventSchemaMessage) Merge(other *EventSchemaMessage) {
sm.BatchCount += other.BatchCount + 1
sm.Sample = other.Sample
if other.ObservedAt.AsTime().After(sm.ObservedAt.AsTime()) {
sm.ObservedAt = other.ObservedAt
}
}
35 changes: 27 additions & 8 deletions proto/event-schema/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions proto/event-schema/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ message EventSchemaMessage {
google.protobuf.Timestamp observedAt = 4;
bytes sample = 5;
bytes correlationID = 6;
string hash = 7;
int64 batchCount = 8;
}
72 changes: 72 additions & 0 deletions schema-forwarder/internal/batcher/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package batcher

import (
"github.com/rudderlabs/rudder-server/jobsdb"
proto "github.com/rudderlabs/rudder-server/proto/event-schema"
"github.com/rudderlabs/rudder-server/schema-forwarder/internal/transformer"
)

// A batch of jobs that share the same schema.
type EventSchemaMessageBatch struct {
Message *proto.EventSchemaMessage
Jobs []*jobsdb.JobT
}

// NewEventSchemaMessageBatcher creates a new batcher.
func NewEventSchemaMessageBatcher(transformer transformer.Transformer) *EventSchemaMessageBatcher {
return &EventSchemaMessageBatcher{
transformer: transformer,
batchIndex: make(map[batchKey]*EventSchemaMessageBatch),
}
}

// EventSchemaMessageBatcher batches jobs by their schema.
type EventSchemaMessageBatcher struct {
transformer transformer.Transformer

batchOrder []batchKey
batchIndex map[batchKey]*EventSchemaMessageBatch
}

// Add adds a job to the batcher after transforming it to an [EventSchemaMessage].
// If the message is already in the batcher, the two messages will be merged to one.
func (sb *EventSchemaMessageBatcher) Add(job *jobsdb.JobT) error {
msg, err := sb.transformer.Transform(job)
if err != nil {
return err
}
key := batchKey{
writeKey: msg.Key.WriteKey,
eventType: msg.Key.EventType,
eventIdentifier: msg.Key.EventIdentifier,
hash: msg.Hash,
}
if _, ok := sb.batchIndex[key]; !ok {
sb.batchOrder = append(sb.batchOrder, key)
sb.batchIndex[key] = &EventSchemaMessageBatch{
Message: msg,
Jobs: []*jobsdb.JobT{job},
}
} else {
sb.batchIndex[key].Jobs = append(sb.batchIndex[key].Jobs, job)
sb.batchIndex[key].Message.Merge(msg)
}
return nil
}

// GetMessageBatches returns the message batches in the order they were added.
func (sb *EventSchemaMessageBatcher) GetMessageBatches() []*EventSchemaMessageBatch {
batches := make([]*EventSchemaMessageBatch, len(sb.batchOrder))
for i, key := range sb.batchOrder {
batches[i] = sb.batchIndex[key]
}
return batches
}

// batchKey is the key used for batching.
type batchKey struct {
writeKey string
eventType string
eventIdentifier string
hash string
}
98 changes: 98 additions & 0 deletions schema-forwarder/internal/batcher/batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package batcher_test

import (
"errors"
"testing"

"github.com/rudderlabs/rudder-server/jobsdb"
proto "github.com/rudderlabs/rudder-server/proto/event-schema"
"github.com/rudderlabs/rudder-server/schema-forwarder/internal/batcher"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestEventSchemaMessageBatcher(t *testing.T) {
t.Run("Add same message twice", func(t *testing.T) {
mockTransformer := &mockTransformer{}
mockTransformer.msg = &proto.EventSchemaMessage{
Key: &proto.EventSchemaKey{
WriteKey: "write-key",
EventType: "event-type",
EventIdentifier: "event-identifier",
},
Hash: "hash",
ObservedAt: timestamppb.Now(),
}

b := batcher.NewEventSchemaMessageBatcher(mockTransformer)

require.NoError(t, b.Add(&jobsdb.JobT{JobID: 1}))
require.NoError(t, b.Add(&jobsdb.JobT{JobID: 2}))

batches := b.GetMessageBatches()
require.Len(t, batches, 1)
require.Len(t, batches[0].Jobs, 2)
require.EqualValues(t, 1, batches[0].Message.BatchCount)
})

t.Run("Add different hash", func(t *testing.T) {
mockTransformer := &mockTransformer{}
mockTransformer.msg = &proto.EventSchemaMessage{
Key: &proto.EventSchemaKey{
WriteKey: "write-key",
EventType: "event-type",
EventIdentifier: "event-identifier",
},
Hash: "hash-1",
ObservedAt: timestamppb.Now(),
}

b := batcher.NewEventSchemaMessageBatcher(mockTransformer)

require.NoError(t, b.Add(&jobsdb.JobT{JobID: 1}))

mockTransformer.msg = &proto.EventSchemaMessage{
Key: &proto.EventSchemaKey{
WriteKey: "write-key",
EventType: "event-type",
EventIdentifier: "event-identifier",
},
Hash: "hash-2",
ObservedAt: timestamppb.Now(),
}

require.NoError(t, b.Add(&jobsdb.JobT{JobID: 2}))

batches := b.GetMessageBatches()
require.Len(t, batches, 2)

require.Len(t, batches[0].Jobs, 1)
require.EqualValues(t, 1, batches[0].Jobs[0].JobID)
require.EqualValues(t, 0, batches[0].Message.BatchCount)

require.Len(t, batches[1].Jobs, 1)
require.EqualValues(t, 2, batches[1].Jobs[0].JobID)
require.EqualValues(t, 0, batches[1].Message.BatchCount)
})

t.Run("Transformer error", func(t *testing.T) {
mockTransformer := &mockTransformer{fail: true}
b := batcher.NewEventSchemaMessageBatcher(mockTransformer)
require.Error(t, b.Add(&jobsdb.JobT{JobID: 2}))
})
}

type mockTransformer struct {
fail bool
msg *proto.EventSchemaMessage
}

func (*mockTransformer) Start() {}
func (*mockTransformer) Stop() {}

func (mt *mockTransformer) Transform(job *jobsdb.JobT) (*proto.EventSchemaMessage, error) {
if mt.fail {
return nil, errors.New("failed")
}
return mt.msg, nil
}
Loading

0 comments on commit ca6a0c2

Please sign in to comment.