diff --git a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go index 0bc6703..ae52e44 100644 --- a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go +++ b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go @@ -13,6 +13,20 @@ import ( ) // schema reference: https://github.com/metrico/qryn/blob/master/lib/db/maintain/scripts.js +const ( + columnTimestampNs = "timestamp_ns" + columnType = "type" + columnServiceName = "service_name" + columnSampleTypesUnits = "sample_types_units" + columnPeriodType = "period_type" + columnPeriodUnit = "period_unit" + columnTags = "tags" + columnDurationNs = "duration_ns" + columnPayloadType = "payload_type" + columnPayloaf = "payload" + columnValuesAgg = "values_agg" +) + type clickhouseAccessNativeColumnar struct { conn driver.Conn @@ -92,10 +106,10 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { m = r.Attributes() timestamp_ns[i] = uint64(r.Timestamp()) - tmp, _ = m.Get("type") + tmp, _ = m.Get(columnType) typ[i] = tmp.AsString() - tmp, _ = m.Get("service_name") + tmp, _ = m.Get(columnServiceName) service_name[i] = tmp.AsString() sample_types, _ := m.Get("sample_types") @@ -112,7 +126,7 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { return err } - values_agg_raw, ok := m.Get("values_agg") + values_agg_raw, ok := m.Get(columnValuesAgg) if ok { values_agg_tuple, err := valueAggToTuple(&values_agg_raw) if err != nil { @@ -127,13 +141,13 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { sample_types_units_item[i] = tuple{v, sample_units_array[i]} } sample_types_units[i] = sample_types_units_item - tmp, _ = m.Get("period_type") + tmp, _ = m.Get(columnPeriodType) period_type[i] = tmp.AsString() - tmp, _ = m.Get("period_unit") + tmp, _ = m.Get(columnPeriodUnit) period_unit[i] = tmp.AsString() - tmp, _ = m.Get("tags") + tmp, _ = m.Get(columnTags) tm = tmp.Map().AsRaw() tag, j := make([]tuple, len(tm)), 0 for k, v := range tm { @@ -142,10 +156,10 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { } tags[i] = tag - tmp, _ = m.Get("duration_ns") + tmp, _ = m.Get(columnDurationNs) duration_ns[i], _ = strconv.ParseUint(tmp.Str(), 10, 64) - tmp, _ = m.Get("payload_type") + tmp, _ = m.Get(columnPeriodType) payload_type[i] = tmp.AsString() payload[i] = r.Body().Bytes().AsRaw() diff --git a/exporter/clickhouseprofileexporter/exporter.go b/exporter/clickhouseprofileexporter/exporter.go index 7258847..deae555 100644 --- a/exporter/clickhouseprofileexporter/exporter.go +++ b/exporter/clickhouseprofileexporter/exporter.go @@ -64,11 +64,11 @@ func newClickhouseProfileExporter(ctx context.Context, set *exporter.CreateSetti func (exp *clickhouseProfileExporter) send(ctx context.Context, logs plog.Logs) error { start := time.Now().UnixMilli() if err := exp.ch.InsertBatch(logs); err != nil { - otelcolExporterClickhouseProfileBatchInsertTimeMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError))) + otelcolExporterClickhouseProfileBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError))) exp.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err } - otelcolExporterClickhouseProfileBatchInsertTimeMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess))) + otelcolExporterClickhouseProfileBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess))) exp.logger.Info("inserted batch", zap.Int("size", logs.ResourceLogs().Len())) return nil } diff --git a/exporter/clickhouseprofileexporter/metrics.go b/exporter/clickhouseprofileexporter/metrics.go index d494573..5c6026a 100644 --- a/exporter/clickhouseprofileexporter/metrics.go +++ b/exporter/clickhouseprofileexporter/metrics.go @@ -9,14 +9,14 @@ import ( const prefix = "exporter_clickhouse_profile_" var ( - otelcolExporterClickhouseProfileBatchInsertTimeMillis metric.Int64Histogram + otelcolExporterClickhouseProfileBatchInsertDurationMillis metric.Int64Histogram ) func initMetrics(meter metric.Meter) error { var err error - if otelcolExporterClickhouseProfileBatchInsertTimeMillis, err = meter.Int64Histogram( - fmt.Sprint(prefix, "batch_insert_time_millis"), - metric.WithDescription("Clickhouse profile exporter batch insert time in millis"), + if otelcolExporterClickhouseProfileBatchInsertDurationMillis, err = meter.Int64Histogram( + fmt.Sprint(prefix, "batch_insert_duration_millis"), + metric.WithDescription("Clickhouse profile exporter batch insert duration in millis"), metric.WithExplicitBucketBoundaries(0, 5, 10, 20, 50, 100, 200, 500, 1000, 5000), ); err != nil { return err diff --git a/receiver/pyroscopereceiver/jfrparser/parser.go b/receiver/pyroscopereceiver/jfrparser/parser.go index c956c45..1cde2ca 100644 --- a/receiver/pyroscopereceiver/jfrparser/parser.go +++ b/receiver/pyroscopereceiver/jfrparser/parser.go @@ -114,6 +114,7 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata) ([ if nil == pr { continue } + // assuming jfr-pprof conversion should not expand memory footprint, transitively applying jfr limit on pprof pr.prof.Payload = new(bytes.Buffer) pr.pprof.WriteUncompressed(pr.prof.Payload) @@ -222,7 +223,7 @@ func calculateValuesAgg(samples *pprof_proto.Profile) []profile_types.SampleType // Loop through each sample type for j, st := range samples.SampleType { sum, count := calculateSumAndCount(samples, j) - valuesAgg = append(valuesAgg, profile_types.SampleType{fmt.Sprintf("%s:%s", st.Type, st.Unit), sum, count}) + valuesAgg = append(valuesAgg, profile_types.SampleType{Key: fmt.Sprintf("%s:%s", st.Type, st.Unit), Sum: sum, Count: count}) } return valuesAgg diff --git a/receiver/pyroscopereceiver/metrics.go b/receiver/pyroscopereceiver/metrics.go index f77d1c9..c7dbc6f 100644 --- a/receiver/pyroscopereceiver/metrics.go +++ b/receiver/pyroscopereceiver/metrics.go @@ -12,7 +12,6 @@ var ( otelcolReceiverPyroscopeHttpRequestTotal metric.Int64Counter otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes metric.Int64Histogram otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes metric.Int64Histogram - otelcolReceiverPyroscopeHttpResponseTimeMillis metric.Int64Histogram ) func initMetrics(meter metric.Meter) error { @@ -37,12 +36,5 @@ func initMetrics(meter metric.Meter) error { ); err != nil { return err } - if otelcolReceiverPyroscopeHttpResponseTimeMillis, err = meter.Int64Histogram( - fmt.Sprint(prefix, "http_response_time_millis"), - metric.WithDescription("Pyroscope receiver http response time in millis"), - metric.WithExplicitBucketBoundaries(0, 5, 10, 20, 50, 100, 200, 500, 1000, 5000), - ); err != nil { - return err - } return nil } diff --git a/receiver/pyroscopereceiver/receiver.go b/receiver/pyroscopereceiver/receiver.go index 5065025..91ee8f0 100644 --- a/receiver/pyroscopereceiver/receiver.go +++ b/receiver/pyroscopereceiver/receiver.go @@ -12,7 +12,6 @@ import ( "strconv" "strings" "sync" - "time" "github.com/metrico/otel-collector/receiver/pyroscopereceiver/compress" "github.com/metrico/otel-collector/receiver/pyroscopereceiver/jfrparser" @@ -95,7 +94,7 @@ func newPyroscopeReceiver(cfg *Config, consumer consumer.Logs, set *receiver.Cre // TODO: rate limit clients func (recv *pyroscopeReceiver) httpHandlerIngest(resp http.ResponseWriter, req *http.Request) { - ctx, cancel := context.WithTimeout(contextWithStart(req.Context(), time.Now().UnixMilli()), recv.cfg.Timeout) + ctx, cancel := context.WithTimeout(req.Context(), recv.cfg.Timeout) defer cancel() // all compute should be bounded by timeout, so dont add compute here @@ -108,14 +107,6 @@ func (recv *pyroscopeReceiver) httpHandlerIngest(resp http.ResponseWriter, req * } } -func startTimeFromContext(ctx context.Context) int64 { - return ctx.Value(keyStart).(int64) -} - -func contextWithStart(ctx context.Context, start int64) context.Context { - return context.WithValue(ctx, keyStart, start) -} - func (recv *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWriter, req *http.Request) <-chan struct{} { c := make(chan struct{}) go func() { @@ -139,6 +130,7 @@ func (recv *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWri recv.handleError(ctx, resp, "text/plain", http.StatusBadRequest, err.Error(), pm.name, errorCodeError) return } + // if no profiles have been parsed, dont error but return if pl.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len() == 0 { writeResponseNoContent(resp) @@ -153,16 +145,14 @@ func (recv *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWri return } - otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess))) - otelcolReceiverPyroscopeHttpResponseTimeMillis.Record(ctx, time.Now().UnixMilli()-startTimeFromContext(ctx), metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess))) + otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess, http.StatusNoContent))) writeResponseNoContent(resp) }() return c } func (recv *pyroscopeReceiver) handleError(ctx context.Context, resp http.ResponseWriter, contentType string, statusCode int, msg string, service string, errorCode string) { - otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, metric.WithAttributeSet(*newOtelcolAttrSetHttp(service, errorCode))) - otelcolReceiverPyroscopeHttpResponseTimeMillis.Record(ctx, time.Now().Unix()-startTimeFromContext(ctx), metric.WithAttributeSet(*newOtelcolAttrSetHttp(service, errorCode))) + otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, metric.WithAttributeSet(*newOtelcolAttrSetHttp(service, errorCode, statusCode))) recv.logger.Error(msg) writeResponse(resp, "text/plain", statusCode, []byte(msg)) } @@ -219,8 +209,12 @@ func readParams(qs *url.Values) (params, error) { return p, nil } -func newOtelcolAttrSetHttp(service string, errorCode string) *attribute.Set { - s := attribute.NewSet(attribute.KeyValue{Key: keyService, Value: attribute.StringValue(service)}, attribute.KeyValue{Key: "error_code", Value: attribute.StringValue(errorCode)}) +func newOtelcolAttrSetHttp(service string, errorCode string, statusCode int) *attribute.Set { + s := attribute.NewSet( + attribute.KeyValue{Key: keyService, Value: attribute.StringValue(service)}, + attribute.KeyValue{Key: "error_code", Value: attribute.StringValue(errorCode)}, + attribute.KeyValue{Key: "status_code", Value: attribute.IntValue(statusCode)}, + ) return &s }