Skip to content

Commit

Permalink
Merge branch 'master' into savepoint-test
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Oct 18, 2022
2 parents db013c6 + b3d1782 commit bd83f8d
Show file tree
Hide file tree
Showing 7 changed files with 420 additions and 51 deletions.
102 changes: 102 additions & 0 deletions cdc/sink/codec/csv/csv_decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package csv

import (
"context"

"github.com/pingcap/errors"
lconfig "github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/worker"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
)

const defaultIOConcurrency = 1

type batchDecoder struct {
csvConfig *config.CSVConfig
parser *mydump.CSVParser
data []byte
msg *csvMessage
closed bool
}

// NewBatchDecoder creates a new BatchDecoder
func NewBatchDecoder(ctx context.Context, csvConfig *config.CSVConfig, value []byte) (codec.EventBatchDecoder, error) {
var backslashEscape bool

// if quote is not set in config, we should unespace backslash
// when parsing csv columns.
if len(csvConfig.Quote) == 0 {
backslashEscape = true
}
cfg := &lconfig.CSVConfig{
Separator: csvConfig.Delimiter,
Delimiter: csvConfig.Quote,
Terminator: csvConfig.Terminator,
Null: csvConfig.NullString,
BackslashEscape: backslashEscape,
}
csvParser, err := mydump.NewCSVParser(ctx, cfg,
mydump.NewStringReader(string(value)),
int64(lconfig.ReadBlockSize),
worker.NewPool(ctx, defaultIOConcurrency, "io"), false, nil)
if err != nil {
return nil, err
}
return &batchDecoder{
csvConfig: csvConfig,
data: value,
msg: newCSVMessage(csvConfig),
parser: csvParser,
}, nil
}

// HasNext implements the EventBatchDecoder interface.
func (b *batchDecoder) HasNext() (model.MessageType, bool, error) {
err := b.parser.ReadRow()
if err != nil {
b.closed = true
return model.MessageTypeUnknown, false, err
}

row := b.parser.LastRow()
if err = b.msg.decode(row.Row); err != nil {
return model.MessageTypeUnknown, false, errors.Trace(err)
}

return model.MessageTypeRow, true, nil
}

// NextResolvedEvent implements the EventBatchDecoder interface.
func (b *batchDecoder) NextResolvedEvent() (uint64, error) {
return 0, nil
}

// NextRowChangedEvent implements the EventBatchDecoder interface.
func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
if b.closed {
return nil, cerror.WrapError(cerror.ErrCSVDecodeFailed, errors.New("no csv row can be found"))
}
return csvMsg2RowChangedEvent(b.msg), nil
}

// NextDDLEvent implements the EventBatchDecoder interface.
func (b *batchDecoder) NextDDLEvent() (*model.DDLEvent, error) {
return nil, nil
}
53 changes: 53 additions & 0 deletions cdc/sink/codec/csv/csv_decoder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package csv

import (
"context"
"testing"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/stretchr/testify/require"
)

func TestCSVBatchDecoder(t *testing.T) {
csvData := `"I","employee","hr",433305438660591626,101,"Smith","Bob","2014-06-04","New York"
"U","employee","hr",433305438660591627,101,"Smith","Bob","2015-10-08","Los Angeles"
"D","employee","hr",433305438660591629,101,"Smith","Bob","2017-03-13","Dallas"
"I","employee","hr",433305438660591630,102,"Alex","Alice","2017-03-14","Shanghai"
"U","employee","hr",433305438660591630,102,"Alex","Alice","2018-06-15","Beijing"
`
ctx := context.Background()
decoder, err := NewBatchDecoder(ctx, &config.CSVConfig{
Delimiter: ",",
Quote: "\"",
Terminator: "\n",
NullString: "\\N",
IncludeCommitTs: true,
}, []byte(csvData))
require.Nil(t, err)

for i := 0; i < 5; i++ {
tp, hasNext, err := decoder.HasNext()
require.Nil(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeRow, tp)
event, err := decoder.NextRowChangedEvent()
require.Nil(t, err)
require.NotNil(t, event)
}

_, hasNext, _ := decoder.HasNext()
require.False(t, hasNext)
}
4 changes: 2 additions & 2 deletions cdc/sink/codec/csv/csv_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func (b *BatchEncoder) AppendRowChangedEvent(
errors.New("no csv config provided"))
}

row, err := buildRowData(b.csvConfig, e)
row, err := rowChangedEvent2CSVMsg(b.csvConfig, e)
if err != nil {
return err
}
b.valueBuf.Write(row)
b.valueBuf.Write(row.encode())
b.batchSize++
if callback != nil {
b.callbackBuf = append(b.callbackBuf, callback)
Expand Down
Loading

0 comments on commit bd83f8d

Please sign in to comment.