Skip to content

Commit

Permalink
Add channel based producer integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Sep 29, 2021
1 parent e7b4bd9 commit cf6e6d8
Showing 1 changed file with 72 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build cgo && (linux || darwin)
// +build cgo
// +build linux darwin
//go:build cgo && linux
// +build cgo,linux

package test

Expand Down Expand Up @@ -47,7 +46,54 @@ var (
testTopic = "gotest"
)

func TestSynchronous(t *testing.T) {
func TestChannelBasedProducer(t *testing.T) {
defer goleak.VerifyNone(t)

partition := int32(0)
sr, opts := newFixtures()
p := newProducer(t, opts...)

done := make(chan struct{})
var sent *kafka.Message
go func() {
defer close(done)
sent = requireEventIsMessage(t, <-p.Events())
}()
go func() {
p.ProduceChannel() <- &kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &testTopic,
Partition: partition,
},
Key: key,
Value: val,
}
}()

// Wait for the delivery report goroutine to finish.
<-done
require.NoError(t, sent.TopicPartition.Error)

p.Close()

recv := consumeMessage(t, kafka.TopicPartition{
Topic: &testTopic,
Partition: partition,
Offset: sent.TopicPartition.Offset,
}, opts...)

assert.Equal(t, sent.String(), recv.String())

spans := sr.Ended()
require.Len(t, spans, 2)
pSpan, cSpan := spans[0], spans[1]
// The should be linked via propagated headers.
assert.Equal(t, pSpan.SpanContext().TraceID(), cSpan.SpanContext().TraceID())
assertProducerSpan(t, pSpan)
assertConsumerSpan(t, cSpan)
}

func TestFunctionBasedProducer(t *testing.T) {
defer goleak.VerifyNone(t)

partition := int32(0)
Expand All @@ -65,27 +111,15 @@ func TestSynchronous(t *testing.T) {
}, deliveryCh)
require.NoError(t, err)
sent := requireEventIsMessage(t, <-deliveryCh)
require.NoError(t, sent.TopicPartition.Error)

p.Close()

c := newConsumer(t, opts...)
require.NoError(t, c.Assign([]kafka.TopicPartition{{
recv := consumeMessage(t, kafka.TopicPartition{
Topic: &testTopic,
Partition: partition,
Offset: sent.TopicPartition.Offset,
}}))
recv := requireEventIsMessage(t, func() kafka.Event {
for {
if e := c.Poll(100); e != nil {
return e
}
}
}())
_, err = c.CommitMessage(recv)
assert.NoError(t, err)
assert.NoError(t, c.Unassign())

c.Close()
}, opts...)

assert.Equal(t, sent.String(), recv.String())

Expand Down Expand Up @@ -124,6 +158,25 @@ func newConsumer(t *testing.T, opts ...splunkkafka.Option) *splunkkafka.Consumer
return c
}

func consumeMessage(t *testing.T, tp kafka.TopicPartition, opts ...splunkkafka.Option) *kafka.Message {
c := newConsumer(t, opts...)
require.NoError(t, c.Assign([]kafka.TopicPartition{tp}))
recv := requireEventIsMessage(t, func() kafka.Event {
for {
if e := c.Poll(100); e != nil {
return e
}
}
}())
assert.NoError(t, recv.TopicPartition.Error)
_, err := c.CommitMessage(recv)
assert.NoError(t, err)
assert.NoError(t, c.Unassign())

c.Close()
return recv
}

func assertProducerSpan(t *testing.T, span trace.ReadOnlySpan) {
assert.Equal(t, fmt.Sprintf("%s send", testTopic), span.Name())
assert.Equal(t, traceapi.SpanKindProducer, span.SpanKind())
Expand Down

0 comments on commit cf6e6d8

Please sign in to comment.