Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pyroscope monitoring #54

Merged
merged 8 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions exporter/clickhouseprofileexporter/ch/access_native_columnar.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@ import (
)

// schema reference: https://github.com/metrico/qryn/blob/master/lib/db/maintain/scripts.js
const (
columnTimestampNs = "timestamp_ns"
columnType = "type"
columnServiceName = "service_name"
columnSampleTypesUnits = "sample_types_units"
columnPeriodType = "period_type"
columnPeriodUnit = "period_unit"
columnTags = "tags"
columnDurationNs = "duration_ns"
columnPayloadType = "payload_type"
columnPayloaf = "payload"
columnValuesAgg = "values_agg"
)

type clickhouseAccessNativeColumnar struct {
conn driver.Conn

Expand Down Expand Up @@ -92,10 +106,10 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
m = r.Attributes()
timestamp_ns[i] = uint64(r.Timestamp())

tmp, _ = m.Get("type")
tmp, _ = m.Get(columnType)
typ[i] = tmp.AsString()

tmp, _ = m.Get("service_name")
tmp, _ = m.Get(columnServiceName)
service_name[i] = tmp.AsString()

sample_types, _ := m.Get("sample_types")
Expand All @@ -112,7 +126,7 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
return err
}

values_agg_raw, ok := m.Get("values_agg")
values_agg_raw, ok := m.Get(columnValuesAgg)
if ok {
values_agg_tuple, err := valueAggToTuple(&values_agg_raw)
if err != nil {
Expand All @@ -127,13 +141,13 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
sample_types_units_item[i] = tuple{v, sample_units_array[i]}
}
sample_types_units[i] = sample_types_units_item
tmp, _ = m.Get("period_type")
tmp, _ = m.Get(columnPeriodType)
period_type[i] = tmp.AsString()

tmp, _ = m.Get("period_unit")
tmp, _ = m.Get(columnPeriodUnit)
period_unit[i] = tmp.AsString()

tmp, _ = m.Get("tags")
tmp, _ = m.Get(columnTags)
tm = tmp.Map().AsRaw()
tag, j := make([]tuple, len(tm)), 0
for k, v := range tm {
Expand All @@ -142,10 +156,10 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
}
tags[i] = tag

tmp, _ = m.Get("duration_ns")
tmp, _ = m.Get(columnDurationNs)
duration_ns[i], _ = strconv.ParseUint(tmp.Str(), 10, 64)

tmp, _ = m.Get("payload_type")
tmp, _ = m.Get(columnPeriodType)
payload_type[i] = tmp.AsString()

payload[i] = r.Body().Bytes().AsRaw()
Expand Down
4 changes: 2 additions & 2 deletions exporter/clickhouseprofileexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ func newClickhouseProfileExporter(ctx context.Context, set *exporter.CreateSetti
func (exp *clickhouseProfileExporter) send(ctx context.Context, logs plog.Logs) error {
start := time.Now().UnixMilli()
if err := exp.ch.InsertBatch(logs); err != nil {
otelcolExporterClickhouseProfileBatchInsertTimeMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError)))
otelcolExporterClickhouseProfileBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError)))
exp.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
}
otelcolExporterClickhouseProfileBatchInsertTimeMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess)))
otelcolExporterClickhouseProfileBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess)))
exp.logger.Info("inserted batch", zap.Int("size", logs.ResourceLogs().Len()))
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions exporter/clickhouseprofileexporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
const prefix = "exporter_clickhouse_profile_"

var (
otelcolExporterClickhouseProfileBatchInsertTimeMillis metric.Int64Histogram
otelcolExporterClickhouseProfileBatchInsertDurationMillis metric.Int64Histogram
)

func initMetrics(meter metric.Meter) error {
var err error
if otelcolExporterClickhouseProfileBatchInsertTimeMillis, err = meter.Int64Histogram(
fmt.Sprint(prefix, "batch_insert_time_millis"),
metric.WithDescription("Clickhouse profile exporter batch insert time in millis"),
if otelcolExporterClickhouseProfileBatchInsertDurationMillis, err = meter.Int64Histogram(
fmt.Sprint(prefix, "batch_insert_duration_millis"),
metric.WithDescription("Clickhouse profile exporter batch insert duration in millis"),
metric.WithExplicitBucketBoundaries(0, 5, 10, 20, 50, 100, 200, 500, 1000, 5000),
); err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion receiver/pyroscopereceiver/jfrparser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata) ([
if nil == pr {
continue
}

// assuming jfr-pprof conversion should not expand memory footprint, transitively applying jfr limit on pprof
pr.prof.Payload = new(bytes.Buffer)
pr.pprof.WriteUncompressed(pr.prof.Payload)
Expand Down Expand Up @@ -222,7 +223,7 @@ func calculateValuesAgg(samples *pprof_proto.Profile) []profile_types.SampleType
// Loop through each sample type
for j, st := range samples.SampleType {
sum, count := calculateSumAndCount(samples, j)
valuesAgg = append(valuesAgg, profile_types.SampleType{fmt.Sprintf("%s:%s", st.Type, st.Unit), sum, count})
valuesAgg = append(valuesAgg, profile_types.SampleType{Key: fmt.Sprintf("%s:%s", st.Type, st.Unit), Sum: sum, Count: count})
}

return valuesAgg
Expand Down
8 changes: 0 additions & 8 deletions receiver/pyroscopereceiver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ var (
otelcolReceiverPyroscopeHttpRequestTotal metric.Int64Counter
otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes metric.Int64Histogram
otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes metric.Int64Histogram
otelcolReceiverPyroscopeHttpResponseTimeMillis metric.Int64Histogram
)

func initMetrics(meter metric.Meter) error {
Expand All @@ -37,12 +36,5 @@ func initMetrics(meter metric.Meter) error {
); err != nil {
return err
}
if otelcolReceiverPyroscopeHttpResponseTimeMillis, err = meter.Int64Histogram(
fmt.Sprint(prefix, "http_response_time_millis"),
metric.WithDescription("Pyroscope receiver http response time in millis"),
metric.WithExplicitBucketBoundaries(0, 5, 10, 20, 50, 100, 200, 500, 1000, 5000),
); err != nil {
return err
}
return nil
}
26 changes: 10 additions & 16 deletions receiver/pyroscopereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/metrico/otel-collector/receiver/pyroscopereceiver/compress"
"github.com/metrico/otel-collector/receiver/pyroscopereceiver/jfrparser"
Expand Down Expand Up @@ -95,7 +94,7 @@ func newPyroscopeReceiver(cfg *Config, consumer consumer.Logs, set *receiver.Cre

// TODO: rate limit clients
func (recv *pyroscopeReceiver) httpHandlerIngest(resp http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(contextWithStart(req.Context(), time.Now().UnixMilli()), recv.cfg.Timeout)
ctx, cancel := context.WithTimeout(req.Context(), recv.cfg.Timeout)
defer cancel()

// all compute should be bounded by timeout, so dont add compute here
Expand All @@ -108,14 +107,6 @@ func (recv *pyroscopeReceiver) httpHandlerIngest(resp http.ResponseWriter, req *
}
}

func startTimeFromContext(ctx context.Context) int64 {
return ctx.Value(keyStart).(int64)
}

func contextWithStart(ctx context.Context, start int64) context.Context {
return context.WithValue(ctx, keyStart, start)
}

func (recv *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWriter, req *http.Request) <-chan struct{} {
c := make(chan struct{})
go func() {
Expand All @@ -139,6 +130,7 @@ func (recv *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWri
recv.handleError(ctx, resp, "text/plain", http.StatusBadRequest, err.Error(), pm.name, errorCodeError)
return
}

// if no profiles have been parsed, dont error but return
if pl.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len() == 0 {
writeResponseNoContent(resp)
Expand All @@ -153,16 +145,14 @@ func (recv *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWri
return
}

otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess)))
otelcolReceiverPyroscopeHttpResponseTimeMillis.Record(ctx, time.Now().UnixMilli()-startTimeFromContext(ctx), metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess)))
otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess, http.StatusNoContent)))
writeResponseNoContent(resp)
}()
return c
}

func (recv *pyroscopeReceiver) handleError(ctx context.Context, resp http.ResponseWriter, contentType string, statusCode int, msg string, service string, errorCode string) {
otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, metric.WithAttributeSet(*newOtelcolAttrSetHttp(service, errorCode)))
otelcolReceiverPyroscopeHttpResponseTimeMillis.Record(ctx, time.Now().Unix()-startTimeFromContext(ctx), metric.WithAttributeSet(*newOtelcolAttrSetHttp(service, errorCode)))
otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, metric.WithAttributeSet(*newOtelcolAttrSetHttp(service, errorCode, statusCode)))
recv.logger.Error(msg)
writeResponse(resp, "text/plain", statusCode, []byte(msg))
}
Expand Down Expand Up @@ -219,8 +209,12 @@ func readParams(qs *url.Values) (params, error) {
return p, nil
}

func newOtelcolAttrSetHttp(service string, errorCode string) *attribute.Set {
s := attribute.NewSet(attribute.KeyValue{Key: keyService, Value: attribute.StringValue(service)}, attribute.KeyValue{Key: "error_code", Value: attribute.StringValue(errorCode)})
func newOtelcolAttrSetHttp(service string, errorCode string, statusCode int) *attribute.Set {
s := attribute.NewSet(
attribute.KeyValue{Key: keyService, Value: attribute.StringValue(service)},
attribute.KeyValue{Key: "error_code", Value: attribute.StringValue(errorCode)},
attribute.KeyValue{Key: "status_code", Value: attribute.IntValue(statusCode)},
)
return &s
}

Expand Down
Loading