diff --git a/exporter/kafkaexporter/marshaler_test.go b/exporter/kafkaexporter/marshaler_test.go index 69d77339480e..4b29b10252d8 100644 --- a/exporter/kafkaexporter/marshaler_test.go +++ b/exporter/kafkaexporter/marshaler_test.go @@ -93,8 +93,8 @@ func TestOTLPMetricsJsonMarshaling(t *testing.T) { m.Gauge().DataPoints().At(0).SetDoubleValue(1.0) r1 := pcommon.NewResource() - r1.Attributes().PutStr("service.name", "my_service_name") r1.Attributes().PutStr("service.instance.id", "kek_x_2") + r1.Attributes().PutStr("service.name", "my_service_name") r1.CopyTo(metric.ResourceMetrics().AppendEmpty().Resource()) standardMarshaler := metricsMarshalers()["otlp_json"] @@ -111,8 +111,8 @@ func TestOTLPMetricsJsonMarshaling(t *testing.T) { require.NoError(t, err, "Must have marshaled the data without error") require.Len(t, msgs, 2, "Expected number of messages in the message") - require.Equal(t, sarama.ByteEncoder("90e74a8334a89993bd3f6ad05f9ca02438032a78d4399fb6fecf6c94fcdb13ef"), msgs[0].Key) - require.Equal(t, sarama.ByteEncoder("55e1113a2eace57b91ef58911d811c28e936365f03ac068e8ce23090d9ea748f"), msgs[1].Key) + require.Equal(t, sarama.ByteEncoder{0x62, 0x7f, 0x20, 0x34, 0x85, 0x49, 0x55, 0x2e, 0xfa, 0x93, 0xae, 0xd7, 0xde, 0x91, 0xd7, 0x16}, msgs[0].Key) + require.Equal(t, sarama.ByteEncoder{0x75, 0x6b, 0xb4, 0xd6, 0xff, 0xeb, 0x92, 0x22, 0xa, 0x68, 0x65, 0x48, 0xe0, 0xd3, 0x94, 0x44}, msgs[1].Key) } func TestOTLPTracesJsonMarshaling(t *testing.T) { diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index 13a3e371e46d..415546e6b205 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -5,9 +5,9 @@ package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collect import ( "github.com/IBM/sarama" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/resourceutil" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -67,7 +67,7 @@ func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sar for i := 0; i < metrics.Len(); i++ { resourceMetrics := metrics.At(i) - hash := resourceutil.CalculateResourceAttributesHash(resourceMetrics.Resource()) + var hash = pdatautil.MapHash(resourceMetrics.Resource().Attributes()) newMetrics := pmetric.NewMetrics() resourceMetrics.MoveTo(newMetrics.ResourceMetrics().AppendEmpty()) @@ -79,7 +79,7 @@ func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sar msgs = append(msgs, &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(bts), - Key: sarama.ByteEncoder(hash), + Key: sarama.ByteEncoder(hash[:]), }) } } else { diff --git a/internal/coreinternal/resourceutil/resourceutil.go b/internal/coreinternal/resourceutil/resourceutil.go deleted file mode 100644 index 9d0e2e9d191f..000000000000 --- a/internal/coreinternal/resourceutil/resourceutil.go +++ /dev/null @@ -1,32 +0,0 @@ -package resourceutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/resourceutil" - -import ( - "crypto/sha256" - "encoding/hex" - "go.opentelemetry.io/collector/pdata/pcommon" - "sort" -) - -type keyValueLabelPair struct { - Key string - Value string -} - -func CalculateResourceAttributesHash(resourceMetrics pcommon.Resource) string { - var pairs []keyValueLabelPair - resourceMetrics.Attributes().Range(func(k string, v pcommon.Value) bool { - pairs = append(pairs, keyValueLabelPair{Key: k, Value: v.AsString()}) - return true - }) - - sort.SliceStable(pairs, func(i, j int) bool { - return pairs[i].Key < pairs[j].Key - }) - - h := sha256.New() - for _, pair := range pairs { - h.Write([]byte(pair.Key)) - h.Write([]byte(pair.Value)) - } - return hex.EncodeToString(h.Sum(nil)) -} diff --git a/internal/coreinternal/resourceutil/resourceutil_test.go b/internal/coreinternal/resourceutil/resourceutil_test.go deleted file mode 100644 index 565285806030..000000000000 --- a/internal/coreinternal/resourceutil/resourceutil_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package resourceutil - -import ( - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/pdata/pcommon" - "testing" -) - -func TestHashEmptyResource(t *testing.T) { - r := pcommon.NewResource() - - assert.EqualValues(t, "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", CalculateResourceAttributesHash(r)) -} - -func TestHashSimpleResource(t *testing.T) { - r := pcommon.NewResource() - r.Attributes().PutStr("k1", "v1") - r.Attributes().PutStr("k2", "v2") - - assert.EqualValues(t, "3590bbad8f8a328dbbd5d01c35d8a5fab92c3588cf7e468e995c31d45a51cbef", CalculateResourceAttributesHash(r)) -} - -func TestHashReorderedAttributes(t *testing.T) { - r1 := pcommon.NewResource() - r1.Attributes().PutStr("k1", "v1") - r1.Attributes().PutStr("k2", "v2") - - r2 := pcommon.NewResource() - r2.Attributes().PutStr("k2", "v2") - r2.Attributes().PutStr("k1", "v1") - - assert.EqualValues(t, CalculateResourceAttributesHash(r1), CalculateResourceAttributesHash(r2)) -} - -func TestHashDifferentAttributeValues(t *testing.T) { - r := pcommon.NewResource() - r.Attributes().PutBool("k1", false) - r.Attributes().PutDouble("k2", 1.0) - r.Attributes().PutEmpty("k3") - r.Attributes().PutEmptyBytes("k4") - r.Attributes().PutEmptyMap("k5") - r.Attributes().PutEmptySlice("k6") - r.Attributes().PutInt("k7", 1) - r.Attributes().PutStr("k8", "v8") - - assert.EqualValues(t, "46852adab1751045942d67dace7c88665ec0e68b7f4b81a33bb05e5b954a8e57", CalculateResourceAttributesHash(r)) -}