Skip to content

Commit

Permalink
kafka(ticdc): producer accept message struct as parameter to simplify…
Browse files Browse the repository at this point in the history
… the signature (#11252)

close #11264
  • Loading branch information
3AceShowHand authored Jun 6, 2024
1 parent 971bfcd commit 277dd7d
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 72 deletions.
6 changes: 2 additions & 4 deletions cdc/sink/ddlsink/mq/ddlproducer/kafka_ddl_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ func (k *kafkaDDLProducer) SyncBroadcastMessage(ctx context.Context, topic strin
case <-ctx.Done():
return ctx.Err()
default:
err := k.syncProducer.SendMessages(ctx, topic,
totalPartitionsNum, message.Key, message.Value)
err := k.syncProducer.SendMessages(ctx, topic, totalPartitionsNum, message)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
}
}
Expand All @@ -89,8 +88,7 @@ func (k *kafkaDDLProducer) SyncSendMessage(ctx context.Context, topic string,
case <-ctx.Done():
return errors.Trace(ctx.Err())
default:
err := k.syncProducer.SendMessage(ctx, topic,
partitionNum, message.Key, message.Value)
err := k.syncProducer.SendMessage(ctx, topic, partitionNum, message)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
}
}
Expand Down
3 changes: 1 addition & 2 deletions cdc/sink/dmlsink/mq/dmlproducer/kafka_dml_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ func (k *kafkaDMLProducer) AsyncSendMessage(
k.failpointCh <- errors.New("kafka sink injected error")
failpoint.Return(nil)
})
return k.asyncProducer.AsyncSend(ctx, topic, partition,
message.Key, message.Value, message.Callback)
return k.asyncProducer.AsyncSend(ctx, topic, partition, message)
}

func (k *kafkaDMLProducer) Close() {
Expand Down
42 changes: 15 additions & 27 deletions pkg/sink/kafka/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)
Expand All @@ -48,15 +49,13 @@ type SyncProducer interface {
// of the produced message, or an error if the message failed to produce.
SendMessage(ctx context.Context,
topic string, partitionNum int32,
key []byte, value []byte) error
message *common.Message) error

// SendMessages produces a given set of messages, and returns only when all
// messages in the set have either succeeded or failed. Note that messages
// can succeed and fail individually; if some succeed and some fail,
// SendMessages will return an error.
SendMessages(ctx context.Context,
topic string, partitionNum int32,
key []byte, value []byte) error
SendMessages(ctx context.Context, topic string, partitionNum int32, message *common.Message) error

// Close shuts down the producer; you must call this function before a producer
// object passes out of scope, as it may otherwise leak memory.
Expand All @@ -75,9 +74,7 @@ type AsyncProducer interface {

// AsyncSend is the input channel for the user to write messages to that they
// wish to send.
AsyncSend(ctx context.Context, topic string,
partition int32, key []byte, value []byte,
callback func()) error
AsyncSend(ctx context.Context, topic string, partition int32, message *common.Message) error

// AsyncRunCallback process the messages that has sent to kafka,
// and run tha attached callback. the caller should call this
Expand All @@ -92,29 +89,26 @@ type saramaSyncProducer struct {
}

func (p *saramaSyncProducer) SendMessage(
ctx context.Context,
_ context.Context,
topic string, partitionNum int32,
key []byte, value []byte,
message *common.Message,
) error {
_, _, err := p.producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Key: sarama.ByteEncoder(key),
Value: sarama.ByteEncoder(value),
Key: sarama.ByteEncoder(message.Key),
Value: sarama.ByteEncoder(message.Value),
Partition: partitionNum,
})
return err
}

func (p *saramaSyncProducer) SendMessages(ctx context.Context,
topic string, partitionNum int32,
key []byte, value []byte,
) error {
func (p *saramaSyncProducer) SendMessages(ctx context.Context, topic string, partitionNum int32, message *common.Message) error {
msgs := make([]*sarama.ProducerMessage, partitionNum)
for i := 0; i < int(partitionNum); i++ {
msgs[i] = &sarama.ProducerMessage{
Topic: topic,
Key: sarama.ByteEncoder(key),
Value: sarama.ByteEncoder(value),
Key: sarama.ByteEncoder(message.Key),
Value: sarama.ByteEncoder(message.Value),
Partition: int32(i),
}
}
Expand Down Expand Up @@ -259,19 +253,13 @@ func (p *saramaAsyncProducer) AsyncRunCallback(

// AsyncSend is the input channel for the user to write messages to that they
// wish to send.
func (p *saramaAsyncProducer) AsyncSend(ctx context.Context,
topic string,
partition int32,
key []byte,
value []byte,
callback func(),
) error {
func (p *saramaAsyncProducer) AsyncSend(ctx context.Context, topic string, partition int32, message *common.Message) error {
msg := &sarama.ProducerMessage{
Topic: topic,
Partition: partition,
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(value),
Metadata: callback,
Key: sarama.StringEncoder(message.Key),
Value: sarama.ByteEncoder(message.Value),
Metadata: message.Callback,
}
select {
case <-ctx.Done():
Expand Down
29 changes: 12 additions & 17 deletions pkg/sink/kafka/mock_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/util"
)

Expand Down Expand Up @@ -91,30 +92,27 @@ type MockSaramaSyncProducer struct {

// SendMessage implement the SyncProducer interface.
func (m *MockSaramaSyncProducer) SendMessage(
ctx context.Context,
_ context.Context,
topic string, partitionNum int32,
key []byte, value []byte,
message *common.Message,
) error {
_, _, err := m.Producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Key: sarama.ByteEncoder(key),
Value: sarama.ByteEncoder(value),
Key: sarama.ByteEncoder(message.Key),
Value: sarama.ByteEncoder(message.Value),
Partition: partitionNum,
})
return err
}

// SendMessages implement the SyncProducer interface.
func (m *MockSaramaSyncProducer) SendMessages(ctx context.Context,
topic string, partitionNum int32,
key []byte, value []byte,
) error {
func (m *MockSaramaSyncProducer) SendMessages(ctx context.Context, topic string, partitionNum int32, message *common.Message) error {
msgs := make([]*sarama.ProducerMessage, partitionNum)
for i := 0; i < int(partitionNum); i++ {
msgs[i] = &sarama.ProducerMessage{
Topic: topic,
Key: sarama.ByteEncoder(key),
Value: sarama.ByteEncoder(value),
Key: sarama.ByteEncoder(message.Key),
Value: sarama.ByteEncoder(message.Value),
Partition: int32(i),
}
}
Expand Down Expand Up @@ -166,16 +164,13 @@ func (p *MockSaramaAsyncProducer) AsyncRunCallback(
}

// AsyncSend implement the AsyncProducer interface.
func (p *MockSaramaAsyncProducer) AsyncSend(ctx context.Context, topic string,
partition int32, key []byte, value []byte,
callback func(),
) error {
func (p *MockSaramaAsyncProducer) AsyncSend(ctx context.Context, topic string, partition int32, message *common.Message) error {
msg := &sarama.ProducerMessage{
Topic: topic,
Partition: partition,
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(value),
Metadata: callback,
Key: sarama.StringEncoder(message.Key),
Value: sarama.ByteEncoder(message.Value),
Metadata: message.Callback,
}
select {
case <-ctx.Done():
Expand Down
28 changes: 11 additions & 17 deletions pkg/sink/kafka/v2/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
pkafka "github.com/pingcap/tiflow/pkg/sink/kafka"
"github.com/pingcap/tiflow/pkg/util"
"github.com/segmentio/kafka-go"
Expand Down Expand Up @@ -275,31 +276,27 @@ type syncWriter struct {
func (s *syncWriter) SendMessage(
ctx context.Context,
topic string, partitionNum int32,
key []byte, value []byte,
message *common.Message,
) error {
return s.w.WriteMessages(ctx, kafka.Message{
Topic: topic,
Partition: int(partitionNum),
Key: key,
Value: value,
Key: message.Key,
Value: message.Value,
})
}

// SendMessages produces a given set of messages, and returns only when all
// messages in the set have either succeeded or failed. Note that messages
// can succeed and fail individually; if some succeed and some fail,
// SendMessages will return an error.
func (s *syncWriter) SendMessages(
ctx context.Context,
topic string, partitionNum int32,
key []byte, value []byte,
) error {
func (s *syncWriter) SendMessages(ctx context.Context, topic string, partitionNum int32, message *common.Message) error {
msgs := make([]kafka.Message, int(partitionNum))
for i := 0; i < int(partitionNum); i++ {
msgs[i] = kafka.Message{
Topic: topic,
Key: key,
Value: value,
Key: message.Key,
Value: message.Value,
Partition: i,
}
}
Expand Down Expand Up @@ -363,10 +360,7 @@ func (a *asyncWriter) Close() {

// AsyncSend is the input channel for the user to write messages to that they
// wish to send.
func (a *asyncWriter) AsyncSend(ctx context.Context, topic string,
partition int32, key []byte, value []byte,
callback func(),
) error {
func (a *asyncWriter) AsyncSend(ctx context.Context, topic string, partition int32, message *common.Message) error {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
Expand All @@ -375,9 +369,9 @@ func (a *asyncWriter) AsyncSend(ctx context.Context, topic string,
return a.w.WriteMessages(ctx, kafka.Message{
Topic: topic,
Partition: int(partition),
Key: key,
Value: value,
WriterData: callback,
Key: message.Key,
Value: message.Value,
WriterData: message.Callback,
})
}

Expand Down
14 changes: 9 additions & 5 deletions pkg/sink/kafka/v2/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
pkafka "github.com/pingcap/tiflow/pkg/sink/kafka"
v2mock "github.com/pingcap/tiflow/pkg/sink/kafka/v2/mock"
"github.com/pingcap/tiflow/pkg/util"
Expand Down Expand Up @@ -213,7 +214,9 @@ func TestSyncWriterSendMessage(t *testing.T) {
require.Equal(t, 3, msgs[0].Partition)
return errors.New("fake")
})
require.NotNil(t, w.SendMessage(context.Background(), "topic", 3, []byte{'1'}, []byte{}))

message := &common.Message{Key: []byte{'1'}, Value: []byte{}}
require.NotNil(t, w.SendMessage(context.Background(), "topic", 3, message))
}

func TestSyncWriterSendMessages(t *testing.T) {
Expand All @@ -224,7 +227,8 @@ func TestSyncWriterSendMessages(t *testing.T) {
require.Equal(t, 3, len(msgs))
return errors.New("fake")
})
require.NotNil(t, w.SendMessages(context.Background(), "topic", 3, []byte{'1'}, []byte{}))
message := &common.Message{Key: []byte{'1'}, Value: []byte{}}
require.NotNil(t, w.SendMessages(context.Background(), "topic", 3, message))
}

func TestSyncWriterClose(t *testing.T) {
Expand All @@ -244,14 +248,14 @@ func TestAsyncWriterAsyncSend(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())

callback := func() {}
message := &common.Message{Key: []byte{'1'}, Value: []byte{}, Callback: func() {}}
mw.EXPECT().WriteMessages(gomock.Any(), gomock.Any()).Return(nil)
err := w.AsyncSend(ctx, "topic", 1, []byte{'1'}, []byte{}, callback)
err := w.AsyncSend(ctx, "topic", 1, message)
require.NoError(t, err)

cancel()

err = w.AsyncSend(ctx, "topic", 1, []byte{'1'}, []byte{}, callback)
err = w.AsyncSend(ctx, "topic", 1, nil)
require.ErrorIs(t, err, context.Canceled)
}

Expand Down

0 comments on commit 277dd7d

Please sign in to comment.