Skip to content

Commit

Permalink
kafka(ticdc): Revert "kafka(ticdc): claim check support large message…
Browse files Browse the repository at this point in the history
… raw value for… (#11494)

close #11493
  • Loading branch information
3AceShowHand authored Aug 20, 2024
1 parent 05baaf4 commit 4825188
Show file tree
Hide file tree
Showing 13 changed files with 182 additions and 284 deletions.
3 changes: 0 additions & 3 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,6 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
LargeMessageHandleOption: oldConfig.LargeMessageHandleOption,
LargeMessageHandleCompression: oldConfig.LargeMessageHandleCompression,
ClaimCheckStorageURI: oldConfig.ClaimCheckStorageURI,
ClaimCheckRawValue: oldConfig.ClaimCheckRawValue,
}
}

Expand Down Expand Up @@ -623,7 +622,6 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
LargeMessageHandleOption: oldConfig.LargeMessageHandleOption,
LargeMessageHandleCompression: oldConfig.LargeMessageHandleCompression,
ClaimCheckStorageURI: oldConfig.ClaimCheckStorageURI,
ClaimCheckRawValue: oldConfig.ClaimCheckRawValue,
}
}

Expand Down Expand Up @@ -991,7 +989,6 @@ type LargeMessageHandleConfig struct {
LargeMessageHandleOption string `json:"large_message_handle_option"`
LargeMessageHandleCompression string `json:"large_message_handle_compression"`
ClaimCheckStorageURI string `json:"claim_check_storage_uri"`
ClaimCheckRawValue bool `json:"claim_check_raw_value"`
}

// DispatchRule represents partition rule for a table
Expand Down
3 changes: 1 addition & 2 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,7 @@ const (
"large-message-handle": {
"large-message-handle-option": "handle-key-only",
"large-message-handle-compression": "",
"claim-check-storage-uri": "",
"claim-check-raw-value": false
"claim-check-storage-uri": ""
},
"glue-schema-registry-config": {
"region":"region",
Expand Down
5 changes: 0 additions & 5 deletions pkg/config/large_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ type LargeMessageHandleConfig struct {
LargeMessageHandleOption string `toml:"large-message-handle-option" json:"large-message-handle-option"`
LargeMessageHandleCompression string `toml:"large-message-handle-compression" json:"large-message-handle-compression"`
ClaimCheckStorageURI string `toml:"claim-check-storage-uri" json:"claim-check-storage-uri"`
ClaimCheckRawValue bool `toml:"claim-check-raw-value" json:"claim-check-raw-value"`
}

// NewDefaultLargeMessageHandleConfig return the default Config.
Expand Down Expand Up @@ -81,10 +80,6 @@ func (c *LargeMessageHandleConfig) AdjustAndValidate(protocol Protocol, enableTi
return cerror.ErrInvalidReplicaConfig.GenWithStack(
"large message handle is set to claim-check, but the claim-check-storage-uri is empty")
}
if c.ClaimCheckRawValue && protocol == ProtocolOpen {
return cerror.ErrInvalidReplicaConfig.GenWithStack(
"large message handle is set to claim-check, raw value is not supported for the open protocol")
}
}

return nil
Expand Down
139 changes: 28 additions & 111 deletions pkg/config/large_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestLargeMessageHandle4NotSupportedProtocol(t *testing.T) {
require.ErrorIs(t, err, cerror.ErrInvalidReplicaConfig)
}

func TestHandleKeyOnly4CanalJSON(t *testing.T) {
func TestLargeMessageHandle4CanalJSON(t *testing.T) {
t.Parallel()

// large-message-handle not set, always no error
Expand All @@ -68,45 +68,27 @@ func TestHandleKeyOnly4CanalJSON(t *testing.T) {
require.NoError(t, err)
require.True(t, largeMessageHandle.Disabled())

largeMessageHandle.LargeMessageHandleOption = LargeMessageHandleOptionHandleKeyOnly

// `enable-tidb-extension` is false, return error
err = largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false)
require.ErrorIs(t, err, cerror.ErrInvalidReplicaConfig)

// `enable-tidb-extension` is true, no error
err = largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, true)
require.NoError(t, err)
require.Equal(t, LargeMessageHandleOptionHandleKeyOnly, largeMessageHandle.LargeMessageHandleOption)
}

func TestClaimCheck4CanalJSON(t *testing.T) {
t.Parallel()
for _, option := range []string{
LargeMessageHandleOptionHandleKeyOnly,
LargeMessageHandleOptionClaimCheck,
} {
largeMessageHandle.LargeMessageHandleOption = option
if option == LargeMessageHandleOptionClaimCheck {
largeMessageHandle.ClaimCheckStorageURI = "file:///tmp/claim-check"
}

// large-message-handle not set, always no error
largeMessageHandle := NewDefaultLargeMessageHandleConfig()

err := largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false)
require.NoError(t, err)
require.True(t, largeMessageHandle.Disabled())

largeMessageHandle.LargeMessageHandleOption = LargeMessageHandleOptionClaimCheck
largeMessageHandle.ClaimCheckStorageURI = "file:///tmp/claim-check"

for _, rawValue := range []bool{false, true} {
largeMessageHandle.ClaimCheckRawValue = rawValue
// `enable-tidb-extension` is false, return error
err = largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false)
err := largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false)
require.ErrorIs(t, err, cerror.ErrInvalidReplicaConfig)

// `enable-tidb-extension` is true, no error
err = largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, true)
require.NoError(t, err)
require.Equal(t, LargeMessageHandleOptionClaimCheck, largeMessageHandle.LargeMessageHandleOption)
require.Equal(t, option, largeMessageHandle.LargeMessageHandleOption)
}
}

func TestHandleKeyOnly4OpenProtocol(t *testing.T) {
func TestLargeMessageHandle4OpenProtocol(t *testing.T) {
t.Parallel()

// large-message-handle not set, always no error
Expand All @@ -116,88 +98,23 @@ func TestHandleKeyOnly4OpenProtocol(t *testing.T) {
require.NoError(t, err)
require.True(t, largeMessageHandle.Disabled())

largeMessageHandle.LargeMessageHandleOption = LargeMessageHandleOptionHandleKeyOnly
// `enable-tidb-extension` is false, return error
err = largeMessageHandle.AdjustAndValidate(ProtocolOpen, false)
require.NoError(t, err)

// `enable-tidb-extension` is true, no error
err = largeMessageHandle.AdjustAndValidate(ProtocolOpen, true)
require.NoError(t, err)
require.Equal(t, LargeMessageHandleOptionHandleKeyOnly, largeMessageHandle.LargeMessageHandleOption)
}

func TestClaimCheck4OpenProtocol(t *testing.T) {
t.Parallel()

// large-message-handle not set, always no error
largeMessageHandle := NewDefaultLargeMessageHandleConfig()

err := largeMessageHandle.AdjustAndValidate(ProtocolOpen, false)
require.NoError(t, err)
require.True(t, largeMessageHandle.Disabled())

largeMessageHandle.LargeMessageHandleOption = LargeMessageHandleOptionClaimCheck
largeMessageHandle.ClaimCheckStorageURI = "file:///tmp/claim-check"

// `enable-tidb-extension` is false, return error
err = largeMessageHandle.AdjustAndValidate(ProtocolOpen, false)
require.NoError(t, err)

// `enable-tidb-extension` is true, no error
err = largeMessageHandle.AdjustAndValidate(ProtocolOpen, true)
require.NoError(t, err)
require.Equal(t, LargeMessageHandleOptionClaimCheck, largeMessageHandle.LargeMessageHandleOption)

largeMessageHandle.ClaimCheckRawValue = true
err = largeMessageHandle.AdjustAndValidate(ProtocolOpen, true)
require.ErrorIs(t, err, cerror.ErrInvalidReplicaConfig)
}

func TestHandleKeyOnly4SimpleProtocol(t *testing.T) {
t.Parallel()

// large-message-handle not set, always no error
largeMessageHandle := NewDefaultLargeMessageHandleConfig()

err := largeMessageHandle.AdjustAndValidate(ProtocolSimple, false)
require.NoError(t, err)
require.True(t, largeMessageHandle.Disabled())

largeMessageHandle.LargeMessageHandleOption = LargeMessageHandleOptionHandleKeyOnly
// `enable-tidb-extension` is false, return error
err = largeMessageHandle.AdjustAndValidate(ProtocolSimple, false)
require.NoError(t, err)

// `enable-tidb-extension` is true, no error
err = largeMessageHandle.AdjustAndValidate(ProtocolSimple, true)
require.NoError(t, err)
require.Equal(t, LargeMessageHandleOptionHandleKeyOnly, largeMessageHandle.LargeMessageHandleOption)
}
for _, o := range []string{
LargeMessageHandleOptionHandleKeyOnly,
LargeMessageHandleOptionClaimCheck,
} {
largeMessageHandle.LargeMessageHandleOption = o
if o == LargeMessageHandleOptionClaimCheck {
largeMessageHandle.ClaimCheckStorageURI = "file:///tmp/claim-check"
}

func TestClaimCheck4SimpleProtocol(t *testing.T) {
t.Parallel()

// large-message-handle not set, always no error
largeMessageHandle := NewDefaultLargeMessageHandleConfig()

err := largeMessageHandle.AdjustAndValidate(ProtocolSimple, false)
require.NoError(t, err)
require.True(t, largeMessageHandle.Disabled())

largeMessageHandle.LargeMessageHandleOption = LargeMessageHandleOptionClaimCheck
largeMessageHandle.ClaimCheckStorageURI = "file:///tmp/claim-check"

// `enable-tidb-extension` is false, return error
err = largeMessageHandle.AdjustAndValidate(ProtocolSimple, false)
require.NoError(t, err)
// `enable-tidb-extension` is false, return error
err := largeMessageHandle.AdjustAndValidate(ProtocolOpen, false)
require.NoError(t, err)

// `enable-tidb-extension` is true, no error
err = largeMessageHandle.AdjustAndValidate(ProtocolSimple, true)
require.NoError(t, err)
require.Equal(t, LargeMessageHandleOptionClaimCheck, largeMessageHandle.LargeMessageHandleOption)
// `enable-tidb-extension` is true, no error
err = largeMessageHandle.AdjustAndValidate(ProtocolOpen, true)
require.NoError(t, err)
require.Equal(t, o, largeMessageHandle.LargeMessageHandleOption)

largeMessageHandle.ClaimCheckRawValue = true
err = largeMessageHandle.AdjustAndValidate(ProtocolSimple, true)
require.NoError(t, err)
}
}
12 changes: 4 additions & 8 deletions pkg/sink/codec/canal/canal_json_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,12 @@ func (b *batchDecoder) assembleClaimCheckRowChangedEvent(ctx context.Context, cl
if err != nil {
return nil, err
}

if !b.config.LargeMessageHandle.ClaimCheckRawValue {
claimCheckM, err := common.UnmarshalClaimCheckMessage(data)
if err != nil {
return nil, err
}
data = claimCheckM.Value
claimCheckM, err := common.UnmarshalClaimCheckMessage(data)
if err != nil {
return nil, err
}

value, err := common.Decompress(b.config.LargeMessageHandle.LargeMessageHandleCompression, data)
value, err := common.Decompress(b.config.LargeMessageHandle.LargeMessageHandleCompression, claimCheckM.Value)
if err != nil {
return nil, err
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/sink/codec/canal/canal_json_row_event_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,9 +534,15 @@ type jsonRowEventEncoderBuilder struct {

// NewJSONRowEventEncoderBuilder creates a canal-json batchEncoderBuilder.
func NewJSONRowEventEncoderBuilder(ctx context.Context, config *common.Config) (codec.RowEventEncoderBuilder, error) {
claimCheck, err := claimcheck.New(ctx, config.LargeMessageHandle, config.ChangefeedID)
if err != nil {
return nil, errors.Trace(err)
var (
claimCheck *claimcheck.ClaimCheck
err error
)
if config.LargeMessageHandle.EnableClaimCheck() {
claimCheck, err = claimcheck.New(ctx, config.LargeMessageHandle.ClaimCheckStorageURI, config.ChangefeedID)
if err != nil {
return nil, errors.Trace(err)
}
}
return &jsonRowEventEncoderBuilder{
config: config,
Expand Down
68 changes: 32 additions & 36 deletions pkg/sink/codec/canal/canal_json_row_event_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,50 +214,46 @@ func TestCanalJSONClaimCheckE2E(t *testing.T) {
codecConfig.MaxMessageBytes = 500
ctx := context.Background()

for _, rawValue := range []bool{false, true} {
codecConfig.LargeMessageHandle.ClaimCheckRawValue = rawValue

builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
encoder := builder.Build()
builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
encoder := builder.Build()

_, insertEvent, _, _ := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig())
err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, func() {})
require.NoError(t, err)
_, insertEvent, _, _ := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig())
err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, func() {})
require.NoError(t, err)

// this is a large message, should be delivered to the external storage.
claimCheckLocationMessage := encoder.Build()[0]
// this is a large message, should be delivered to the external storage.
claimCheckLocationMessage := encoder.Build()[0]

decoder, err := NewBatchDecoder(ctx, codecConfig, nil)
require.NoError(t, err)
decoder, err := NewBatchDecoder(ctx, codecConfig, nil)
require.NoError(t, err)

err = decoder.AddKeyValue(claimCheckLocationMessage.Key, claimCheckLocationMessage.Value)
require.NoError(t, err)
err = decoder.AddKeyValue(claimCheckLocationMessage.Key, claimCheckLocationMessage.Value)
require.NoError(t, err)

messageType, ok, err := decoder.HasNext()
require.NoError(t, err)
require.Equal(t, messageType, model.MessageTypeRow)
require.True(t, ok)
messageType, ok, err := decoder.HasNext()
require.NoError(t, err)
require.Equal(t, messageType, model.MessageTypeRow)
require.True(t, ok)

decodedLargeEvent, err := decoder.NextRowChangedEvent()
require.NoError(t, err, rawValue)
decodedLargeEvent, err := decoder.NextRowChangedEvent()
require.NoError(t, err)

require.Equal(t, insertEvent.CommitTs, decodedLargeEvent.CommitTs)
require.Equal(t, insertEvent.TableInfo.GetSchemaName(), decodedLargeEvent.TableInfo.GetSchemaName())
require.Equal(t, insertEvent.TableInfo.GetTableName(), decodedLargeEvent.TableInfo.GetTableName())
require.Nil(t, nil, decodedLargeEvent.PreColumns)
require.Equal(t, insertEvent.CommitTs, decodedLargeEvent.CommitTs)
require.Equal(t, insertEvent.TableInfo.GetSchemaName(), decodedLargeEvent.TableInfo.GetSchemaName())
require.Equal(t, insertEvent.TableInfo.GetTableName(), decodedLargeEvent.TableInfo.GetTableName())
require.Nil(t, nil, decodedLargeEvent.PreColumns)

decodedColumns := make(map[string]*model.ColumnData, len(decodedLargeEvent.Columns))
for _, column := range decodedLargeEvent.Columns {
colName := decodedLargeEvent.TableInfo.ForceGetColumnName(column.ColumnID)
decodedColumns[colName] = column
}
for _, col := range insertEvent.Columns {
colName := insertEvent.TableInfo.ForceGetColumnName(col.ColumnID)
decoded, ok := decodedColumns[colName]
require.True(t, ok)
require.EqualValues(t, col.Value, decoded.Value)
}
decodedColumns := make(map[string]*model.ColumnData, len(decodedLargeEvent.Columns))
for _, column := range decodedLargeEvent.Columns {
colName := decodedLargeEvent.TableInfo.ForceGetColumnName(column.ColumnID)
decodedColumns[colName] = column
}
for _, col := range insertEvent.Columns {
colName := insertEvent.TableInfo.ForceGetColumnName(col.ColumnID)
decoded, ok := decodedColumns[colName]
require.True(t, ok)
require.EqualValues(t, col.Value, decoded.Value)
}
}

Expand Down
12 changes: 9 additions & 3 deletions pkg/sink/codec/open/open_protocol_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,15 @@ func (b *batchEncoderBuilder) CleanMetrics() {
func NewBatchEncoderBuilder(
ctx context.Context, config *common.Config,
) (codec.RowEventEncoderBuilder, error) {
claimCheck, err := claimcheck.New(ctx, config.LargeMessageHandle, config.ChangefeedID)
if err != nil {
return nil, errors.Trace(err)
var (
claimCheck *claimcheck.ClaimCheck
err error
)
if config.LargeMessageHandle.EnableClaimCheck() {
claimCheck, err = claimcheck.New(ctx, config.LargeMessageHandle.ClaimCheckStorageURI, config.ChangefeedID)
if err != nil {
return nil, errors.Trace(err)
}
}
return &batchEncoderBuilder{
config: config,
Expand Down
12 changes: 4 additions & 8 deletions pkg/sink/codec/simple/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,12 @@ func (d *Decoder) assembleClaimCheckRowChangedEvent(claimCheckLocation string) (
if err != nil {
return nil, err
}

if !d.config.LargeMessageHandle.ClaimCheckRawValue {
claimCheckM, err := common.UnmarshalClaimCheckMessage(data)
if err != nil {
return nil, err
}
data = claimCheckM.Value
claimCheckM, err := common.UnmarshalClaimCheckMessage(data)
if err != nil {
return nil, err
}

value, err := common.Decompress(d.config.LargeMessageHandle.LargeMessageHandleCompression, data)
value, err := common.Decompress(d.config.LargeMessageHandle.LargeMessageHandleCompression, claimCheckM.Value)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 4825188

Please sign in to comment.