Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: store trace kept record with pipelining instead of lua script #1188

Merged
merged 1 commit into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 61 additions & 86 deletions centralstore/redis_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (r *RedisBasicStore) Start() error {
return err
}

r.traces = newTraceStatusStore(r.Clock, r.Tracer, r.RedisClient.NewScript(keepTraceKey, keepTraceScript), r.Config)
r.traces = newTraceStatusStore(r.Clock, r.Tracer, r.Config)
r.states = stateProcessor

// register metrics for each state
Expand Down Expand Up @@ -274,8 +274,7 @@ func (r *RedisBasicStore) GetTraces(ctx context.Context, traceIDs ...string) ([]
defer conn.Close()

for _, traceID := range traceIDs {
cmd := redis.NewGetAllValuesHashCommand(spansHashByTraceIDKey(traceID))
if err := cmd.Send(conn); err != nil {
if err := redis.NewGetAllValuesHashCommand(spansHashByTraceIDKey(traceID), conn).Send(); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -547,12 +546,11 @@ type tracesStore struct {
config config.Config
}

func newTraceStatusStore(clock clockwork.Clock, tracer trace.Tracer, keepTraceScript redis.Script, cfg config.Config) *tracesStore {
func newTraceStatusStore(clock clockwork.Clock, tracer trace.Tracer, cfg config.Config) *tracesStore {
return &tracesStore{
clock: clock,
tracer: tracer,
keepTraceScript: keepTraceScript,
config: cfg,
clock: clock,
tracer: tracer,
config: cfg,
}
}

Expand All @@ -565,47 +563,47 @@ type centralTraceStatusInit struct {
}

type centralTraceStatusReason struct {
KeepReason string
Rate uint
ReasonIndex uint // this is the cache ID for the reason
Metadata []byte
KeepReason string
Rate uint
Metadata map[string]interface{}
}

type centralTraceStatusRedis struct {
TraceID string
State string
Rate uint
Metadata []byte
Count uint32
EventCount uint32
LinkCount uint32
KeepReason string
SamplerKey string
ReasonIndex uint
Timestamp int64
TraceID string
State string
Count uint32
EventCount uint32
LinkCount uint32
SamplerKey string
Timestamp int64

KeepRecord []byte
}

func normalizeCentralTraceStatusRedis(status *centralTraceStatusRedis) (*CentralTraceStatus, error) {
metadata := make(map[string]any, 0)
if status.Metadata != nil {
err := json.Unmarshal(status.Metadata, &metadata)
var reason centralTraceStatusReason
if status.KeepRecord != nil {
err := json.Unmarshal(status.KeepRecord, &reason)
if err != nil {
return nil, err
}
}

if reason.Metadata == nil {
reason.Metadata = make(map[string]interface{})
}

return &CentralTraceStatus{
TraceID: status.TraceID,
State: CentralTraceState(status.State),
Rate: status.Rate,
SamplerSelector: status.SamplerKey,
reasonIndex: status.ReasonIndex,
Metadata: metadata,
KeepReason: status.KeepReason,
Count: status.Count,
EventCount: status.EventCount,
LinkCount: status.LinkCount,
Timestamp: time.UnixMicro(status.Timestamp),
KeepReason: reason.KeepReason,
Rate: reason.Rate,
Metadata: reason.Metadata,
}, nil
}

Expand Down Expand Up @@ -648,9 +646,9 @@ func (t *tracesStore) addStatuses(ctx context.Context, conn redis.Conn, traces m
traceStatusKey := t.traceStatusKey(traceID)
args := redis.Args().AddFlat(trace)

commands = append(commands, redis.NewMultiSetHashCommand(traceStatusKey, args))
commands = append(commands, redis.NewExpireCommand(traceStatusKey, t.traceExpirationDuration().Seconds()))
commands = append(commands, redis.NewINCRCommand(traceStatusCountKey))
commands = append(commands, redis.NewMultiSetHashCommand(traceStatusKey, args, conn))
commands = append(commands, redis.NewExpireCommand(traceStatusKey, t.traceExpirationDuration().Seconds(), conn))
commands = append(commands, redis.NewINCRCommand(traceStatusCountKey, conn))
}

err := conn.Exec(commands...)
Expand All @@ -671,9 +669,7 @@ func (t *tracesStore) getTraceStates(ctx context.Context, conn redis.Conn, trace

states := make(map[string]CentralTraceState, len(traceIDs))
for _, id := range traceIDs {
cmd := redis.NewGetHashCommand(t.traceStatusKey(id), "State")
err := cmd.Send(conn)
if err != nil {
if err := redis.NewGetHashCommand(t.traceStatusKey(id), "State", conn).Send(); err != nil {
span.RecordError(err)
return nil, err
}
Expand Down Expand Up @@ -708,36 +704,34 @@ func (t *tracesStore) keepTrace(ctx context.Context, conn redis.Conn, status []*
defer keepspan.End()

otelutil.AddSpanField(keepspan, "num_traces", len(status))
traces := make([]interface{}, 0, len(status)+1)
traces = append(traces, "key")
args := redis.Args()
var err error
metadata := make([]byte, 0)
sentQueries := 0
for _, s := range status {
if s.Metadata != nil {
metadata, err = json.Marshal(s.Metadata)
if err != nil {
keepspan.RecordError(err)
return err
}
}
trace := &centralTraceStatusReason{
KeepReason: s.KeepReason,
Rate: s.Rate,
Metadata: metadata,
Metadata: s.Metadata,
}

data, marshalErr := json.Marshal(trace)
if marshalErr != nil {
err = errors.Join(err, marshalErr)
continue
}
key := map[string]any{"key": t.traceStatusKey(s.TraceID)}

args = args.AddFlat(key)
args = args.AddFlat(trace)
traces = append(traces, args...)
if err := conn.SetNXHash(t.traceStatusKey(s.TraceID), "KeepRecord", data).Send(); err != nil {
err = errors.Join(err, err)
continue
}
sentQueries++
}

if sentQueries == 0 {
return errors.Join(errors.New("failed to store keep reason for any trace"), err)
}

_, err = t.keepTraceScript.Do(ctx, conn, traces...)
if err != nil {
keepspan.RecordError(err)
return err
if _, err := conn.ReceiveInt64s(sentQueries); err != nil {
return errors.Join(errors.New("failed to store keep reason for any trace"), err)
}

return nil
Expand All @@ -762,11 +756,10 @@ func (t *tracesStore) getTraceStatuses(ctx context.Context, client redis.Client,
defer chunkSpan.End()

for _, traceID := range traceIDs {
err := redis.NewGetAllHashCommand(t.traceStatusKey(traceID)).Send(conn)
if err != nil {
if err := redis.NewGetAllHashCommand(t.traceStatusKey(traceID), conn).Send(); err != nil {
chunkSpan.RecordError(err)
continue
}

}
statuses := make([]*centralTraceStatusRedis, 0, len(traceIDs))
err := conn.ReceiveStructs(len(traceIDs), func(reply []any, err error) error {
Expand Down Expand Up @@ -866,13 +859,13 @@ func (t *tracesStore) storeSpans(ctx context.Context, conn redis.Conn, spans []*
for i := 0; i < len(commands); i += 3 {
var err error
span := spans[i/3]
commands[i], err = addToSpanHash(span)
commands[i], err = addToSpanHash(span, conn)
if err != nil {
return err
}

commands[i+1] = redis.NewExpireCommand(spansHashByTraceIDKey(span.TraceID), t.traceExpirationDuration().Seconds())
commands[i+2] = t.incrementSpanCountsCMD(span.TraceID, span.Type)
commands[i+1] = redis.NewExpireCommand(spansHashByTraceIDKey(span.TraceID), t.traceExpirationDuration().Seconds(), conn)
commands[i+2] = t.incrementSpanCountsCMD(span.TraceID, span.Type, conn)
}

err := conn.Exec(commands...)
Expand All @@ -896,8 +889,8 @@ func (t *tracesStore) incrementSpanCounts(ctx context.Context, conn redis.Conn,
commands := make([]redis.Command, 2*len(spans))
for i := 0; i < len(commands); i += 2 {
span := spans[i/2]
commands[i] = t.incrementSpanCountsCMD(span.TraceID, span.Type)
commands[i+1] = redis.NewExpireCommand(t.traceStatusKey(span.TraceID), t.traceExpirationDuration().Seconds())
commands[i] = t.incrementSpanCountsCMD(span.TraceID, span.Type, conn)
commands[i+1] = redis.NewExpireCommand(t.traceStatusKey(span.TraceID), t.traceExpirationDuration().Seconds(), conn)
}

err := conn.Exec(commands...)
Expand All @@ -907,7 +900,7 @@ func (t *tracesStore) incrementSpanCounts(ctx context.Context, conn redis.Conn,
return err
}

func (t *tracesStore) incrementSpanCountsCMD(traceID string, spanType types.SpanType) redis.Command {
func (t *tracesStore) incrementSpanCountsCMD(traceID string, spanType types.SpanType, conn redis.Conn) redis.Command {

var field string
switch spanType {
Expand All @@ -919,22 +912,22 @@ func (t *tracesStore) incrementSpanCountsCMD(traceID string, spanType types.Span
field = "Count"
}

return redis.NewIncrByHashCommand(t.traceStatusKey(traceID), field, 1)
return redis.NewIncrByHashCommand(t.traceStatusKey(traceID), field, 1, conn)
}

func spansHashByTraceIDKey(traceID string) string {
return traceID + ":spans"
}

// central span -> blobs
func addToSpanHash(span *CentralSpan) (redis.Command, error) {
func addToSpanHash(span *CentralSpan, conn redis.Conn) (redis.Command, error) {
data, err := json.Marshal(span)
if err != nil {
return nil, err
}

// overwrite the span data if it already exists
return redis.NewSetHashCommand(spansHashByTraceIDKey(span.TraceID), map[string]any{span.SpanID: data}), nil
return redis.NewSetHashCommand(spansHashByTraceIDKey(span.TraceID), map[string]any{span.SpanID: data}, conn), nil
}

// TraceStateProcessor is a map of trace IDs to their state.
Expand Down Expand Up @@ -1355,24 +1348,6 @@ func (s stateChangeEvent) string() string {
return s.current.String() + "-" + s.next.String()
}

const keepTraceKey = 1
const keepTraceScript = `
local totalArgs = table.getn(ARGV)
local traceStatusKey = ""

for i=1, totalArgs, 2 do
-- If trace status does not exist, a new entry is created.
-- If KeepReason already exists, this operation has no effect.
if ARGV[i] == "key" then
traceStatusKey = ARGV[i+1]
else
redis.call("HSETNX", traceStatusKey, ARGV[i], ARGV[i+1])
end
end

return 1
`

const removeExpiredTracesKey = 1
const removeExpiredTracesScript = `
local stateKey = KEYS[1]
Expand Down
Loading
Loading