Skip to content

Commit

Permalink
Merge branch 'master' into savepoint-test
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Oct 18, 2022
2 parents bd83f8d + 64847e7 commit c66124b
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 142 deletions.
20 changes: 20 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,26 @@ func (r *RowChangedEvent) IsUpdate() bool {
return len(r.PreColumns) != 0 && len(r.Columns) != 0
}

// PrimaryKeyColumnNames return all primary key's name
func (r *RowChangedEvent) PrimaryKeyColumnNames() []string {
var result []string

var cols []*Column
if r.IsDelete() {
cols = r.PreColumns
} else {
cols = r.Columns
}

result = make([]string, 0)
for _, col := range cols {
if col != nil && col.Flag.IsPrimaryKey() {
result = append(result, col.Name)
}
}
return result
}

// PrimaryKeyColumns returns the column(s) corresponding to the handle key(s)
func (r *RowChangedEvent) PrimaryKeyColumns() []*Column {
pkeyCols := make([]*Column, 0)
Expand Down
3 changes: 1 addition & 2 deletions cdc/sink/codec/canal/canal_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ type canalEntryBuilder struct {

// newCanalEntryBuilder creates a new canalEntryBuilder
func newCanalEntryBuilder() *canalEntryBuilder {
d := charmap.ISO8859_1.NewDecoder()
return &canalEntryBuilder{
bytesDecoder: d,
bytesDecoder: charmap.ISO8859_1.NewDecoder(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/codec/canal/canal_json_decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) {
t.Parallel()
expectedDecodedValue := collectExpectedDecodedValue(testColumnsTable)
for _, encodeEnable := range []bool{false, true} {
encoder := &JSONBatchEncoder{builder: newCanalEntryBuilder(), enableTiDBExtension: encodeEnable}
encoder := newJSONBatchEncoder(encodeEnable)
require.NotNil(t, encoder)

err := encoder.AppendRowChangedEvent(context.Background(), "", testCaseInsert, nil)
Expand Down
250 changes: 127 additions & 123 deletions cdc/sink/codec/canal/canal_json_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,133 +24,148 @@ import (
"github.com/pingcap/tiflow/cdc/sink/codec"
"github.com/pingcap/tiflow/cdc/sink/codec/common"
"github.com/pingcap/tiflow/pkg/config"
cerrors "github.com/pingcap/tiflow/pkg/errors"
canal "github.com/pingcap/tiflow/proto/canal"
cerror "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

// JSONBatchEncoder encodes Canal json messages in JSON format
type JSONBatchEncoder struct {
builder *canalEntryBuilder
messageBuf []canalJSONMessageInterface
callbackBuf []func()
builder *canalEntryBuilder

// When it is true, canal-json would generate TiDB extension information
// which, at the moment, only includes `tidbWaterMarkType` and `_tidb` fields.
enableTiDBExtension bool

// messageHolder is used to hold each message and will be reset after each message is encoded.
messageHolder canalJSONMessageInterface
messages []*common.Message
}

// newJSONBatchEncoder creates a new JSONBatchEncoder
func newJSONBatchEncoder() codec.EventBatchEncoder {
return &JSONBatchEncoder{
builder: newCanalEntryBuilder(),
messageBuf: make([]canalJSONMessageInterface, 0),
callbackBuf: make([]func(), 0),
enableTiDBExtension: false,
func newJSONBatchEncoder(enableTiDBExtension bool) codec.EventBatchEncoder {
encoder := &JSONBatchEncoder{
builder: newCanalEntryBuilder(),
messageHolder: &canalJSONMessage{
// for Data field, no matter event type, always be filled with only one item.
Data: make([]map[string]interface{}, 1),
},
enableTiDBExtension: enableTiDBExtension,
messages: make([]*common.Message, 0, 1),
}
}

func (c *JSONBatchEncoder) newJSONMessageForDML(e *model.RowChangedEvent) (canalJSONMessageInterface, error) {
eventType := convertRowEventType(e)
header := c.builder.buildHeader(e.CommitTs, e.Table.Schema, e.Table.Table, eventType, 1)
rowData, err := c.builder.buildRowData(e)
if err != nil {
return nil, cerrors.WrapError(cerrors.ErrCanalEncodeFailed, err)
if enableTiDBExtension {
encoder.messageHolder = &canalJSONMessageWithTiDBExtension{
canalJSONMessage: encoder.messageHolder.(*canalJSONMessage),
Extensions: &tidbExtension{},
}
}

pkCols := e.PrimaryKeyColumns()
pkNames := make([]string, len(pkCols))
for i := range pkNames {
pkNames[i] = pkCols[i].Name
}
return encoder
}

var nonTrivialRow []*canal.Column
if e.IsDelete() {
nonTrivialRow = rowData.BeforeColumns
} else {
nonTrivialRow = rowData.AfterColumns
}
func (c *JSONBatchEncoder) newJSONMessageForDML(e *model.RowChangedEvent) error {
isDelete := e.IsDelete()
sqlTypeMap := make(map[string]int32, len(e.Columns))
mysqlTypeMap := make(map[string]string, len(e.Columns))

sqlType := make(map[string]int32, len(nonTrivialRow))
mysqlType := make(map[string]string, len(nonTrivialRow))
for i := range nonTrivialRow {
sqlType[nonTrivialRow[i].Name] = nonTrivialRow[i].SqlType
mysqlType[nonTrivialRow[i].Name] = nonTrivialRow[i].MysqlType
}
filling := func(columns []*model.Column, fillTypes bool) (map[string]interface{}, error) {
if len(columns) == 0 {
return nil, nil
}
data := make(map[string]interface{}, len(columns))
for _, col := range columns {
if col != nil {
mysqlType := getMySQLType(col)
javaType, err := getJavaSQLType(col, mysqlType)
if err != nil {
return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
}
value, err := c.builder.formatValue(col.Value, javaType)
if err != nil {
return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
}
if fillTypes {
sqlTypeMap[col.Name] = int32(javaType)
mysqlTypeMap[col.Name] = mysqlType
}

var (
data map[string]interface{}
oldData map[string]interface{}
)

if len(rowData.BeforeColumns) > 0 {
oldData = make(map[string]interface{}, len(rowData.BeforeColumns))
for i := range rowData.BeforeColumns {
if !rowData.BeforeColumns[i].GetIsNull() {
oldData[rowData.BeforeColumns[i].Name] = rowData.BeforeColumns[i].Value
} else {
oldData[rowData.BeforeColumns[i].Name] = nil
if col.Value == nil {
data[col.Name] = nil
} else {
data[col.Name] = value
}
}
}
return data, nil
}

if len(rowData.AfterColumns) > 0 {
data = make(map[string]interface{}, len(rowData.AfterColumns))
for i := range rowData.AfterColumns {
if !rowData.AfterColumns[i].GetIsNull() {
data[rowData.AfterColumns[i].Name] = rowData.AfterColumns[i].Value
} else {
data[rowData.AfterColumns[i].Name] = nil
}
}
oldData, err := filling(e.PreColumns, isDelete)
if err != nil {
return err
}

msg := &canalJSONMessage{
ID: 0, // ignored by both Canal Adapter and Flink
Schema: header.SchemaName,
Table: header.TableName,
PKNames: pkNames,
IsDDL: false,
EventType: header.GetEventType().String(),
ExecutionTime: header.ExecuteTime,
BuildTime: time.Now().UnixNano() / 1e6, // ignored by both Canal Adapter and Flink
Query: "",
SQLType: sqlType,
MySQLType: mysqlType,
Data: make([]map[string]interface{}, 0),
Old: nil,
tikvTs: e.CommitTs,
data, err := filling(e.Columns, !isDelete)
if err != nil {
return err
}

var baseMessage *canalJSONMessage
if !c.enableTiDBExtension {
baseMessage = c.messageHolder.(*canalJSONMessage)
} else {
baseMessage = c.messageHolder.(*canalJSONMessageWithTiDBExtension).canalJSONMessage
}

baseMessage.ID = 0 // ignored by both Canal Adapter and Flink
baseMessage.Schema = e.Table.Schema
baseMessage.Table = e.Table.Table
baseMessage.PKNames = e.PrimaryKeyColumnNames()
baseMessage.IsDDL = false
baseMessage.EventType = eventTypeString(e)
baseMessage.ExecutionTime = convertToCanalTs(e.CommitTs)
baseMessage.BuildTime = time.Now().UnixNano() / 1e6 // ignored by both Canal Adapter and Flink
baseMessage.Query = ""
baseMessage.SQLType = sqlTypeMap
baseMessage.MySQLType = mysqlTypeMap
baseMessage.Old = nil
baseMessage.tikvTs = e.CommitTs

if e.IsDelete() {
msg.Data = append(msg.Data, oldData)
baseMessage.Data[0] = oldData
} else if e.IsInsert() {
msg.Data = append(msg.Data, data)
baseMessage.Data[0] = data
} else if e.IsUpdate() {
msg.Old = []map[string]interface{}{oldData}
msg.Data = append(msg.Data, data)
baseMessage.Data[0] = data
baseMessage.Old = []map[string]interface{}{oldData}
} else {
log.Panic("unreachable event type", zap.Any("event", e))
}

if !c.enableTiDBExtension {
return msg, nil
if c.enableTiDBExtension {
c.messageHolder.(*canalJSONMessageWithTiDBExtension).Extensions.CommitTs = e.CommitTs
}

return &canalJSONMessageWithTiDBExtension{
canalJSONMessage: msg,
Extensions: &tidbExtension{CommitTs: e.CommitTs},
}, nil
return nil
}

func eventTypeString(e *model.RowChangedEvent) string {
if e.IsDelete() {
return "DELETE"
}
if len(e.PreColumns) == 0 {
return "INSERT"
}
return "UPDATE"
}

func (c *JSONBatchEncoder) newJSONMessageForDDL(e *model.DDLEvent) canalJSONMessageInterface {
header := c.builder.buildHeader(e.CommitTs, e.TableInfo.TableName.Schema, e.TableInfo.TableName.Table, convertDdlEventType(e), 1)
msg := &canalJSONMessage{
ID: 0, // ignored by both Canal Adapter and Flink
Schema: header.SchemaName,
Table: header.TableName,
Schema: e.TableInfo.TableName.Schema,
Table: e.TableInfo.TableName.Table,
IsDDL: true,
EventType: header.GetEventType().String(),
ExecutionTime: header.ExecuteTime,
EventType: convertDdlEventType(e).String(),
ExecutionTime: convertToCanalTs(e.CommitTs),
BuildTime: time.Now().UnixNano() / 1e6, // timestamp
Query: e.Query,
tikvTs: e.CommitTs,
Expand Down Expand Up @@ -188,7 +203,7 @@ func (c *JSONBatchEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, er
msg := c.newJSONMessage4CheckpointEvent(ts)
value, err := json.Marshal(msg)
if err != nil {
return nil, cerrors.WrapError(cerrors.ErrCanalEncodeFailed, err)
return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
}
return common.NewResolvedMsg(config.ProtocolCanalJSON, nil, value, ts), nil
}
Expand All @@ -200,53 +215,45 @@ func (c *JSONBatchEncoder) AppendRowChangedEvent(
e *model.RowChangedEvent,
callback func(),
) error {
message, err := c.newJSONMessageForDML(e)
if err != nil {
if err := c.newJSONMessageForDML(e); err != nil {
return errors.Trace(err)
}
c.messageBuf = append(c.messageBuf, message)
if callback != nil {
c.callbackBuf = append(c.callbackBuf, callback)

value, err := json.Marshal(c.messageHolder)
if err != nil {
log.Panic("JSONBatchEncoder", zap.Error(err))
return nil
}
m := common.NewMsg(config.ProtocolCanalJSON, nil, value, e.CommitTs,
model.MessageTypeRow, c.messageHolder.getSchema(), c.messageHolder.getTable())
m.IncRowsCount()
m.Callback = callback

c.messages = append(c.messages, m)
return nil
}

// Build implements the EventJSONBatchEncoder interface
func (c *JSONBatchEncoder) Build() []*common.Message {
if len(c.messages) == 0 {
return nil
}

result := c.messages
c.messages = c.messages[:0]
return result
}

// EncodeDDLEvent encodes DDL events
func (c *JSONBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*common.Message, error) {
message := c.newJSONMessageForDDL(e)
value, err := json.Marshal(message)
if err != nil {
return nil, cerrors.WrapError(cerrors.ErrCanalEncodeFailed, err)
return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
}
return common.NewDDLMsg(config.ProtocolCanalJSON, nil, value, e), nil
}

// Build implements the EventJSONBatchEncoder interface
func (c *JSONBatchEncoder) Build() []*common.Message {
if len(c.messageBuf) == 0 {
return nil
}
ret := make([]*common.Message, len(c.messageBuf))
for i, msg := range c.messageBuf {
value, err := json.Marshal(msg)
if err != nil {
log.Panic("JSONBatchEncoder", zap.Error(err))
return nil
}
m := common.NewMsg(config.ProtocolCanalJSON, nil, value, msg.getTikvTs(), model.MessageTypeRow, msg.getSchema(), msg.getTable())
m.IncRowsCount()
ret[i] = m
}
c.messageBuf = make([]canalJSONMessageInterface, 0)
if len(c.callbackBuf) != 0 && len(c.callbackBuf) == len(ret) {
for i, c := range c.callbackBuf {
ret[i].Callback = c
}
c.callbackBuf = make([]func(), 0)
}
return ret
}

type jsonBatchEncoderBuilder struct {
config *common.Config
}
Expand All @@ -258,8 +265,5 @@ func NewJSONBatchEncoderBuilder(config *common.Config) codec.EncoderBuilder {

// Build a `JSONBatchEncoder`
func (b *jsonBatchEncoderBuilder) Build() codec.EventBatchEncoder {
encoder := newJSONBatchEncoder()
encoder.(*JSONBatchEncoder).enableTiDBExtension = b.config.EnableTiDBExtension

return encoder
return newJSONBatchEncoder(b.config.EnableTiDBExtension)
}
Loading

0 comments on commit c66124b

Please sign in to comment.