Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tracing span data in cpu events #384

Merged
merged 3 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (ca *CpuAnalyzer) Type() analyzer.Type {
}

func (ca *CpuAnalyzer) ConsumableEvents() []string {
return []string{constnames.CpuEvent, constnames.JavaFutexInfo, constnames.TransactionIdEvent, constnames.ProcessExitEvent}
return []string{constnames.CpuEvent, constnames.JavaFutexInfo, constnames.TransactionIdEvent, constnames.ProcessExitEvent, constnames.SpanEvent}
}

func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consumers []consumer.Consumer) analyzer.Analyzer {
Expand Down Expand Up @@ -75,6 +75,8 @@ func (ca *CpuAnalyzer) ConsumeEvent(event *model.KindlingEvent) error {
pid := event.GetPid()
tid := event.Ctx.ThreadInfo.GetTid()
ca.trimExitedThread(pid, tid)
case constnames.SpanEvent:
ca.ConsumeSpanEvent(event)
}
return nil
}
Expand Down Expand Up @@ -106,6 +108,23 @@ func (ca *CpuAnalyzer) ConsumeJavaFutexEvent(event *model.KindlingEvent) {
ca.PutEventToSegments(event.GetPid(), event.Ctx.ThreadInfo.GetTid(), event.Ctx.ThreadInfo.Comm, ev)
}

func (ca *CpuAnalyzer) ConsumeSpanEvent(event *model.KindlingEvent) {
ev := new(ApmSpanEvent)
ev.StartTime = event.Timestamp
for i := 0; i < int(event.ParamsNumber); i++ {
userAttributes := event.UserAttributes[i]
switch {
case userAttributes.GetKey() == "end_time":
ev.EndTime, _ = strconv.ParseUint(string(userAttributes.GetValue()), 10, 64)
case userAttributes.GetKey() == "trace_id":
ev.TraceId = string(userAttributes.GetValue())
case userAttributes.GetKey() == "span":
ev.Name = string(userAttributes.GetValue())
}
}
ca.PutEventToSegments(event.GetPid(), event.Ctx.ThreadInfo.GetTid(), event.Ctx.ThreadInfo.Comm, ev)
}

func (ca *CpuAnalyzer) ConsumeCpuEvent(event *model.KindlingEvent) {
ev := new(CpuEvent)
for i := 0; i < int(event.ParamsNumber); i++ {
Expand Down
29 changes: 29 additions & 0 deletions collector/pkg/component/analyzer/cpuanalyzer/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ const (
TimedCpuEventKind TimedEventKind = iota
TimedJavaFutexEventKind
TimedTransactionIdEventKind
TimedApmSpanEventKind
)

const (
CpuEventLabel = "cpuEvents"
JavaFutexEventLabel = "javaFutexEvents"
TransactionIdEventLabel = "transactionIds"
SpanLabel = "spans"
)

type TimedEvent interface {
Expand All @@ -43,6 +45,7 @@ type Segment struct {
CpuEvents []TimedEvent `json:"cpuEvents"`
JavaFutexEvents []TimedEvent `json:"javaFutexEvents"`
TransactionIds []TimedEvent `json:"transactionIds"`
Spans []TimedEvent `json:"spans"`
IsSend int
IndexTimestamp string `json:"indexTimestamp"`
}
Expand All @@ -57,6 +60,7 @@ func newSegment(pid uint32, tid uint32, threadName string, startTime uint64, end
CpuEvents: make([]TimedEvent, 0),
JavaFutexEvents: make([]TimedEvent, 0),
TransactionIds: make([]TimedEvent, 0),
Spans: make([]TimedEvent, 0),
IsSend: 0,
IndexTimestamp: "",
}
Expand All @@ -69,6 +73,8 @@ func (s *Segment) putTimedEvent(event TimedEvent) {
s.JavaFutexEvents = append(s.JavaFutexEvents, event)
case TimedTransactionIdEventKind:
s.TransactionIds = append(s.TransactionIds, event)
case TimedApmSpanEventKind:
s.Spans = append(s.Spans, event)
}
}

Expand All @@ -92,6 +98,10 @@ func (s *Segment) toDataGroup() *model.DataGroup {
if err == nil {
labels.AddStringValue(TransactionIdEventLabel, string(transactionIdEventString))
}
spanEventString, err := json.Marshal(s.Spans)
if err == nil {
labels.AddStringValue(SpanLabel, string(spanEventString))
}
return model.NewDataGroup(constnames.CameraEventGroupName, labels, s.StartTime)
}

Expand Down Expand Up @@ -154,3 +164,22 @@ func (t *TransactionIdEvent) EndTimestamp() uint64 {
func (t *TransactionIdEvent) Kind() TimedEventKind {
return TimedTransactionIdEventKind
}

type ApmSpanEvent struct {
StartTime uint64 `json:"startTime"`
EndTime uint64 `json:"endTime"`
TraceId string `json:"traceId"`
Name string `json:"name"`
}

func (j *ApmSpanEvent) StartTimestamp() uint64 {
return j.StartTime
}

func (j *ApmSpanEvent) EndTimestamp() uint64 {
return j.EndTime
}

func (j *ApmSpanEvent) Kind() TimedEventKind {
return TimedApmSpanEventKind
}
1 change: 1 addition & 0 deletions collector/pkg/model/constnames/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
CpuEvent = "cpu_event"
JavaFutexInfo = "java_futex_info"
TransactionIdEvent = "apm_trace_id_event"
SpanEvent = "apm_span_event"
OtherEvent = "other"

ProcessExitEvent = "procexit"
Expand Down
70 changes: 69 additions & 1 deletion probe/src/cgo/kindling.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ char* time_char = new char[32];
char* depth_char = new char[8];
char* finish_char = new char[4];
char* kd_stack = new char[1024];
char* duration_char = new char[32];
char* span_char = new char[1024];

int16_t event_filters[1024][16];

Expand Down Expand Up @@ -211,7 +213,10 @@ int getEvent(void** pp_kindling_event) {
parse_xtid(ev, data_val, *data_param, p_kindling_event, threadInfo, userAttNumber);
return 1;
}

if (data_param->m_len > 8 && memcmp(data_val, "kd-span@", 8) == 0) {
parse_span(ev, data_val, *data_param, p_kindling_event, threadInfo, userAttNumber);
return 1;
}
if (data_param->m_len > 6 && memcmp(data_val, "kd-tm@", 6) == 0) {
parse_tm(data_val, *data_param, threadInfo);
return -1;
Expand Down Expand Up @@ -483,6 +488,69 @@ void parse_xtid(sinsp_evt* s_evt, char* data_val, sinsp_evt_param data_param,
p_kindling_event->paramsNumber = userAttNumber;
}

void parse_span(sinsp_evt* s_evt, char* data_val, sinsp_evt_param data_param,
kindling_event_t_for_go* p_kindling_event, sinsp_threadinfo* threadInfo,
uint16_t& userAttNumber) {
int val_offset = 0;
int tmp_offset = 0;
int span_offset = 0;
int traceId_offset = 0;
for (int i = 8; i < data_param.m_len; i++) {
if (data_val[i] == '!') {
if (val_offset == 0) {
duration_char[tmp_offset] = '\0';
} else if (val_offset == 1) {
span_char[tmp_offset] = '\0';
span_offset = tmp_offset;
} else if (val_offset == 2) {
traceId[tmp_offset] = '\0';
traceId_offset = tmp_offset;
break;
}
tmp_offset = 0;
val_offset++;
continue;
}
if (val_offset == 0) {
duration_char[tmp_offset] = data_val[i];
} else if (val_offset == 1) {
span_char[tmp_offset] = data_val[i];
} else if (val_offset == 2) {
traceId[tmp_offset] = data_val[i];
}
tmp_offset++;
}
p_kindling_event->timestamp = s_evt->get_ts() - atol(duration_char);
strcpy(p_kindling_event->userAttributes[userAttNumber].key, "end_time");
memcpy(p_kindling_event->userAttributes[userAttNumber].value, to_string(s_evt->get_ts()).data(),
19);
p_kindling_event->userAttributes[userAttNumber].valueType = UINT64;
p_kindling_event->userAttributes[userAttNumber].len = 19;
userAttNumber++;

strcpy(p_kindling_event->userAttributes[userAttNumber].key, "trace_id");
memcpy(p_kindling_event->userAttributes[userAttNumber].value, traceId, traceId_offset);
p_kindling_event->userAttributes[userAttNumber].valueType = CHARBUF;
p_kindling_event->userAttributes[userAttNumber].len = traceId_offset;
userAttNumber++;

strcpy(p_kindling_event->userAttributes[userAttNumber].key, "span");
memcpy(p_kindling_event->userAttributes[userAttNumber].value, span_char, span_offset);
p_kindling_event->userAttributes[userAttNumber].valueType = CHARBUF;
p_kindling_event->userAttributes[userAttNumber].len = span_offset;
userAttNumber++;

strcpy(p_kindling_event->name, "apm_span_event");
p_kindling_event->context.tinfo.tid = threadInfo->m_tid;
map<uint64_t, char*>::iterator key =
ptid_comm.find(threadInfo->m_pid << 32 | (threadInfo->m_tid & 0xFFFFFFFF));
if (key != ptid_comm.end()) {
strcpy(p_kindling_event->context.tinfo.comm, key->second);
}
p_kindling_event->context.tinfo.pid = threadInfo->m_pid;
p_kindling_event->paramsNumber = userAttNumber;
}

void parse_tm(char* data_val, sinsp_evt_param data_param, sinsp_threadinfo* threadInfo) {
char* comm_char = new char[256];
int val_offset = 0;
Expand Down
4 changes: 4 additions & 0 deletions probe/src/cgo/kindling.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ void parse_xtid(sinsp_evt* s_evt, char* data_val, sinsp_evt_param data_param,
kindling_event_t_for_go* p_kindling_event, sinsp_threadinfo* threadInfo,
uint16_t& userAttNumber);

void parse_span(sinsp_evt *s_evt, char *data_val, sinsp_evt_param data_param,
kindling_event_t_for_go *p_kindling_event, sinsp_threadinfo* threadInfo,
uint16_t &userAttNumber);

void parse_tm(char* data_val, sinsp_evt_param data_param, sinsp_threadinfo* threadInfo);

void init_kindling_event(kindling_event_t_for_go* p_kindling_event, void** pp_kindling_event);
Expand Down