From e904d6ccc0c63c16833db10b65d2d04e69572f46 Mon Sep 17 00:00:00 2001 From: zheng Date: Wed, 7 Dec 2022 11:05:56 +0800 Subject: [PATCH] Add tracing span data in cpu events (#384) Signed-off-by: huxiangyuan --- CHANGELOG.md | 2 +- .../analyzer/cpuanalyzer/cpu_analyzer.go | 21 +++++- .../component/analyzer/cpuanalyzer/model.go | 29 ++++++++ collector/pkg/model/constnames/const.go | 1 + probe/src/cgo/kindling.cpp | 70 ++++++++++++++++++- probe/src/cgo/kindling.h | 4 ++ 6 files changed, 124 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d0d41edf9..8e2e2154f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ ### New features - Support the protocol RocketMQ.([#328](https://github.com/KindlingProject/kindling/pull/328)) - Add a new tool: A debug tool for Trace Profiling is provided for developers to troubleshoot problems.([#363](https://github.com/KindlingProject/kindling/pull/363)) - +- Add tracing span data in cpu events. ([#384](https://github.com/KindlingProject/kindling/pull/384)) ### Enhancements - Add the field `end_timestamp` to the trace data to make it easier for querying. ([#380](https://github.com/KindlingProject/kindling/pull/380)) diff --git a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go index 13e059688..d99998c80 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go @@ -37,7 +37,7 @@ func (ca *CpuAnalyzer) Type() analyzer.Type { } func (ca *CpuAnalyzer) ConsumableEvents() []string { - return []string{constnames.CpuEvent, constnames.JavaFutexInfo, constnames.TransactionIdEvent, constnames.ProcessExitEvent} + return []string{constnames.CpuEvent, constnames.JavaFutexInfo, constnames.TransactionIdEvent, constnames.ProcessExitEvent, constnames.SpanEvent} } func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consumers []consumer.Consumer) analyzer.Analyzer { @@ -80,6 +80,8 @@ func (ca *CpuAnalyzer) ConsumeEvent(event *model.KindlingEvent) error { pid := event.GetPid() tid := event.Ctx.ThreadInfo.GetTid() ca.trimExitedThread(pid, tid) + case constnames.SpanEvent: + ca.ConsumeSpanEvent(event) } return nil } @@ -111,6 +113,23 @@ func (ca *CpuAnalyzer) ConsumeJavaFutexEvent(event *model.KindlingEvent) { ca.PutEventToSegments(event.GetPid(), event.Ctx.ThreadInfo.GetTid(), event.Ctx.ThreadInfo.Comm, ev) } +func (ca *CpuAnalyzer) ConsumeSpanEvent(event *model.KindlingEvent) { + ev := new(ApmSpanEvent) + ev.StartTime = event.Timestamp + for i := 0; i < int(event.ParamsNumber); i++ { + userAttributes := event.UserAttributes[i] + switch { + case userAttributes.GetKey() == "end_time": + ev.EndTime, _ = strconv.ParseUint(string(userAttributes.GetValue()), 10, 64) + case userAttributes.GetKey() == "trace_id": + ev.TraceId = string(userAttributes.GetValue()) + case userAttributes.GetKey() == "span": + ev.Name = string(userAttributes.GetValue()) + } + } + ca.PutEventToSegments(event.GetPid(), event.Ctx.ThreadInfo.GetTid(), event.Ctx.ThreadInfo.Comm, ev) +} + func (ca *CpuAnalyzer) ConsumeCpuEvent(event *model.KindlingEvent) { ev := new(CpuEvent) for i := 0; i < int(event.ParamsNumber); i++ { diff --git a/collector/pkg/component/analyzer/cpuanalyzer/model.go b/collector/pkg/component/analyzer/cpuanalyzer/model.go index 712350034..132a2f896 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/model.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/model.go @@ -13,12 +13,14 @@ const ( TimedCpuEventKind TimedEventKind = iota TimedJavaFutexEventKind TimedTransactionIdEventKind + TimedApmSpanEventKind ) const ( CpuEventLabel = "cpuEvents" JavaFutexEventLabel = "javaFutexEvents" TransactionIdEventLabel = "transactionIds" + SpanLabel = "spans" ) type TimedEvent interface { @@ -43,6 +45,7 @@ type Segment struct { CpuEvents []TimedEvent `json:"cpuEvents"` JavaFutexEvents []TimedEvent `json:"javaFutexEvents"` TransactionIds []TimedEvent `json:"transactionIds"` + Spans []TimedEvent `json:"spans"` IsSend int IndexTimestamp string `json:"indexTimestamp"` } @@ -57,6 +60,7 @@ func newSegment(pid uint32, tid uint32, threadName string, startTime uint64, end CpuEvents: make([]TimedEvent, 0), JavaFutexEvents: make([]TimedEvent, 0), TransactionIds: make([]TimedEvent, 0), + Spans: make([]TimedEvent, 0), IsSend: 0, IndexTimestamp: "", } @@ -69,6 +73,8 @@ func (s *Segment) putTimedEvent(event TimedEvent) { s.JavaFutexEvents = append(s.JavaFutexEvents, event) case TimedTransactionIdEventKind: s.TransactionIds = append(s.TransactionIds, event) + case TimedApmSpanEventKind: + s.Spans = append(s.Spans, event) } } @@ -92,6 +98,10 @@ func (s *Segment) toDataGroup() *model.DataGroup { if err == nil { labels.AddStringValue(TransactionIdEventLabel, string(transactionIdEventString)) } + spanEventString, err := json.Marshal(s.Spans) + if err == nil { + labels.AddStringValue(SpanLabel, string(spanEventString)) + } return model.NewDataGroup(constnames.CameraEventGroupName, labels, s.StartTime) } @@ -154,3 +164,22 @@ func (t *TransactionIdEvent) EndTimestamp() uint64 { func (t *TransactionIdEvent) Kind() TimedEventKind { return TimedTransactionIdEventKind } + +type ApmSpanEvent struct { + StartTime uint64 `json:"startTime"` + EndTime uint64 `json:"endTime"` + TraceId string `json:"traceId"` + Name string `json:"name"` +} + +func (j *ApmSpanEvent) StartTimestamp() uint64 { + return j.StartTime +} + +func (j *ApmSpanEvent) EndTimestamp() uint64 { + return j.EndTime +} + +func (j *ApmSpanEvent) Kind() TimedEventKind { + return TimedApmSpanEventKind +} \ No newline at end of file diff --git a/collector/pkg/model/constnames/const.go b/collector/pkg/model/constnames/const.go index 19d6aa285..eb990febb 100644 --- a/collector/pkg/model/constnames/const.go +++ b/collector/pkg/model/constnames/const.go @@ -21,6 +21,7 @@ const ( CpuEvent = "cpu_event" JavaFutexInfo = "java_futex_info" TransactionIdEvent = "apm_trace_id_event" + SpanEvent = "apm_span_event" OtherEvent = "other" ProcessExitEvent = "procexit" diff --git a/probe/src/cgo/kindling.cpp b/probe/src/cgo/kindling.cpp index 77e457b62..fa998707e 100644 --- a/probe/src/cgo/kindling.cpp +++ b/probe/src/cgo/kindling.cpp @@ -38,6 +38,8 @@ char* time_char = new char[32]; char* depth_char = new char[8]; char* finish_char = new char[4]; char* kd_stack = new char[1024]; +char* duration_char = new char[32]; +char* span_char = new char[1024]; int16_t event_filters[1024][16]; @@ -211,7 +213,10 @@ int getEvent(void** pp_kindling_event) { parse_xtid(ev, data_val, *data_param, p_kindling_event, threadInfo, userAttNumber); return 1; } - + if (data_param->m_len > 8 && memcmp(data_val, "kd-span@", 8) == 0) { + parse_span(ev, data_val, *data_param, p_kindling_event, threadInfo, userAttNumber); + return 1; + } if (data_param->m_len > 6 && memcmp(data_val, "kd-tm@", 6) == 0) { parse_tm(data_val, *data_param, threadInfo); return -1; @@ -483,6 +488,69 @@ void parse_xtid(sinsp_evt* s_evt, char* data_val, sinsp_evt_param data_param, p_kindling_event->paramsNumber = userAttNumber; } +void parse_span(sinsp_evt* s_evt, char* data_val, sinsp_evt_param data_param, + kindling_event_t_for_go* p_kindling_event, sinsp_threadinfo* threadInfo, + uint16_t& userAttNumber) { + int val_offset = 0; + int tmp_offset = 0; + int span_offset = 0; + int traceId_offset = 0; + for (int i = 8; i < data_param.m_len; i++) { + if (data_val[i] == '!') { + if (val_offset == 0) { + duration_char[tmp_offset] = '\0'; + } else if (val_offset == 1) { + span_char[tmp_offset] = '\0'; + span_offset = tmp_offset; + } else if (val_offset == 2) { + traceId[tmp_offset] = '\0'; + traceId_offset = tmp_offset; + break; + } + tmp_offset = 0; + val_offset++; + continue; + } + if (val_offset == 0) { + duration_char[tmp_offset] = data_val[i]; + } else if (val_offset == 1) { + span_char[tmp_offset] = data_val[i]; + } else if (val_offset == 2) { + traceId[tmp_offset] = data_val[i]; + } + tmp_offset++; + } + p_kindling_event->timestamp = s_evt->get_ts() - atol(duration_char); + strcpy(p_kindling_event->userAttributes[userAttNumber].key, "end_time"); + memcpy(p_kindling_event->userAttributes[userAttNumber].value, to_string(s_evt->get_ts()).data(), + 19); + p_kindling_event->userAttributes[userAttNumber].valueType = UINT64; + p_kindling_event->userAttributes[userAttNumber].len = 19; + userAttNumber++; + + strcpy(p_kindling_event->userAttributes[userAttNumber].key, "trace_id"); + memcpy(p_kindling_event->userAttributes[userAttNumber].value, traceId, traceId_offset); + p_kindling_event->userAttributes[userAttNumber].valueType = CHARBUF; + p_kindling_event->userAttributes[userAttNumber].len = traceId_offset; + userAttNumber++; + + strcpy(p_kindling_event->userAttributes[userAttNumber].key, "span"); + memcpy(p_kindling_event->userAttributes[userAttNumber].value, span_char, span_offset); + p_kindling_event->userAttributes[userAttNumber].valueType = CHARBUF; + p_kindling_event->userAttributes[userAttNumber].len = span_offset; + userAttNumber++; + + strcpy(p_kindling_event->name, "apm_span_event"); + p_kindling_event->context.tinfo.tid = threadInfo->m_tid; + map::iterator key = + ptid_comm.find(threadInfo->m_pid << 32 | (threadInfo->m_tid & 0xFFFFFFFF)); + if (key != ptid_comm.end()) { + strcpy(p_kindling_event->context.tinfo.comm, key->second); + } + p_kindling_event->context.tinfo.pid = threadInfo->m_pid; + p_kindling_event->paramsNumber = userAttNumber; +} + void parse_tm(char* data_val, sinsp_evt_param data_param, sinsp_threadinfo* threadInfo) { char* comm_char = new char[256]; int val_offset = 0; diff --git a/probe/src/cgo/kindling.h b/probe/src/cgo/kindling.h index 04d140994..1d84900e0 100644 --- a/probe/src/cgo/kindling.h +++ b/probe/src/cgo/kindling.h @@ -99,6 +99,10 @@ void parse_xtid(sinsp_evt* s_evt, char* data_val, sinsp_evt_param data_param, kindling_event_t_for_go* p_kindling_event, sinsp_threadinfo* threadInfo, uint16_t& userAttNumber); +void parse_span(sinsp_evt *s_evt, char *data_val, sinsp_evt_param data_param, + kindling_event_t_for_go *p_kindling_event, sinsp_threadinfo* threadInfo, + uint16_t &userAttNumber); + void parse_tm(char* data_val, sinsp_evt_param data_param, sinsp_threadinfo* threadInfo); void init_kindling_event(kindling_event_t_for_go* p_kindling_event, void** pp_kindling_event);