Skip to content

Commit

Permalink
sink(ticdc): add an index file in storage sink to quickly find the la…
Browse files Browse the repository at this point in the history
…rgest file number (#8406)

close #8256
  • Loading branch information
zhaoxinyu authored Mar 13, 2023
1 parent 92e3215 commit 36bb8e9
Show file tree
Hide file tree
Showing 13 changed files with 743 additions and 229 deletions.
20 changes: 7 additions & 13 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,6 @@ const (
// Assert EventSink[E event.TableEvent] implementation
var _ dmlsink.EventSink[*model.SingleTableTxn] = (*DMLSink)(nil)

// versionedTable is used to wrap TableName with a version
type versionedTable struct {
model.TableName
version uint64
}

// eventFragment is used to attach a sequence number to TxnCallbackableEvent.
// The sequence number is mainly useful for TxnCallbackableEvent defragmentation.
// e.g. TxnCallbackableEvent 1~5 are dispatched to a group of encoding workers, but the
Expand All @@ -59,9 +53,9 @@ type versionedTable struct {
// at dmlWorker sequentially.
type eventFragment struct {
// event sequence number
seqNumber uint64
versionedTable
event *dmlsink.TxnCallbackableEvent
seqNumber uint64
versionedTable cloudstorage.VersionedTable
event *dmlsink.TxnCallbackableEvent
// encodedMsgs denote the encoded messages after the event is handled in encodingWorker.
encodedMsgs []*common.Message
}
Expand Down Expand Up @@ -121,7 +115,7 @@ func NewDMLSink(ctx context.Context,
}
encoderBuilder, err := builder.NewEventBatchEncoderBuilder(ctx, encoderConfig)
if err != nil {
return nil, cerror.WrapError(cerror.ErrCloudStorageInvalidConfig, err)
return nil, cerror.WrapError(cerror.ErrStorageSinkInvalidConfig, err)
}

s.changefeedID = contextutil.ChangefeedIDFromCtx(ctx)
Expand Down Expand Up @@ -169,7 +163,7 @@ func (s *DMLSink) run(ctx context.Context) error {

// WriteEvents write events to cloud storage sink.
func (s *DMLSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTableTxn]) error {
var tbl versionedTable
var tbl cloudstorage.VersionedTable

for _, txn := range txns {
if txn.GetTableSinkState() != state.TableSinkSinking {
Expand All @@ -179,9 +173,9 @@ func (s *DMLSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa
continue
}

tbl = versionedTable{
tbl = cloudstorage.VersionedTable{
TableName: txn.Event.TableInfo.TableName,
version: txn.Event.TableInfo.Version,
Version: txn.Event.TableInfo.Version,
}
seq := atomic.AddUint64(&s.lastSeqNum, 1)
// emit a TxnCallbackableEvent encoupled with a sequence number starting from one.
Expand Down
213 changes: 191 additions & 22 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,18 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/config"
"github.com/stretchr/testify/require"
)

func TestCloudStorageWriteEvents(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?flush-interval=2s", parentDir)
sinkURI, err := url.Parse(uri)
require.Nil(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.Protocol = config.ProtocolOpen.String()

errCh := make(chan error, 5)
s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)

func generateTxnEvents(
cnt *uint64,
batch int,
tableStatus *state.TableSinkState,
) []*dmlsink.TxnCallbackableEvent {
// assume we have a large transaction and it is splitted into 10 small transactions
txns := make([]*dmlsink.TxnCallbackableEvent, 0, 10)
var cnt uint64 = 0
batch := 100
tableStatus := state.TableSinkSinking

for i := 0; i < 10; i++ {
txn := &dmlsink.TxnCallbackableEvent{
Expand All @@ -71,9 +60,9 @@ func TestCloudStorageWriteEvents(t *testing.T) {
},
},
Callback: func() {
atomic.AddUint64(&cnt, uint64(batch))
atomic.AddUint64(cnt, uint64(batch))
},
SinkState: &tableStatus,
SinkState: tableStatus,
}
for j := 0; j < batch; j++ {
row := &model.RowChangedEvent{
Expand All @@ -89,11 +78,107 @@ func TestCloudStorageWriteEvents(t *testing.T) {
}
txns = append(txns, txn)
}

return txns
}

func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?flush-interval=2s", parentDir)
sinkURI, err := url.Parse(uri)
require.Nil(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.Protocol = config.ProtocolOpen.String()

errCh := make(chan error, 5)
s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)
var cnt uint64 = 0
batch := 100
tableStatus := state.TableSinkSinking

// generating one dml file.
txns := generateTxnEvents(&cnt, batch, &tableStatus)
tableDir := path.Join(parentDir, "test/table1/33")
os.MkdirAll(tableDir, 0o755)
err = s.WriteEvents(txns...)
require.Nil(t, err)
time.Sleep(4 * time.Second)
time.Sleep(3 * time.Second)

files, err := os.ReadDir(tableDir)
require.Nil(t, err)
require.Len(t, files, 3)
var fileNames []string
for _, f := range files {
fileNames = append(fileNames, f.Name())
}
require.ElementsMatch(t, []string{"CDC000001.json", "schema.json", "CDC.index"}, fileNames)
content, err := os.ReadFile(path.Join(tableDir, "CDC000001.json"))
require.Nil(t, err)
require.Greater(t, len(content), 0)

content, err = os.ReadFile(path.Join(tableDir, "CDC.index"))
require.Nil(t, err)
require.Equal(t, "CDC000001.json\n", string(content))
require.Equal(t, uint64(1000), atomic.LoadUint64(&cnt))

// generating another dml file.
err = s.WriteEvents(txns...)
require.Nil(t, err)
time.Sleep(3 * time.Second)

files, err = os.ReadDir(tableDir)
require.Nil(t, err)
require.Len(t, files, 4)
fileNames = nil
for _, f := range files {
fileNames = append(fileNames, f.Name())
}
require.ElementsMatch(t, []string{
"CDC000001.json", "CDC000002.json",
"schema.json", "CDC.index",
}, fileNames)
content, err = os.ReadFile(path.Join(tableDir, "CDC000002.json"))
require.Nil(t, err)
require.Greater(t, len(content), 0)

content, err = os.ReadFile(path.Join(tableDir, "CDC.index"))
require.Nil(t, err)
require.Equal(t, "CDC000002.json\n", string(content))
require.Equal(t, uint64(2000), atomic.LoadUint64(&cnt))

cancel()
s.Close()
}

func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?flush-interval=2s", parentDir)
sinkURI, err := url.Parse(uri)
require.Nil(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.Protocol = config.ProtocolOpen.String()
replicaConfig.Sink.DateSeparator = config.DateSeparatorDay.String()

errCh := make(chan error, 5)
s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)
mockClock := clock.NewMock()
s.writer.setClock(mockClock)

var cnt uint64 = 0
batch := 100
tableStatus := state.TableSinkSinking

mockClock.Set(time.Date(2023, 3, 8, 23, 59, 58, 0, time.UTC))
txns := generateTxnEvents(&cnt, batch, &tableStatus)
tableDir := path.Join(parentDir, "test/table1/33/2023-03-08")
err = s.WriteEvents(txns...)
require.Nil(t, err)
time.Sleep(3 * time.Second)

files, err := os.ReadDir(tableDir)
require.Nil(t, err)
Expand All @@ -102,12 +187,96 @@ func TestCloudStorageWriteEvents(t *testing.T) {
for _, f := range files {
fileNames = append(fileNames, f.Name())
}
require.ElementsMatch(t, []string{"CDC000001.json", "schema.json"}, fileNames)
require.ElementsMatch(t, []string{"CDC000001.json", "CDC.index"}, fileNames)
content, err := os.ReadFile(path.Join(tableDir, "CDC000001.json"))
require.Nil(t, err)
require.Greater(t, len(content), 0)

content, err = os.ReadFile(path.Join(tableDir, "CDC.index"))
require.Nil(t, err)
require.Equal(t, "CDC000001.json\n", string(content))
require.Equal(t, uint64(1000), atomic.LoadUint64(&cnt))

// test date (day) is NOT changed.
mockClock.Set(time.Date(2023, 3, 8, 23, 59, 59, 0, time.UTC))
s.writer.setClock(mockClock)
err = s.WriteEvents(txns...)
require.Nil(t, err)
time.Sleep(3 * time.Second)

files, err = os.ReadDir(tableDir)
require.Nil(t, err)
require.Len(t, files, 3)
fileNames = nil
for _, f := range files {
fileNames = append(fileNames, f.Name())
}
require.ElementsMatch(t, []string{"CDC000001.json", "CDC000002.json", "CDC.index"}, fileNames)
content, err = os.ReadFile(path.Join(tableDir, "CDC000002.json"))
require.Nil(t, err)
require.Greater(t, len(content), 0)

content, err = os.ReadFile(path.Join(tableDir, "CDC.index"))
require.Nil(t, err)
require.Equal(t, "CDC000002.json\n", string(content))
require.Equal(t, uint64(2000), atomic.LoadUint64(&cnt))

// test date (day) is changed.
mockClock.Set(time.Date(2023, 3, 9, 0, 0, 10, 0, time.UTC))
s.writer.setClock(mockClock)
err = s.WriteEvents(txns...)
require.Nil(t, err)
time.Sleep(3 * time.Second)

tableDir = path.Join(parentDir, "test/table1/33/2023-03-09")
files, err = os.ReadDir(tableDir)
require.Nil(t, err)
require.Len(t, files, 2)
fileNames = nil
for _, f := range files {
fileNames = append(fileNames, f.Name())
}
require.ElementsMatch(t, []string{"CDC000001.json", "CDC.index"}, fileNames)
content, err = os.ReadFile(path.Join(tableDir, "CDC000001.json"))
require.Nil(t, err)
require.Greater(t, len(content), 0)

content, err = os.ReadFile(path.Join(tableDir, "CDC.index"))
require.Nil(t, err)
require.Equal(t, "CDC000001.json\n", string(content))
require.Equal(t, uint64(3000), atomic.LoadUint64(&cnt))
cancel()
s.Close()

// test table is scheduled from one node to another
cnt = 0
ctx, cancel = context.WithCancel(context.Background())
s, err = NewDMLSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)
mockClock = clock.NewMock()
mockClock.Set(time.Date(2023, 3, 9, 0, 1, 10, 0, time.UTC))
s.writer.setClock(mockClock)
err = s.WriteEvents(txns...)
require.Nil(t, err)
time.Sleep(3 * time.Second)

files, err = os.ReadDir(tableDir)
require.Nil(t, err)
require.Len(t, files, 3)
fileNames = nil
for _, f := range files {
fileNames = append(fileNames, f.Name())
}
require.ElementsMatch(t, []string{"CDC000001.json", "CDC000002.json", "CDC.index"}, fileNames)
content, err = os.ReadFile(path.Join(tableDir, "CDC000002.json"))
require.Nil(t, err)
require.Greater(t, len(content), 0)

content, err = os.ReadFile(path.Join(tableDir, "CDC.index"))
require.Nil(t, err)
require.Equal(t, "CDC000002.json\n", string(content))
require.Equal(t, uint64(1000), atomic.LoadUint64(&cnt))

cancel()
s.Close()
}
3 changes: 2 additions & 1 deletion cdc/sink/dmlsink/cloudstorage/defragmenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/builder"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -54,7 +55,7 @@ func TestDeframenter(t *testing.T) {
go func(seq uint64) {
encoder := encoderBuilder.Build()
frag := eventFragment{
versionedTable: versionedTable{
versionedTable: cloudstorage.VersionedTable{
TableName: model.TableName{
Schema: "test",
Table: "table1",
Expand Down
Loading

0 comments on commit 36bb8e9

Please sign in to comment.