Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ignore trace decision messages produced by the publishers #1437

Merged
merged 3 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -1342,6 +1342,15 @@ func (i *InMemCollector) signalKeptTraceDecisions(ctx context.Context, msg strin
return
}

peerID, err := i.Peers.GetInstanceID()
if err != nil {
return
}

if isMyDecision(msg, peerID) {
return
}

select {
case <-i.done:
return
Expand All @@ -1357,6 +1366,15 @@ func (i *InMemCollector) signalDroppedTraceDecisions(ctx context.Context, msg st
return
}

peerID, err := i.Peers.GetInstanceID()
if err != nil {
return
}

if isMyDecision(msg, peerID) {
return
}

select {
case <-i.done:
return
Expand All @@ -1373,18 +1391,23 @@ func (i *InMemCollector) processTraceDecisions(msg string, decisionType decision
return
}

peerID, err := i.Peers.GetInstanceID()
if err != nil {
i.Logger.Error().Logf("Failed to get peer ID. %s", err)
return
}

// Deserialize the message into trace decisions
decisions := make([]TraceDecision, 0)
var err error
switch decisionType {
case keptDecision:
decisions, err = newKeptTraceDecision(msg)
decisions, err = newKeptTraceDecision(msg, peerID)
if err != nil {
i.Logger.Error().Logf("Failed to unmarshal kept trace decision message. %s", err)
return
}
case dropDecision:
decisions, err = newDroppedTraceDecision(msg)
decisions, err = newDroppedTraceDecision(msg, peerID)
if err != nil {
i.Logger.Error().Logf("Failed to unmarshal drop trace decision message. %s", err)
return
Expand Down Expand Up @@ -1573,6 +1596,11 @@ func (i *InMemCollector) sendDecisions(decisionChan <-chan TraceDecision, interv
ctx := context.Background()
var createDecisionMessage newDecisionMessage
var metricName, topic string
peerID, err := i.Peers.GetInstanceID()
if err != nil {
i.Logger.Error().Logf("Failed to get peer ID. %s", err)
return
}
switch decisionType {
case keptDecision:
metricName = "collector_kept_decisions_batch_size"
Expand Down Expand Up @@ -1620,7 +1648,7 @@ func (i *InMemCollector) sendDecisions(decisionChan <-chan TraceDecision, interv
case <-i.done:
return nil
default:
msg, err := createDecisionMessage(decisionsToProcess)
msg, err := createDecisionMessage(decisionsToProcess, peerID)
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"error": err.Error(),
Expand Down
19 changes: 15 additions & 4 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func newTestCollector(conf config.Config, transmission transmit.Transmission, pe
dropDecisionBuffer: make(chan TraceDecision, 5),
Peers: &peer.MockPeers{
Peers: []string{"api1", "api2"},
ID: "api1",
},
Sharder: &sharder.MockSharder{
Self: &sharder.TestShard{
Expand Down Expand Up @@ -2255,9 +2256,15 @@ func TestSendDropDecisions(t *testing.T) {
close(coll.dropDecisionBuffer)
droppedMessage := <-messages

decompressedData, err := decompressDropDecisions([]byte(droppedMessage))
peerID, err := coll.Peers.GetInstanceID()
require.NoError(t, err)
decompressedData, err := newDroppedTraceDecision(droppedMessage, peerID)
assert.NoError(t, err)
assert.Equal(t, "trace1", decompressedData)
droppedTraceID := make([]string, 0)
for _, td := range decompressedData {
droppedTraceID = append(droppedTraceID, td.TraceID)
}
assert.Equal(t, []string{"trace1"}, droppedTraceID)

<-closed

Expand All @@ -2282,9 +2289,13 @@ func TestSendDropDecisions(t *testing.T) {
close(coll.dropDecisionBuffer)
droppedMessage = <-messages

decompressedData, err = decompressDropDecisions([]byte(droppedMessage))
decompressedData, err = newDroppedTraceDecision(droppedMessage, peerID)
assert.NoError(t, err)
assert.Equal(t, "trace0,trace1,trace2,trace3,trace4", decompressedData)
droppedTraceID = make([]string, 0)
for _, td := range decompressedData {
droppedTraceID = append(droppedTraceID, td.TraceID)
}
assert.Equal(t, "trace0,trace1,trace2,trace3,trace4", strings.Join(droppedTraceID, ","))

<-closed
}
Expand Down
14 changes: 10 additions & 4 deletions collect/stressRelief.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ func (s *StressRelief) Start() error {
return nil
}

const stressReliefMessageSeparator = "|"

type stressReliefMessage struct {
peerID string
level uint
Expand All @@ -214,21 +216,25 @@ func newStressReliefMessage(level uint, peerID string) *stressReliefMessage {
}

func (msg *stressReliefMessage) String() string {
return msg.peerID + "|" + fmt.Sprint(msg.level)
return msg.peerID + stressReliefMessageSeparator + fmt.Sprint(msg.level)
}

func unmarshalStressReliefMessage(msg string) (*stressReliefMessage, error) {
if len(msg) < 2 {
return nil, fmt.Errorf("empty message")
}

parts := strings.SplitN(msg, "|", 2)
level, err := strconv.Atoi(parts[1])
separatorIdx := strings.IndexRune(msg, rune(stressReliefMessageSeparator[0]))
if separatorIdx == -1 {
return nil, fmt.Errorf("invalid stress relief message")
}

level, err := strconv.Atoi(msg[separatorIdx+1:])
if err != nil {
return nil, err
}

return newStressReliefMessage(uint(level), parts[0]), nil
return newStressReliefMessage(uint(level), msg[:separatorIdx]), nil
}

func (s *StressRelief) onStressLevelUpdate(ctx context.Context, msg string) {
Expand Down
68 changes: 55 additions & 13 deletions collect/trace_decision.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ import (

type decisionType int

// decisionMessageSeparator is the separator used to separate the sender ID from the compressed decisions
// in the decision message.
// The pipe character should not be used in URLs or IP addresses because it's not a valid character in these
// contexts.
const decisionMessageSeparator = "|"

func (d decisionType) String() string {
switch d {
case keptDecision:
Expand All @@ -29,34 +35,47 @@ var (
dropDecision decisionType = 2
)

type newDecisionMessage func([]TraceDecision) (string, error)
type newDecisionMessage func(tds []TraceDecision, senderID string) (string, error)

func newDroppedDecisionMessage(tds []TraceDecision) (string, error) {
func newDroppedDecisionMessage(tds []TraceDecision, senderID string) (string, error) {
if len(tds) == 0 {
return "", fmt.Errorf("no dropped trace decisions provided")
}
if senderID == "" {
return "", fmt.Errorf("no sender ID provided")
}

traceIDs := make([]string, 0, len(tds))
payload := make([]string, 0, len(tds))
for _, td := range tds {
if td.TraceID != "" {
traceIDs = append(traceIDs, td.TraceID)
payload = append(payload, td.TraceID)
}
}

compressed, err := compress(strings.Join(traceIDs, ","))
compressed, err := compress(strings.Join(payload, ","))
if err != nil {
return "", err
}
return string(compressed), nil
return senderID + decisionMessageSeparator + string(compressed), nil
}

func newDroppedTraceDecision(msg string) ([]TraceDecision, error) {
data, err := decompressDropDecisions([]byte(msg))
func newDroppedTraceDecision(msg string, senderID string) ([]TraceDecision, error) {
// Use IndexRune here since it's faster than SplitN and requires less allocation
separatorIdx := strings.IndexRune(msg, rune(decisionMessageSeparator[0]))
if separatorIdx == -1 {
return nil, fmt.Errorf("invalid dropped decision message")
}

if msg[:separatorIdx] != senderID {
return nil, nil
}

ids, err := decompressDropDecisions([]byte(msg[separatorIdx+1:]))
if err != nil {
return nil, err
}

traceIDs := strings.Split(data, ",")
traceIDs := strings.Split(ids, ",")
decisions := make([]TraceDecision, 0, len(traceIDs))
for _, traceID := range traceIDs {
decisions = append(decisions, TraceDecision{
Expand All @@ -66,25 +85,48 @@ func newDroppedTraceDecision(msg string) ([]TraceDecision, error) {
return decisions, nil
}

func newKeptDecisionMessage(tds []TraceDecision) (string, error) {
func newKeptDecisionMessage(tds []TraceDecision, senderID string) (string, error) {
if len(tds) == 0 {
return "", fmt.Errorf("no kept trace decisions provided")
}

if senderID == "" {
return "", fmt.Errorf("no sender ID provided")
}

compressed, err := compress(tds)
if err != nil {
return "", err
}
return string(compressed), nil
return senderID + decisionMessageSeparator + string(compressed), nil
}

func newKeptTraceDecision(msg string) ([]TraceDecision, error) {
compressed, err := decompressKeptDecisions([]byte(msg))
func newKeptTraceDecision(msg string, senderID string) ([]TraceDecision, error) {
// Use IndexRune here since it's faster than SplitN and requires less allocation
separatorIdx := strings.IndexRune(msg, rune(decisionMessageSeparator[0]))
if separatorIdx == -1 {
return nil, fmt.Errorf("invalid dropped decision message")
}

if msg[:separatorIdx] != senderID {
return nil, nil
}

compressed, err := decompressKeptDecisions([]byte(msg[separatorIdx+1:]))
if err != nil {
return nil, err
}
return compressed, nil
}

func isMyDecision(msg string, senderID string) bool {
if senderID == "" {
return false
}

return strings.HasPrefix(msg, senderID+decisionMessageSeparator)
}

var _ cache.KeptTrace = &TraceDecision{}

type TraceDecision struct {
Expand Down
18 changes: 9 additions & 9 deletions collect/trace_decision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ func TestDropDecisionRoundTrip(t *testing.T) {
}

// Step 1: Create a dropped decision message
msg, err := newDroppedDecisionMessage(tds)
msg, err := newDroppedDecisionMessage(tds, "sender1")
assert.NoError(t, err, "expected no error for valid dropped decision message")
assert.NotEmpty(t, msg, "expected non-empty message")

// Step 2: Decompress the message back to TraceDecision using newDroppedTraceDecision
decompressedTds, err := newDroppedTraceDecision(msg)
decompressedTds, err := newDroppedTraceDecision(msg, "sender1")
assert.NoError(t, err, "expected no error during decompression of the dropped decision message")
assert.Len(t, decompressedTds, len(tds), "expected decompressed TraceDecision length to match original")

Expand Down Expand Up @@ -68,12 +68,12 @@ func TestKeptDecisionRoundTrip(t *testing.T) {
}

// Step 1: Create a kept decision message
msg, err := newKeptDecisionMessage(tds)
msg, err := newKeptDecisionMessage(tds, "sender1")
assert.NoError(t, err, "expected no error for valid kept decision message")
assert.NotEmpty(t, msg, "expected non-empty message")

// Step 2: Decompress the message back to TraceDecision using newKeptTraceDecision
decompressedTds, err := newKeptTraceDecision(msg)
decompressedTds, err := newKeptTraceDecision(msg, "sender1")
assert.NoError(t, err, "expected no error during decompression of the kept decision message")
assert.Len(t, decompressedTds, len(tds), "expected decompressed TraceDecision length to match original")

Expand Down Expand Up @@ -123,7 +123,7 @@ func BenchmarkDynamicCompressedEncoding(b *testing.B) {
// Run the benchmark
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := newKeptDecisionMessage(decisions)
_, err := newKeptDecisionMessage(decisions, "sender1")
if err != nil {
b.Fatal(err)
}
Expand All @@ -132,14 +132,14 @@ func BenchmarkDynamicCompressedEncoding(b *testing.B) {

func BenchmarkDynamicJSONDecoding(b *testing.B) {
decisions := generateRandomDecisions(1000)
jsonData, err := newKeptDecisionMessage(decisions)
jsonData, err := newKeptDecisionMessage(decisions, "sender1")
if err != nil {
b.Fatal(err)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := newKeptTraceDecision(jsonData)
_, err := newKeptTraceDecision(jsonData, "sender1")
if err != nil {
b.Fatal(err)
}
Expand All @@ -148,14 +148,14 @@ func BenchmarkDynamicJSONDecoding(b *testing.B) {

func BenchmarkDynamicCompressedDecoding(b *testing.B) {
decisions := generateRandomDecisions(1000)
compressedData, err := newKeptDecisionMessage(decisions)
compressedData, err := newKeptDecisionMessage(decisions, "sender1")
if err != nil {
b.Fatal(err)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := newKeptTraceDecision(compressedData)
_, err := newKeptTraceDecision(compressedData, "sender1")
if err != nil {
b.Fatal(err)
}
Expand Down