diff --git a/instrumentation/github.com/confluentinc/confluent-kafka-go/kafka/splunkkafka/test/kafka_test.go b/instrumentation/github.com/confluentinc/confluent-kafka-go/kafka/splunkkafka/test/kafka_test.go index d2efb2eda..70193c31b 100644 --- a/instrumentation/github.com/confluentinc/confluent-kafka-go/kafka/splunkkafka/test/kafka_test.go +++ b/instrumentation/github.com/confluentinc/confluent-kafka-go/kafka/splunkkafka/test/kafka_test.go @@ -12,9 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build cgo && (linux || darwin) -// +build cgo -// +build linux darwin +//go:build cgo && linux +// +build cgo,linux package test @@ -47,7 +46,54 @@ var ( testTopic = "gotest" ) -func TestSynchronous(t *testing.T) { +func TestChannelBasedProducer(t *testing.T) { + defer goleak.VerifyNone(t) + + partition := int32(0) + sr, opts := newFixtures() + p := newProducer(t, opts...) + + done := make(chan struct{}) + var sent *kafka.Message + go func() { + defer close(done) + sent = requireEventIsMessage(t, <-p.Events()) + }() + go func() { + p.ProduceChannel() <- &kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &testTopic, + Partition: partition, + }, + Key: key, + Value: val, + } + }() + + // Wait for the delivery report goroutine to finish. + <-done + require.NoError(t, sent.TopicPartition.Error) + + p.Close() + + recv := consumeMessage(t, kafka.TopicPartition{ + Topic: &testTopic, + Partition: partition, + Offset: sent.TopicPartition.Offset, + }, opts...) + + assert.Equal(t, sent.String(), recv.String()) + + spans := sr.Ended() + require.Len(t, spans, 2) + pSpan, cSpan := spans[0], spans[1] + // The should be linked via propagated headers. + assert.Equal(t, pSpan.SpanContext().TraceID(), cSpan.SpanContext().TraceID()) + assertProducerSpan(t, pSpan) + assertConsumerSpan(t, cSpan) +} + +func TestFunctionBasedProducer(t *testing.T) { defer goleak.VerifyNone(t) partition := int32(0) @@ -65,27 +111,15 @@ func TestSynchronous(t *testing.T) { }, deliveryCh) require.NoError(t, err) sent := requireEventIsMessage(t, <-deliveryCh) + require.NoError(t, sent.TopicPartition.Error) p.Close() - c := newConsumer(t, opts...) - require.NoError(t, c.Assign([]kafka.TopicPartition{{ + recv := consumeMessage(t, kafka.TopicPartition{ Topic: &testTopic, Partition: partition, Offset: sent.TopicPartition.Offset, - }})) - recv := requireEventIsMessage(t, func() kafka.Event { - for { - if e := c.Poll(100); e != nil { - return e - } - } - }()) - _, err = c.CommitMessage(recv) - assert.NoError(t, err) - assert.NoError(t, c.Unassign()) - - c.Close() + }, opts...) assert.Equal(t, sent.String(), recv.String()) @@ -124,6 +158,25 @@ func newConsumer(t *testing.T, opts ...splunkkafka.Option) *splunkkafka.Consumer return c } +func consumeMessage(t *testing.T, tp kafka.TopicPartition, opts ...splunkkafka.Option) *kafka.Message { + c := newConsumer(t, opts...) + require.NoError(t, c.Assign([]kafka.TopicPartition{tp})) + recv := requireEventIsMessage(t, func() kafka.Event { + for { + if e := c.Poll(100); e != nil { + return e + } + } + }()) + assert.NoError(t, recv.TopicPartition.Error) + _, err := c.CommitMessage(recv) + assert.NoError(t, err) + assert.NoError(t, c.Unassign()) + + c.Close() + return recv +} + func assertProducerSpan(t *testing.T, span trace.ReadOnlySpan) { assert.Equal(t, fmt.Sprintf("%s send", testTopic), span.Name()) assert.Equal(t, traceapi.SpanKindProducer, span.SpanKind())