Skip to content

Commit

Permalink
Add tracing span data in cpu events (#384)
Browse files Browse the repository at this point in the history
Signed-off-by: huxiangyuan <huxiangyuan@harmonycloud.cn>
  • Loading branch information
hocktea214 authored Dec 7, 2022
1 parent 615b2c2 commit e904d6c
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 3 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
### New features
- Support the protocol RocketMQ.([#328](https://github.com/KindlingProject/kindling/pull/328))
- Add a new tool: A debug tool for Trace Profiling is provided for developers to troubleshoot problems.([#363](https://github.com/KindlingProject/kindling/pull/363))

- Add tracing span data in cpu events. ([#384](https://github.com/KindlingProject/kindling/pull/384))

### Enhancements
- Add the field `end_timestamp` to the trace data to make it easier for querying. ([#380](https://github.com/KindlingProject/kindling/pull/380))
Expand Down
21 changes: 20 additions & 1 deletion collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (ca *CpuAnalyzer) Type() analyzer.Type {
}

func (ca *CpuAnalyzer) ConsumableEvents() []string {
return []string{constnames.CpuEvent, constnames.JavaFutexInfo, constnames.TransactionIdEvent, constnames.ProcessExitEvent}
return []string{constnames.CpuEvent, constnames.JavaFutexInfo, constnames.TransactionIdEvent, constnames.ProcessExitEvent, constnames.SpanEvent}
}

func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consumers []consumer.Consumer) analyzer.Analyzer {
Expand Down Expand Up @@ -80,6 +80,8 @@ func (ca *CpuAnalyzer) ConsumeEvent(event *model.KindlingEvent) error {
pid := event.GetPid()
tid := event.Ctx.ThreadInfo.GetTid()
ca.trimExitedThread(pid, tid)
case constnames.SpanEvent:
ca.ConsumeSpanEvent(event)
}
return nil
}
Expand Down Expand Up @@ -111,6 +113,23 @@ func (ca *CpuAnalyzer) ConsumeJavaFutexEvent(event *model.KindlingEvent) {
ca.PutEventToSegments(event.GetPid(), event.Ctx.ThreadInfo.GetTid(), event.Ctx.ThreadInfo.Comm, ev)
}

func (ca *CpuAnalyzer) ConsumeSpanEvent(event *model.KindlingEvent) {
ev := new(ApmSpanEvent)
ev.StartTime = event.Timestamp
for i := 0; i < int(event.ParamsNumber); i++ {
userAttributes := event.UserAttributes[i]
switch {
case userAttributes.GetKey() == "end_time":
ev.EndTime, _ = strconv.ParseUint(string(userAttributes.GetValue()), 10, 64)
case userAttributes.GetKey() == "trace_id":
ev.TraceId = string(userAttributes.GetValue())
case userAttributes.GetKey() == "span":
ev.Name = string(userAttributes.GetValue())
}
}
ca.PutEventToSegments(event.GetPid(), event.Ctx.ThreadInfo.GetTid(), event.Ctx.ThreadInfo.Comm, ev)
}

func (ca *CpuAnalyzer) ConsumeCpuEvent(event *model.KindlingEvent) {
ev := new(CpuEvent)
for i := 0; i < int(event.ParamsNumber); i++ {
Expand Down
29 changes: 29 additions & 0 deletions collector/pkg/component/analyzer/cpuanalyzer/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ const (
TimedCpuEventKind TimedEventKind = iota
TimedJavaFutexEventKind
TimedTransactionIdEventKind
TimedApmSpanEventKind
)

const (
CpuEventLabel = "cpuEvents"
JavaFutexEventLabel = "javaFutexEvents"
TransactionIdEventLabel = "transactionIds"
SpanLabel = "spans"
)

type TimedEvent interface {
Expand All @@ -43,6 +45,7 @@ type Segment struct {
CpuEvents []TimedEvent `json:"cpuEvents"`
JavaFutexEvents []TimedEvent `json:"javaFutexEvents"`
TransactionIds []TimedEvent `json:"transactionIds"`
Spans []TimedEvent `json:"spans"`
IsSend int
IndexTimestamp string `json:"indexTimestamp"`
}
Expand All @@ -57,6 +60,7 @@ func newSegment(pid uint32, tid uint32, threadName string, startTime uint64, end
CpuEvents: make([]TimedEvent, 0),
JavaFutexEvents: make([]TimedEvent, 0),
TransactionIds: make([]TimedEvent, 0),
Spans: make([]TimedEvent, 0),
IsSend: 0,
IndexTimestamp: "",
}
Expand All @@ -69,6 +73,8 @@ func (s *Segment) putTimedEvent(event TimedEvent) {
s.JavaFutexEvents = append(s.JavaFutexEvents, event)
case TimedTransactionIdEventKind:
s.TransactionIds = append(s.TransactionIds, event)
case TimedApmSpanEventKind:
s.Spans = append(s.Spans, event)
}
}

Expand All @@ -92,6 +98,10 @@ func (s *Segment) toDataGroup() *model.DataGroup {
if err == nil {
labels.AddStringValue(TransactionIdEventLabel, string(transactionIdEventString))
}
spanEventString, err := json.Marshal(s.Spans)
if err == nil {
labels.AddStringValue(SpanLabel, string(spanEventString))
}
return model.NewDataGroup(constnames.CameraEventGroupName, labels, s.StartTime)
}

Expand Down Expand Up @@ -154,3 +164,22 @@ func (t *TransactionIdEvent) EndTimestamp() uint64 {
func (t *TransactionIdEvent) Kind() TimedEventKind {
return TimedTransactionIdEventKind
}

type ApmSpanEvent struct {
StartTime uint64 `json:"startTime"`
EndTime uint64 `json:"endTime"`
TraceId string `json:"traceId"`
Name string `json:"name"`
}

func (j *ApmSpanEvent) StartTimestamp() uint64 {
return j.StartTime
}

func (j *ApmSpanEvent) EndTimestamp() uint64 {
return j.EndTime
}

func (j *ApmSpanEvent) Kind() TimedEventKind {
return TimedApmSpanEventKind
}
1 change: 1 addition & 0 deletions collector/pkg/model/constnames/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
CpuEvent = "cpu_event"
JavaFutexInfo = "java_futex_info"
TransactionIdEvent = "apm_trace_id_event"
SpanEvent = "apm_span_event"
OtherEvent = "other"

ProcessExitEvent = "procexit"
Expand Down
70 changes: 69 additions & 1 deletion probe/src/cgo/kindling.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ char* time_char = new char[32];
char* depth_char = new char[8];
char* finish_char = new char[4];
char* kd_stack = new char[1024];
char* duration_char = new char[32];
char* span_char = new char[1024];

int16_t event_filters[1024][16];

Expand Down Expand Up @@ -211,7 +213,10 @@ int getEvent(void** pp_kindling_event) {
parse_xtid(ev, data_val, *data_param, p_kindling_event, threadInfo, userAttNumber);
return 1;
}

if (data_param->m_len > 8 && memcmp(data_val, "kd-span@", 8) == 0) {
parse_span(ev, data_val, *data_param, p_kindling_event, threadInfo, userAttNumber);
return 1;
}
if (data_param->m_len > 6 && memcmp(data_val, "kd-tm@", 6) == 0) {
parse_tm(data_val, *data_param, threadInfo);
return -1;
Expand Down Expand Up @@ -483,6 +488,69 @@ void parse_xtid(sinsp_evt* s_evt, char* data_val, sinsp_evt_param data_param,
p_kindling_event->paramsNumber = userAttNumber;
}

void parse_span(sinsp_evt* s_evt, char* data_val, sinsp_evt_param data_param,
kindling_event_t_for_go* p_kindling_event, sinsp_threadinfo* threadInfo,
uint16_t& userAttNumber) {
int val_offset = 0;
int tmp_offset = 0;
int span_offset = 0;
int traceId_offset = 0;
for (int i = 8; i < data_param.m_len; i++) {
if (data_val[i] == '!') {
if (val_offset == 0) {
duration_char[tmp_offset] = '\0';
} else if (val_offset == 1) {
span_char[tmp_offset] = '\0';
span_offset = tmp_offset;
} else if (val_offset == 2) {
traceId[tmp_offset] = '\0';
traceId_offset = tmp_offset;
break;
}
tmp_offset = 0;
val_offset++;
continue;
}
if (val_offset == 0) {
duration_char[tmp_offset] = data_val[i];
} else if (val_offset == 1) {
span_char[tmp_offset] = data_val[i];
} else if (val_offset == 2) {
traceId[tmp_offset] = data_val[i];
}
tmp_offset++;
}
p_kindling_event->timestamp = s_evt->get_ts() - atol(duration_char);
strcpy(p_kindling_event->userAttributes[userAttNumber].key, "end_time");
memcpy(p_kindling_event->userAttributes[userAttNumber].value, to_string(s_evt->get_ts()).data(),
19);
p_kindling_event->userAttributes[userAttNumber].valueType = UINT64;
p_kindling_event->userAttributes[userAttNumber].len = 19;
userAttNumber++;

strcpy(p_kindling_event->userAttributes[userAttNumber].key, "trace_id");
memcpy(p_kindling_event->userAttributes[userAttNumber].value, traceId, traceId_offset);
p_kindling_event->userAttributes[userAttNumber].valueType = CHARBUF;
p_kindling_event->userAttributes[userAttNumber].len = traceId_offset;
userAttNumber++;

strcpy(p_kindling_event->userAttributes[userAttNumber].key, "span");
memcpy(p_kindling_event->userAttributes[userAttNumber].value, span_char, span_offset);
p_kindling_event->userAttributes[userAttNumber].valueType = CHARBUF;
p_kindling_event->userAttributes[userAttNumber].len = span_offset;
userAttNumber++;

strcpy(p_kindling_event->name, "apm_span_event");
p_kindling_event->context.tinfo.tid = threadInfo->m_tid;
map<uint64_t, char*>::iterator key =
ptid_comm.find(threadInfo->m_pid << 32 | (threadInfo->m_tid & 0xFFFFFFFF));
if (key != ptid_comm.end()) {
strcpy(p_kindling_event->context.tinfo.comm, key->second);
}
p_kindling_event->context.tinfo.pid = threadInfo->m_pid;
p_kindling_event->paramsNumber = userAttNumber;
}

void parse_tm(char* data_val, sinsp_evt_param data_param, sinsp_threadinfo* threadInfo) {
char* comm_char = new char[256];
int val_offset = 0;
Expand Down
4 changes: 4 additions & 0 deletions probe/src/cgo/kindling.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ void parse_xtid(sinsp_evt* s_evt, char* data_val, sinsp_evt_param data_param,
kindling_event_t_for_go* p_kindling_event, sinsp_threadinfo* threadInfo,
uint16_t& userAttNumber);

void parse_span(sinsp_evt *s_evt, char *data_val, sinsp_evt_param data_param,
kindling_event_t_for_go *p_kindling_event, sinsp_threadinfo* threadInfo,
uint16_t &userAttNumber);

void parse_tm(char* data_val, sinsp_evt_param data_param, sinsp_threadinfo* threadInfo);

void init_kindling_event(kindling_event_t_for_go* p_kindling_event, void** pp_kindling_event);
Expand Down

0 comments on commit e904d6c

Please sign in to comment.