Skip to content

Commit

Permalink
feat: properly handle keep state and timestamp for state transition (#…
Browse files Browse the repository at this point in the history
…1014)

<!--
Thank you for contributing to the project! 💜
Please make sure to:
- Chat with us first if this is a big change
  - Open a new issue (or comment on an existing one)
- We want to make sure you don't spend time implementing something we
might have to say No to
- Add unit tests
- Mention any relevant issues in the PR description (e.g. "Fixes #123")

Please see our [OSS process
document](https://github.com/honeycombio/home/blob/main/honeycomb-oss-lifecycle-and-practices.md#)
to get an idea of how we operate.
-->

## Which problem is this PR solving?
- Keep state
- this PR makes sure that the keep decision is stored as part of the
state transition for a given trace
- It also adds logic to remove spans information for a given trace when
a decision is made
- Timestamp
    - we nee record the current time whenever a state change happens.
- the precision used before is not sufficient to determine the order of
state change events
    - this PR changes the timestamp precision from second to nanosecond

## Short description of the changes

- Modified `KeepTrace`
- return traceIDs that has successfully changed their states from
`toNextState` function
- added more tests
  • Loading branch information
VinozzZ authored and kentquirk committed Mar 15, 2024
1 parent af28d07 commit fa83b2b
Show file tree
Hide file tree
Showing 5 changed files with 470 additions and 167 deletions.
125 changes: 82 additions & 43 deletions centralstore/redis_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,39 @@ type RedisBasicStoreOptions struct {
MaxTraceRetention time.Duration
}

var _ BasicStorer = (*RedisBasicStore)(nil)

func NewRedisBasicStore(opt RedisBasicStoreOptions) *RedisBasicStore {
var host string
func EnsureRedisBasicStoreOptions(opt *RedisBasicStoreOptions) {
if opt == nil {
opt = &RedisBasicStoreOptions{}
}
if opt.Host == "" {
host = "localhost:6379"
opt.Host = "localhost:6379"
}

if opt.MaxTraceRetention == 0 {
opt.MaxTraceRetention = 24 * time.Hour
}

if opt.Cache.DroppedSize == 0 {
opt.Cache.DroppedSize = 10000
}

if opt.Cache.KeptSize == 0 {
opt.Cache.KeptSize = 100
}

if opt.Cache.SizeCheckInterval == 0 {
opt.Cache.SizeCheckInterval = config.Duration(10 * time.Second)
}

}

var _ BasicStorer = (*RedisBasicStore)(nil)

func NewRedisBasicStore(opt *RedisBasicStoreOptions) *RedisBasicStore {
EnsureRedisBasicStoreOptions(opt)

redisClient := redis.NewClient(&redis.Config{
Addr: host,
Addr: opt.Host,
})

decisionCache, err := cache.NewCuckooSentCache(opt.Cache, &metrics.NullMetrics{})
Expand Down Expand Up @@ -77,7 +96,6 @@ func NewRedisBasicStore(opt RedisBasicStoreOptions) *RedisBasicStore {
decisionCache: decisionCache,
traces: newTraceStatusStore(clock),
states: stateProcessor,
errs: make(chan error, defaultPendingWorkCapacity),
}
}

Expand All @@ -87,15 +105,13 @@ type RedisBasicStore struct {
decisionCache cache.TraceSentCache
traces *tracesStore
states *traceStateProcessor
errs chan error
}

func (r *RedisBasicStore) Stop() error {
r.states.Stop()
if err := r.client.Stop(context.TODO()); err != nil {
r.errs <- err
return err
}
close(r.errs)
return nil
}

Expand All @@ -113,36 +129,41 @@ func (r *RedisBasicStore) WriteSpan(span *CentralSpan) error {
case DecisionDrop:
return nil
case DecisionKeep, AwaitingDecision:
_ = r.traces.incrementSpanCounts(conn, span.TraceID, span.Type)
err := r.traces.incrementSpanCounts(conn, span.TraceID, span.Type)
if err != nil {
return err
}
case Collecting:
if span.ParentID == "" {
err := r.states.toNextState(conn, newTraceStateChangeEvent(Collecting, DecisionDelay), span.TraceID)
_, err := r.states.toNextState(conn, newTraceStateChangeEvent(Collecting, DecisionDelay), span.TraceID)
if err != nil {
r.errs <- err
return err
}
}
case DecisionDelay, ReadyToDecide:
case Unknown:
err := r.states.addNewTrace(conn, span.TraceID)
if err != nil {
r.errs <- err
return err
}
err = r.traces.addStatus(conn, span)
if err != nil {
r.errs <- err
return err
}
}

err := r.traces.storeSpan(conn, span)
if err != nil {
r.errs <- err
return err
}

return nil
}

// GetTrace returns a CentralTrace with the given traceID.
// if a decision has been made about the trace, the returned value
// will not contain span data.
func (r *RedisBasicStore) GetTrace(traceID string) (*CentralTrace, error) {
// TODO: what if this trace is already in the decision cache?
conn := r.client.Get()
defer conn.Close()

Expand Down Expand Up @@ -237,20 +258,19 @@ func (r *RedisBasicStore) GetTracesNeedingDecision(n int) ([]string, error) {

traceIDs, err := r.states.allTraceIDs(conn, ReadyToDecide, n)
if err != nil {
r.errs <- err
return nil, err
}

if len(traceIDs) == 0 {
return nil, nil
}
err = r.changeTraceStatus(conn, traceIDs, ReadyToDecide, AwaitingDecision)

succeed, err := r.states.toNextState(conn, newTraceStateChangeEvent(ReadyToDecide, AwaitingDecision), traceIDs...)
if err != nil {
r.errs <- err
return nil, err
}

return traceIDs, nil
return succeed, nil

}

Expand All @@ -271,7 +291,7 @@ func (r *RedisBasicStore) ChangeTraceStatus(traceIDs []string, fromState, toStat
return r.removeTraces(traceIDs)
}

err := r.changeTraceStatus(conn, traceIDs, fromState, toState)
_, err := r.states.toNextState(conn, newTraceStateChangeEvent(fromState, toState), traceIDs...)
if err != nil {
return err
}
Expand All @@ -286,8 +306,21 @@ func (r *RedisBasicStore) KeepTraces(statuses []*CentralTraceStatus) error {
traceIDs := make([]string, 0, len(statuses))
for _, status := range statuses {
traceIDs = append(traceIDs, status.TraceID)
r.decisionCache.Record(status, true, status.KeepReason)
}
err := r.changeTraceStatus(conn, traceIDs, AwaitingDecision, DecisionKeep)

_, err := r.states.toNextState(conn, newTraceStateChangeEvent(AwaitingDecision, DecisionKeep), traceIDs...)
if err != nil {
return err
}

// remove span list
spanListKeys := make([]string, 0, len(traceIDs))
for _, traceID := range traceIDs {
spanListKeys = append(spanListKeys, spansHashByTraceIDKey(traceID))
}

_, err = conn.Del(spanListKeys...)
if err != nil {
return err
}
Expand Down Expand Up @@ -336,15 +369,6 @@ func (r *RedisBasicStore) removeTraces(traceIDs []string) error {
return r.states.remove(conn, AwaitingDecision, traceIDs...)
}

func (r *RedisBasicStore) changeTraceStatus(conn redis.Conn, traceIDs []string, fromState, toState CentralTraceState) error {
err := r.states.toNextState(conn, newTraceStateChangeEvent(fromState, toState), traceIDs...)
if err != nil {
return err
}

return nil
}

// TraceStore stores trace state status and spans.
// trace state statuses is stored in a redis hash with the key being the trace ID
// and each field being a status field.
Expand Down Expand Up @@ -474,11 +498,11 @@ func (t *tracesStore) incrementSpanCountsCMD(traceID string, spanType SpanType)
var field string
switch spanType {
case SpanTypeEvent:
field = "span_count_event"
field = "EventCount"
case SpanTypeLink:
field = "span_count_link"
field = "LinkCount"
default:
field = "span_count"
field = "Count"
}

return redis.NewIncrByHashCommand(t.traceStatusKey(traceID), field, 1), nil
Expand Down Expand Up @@ -571,6 +595,7 @@ func newTraceStateProcessor(cfg traceStateProcessorConfig, clock clockwork.Clock
DecisionDelay,
ReadyToDecide,
AwaitingDecision,
DecisionKeep,
},
config: cfg,
clock: clock,
Expand Down Expand Up @@ -611,7 +636,7 @@ func (t *traceStateProcessor) addNewTrace(conn redis.Conn, traceID string) error
return nil
}

return conn.ZAdd(t.stateByTraceIDsKey(Collecting), []any{t.clock.Now().Unix(), traceID})
return conn.ZAdd(t.stateByTraceIDsKey(Collecting), []any{t.clock.Now().UnixNano(), traceID})
}

func (t *traceStateProcessor) stateByTraceIDsKey(state CentralTraceState) string {
Expand Down Expand Up @@ -646,7 +671,7 @@ func (t *traceStateProcessor) traceIDsWithTimestamp(conn redis.Conn, state Centr
traces[traceIDs[i]] = time.Time{}
continue
}
traces[traceIDs[i]] = time.Unix(value, 0)
traces[traceIDs[i]] = time.Unix(0, value)
}

return traces, nil
Expand All @@ -665,28 +690,37 @@ func (t *traceStateProcessor) remove(conn redis.Conn, state CentralTraceState, t
return conn.ZRemove(t.stateByTraceIDsKey(state), traceIDs)
}

func (t *traceStateProcessor) toNextState(conn redis.Conn, changeEvent stateChangeEvent, traceIDs ...string) error {
eligible := make([]any, 0, len(traceIDs))
func (t *traceStateProcessor) toNextState(conn redis.Conn, changeEvent stateChangeEvent, traceIDs ...string) ([]string, error) {
eligible := make([]string, 0, len(traceIDs))
for _, traceID := range traceIDs {
if t.isValidStateChangeThroughLua(conn, changeEvent, traceID) {
eligible = append(eligible, traceID)
}
}

if len(eligible) == 0 {
return errors.New("invalid state change event")
return nil, errors.New("invalid state change event")
}

// store the timestamp as a part of the entry in the trace state set
// make it into a sorted set
// the timestamp should be a fixed length unix timestamp
timestamps := make([]int64, len(eligible))
for i := range eligible {
timestamps[i] = t.clock.Now().Unix()
timestamps[i] = t.clock.Now().UnixNano()
}

// only add traceIDs to the destination if they don't already exist
return conn.ZMove(t.stateByTraceIDsKey(changeEvent.current), t.stateByTraceIDsKey(changeEvent.next), timestamps, eligible)
err := conn.ZMove(t.stateByTraceIDsKey(changeEvent.current), t.stateByTraceIDsKey(changeEvent.next), timestamps, eligible)
if err != nil {
return nil, err
}

if len(eligible) != len(traceIDs) {
return eligible, fmt.Errorf("some traceIDs were not moved to the next state")
}

return eligible, nil
}

func (t *traceStateProcessor) isValidStateChange(conn redis.Conn, stateChange stateChangeEvent, traceID string) bool {
Expand Down Expand Up @@ -735,7 +769,7 @@ func (t *traceStateProcessor) removeExpiredTraces(client redis.Client) {

// get the traceIDs that have been in the state for longer than the expiration time
for _, state := range t.states {
traceIDs, err := conn.ZRangeByScoreString(t.stateByTraceIDsKey(state), t.clock.Now().Add(-t.config.maxTraceRetention).Unix())
traceIDs, err := conn.ZRangeByScoreString(t.stateByTraceIDsKey(state), t.clock.Now().Add(-t.config.maxTraceRetention).UnixNano())
if err != nil {
return
}
Expand Down Expand Up @@ -784,6 +818,10 @@ const stateChangeScript = `
currentState = previousState
end
if (currentState ~= previousState) then
do return -1 end
end
local stateChangeEvent = string.format("%s-%s", currentState, nextState)
local changeEventIsValid = redis.call('SISMEMBER', possibleStateChangeEvents, stateChangeEvent)
if (changeEventIsValid == 0) then
Expand All @@ -808,6 +846,7 @@ func ensureValidStateChangeEvents(client redis.Client) error {
newTraceStateChangeEvent(DecisionDelay, ReadyToDecide).string(),
newTraceStateChangeEvent(ReadyToDecide, AwaitingDecision).string(),
newTraceStateChangeEvent(AwaitingDecision, ReadyToDecide).string(),
newTraceStateChangeEvent(AwaitingDecision, DecisionKeep).string(),
)
}

Expand Down
Loading

0 comments on commit fa83b2b

Please sign in to comment.