From 36bb8e9ecfa68f204065f3b4466ff1c78a37b33b Mon Sep 17 00:00:00 2001 From: zhaoxinyu Date: Mon, 13 Mar 2023 17:44:39 +0800 Subject: [PATCH] sink(ticdc): add an index file in storage sink to quickly find the largest file number (#8406) close pingcap/tiflow#8256 --- .../cloudstorage/cloud_storage_dml_sink.go | 20 +- .../cloud_storage_dml_sink_test.go | 213 ++++++++++++++++-- .../dmlsink/cloudstorage/defragmenter_test.go | 3 +- cdc/sink/dmlsink/cloudstorage/dml_worker.go | 164 ++++++-------- .../dmlsink/cloudstorage/dml_worker_test.go | 83 +------ cdc/sink/dmlsink/cloudstorage/dml_writer.go | 14 +- .../cloudstorage/encoding_worker_test.go | 5 +- cmd/storage-consumer/main.go | 9 +- errors.toml | 17 +- pkg/errors/cdc_errors.go | 12 +- pkg/sink/cloudstorage/config.go | 14 +- pkg/sink/cloudstorage/path.go | 209 +++++++++++++++++ pkg/sink/cloudstorage/path_test.go | 209 +++++++++++++++++ 13 files changed, 743 insertions(+), 229 deletions(-) create mode 100644 pkg/sink/cloudstorage/path.go create mode 100644 pkg/sink/cloudstorage/path_test.go diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go index 540fea9005c..06496443c3a 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go @@ -45,12 +45,6 @@ const ( // Assert EventSink[E event.TableEvent] implementation var _ dmlsink.EventSink[*model.SingleTableTxn] = (*DMLSink)(nil) -// versionedTable is used to wrap TableName with a version -type versionedTable struct { - model.TableName - version uint64 -} - // eventFragment is used to attach a sequence number to TxnCallbackableEvent. // The sequence number is mainly useful for TxnCallbackableEvent defragmentation. // e.g. TxnCallbackableEvent 1~5 are dispatched to a group of encoding workers, but the @@ -59,9 +53,9 @@ type versionedTable struct { // at dmlWorker sequentially. type eventFragment struct { // event sequence number - seqNumber uint64 - versionedTable - event *dmlsink.TxnCallbackableEvent + seqNumber uint64 + versionedTable cloudstorage.VersionedTable + event *dmlsink.TxnCallbackableEvent // encodedMsgs denote the encoded messages after the event is handled in encodingWorker. encodedMsgs []*common.Message } @@ -121,7 +115,7 @@ func NewDMLSink(ctx context.Context, } encoderBuilder, err := builder.NewEventBatchEncoderBuilder(ctx, encoderConfig) if err != nil { - return nil, cerror.WrapError(cerror.ErrCloudStorageInvalidConfig, err) + return nil, cerror.WrapError(cerror.ErrStorageSinkInvalidConfig, err) } s.changefeedID = contextutil.ChangefeedIDFromCtx(ctx) @@ -169,7 +163,7 @@ func (s *DMLSink) run(ctx context.Context) error { // WriteEvents write events to cloud storage sink. func (s *DMLSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTableTxn]) error { - var tbl versionedTable + var tbl cloudstorage.VersionedTable for _, txn := range txns { if txn.GetTableSinkState() != state.TableSinkSinking { @@ -179,9 +173,9 @@ func (s *DMLSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa continue } - tbl = versionedTable{ + tbl = cloudstorage.VersionedTable{ TableName: txn.Event.TableInfo.TableName, - version: txn.Event.TableInfo.Version, + Version: txn.Event.TableInfo.Version, } seq := atomic.AddUint64(&s.lastSeqNum, 1) // emit a TxnCallbackableEvent encoupled with a sequence number starting from one. diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go index 0b8ef090993..b00cfd05926 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go @@ -28,29 +28,18 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink" "github.com/pingcap/tiflow/cdc/sink/tablesink/state" + "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/config" "github.com/stretchr/testify/require" ) -func TestCloudStorageWriteEvents(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - parentDir := t.TempDir() - uri := fmt.Sprintf("file:///%s?flush-interval=2s", parentDir) - sinkURI, err := url.Parse(uri) - require.Nil(t, err) - - replicaConfig := config.GetDefaultReplicaConfig() - replicaConfig.Sink.Protocol = config.ProtocolOpen.String() - - errCh := make(chan error, 5) - s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh) - require.Nil(t, err) - +func generateTxnEvents( + cnt *uint64, + batch int, + tableStatus *state.TableSinkState, +) []*dmlsink.TxnCallbackableEvent { // assume we have a large transaction and it is splitted into 10 small transactions txns := make([]*dmlsink.TxnCallbackableEvent, 0, 10) - var cnt uint64 = 0 - batch := 100 - tableStatus := state.TableSinkSinking for i := 0; i < 10; i++ { txn := &dmlsink.TxnCallbackableEvent{ @@ -71,9 +60,9 @@ func TestCloudStorageWriteEvents(t *testing.T) { }, }, Callback: func() { - atomic.AddUint64(&cnt, uint64(batch)) + atomic.AddUint64(cnt, uint64(batch)) }, - SinkState: &tableStatus, + SinkState: tableStatus, } for j := 0; j < batch; j++ { row := &model.RowChangedEvent{ @@ -89,11 +78,107 @@ func TestCloudStorageWriteEvents(t *testing.T) { } txns = append(txns, txn) } + + return txns +} + +func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + parentDir := t.TempDir() + uri := fmt.Sprintf("file:///%s?flush-interval=2s", parentDir) + sinkURI, err := url.Parse(uri) + require.Nil(t, err) + + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.Protocol = config.ProtocolOpen.String() + + errCh := make(chan error, 5) + s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh) + require.Nil(t, err) + var cnt uint64 = 0 + batch := 100 + tableStatus := state.TableSinkSinking + + // generating one dml file. + txns := generateTxnEvents(&cnt, batch, &tableStatus) tableDir := path.Join(parentDir, "test/table1/33") - os.MkdirAll(tableDir, 0o755) err = s.WriteEvents(txns...) require.Nil(t, err) - time.Sleep(4 * time.Second) + time.Sleep(3 * time.Second) + + files, err := os.ReadDir(tableDir) + require.Nil(t, err) + require.Len(t, files, 3) + var fileNames []string + for _, f := range files { + fileNames = append(fileNames, f.Name()) + } + require.ElementsMatch(t, []string{"CDC000001.json", "schema.json", "CDC.index"}, fileNames) + content, err := os.ReadFile(path.Join(tableDir, "CDC000001.json")) + require.Nil(t, err) + require.Greater(t, len(content), 0) + + content, err = os.ReadFile(path.Join(tableDir, "CDC.index")) + require.Nil(t, err) + require.Equal(t, "CDC000001.json\n", string(content)) + require.Equal(t, uint64(1000), atomic.LoadUint64(&cnt)) + + // generating another dml file. + err = s.WriteEvents(txns...) + require.Nil(t, err) + time.Sleep(3 * time.Second) + + files, err = os.ReadDir(tableDir) + require.Nil(t, err) + require.Len(t, files, 4) + fileNames = nil + for _, f := range files { + fileNames = append(fileNames, f.Name()) + } + require.ElementsMatch(t, []string{ + "CDC000001.json", "CDC000002.json", + "schema.json", "CDC.index", + }, fileNames) + content, err = os.ReadFile(path.Join(tableDir, "CDC000002.json")) + require.Nil(t, err) + require.Greater(t, len(content), 0) + + content, err = os.ReadFile(path.Join(tableDir, "CDC.index")) + require.Nil(t, err) + require.Equal(t, "CDC000002.json\n", string(content)) + require.Equal(t, uint64(2000), atomic.LoadUint64(&cnt)) + + cancel() + s.Close() +} + +func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + parentDir := t.TempDir() + uri := fmt.Sprintf("file:///%s?flush-interval=2s", parentDir) + sinkURI, err := url.Parse(uri) + require.Nil(t, err) + + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.Protocol = config.ProtocolOpen.String() + replicaConfig.Sink.DateSeparator = config.DateSeparatorDay.String() + + errCh := make(chan error, 5) + s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh) + require.Nil(t, err) + mockClock := clock.NewMock() + s.writer.setClock(mockClock) + + var cnt uint64 = 0 + batch := 100 + tableStatus := state.TableSinkSinking + + mockClock.Set(time.Date(2023, 3, 8, 23, 59, 58, 0, time.UTC)) + txns := generateTxnEvents(&cnt, batch, &tableStatus) + tableDir := path.Join(parentDir, "test/table1/33/2023-03-08") + err = s.WriteEvents(txns...) + require.Nil(t, err) + time.Sleep(3 * time.Second) files, err := os.ReadDir(tableDir) require.Nil(t, err) @@ -102,12 +187,96 @@ func TestCloudStorageWriteEvents(t *testing.T) { for _, f := range files { fileNames = append(fileNames, f.Name()) } - require.ElementsMatch(t, []string{"CDC000001.json", "schema.json"}, fileNames) + require.ElementsMatch(t, []string{"CDC000001.json", "CDC.index"}, fileNames) content, err := os.ReadFile(path.Join(tableDir, "CDC000001.json")) require.Nil(t, err) require.Greater(t, len(content), 0) + content, err = os.ReadFile(path.Join(tableDir, "CDC.index")) + require.Nil(t, err) + require.Equal(t, "CDC000001.json\n", string(content)) require.Equal(t, uint64(1000), atomic.LoadUint64(&cnt)) + + // test date (day) is NOT changed. + mockClock.Set(time.Date(2023, 3, 8, 23, 59, 59, 0, time.UTC)) + s.writer.setClock(mockClock) + err = s.WriteEvents(txns...) + require.Nil(t, err) + time.Sleep(3 * time.Second) + + files, err = os.ReadDir(tableDir) + require.Nil(t, err) + require.Len(t, files, 3) + fileNames = nil + for _, f := range files { + fileNames = append(fileNames, f.Name()) + } + require.ElementsMatch(t, []string{"CDC000001.json", "CDC000002.json", "CDC.index"}, fileNames) + content, err = os.ReadFile(path.Join(tableDir, "CDC000002.json")) + require.Nil(t, err) + require.Greater(t, len(content), 0) + + content, err = os.ReadFile(path.Join(tableDir, "CDC.index")) + require.Nil(t, err) + require.Equal(t, "CDC000002.json\n", string(content)) + require.Equal(t, uint64(2000), atomic.LoadUint64(&cnt)) + + // test date (day) is changed. + mockClock.Set(time.Date(2023, 3, 9, 0, 0, 10, 0, time.UTC)) + s.writer.setClock(mockClock) + err = s.WriteEvents(txns...) + require.Nil(t, err) + time.Sleep(3 * time.Second) + + tableDir = path.Join(parentDir, "test/table1/33/2023-03-09") + files, err = os.ReadDir(tableDir) + require.Nil(t, err) + require.Len(t, files, 2) + fileNames = nil + for _, f := range files { + fileNames = append(fileNames, f.Name()) + } + require.ElementsMatch(t, []string{"CDC000001.json", "CDC.index"}, fileNames) + content, err = os.ReadFile(path.Join(tableDir, "CDC000001.json")) + require.Nil(t, err) + require.Greater(t, len(content), 0) + + content, err = os.ReadFile(path.Join(tableDir, "CDC.index")) + require.Nil(t, err) + require.Equal(t, "CDC000001.json\n", string(content)) + require.Equal(t, uint64(3000), atomic.LoadUint64(&cnt)) + cancel() + s.Close() + + // test table is scheduled from one node to another + cnt = 0 + ctx, cancel = context.WithCancel(context.Background()) + s, err = NewDMLSink(ctx, sinkURI, replicaConfig, errCh) + require.Nil(t, err) + mockClock = clock.NewMock() + mockClock.Set(time.Date(2023, 3, 9, 0, 1, 10, 0, time.UTC)) + s.writer.setClock(mockClock) + err = s.WriteEvents(txns...) + require.Nil(t, err) + time.Sleep(3 * time.Second) + + files, err = os.ReadDir(tableDir) + require.Nil(t, err) + require.Len(t, files, 3) + fileNames = nil + for _, f := range files { + fileNames = append(fileNames, f.Name()) + } + require.ElementsMatch(t, []string{"CDC000001.json", "CDC000002.json", "CDC.index"}, fileNames) + content, err = os.ReadFile(path.Join(tableDir, "CDC000002.json")) + require.Nil(t, err) + require.Greater(t, len(content), 0) + + content, err = os.ReadFile(path.Join(tableDir, "CDC.index")) + require.Nil(t, err) + require.Equal(t, "CDC000002.json\n", string(content)) + require.Equal(t, uint64(1000), atomic.LoadUint64(&cnt)) + cancel() s.Close() } diff --git a/cdc/sink/dmlsink/cloudstorage/defragmenter_test.go b/cdc/sink/dmlsink/cloudstorage/defragmenter_test.go index 3b10461526c..d63640dabcf 100644 --- a/cdc/sink/dmlsink/cloudstorage/defragmenter_test.go +++ b/cdc/sink/dmlsink/cloudstorage/defragmenter_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tiflow/cdc/sink/dmlsink" "github.com/pingcap/tiflow/cdc/sink/util" "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/sink/codec/builder" "github.com/stretchr/testify/require" ) @@ -54,7 +55,7 @@ func TestDeframenter(t *testing.T) { go func(seq uint64) { encoder := encoderBuilder.Build() frag := eventFragment{ - versionedTable: versionedTable{ + versionedTable: cloudstorage.VersionedTable{ TableName: model.TableName{ Schema: "test", Table: "table1", diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker.go b/cdc/sink/dmlsink/cloudstorage/dml_worker.go index 43d79a331b5..66279404f68 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker.go @@ -16,8 +16,7 @@ import ( "bytes" "context" "encoding/json" - "fmt" - "strings" + "path" "sync" "sync/atomic" "time" @@ -30,7 +29,6 @@ import ( mcloudstorage "github.com/pingcap/tiflow/cdc/sink/metrics/cloudstorage" "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/chann" - "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -48,27 +46,24 @@ type dmlWorker struct { flushNotifyCh chan flushTask // tableEvents maintains a mapping of . tableEvents *tableEventsMap - // fileIndex maintains a mapping of . - fileIndex map[versionedTable]*indexWithDate // fileSize maintains a mapping of . - fileSize map[versionedTable]uint64 - isClosed uint64 - extension string - statistics *metrics.Statistics - clock clock.Clock - bufferPool sync.Pool - metricWriteBytes prometheus.Gauge - metricFileCount prometheus.Gauge + fileSize map[cloudstorage.VersionedTable]uint64 + isClosed uint64 + statistics *metrics.Statistics + filePathGenerator *cloudstorage.FilePathGenerator + bufferPool sync.Pool + metricWriteBytes prometheus.Gauge + metricFileCount prometheus.Gauge } type tableEventsMap struct { mu sync.Mutex - fragments map[versionedTable][]eventFragment + fragments map[cloudstorage.VersionedTable][]eventFragment } func newTableEventsMap() *tableEventsMap { return &tableEventsMap{ - fragments: make(map[versionedTable][]eventFragment), + fragments: make(map[cloudstorage.VersionedTable][]eventFragment), } } @@ -82,31 +77,25 @@ type flushTask struct { targetTables []wrappedTable } -type indexWithDate struct { - index uint64 - currDate, prevDate string -} - func newDMLWorker( id int, changefeedID model.ChangeFeedID, storage storage.ExternalStorage, config *cloudstorage.Config, extension string, + clock clock.Clock, statistics *metrics.Statistics, ) *dmlWorker { d := &dmlWorker{ - id: id, - changeFeedID: changefeedID, - storage: storage, - config: config, - tableEvents: newTableEventsMap(), - flushNotifyCh: make(chan flushTask, 1), - fileIndex: make(map[versionedTable]*indexWithDate), - fileSize: make(map[versionedTable]uint64), - extension: extension, - statistics: statistics, - clock: clock.New(), + id: id, + changeFeedID: changefeedID, + storage: storage, + config: config, + tableEvents: newTableEventsMap(), + flushNotifyCh: make(chan flushTask, 1), + fileSize: make(map[cloudstorage.VersionedTable]uint64), + statistics: statistics, + filePathGenerator: cloudstorage.NewFilePathGenerator(config, storage, extension, clock), bufferPool: sync.Pool{ New: func() interface{} { return new(bytes.Buffer) @@ -119,6 +108,11 @@ func newDMLWorker( return d } +// setClock is used for unit test +func (d *dmlWorker) setClock(clock clock.Clock) { + d.filePathGenerator.SetClock(clock) +} + // run creates a set of background goroutines. func (d *dmlWorker) run(ctx context.Context, ch *chann.DrainableChann[eventFragment]) error { log.Debug("dml worker started", zap.Int("workerID", d.id), @@ -149,9 +143,9 @@ func (d *dmlWorker) flushMessages(ctx context.Context) error { return nil } for _, tbl := range task.targetTables { - table := versionedTable{ + table := cloudstorage.VersionedTable{ TableName: tbl.tableName, - version: tbl.tableInfo.Version, + Version: tbl.tableInfo.Version, } d.tableEvents.mu.Lock() events := make([]eventFragment, len(d.tableEvents.fragments[table])) @@ -173,13 +167,46 @@ func (d *dmlWorker) flushMessages(ctx context.Context) error { return errors.Trace(err) } - path := d.generateDataFilePath(table) - err = d.writeDataFile(ctx, path, events) + // make sure that `generateDateStr()` is invoked ONLY once before + // generating data file path and index file path. Because we don't expect the index + // file is written to a different dir if date change happens between + // generating data and index file. + date := d.filePathGenerator.GenerateDateStr() + dataFilePath, err := d.filePathGenerator.GenerateDataFilePath(ctx, table, date) + if err != nil { + log.Error("failed to generate data file path", + zap.Int("workerID", d.id), + zap.String("namespace", d.changeFeedID.Namespace), + zap.String("changefeed", d.changeFeedID.ID), + zap.Error(err)) + return errors.Trace(err) + } + indexFilePath := d.filePathGenerator.GenerateIndexFilePath(table, date) + + // first write the index file to external storage. + // the file content is simply the last elemement of the data file path + err = d.writeIndexFile(ctx, indexFilePath, path.Base(dataFilePath)+"\n") + if err != nil { + log.Error("failed to write index file to external storage", + zap.Int("workerID", d.id), + zap.String("namespace", d.changeFeedID.Namespace), + zap.String("changefeed", d.changeFeedID.ID), + zap.String("path", indexFilePath), + zap.Error(err)) + } + + // then write the data file to external storage. + // TODO: if system crashes when writing date file CDC000002.csv + // (file is not generated at all), then after TiCDC recovers from the crash, + // storage sink will generate a new file named CDC000003.csv, + // we will optimize this issue later. + err = d.writeDataFile(ctx, dataFilePath, events) if err != nil { log.Error("failed to write data file to external storage", zap.Int("workerID", d.id), zap.String("namespace", d.changeFeedID.Namespace), zap.String("changefeed", d.changeFeedID.ID), + zap.String("path", dataFilePath), zap.Error(err)) return errors.Trace(err) } @@ -189,7 +216,7 @@ func (d *dmlWorker) flushMessages(ctx context.Context) error { zap.String("changefeed", d.changeFeedID.ID), zap.String("schema", table.Schema), zap.String("table", table.Table), - zap.String("path", path), + zap.String("path", dataFilePath), ) } } @@ -202,13 +229,13 @@ func (d *dmlWorker) flushMessages(ctx context.Context) error { // if it hasn't been created when a DDL event was executed. func (d *dmlWorker) writeSchemaFile( ctx context.Context, - table versionedTable, + table cloudstorage.VersionedTable, tableInfo *model.TableInfo, ) error { - if _, ok := d.fileIndex[table]; !ok { + if ok := d.filePathGenerator.Contains(table); !ok { var tableDetail cloudstorage.TableDefinition tableDetail.FromTableInfo(tableInfo) - path := generateSchemaFilePath(tableDetail) + path := cloudstorage.GenerateSchemaFilePath(tableDetail) // the file may have been created when a DDL event was executed. exist, err := d.storage.FileExists(ctx, path) if err != nil { @@ -232,6 +259,11 @@ func (d *dmlWorker) writeSchemaFile( return nil } +func (d *dmlWorker) writeIndexFile(ctx context.Context, path, content string) error { + err := d.storage.WriteFile(ctx, path, []byte(content)) + return err +} + func (d *dmlWorker) writeDataFile(ctx context.Context, path string, events []eventFragment) error { var callbacks []func() @@ -309,9 +341,9 @@ func (d *dmlWorker) dispatchFlushTasks(ctx context.Context, // the physical table id (useful for partition table) // recorded in mounter while the later one does not. // TODO: handle TableID of model.TableInfo.TableName properly. - tbl := versionedTable{ + tbl := cloudstorage.VersionedTable{ TableName: elem.tableName, - version: elem.tableInfo.Version, + Version: elem.tableInfo.Version, } d.fileSize[tbl] = 0 } @@ -328,7 +360,7 @@ func (d *dmlWorker) dispatchFlushTasks(ctx context.Context, d.tableEvents.mu.Unlock() key := wrappedTable{ - tableName: frag.TableName, + tableName: frag.versionedTable.TableName, tableInfo: frag.event.Event.TableInfo, } @@ -358,54 +390,6 @@ func (d *dmlWorker) dispatchFlushTasks(ctx context.Context, } } -func generateSchemaFilePath(def cloudstorage.TableDefinition) string { - return fmt.Sprintf("%s/%s/%d/schema.json", def.Schema, def.Table, def.TableVersion) -} - -func (d *dmlWorker) generateDataFilePath(tbl versionedTable) string { - var elems []string - var dateStr string - - elems = append(elems, tbl.Schema) - elems = append(elems, tbl.Table) - elems = append(elems, fmt.Sprintf("%d", tbl.version)) - - if d.config.EnablePartitionSeparator && tbl.TableName.IsPartition { - elems = append(elems, fmt.Sprintf("%d", tbl.TableID)) - } - currTime := d.clock.Now() - switch d.config.DateSeparator { - case config.DateSeparatorYear.String(): - dateStr = currTime.Format("2006") - elems = append(elems, dateStr) - case config.DateSeparatorMonth.String(): - dateStr = currTime.Format("2006-01") - elems = append(elems, dateStr) - case config.DateSeparatorDay.String(): - dateStr = currTime.Format("2006-01-02") - elems = append(elems, dateStr) - default: - } - - if idx, ok := d.fileIndex[tbl]; !ok { - d.fileIndex[tbl] = &indexWithDate{ - currDate: dateStr, - } - } else { - idx.currDate = dateStr - } - - // if date changed, reset the counter - if d.fileIndex[tbl].prevDate != d.fileIndex[tbl].currDate { - d.fileIndex[tbl].prevDate = d.fileIndex[tbl].currDate - d.fileIndex[tbl].index = 0 - } - d.fileIndex[tbl].index++ - elems = append(elems, fmt.Sprintf("CDC%06d%s", d.fileIndex[tbl].index, d.extension)) - - return strings.Join(elems, "/") -} - func (d *dmlWorker) close() { if !atomic.CompareAndSwapUint64(&d.isClosed, 0, 1) { return diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go b/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go index e6cade7068a..d64a69f62f3 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go @@ -50,83 +50,10 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker { statistics := metrics.NewStatistics(ctx, sink.TxnSink) d := newDMLWorker(1, model.DefaultChangeFeedID("dml-worker-test"), storage, - cfg, ".json", statistics) + cfg, ".json", clock.New(), statistics) return d } -func TestGenerateDataFilePath(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - dir := t.TempDir() - w := testDMLWorker(ctx, t, dir) - table := versionedTable{ - TableName: model.TableName{ - Schema: "test", - Table: "table1", - }, - version: 5, - } - - // date-separator: none - path := w.generateDataFilePath(table) - require.Equal(t, "test/table1/5/CDC000001.json", path) - path = w.generateDataFilePath(table) - require.Equal(t, "test/table1/5/CDC000002.json", path) - - // date-separator: year - mockClock := clock.NewMock() - w = testDMLWorker(ctx, t, dir) - w.config.DateSeparator = config.DateSeparatorYear.String() - w.clock = mockClock - mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) - path = w.generateDataFilePath(table) - require.Equal(t, "test/table1/5/2022/CDC000001.json", path) - path = w.generateDataFilePath(table) - require.Equal(t, "test/table1/5/2022/CDC000002.json", path) - // year changed - mockClock.Set(time.Date(2023, 1, 1, 0, 0, 20, 0, time.UTC)) - path = w.generateDataFilePath(table) - require.Equal(t, "test/table1/5/2023/CDC000001.json", path) - path = w.generateDataFilePath(table) - require.Equal(t, "test/table1/5/2023/CDC000002.json", path) - - // date-separator: month - mockClock = clock.NewMock() - w = testDMLWorker(ctx, t, dir) - w.config.DateSeparator = config.DateSeparatorMonth.String() - w.clock = mockClock - mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) - path = w.generateDataFilePath(table) - require.Equal(t, "test/table1/5/2022-12/CDC000001.json", path) - path = w.generateDataFilePath(table) - require.Equal(t, "test/table1/5/2022-12/CDC000002.json", path) - // month changed - mockClock.Set(time.Date(2023, 1, 1, 0, 0, 20, 0, time.UTC)) - path = w.generateDataFilePath(table) - require.Equal(t, "test/table1/5/2023-01/CDC000001.json", path) - path = w.generateDataFilePath(table) - require.Equal(t, "test/table1/5/2023-01/CDC000002.json", path) - - // date-separator: day - mockClock = clock.NewMock() - w = testDMLWorker(ctx, t, dir) - w.config.DateSeparator = config.DateSeparatorDay.String() - w.clock = mockClock - mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) - path = w.generateDataFilePath(table) - require.Equal(t, "test/table1/5/2022-12-31/CDC000001.json", path) - path = w.generateDataFilePath(table) - require.Equal(t, "test/table1/5/2022-12-31/CDC000002.json", path) - // day changed - mockClock.Set(time.Date(2023, 1, 1, 0, 0, 20, 0, time.UTC)) - path = w.generateDataFilePath(table) - require.Equal(t, "test/table1/5/2023-01-01/CDC000001.json", path) - path = w.generateDataFilePath(table) - require.Equal(t, "test/table1/5/2023-01-01/CDC000002.json", path) - - w.close() -} - func TestDMLWorkerRun(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) parentDir := t.TempDir() @@ -155,9 +82,9 @@ func TestDMLWorkerRun(t *testing.T) { for i := 0; i < 5; i++ { frag := eventFragment{ seqNumber: uint64(i), - versionedTable: versionedTable{ + versionedTable: cloudstorage.VersionedTable{ TableName: table1, - version: 99, + Version: 99, }, event: &dmlsink.TxnCallbackableEvent{ Event: &model.SingleTableTxn{ @@ -199,12 +126,12 @@ func TestDMLWorkerRun(t *testing.T) { // check whether files for table1 has been generated files, err := os.ReadDir(table1Dir) require.Nil(t, err) - require.Len(t, files, 2) + require.Len(t, files, 3) var fileNames []string for _, f := range files { fileNames = append(fileNames, f.Name()) } - require.ElementsMatch(t, []string{"CDC000001.json", "schema.json"}, fileNames) + require.ElementsMatch(t, []string{"CDC000001.json", "schema.json", "CDC.index"}, fileNames) cancel() d.close() wg.Wait() diff --git a/cdc/sink/dmlsink/cloudstorage/dml_writer.go b/cdc/sink/dmlsink/cloudstorage/dml_writer.go index 300b00c339d..173b9df8c43 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_writer.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_writer.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/metrics" + "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/chann" "github.com/pingcap/tiflow/pkg/hash" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" @@ -35,6 +36,7 @@ type dmlWriter struct { storage storage.ExternalStorage config *cloudstorage.Config extension string + clock clock.Clock statistics *metrics.Statistics inputCh <-chan eventFragment errCh chan<- error @@ -55,6 +57,7 @@ func newDMLWriter( workerChannels: make([]*chann.DrainableChann[eventFragment], config.WorkerCount), workers: make([]*dmlWorker, config.WorkerCount), hasher: hash.NewPositionInertia(), + clock: clock.New(), config: config, extension: extension, statistics: statistics, @@ -63,7 +66,7 @@ func newDMLWriter( } for i := 0; i < config.WorkerCount; i++ { - worker := newDMLWorker(i, changefeedID, storage, config, extension, statistics) + worker := newDMLWorker(i, changefeedID, storage, config, extension, d.clock, statistics) d.workers[i] = worker d.workerChannels[i] = chann.NewAutoDrainChann[eventFragment]() } @@ -71,6 +74,13 @@ func newDMLWriter( return d } +// setClock is used for unit test. +func (d *dmlWriter) setClock(clock clock.Clock) { + for i := 0; i < d.config.WorkerCount; i++ { + d.workers[i].setClock(clock) + } +} + func (d *dmlWriter) run(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { @@ -97,7 +107,7 @@ func (d *dmlWriter) dispatchFragToDMLWorker(ctx context.Context) error { if !ok { return nil } - tableName := frag.TableName + tableName := frag.versionedTable.TableName d.hasher.Reset() d.hasher.Write([]byte(tableName.Schema), []byte(tableName.Table)) workerID := d.hasher.Sum32() % uint32(d.config.WorkerCount) diff --git a/cdc/sink/dmlsink/cloudstorage/encoding_worker_test.go b/cdc/sink/dmlsink/cloudstorage/encoding_worker_test.go index 7698126f78b..0c0b7af5371 100644 --- a/cdc/sink/dmlsink/cloudstorage/encoding_worker_test.go +++ b/cdc/sink/dmlsink/cloudstorage/encoding_worker_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tiflow/cdc/sink/dmlsink" "github.com/pingcap/tiflow/cdc/sink/util" "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/sink/codec/builder" "github.com/stretchr/testify/require" ) @@ -52,7 +53,7 @@ func TestEncodeEvents(t *testing.T) { worker, fn := testEncodingWorker(ctx, t) defer fn() err := worker.encodeEvents(ctx, eventFragment{ - versionedTable: versionedTable{ + versionedTable: cloudstorage.VersionedTable{ TableName: model.TableName{ Schema: "test", Table: "table1", @@ -134,7 +135,7 @@ func TestEncodingWorkerRun(t *testing.T) { for i := 0; i < 3; i++ { frag := eventFragment{ - versionedTable: versionedTable{ + versionedTable: cloudstorage.VersionedTable{ TableName: table, }, seqNumber: uint64(i + 1), diff --git a/cmd/storage-consumer/main.go b/cmd/storage-consumer/main.go index 08382c9410f..6bbf4d894f5 100644 --- a/cmd/storage-consumer/main.go +++ b/cmd/storage-consumer/main.go @@ -360,10 +360,6 @@ func (c *consumer) getNewFiles(ctx context.Context) (map[dmlPathKey]fileIndexRan var fileIdx uint64 var err error - if strings.HasSuffix(path, "metadata") { - return nil - } - if strings.HasSuffix(path, "schema.json") { err = schemaKey.parseSchemaFilePath(path) if err != nil { @@ -389,13 +385,16 @@ func (c *consumer) getNewFiles(ctx context.Context) (map[dmlPathKey]fileIndexRan dmlkey.schemaPathKey = schemaKey dmlkey.partitionNum = fakePartitionNumForSchemaFile dmlkey.date = "" - } else { + } else if strings.HasSuffix(path, c.fileExtension) { fileIdx, err = dmlkey.parseDMLFilePath(c.replicationCfg.Sink.DateSeparator, path) if err != nil { log.Error("failed to parse dml file path", zap.Error(err)) // skip handling this file return nil } + } else { + log.Debug("ignore handling file", zap.String("path", path)) + return nil } if _, ok := c.tableDMLIdxMap[dmlkey]; !ok || fileIdx >= c.tableDMLIdxMap[dmlkey] { diff --git a/errors.toml b/errors.toml index 03928351ebb..739ac4b6fda 100755 --- a/errors.toml +++ b/errors.toml @@ -156,11 +156,6 @@ error = ''' invalid overwrite-checkpoint-ts %s, overwrite-checkpoint-ts only accept 'now' or a valid timestamp in integer ''' -["CDC:ErrCloudStorageInvalidConfig"] -error = ''' -cloud storage config invalid -''' - ["CDC:ErrClusterIDMismatch"] error = ''' cluster ID mismatch, tikv cluster ID is %d and request cluster ID is %d @@ -846,9 +841,19 @@ error = ''' fail to open storage for redo log ''' +["CDC:ErrStorageSinkInvalidConfig"] +error = ''' +storage sink config invalid +''' + ["CDC:ErrStorageSinkInvalidDateSeparator"] error = ''' -date separator in cloud storage sink is invalid +date separator in storage sink is invalid +''' + +["CDC:ErrStorageSinkInvalidFileName"] +error = ''' +filename in storage sink is invalid ''' ["CDC:ErrSyncRenameTableFailed"] diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index bc528c93827..f488ab47a92 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -333,7 +333,7 @@ var ( errors.RFCCodeText("CDC:ErrMessageTooLarge"), ) ErrStorageSinkInvalidDateSeparator = errors.Normalize( - "date separator in cloud storage sink is invalid", + "date separator in storage sink is invalid", errors.RFCCodeText("CDC:ErrStorageSinkInvalidDateSeparator"), ) ErrCSVEncodeFailed = errors.Normalize( @@ -344,9 +344,13 @@ var ( "csv decode failed", errors.RFCCodeText("CDC:ErrCSVDecodeFailed"), ) - ErrCloudStorageInvalidConfig = errors.Normalize( - "cloud storage config invalid", - errors.RFCCodeText("CDC:ErrCloudStorageInvalidConfig"), + ErrStorageSinkInvalidConfig = errors.Normalize( + "storage sink config invalid", + errors.RFCCodeText("CDC:ErrStorageSinkInvalidConfig"), + ) + ErrStorageSinkInvalidFileName = errors.Normalize( + "filename in storage sink is invalid", + errors.RFCCodeText("CDC:ErrStorageSinkInvalidFileName"), ) // utilities related errors diff --git a/pkg/sink/cloudstorage/config.go b/pkg/sink/cloudstorage/config.go index 88b644a34bf..e566a8d7eb1 100644 --- a/pkg/sink/cloudstorage/config.go +++ b/pkg/sink/cloudstorage/config.go @@ -72,12 +72,14 @@ func (c *Config) Apply( replicaConfig *config.ReplicaConfig, ) (err error) { if sinkURI == nil { - return cerror.ErrCloudStorageInvalidConfig.GenWithStack("failed to open cloud storage sink, empty SinkURI") + return cerror.ErrStorageSinkInvalidConfig.GenWithStack( + "failed to open cloud storage sink, empty SinkURI") } scheme := strings.ToLower(sinkURI.Scheme) if !psink.IsStorageScheme(scheme) { - return cerror.ErrCloudStorageInvalidConfig.GenWithStack("can't create cloud storage sink with unsupported scheme: %s", scheme) + return cerror.ErrStorageSinkInvalidConfig.GenWithStack( + "can't create cloud storage sink with unsupported scheme: %s", scheme) } query := sinkURI.Query() if err = getWorkerCount(query, &c.WorkerCount); err != nil { @@ -106,10 +108,10 @@ func getWorkerCount(values url.Values, workerCount *int) error { c, err := strconv.Atoi(s) if err != nil { - return cerror.WrapError(cerror.ErrCloudStorageInvalidConfig, err) + return cerror.WrapError(cerror.ErrStorageSinkInvalidConfig, err) } if c <= 0 { - return cerror.WrapError(cerror.ErrCloudStorageInvalidConfig, + return cerror.WrapError(cerror.ErrStorageSinkInvalidConfig, fmt.Errorf("invalid worker-count %d, it must be greater than 0", c)) } if c > maxWorkerCount { @@ -130,7 +132,7 @@ func getFlushInterval(values url.Values, flushInterval *time.Duration) error { d, err := time.ParseDuration(s) if err != nil { - return cerror.WrapError(cerror.ErrCloudStorageInvalidConfig, err) + return cerror.WrapError(cerror.ErrStorageSinkInvalidConfig, err) } if d > maxFlushInterval { @@ -156,7 +158,7 @@ func getFileSize(values url.Values, fileSize *int) error { sz, err := strconv.Atoi(s) if err != nil { - return cerror.WrapError(cerror.ErrCloudStorageInvalidConfig, err) + return cerror.WrapError(cerror.ErrStorageSinkInvalidConfig, err) } if sz > maxFileSize { log.Warn("file-size is too large", diff --git a/pkg/sink/cloudstorage/path.go b/pkg/sink/cloudstorage/path.go new file mode 100644 index 00000000000..a048ea19d7d --- /dev/null +++ b/pkg/sink/cloudstorage/path.go @@ -0,0 +1,209 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloudstorage + +import ( + "context" + "fmt" + "strconv" + "strings" + + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/engine/pkg/clock" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" +) + +const ( + // 3 is the length of "CDC", and the file number contains + // at least 6 digits (e.g. CDC000001.csv). + minFileNamePrefixLen = 9 + defaultIndexFileName = "CDC.index" +) + +// GenerateSchemaFilePath generates schema file path based on the table definition. +func GenerateSchemaFilePath(def TableDefinition) string { + return fmt.Sprintf("%s/%s/%d/schema.json", def.Schema, def.Table, def.TableVersion) +} + +type indexWithDate struct { + index uint64 + currDate, prevDate string +} + +// VersionedTable is used to wrap TableName with a version. +type VersionedTable struct { + model.TableName + // Version is consistent with the version of TableInfo recorded in + // schema storage. It can either be finished ts of a DDL event, + // or be the checkpoint ts when processor is restarted. + Version uint64 +} + +// FilePathGenerator is used to generate data file path and index file path. +type FilePathGenerator struct { + extension string + config *Config + clock clock.Clock + storage storage.ExternalStorage + fileIndex map[VersionedTable]*indexWithDate +} + +// NewFilePathGenerator creates a FilePathGenerator. +func NewFilePathGenerator( + config *Config, + storage storage.ExternalStorage, + extension string, + clock clock.Clock, +) *FilePathGenerator { + return &FilePathGenerator{ + config: config, + extension: extension, + storage: storage, + clock: clock, + fileIndex: make(map[VersionedTable]*indexWithDate), + } +} + +// SetClock is used for unit test +func (f *FilePathGenerator) SetClock(clock clock.Clock) { + f.clock = clock +} + +// Contains checks if a VersionedTable is cached by FilePathGenerator before. +func (f *FilePathGenerator) Contains(tbl VersionedTable) bool { + _, ok := f.fileIndex[tbl] + return ok +} + +// GenerateDateStr generates a date string base on current time +// and the date-separator configuration item. +func (f *FilePathGenerator) GenerateDateStr() string { + var dateStr string + + currTime := f.clock.Now() + switch f.config.DateSeparator { + case config.DateSeparatorYear.String(): + dateStr = currTime.Format("2006") + case config.DateSeparatorMonth.String(): + dateStr = currTime.Format("2006-01") + case config.DateSeparatorDay.String(): + dateStr = currTime.Format("2006-01-02") + default: + } + + return dateStr +} + +func (f *FilePathGenerator) generateDataDirPath(tbl VersionedTable, date string) string { + var elems []string + + elems = append(elems, tbl.Schema) + elems = append(elems, tbl.Table) + elems = append(elems, fmt.Sprintf("%d", tbl.Version)) + + if f.config.EnablePartitionSeparator && tbl.TableName.IsPartition { + elems = append(elems, fmt.Sprintf("%d", tbl.TableID)) + } + + if len(date) != 0 { + elems = append(elems, date) + } + + return strings.Join(elems, "/") +} + +func (f *FilePathGenerator) fetchIndexFromFileName(fileName string) (uint64, error) { + var fileIdx uint64 + var err error + + if len(fileName) < minFileNamePrefixLen+len(f.extension) || + !strings.HasPrefix(fileName, "CDC") || + !strings.HasSuffix(fileName, f.extension) { + return 0, cerror.WrapError(cerror.ErrStorageSinkInvalidFileName, + fmt.Errorf("'%s' is a invalid file name", fileName)) + } + + extIdx := strings.Index(fileName, f.extension) + fileIdxStr := fileName[3:extIdx] + if fileIdx, err = strconv.ParseUint(fileIdxStr, 10, 64); err != nil { + return 0, cerror.WrapError(cerror.ErrStorageSinkInvalidFileName, err) + } + + return fileIdx, nil +} + +// GenerateDataFilePath generates a canonical path for data file. +func (f *FilePathGenerator) GenerateDataFilePath( + ctx context.Context, + tbl VersionedTable, + date string, +) (string, error) { + var elems []string + + elems = append(elems, f.generateDataDirPath(tbl, date)) + if idx, ok := f.fileIndex[tbl]; !ok { + var fileIdx uint64 + + indexFile := f.GenerateIndexFilePath(tbl, date) + exist, err := f.storage.FileExists(ctx, indexFile) + if err != nil { + return "", err + } + if exist { + data, err := f.storage.ReadFile(ctx, indexFile) + if err != nil { + return "", err + } + fileName := strings.TrimSuffix(string(data), "\n") + maxFileIdx, err := f.fetchIndexFromFileName(fileName) + if err != nil { + return "", err + } + + // TODO: if the file with maxFileIdx does not exist or is empty, + // we can reuse the old index number. + fileIdx = maxFileIdx + } + + f.fileIndex[tbl] = &indexWithDate{ + prevDate: date, + currDate: date, + index: fileIdx, + } + } else { + idx.currDate = date + } + + // if date changed, reset the counter + if f.fileIndex[tbl].prevDate != f.fileIndex[tbl].currDate { + f.fileIndex[tbl].prevDate = f.fileIndex[tbl].currDate + f.fileIndex[tbl].index = 0 + } + f.fileIndex[tbl].index++ + elems = append(elems, fmt.Sprintf("CDC%06d%s", f.fileIndex[tbl].index, f.extension)) + + return strings.Join(elems, "/"), nil +} + +// GenerateIndexFilePath generates a canonical path for index file. +func (f *FilePathGenerator) GenerateIndexFilePath(tbl VersionedTable, date string) string { + var elems []string + + elems = append(elems, f.generateDataDirPath(tbl, date)) + elems = append(elems, defaultIndexFileName) + + return strings.Join(elems, "/") +} diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/sink/cloudstorage/path_test.go new file mode 100644 index 00000000000..e1b4bd1d3e0 --- /dev/null +++ b/pkg/sink/cloudstorage/path_test.go @@ -0,0 +1,209 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package cloudstorage + +import ( + "context" + "fmt" + "net/url" + "testing" + "time" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/engine/pkg/clock" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/util" + "github.com/stretchr/testify/require" +) + +func testFilePathGenerator(ctx context.Context, t *testing.T, dir string) *FilePathGenerator { + uri := fmt.Sprintf("file:///%s?flush-interval=2s", dir) + storage, err := util.GetExternalStorageFromURI(ctx, uri) + require.NoError(t, err) + + sinkURI, err := url.Parse(uri) + require.NoError(t, err) + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.Protocol = config.ProtocolOpen.String() + cfg := NewConfig() + err = cfg.Apply(ctx, sinkURI, replicaConfig) + require.NoError(t, err) + + f := NewFilePathGenerator(cfg, storage, ".json", clock.New()) + return f +} + +func TestGenerateDataFilePath(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + table := VersionedTable{ + TableName: model.TableName{ + Schema: "test", + Table: "table1", + }, + Version: 5, + } + + dir := t.TempDir() + f := testFilePathGenerator(ctx, t, dir) + date := f.GenerateDateStr() + // date-separator: none + path, err := f.GenerateDataFilePath(ctx, table, date) + require.NoError(t, err) + require.Equal(t, "test/table1/5/CDC000001.json", path) + path, err = f.GenerateDataFilePath(ctx, table, date) + require.NoError(t, err) + require.Equal(t, "test/table1/5/CDC000002.json", path) + + // date-separator: year + mockClock := clock.NewMock() + f = testFilePathGenerator(ctx, t, dir) + f.config.DateSeparator = config.DateSeparatorYear.String() + f.clock = mockClock + mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) + date = f.GenerateDateStr() + path, err = f.GenerateDataFilePath(ctx, table, date) + require.NoError(t, err) + require.Equal(t, "test/table1/5/2022/CDC000001.json", path) + path, err = f.GenerateDataFilePath(ctx, table, date) + require.NoError(t, err) + require.Equal(t, "test/table1/5/2022/CDC000002.json", path) + // year changed + mockClock.Set(time.Date(2023, 1, 1, 0, 0, 20, 0, time.UTC)) + date = f.GenerateDateStr() + path, err = f.GenerateDataFilePath(ctx, table, date) + require.NoError(t, err) + require.Equal(t, "test/table1/5/2023/CDC000001.json", path) + path, err = f.GenerateDataFilePath(ctx, table, date) + require.NoError(t, err) + require.Equal(t, "test/table1/5/2023/CDC000002.json", path) + + // date-separator: month + mockClock = clock.NewMock() + f = testFilePathGenerator(ctx, t, dir) + f.config.DateSeparator = config.DateSeparatorMonth.String() + f.clock = mockClock + mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) + date = f.GenerateDateStr() + path, err = f.GenerateDataFilePath(ctx, table, date) + require.NoError(t, err) + require.Equal(t, "test/table1/5/2022-12/CDC000001.json", path) + path, err = f.GenerateDataFilePath(ctx, table, date) + require.NoError(t, err) + require.Equal(t, "test/table1/5/2022-12/CDC000002.json", path) + // month changed + mockClock.Set(time.Date(2023, 1, 1, 0, 0, 20, 0, time.UTC)) + date = f.GenerateDateStr() + path, err = f.GenerateDataFilePath(ctx, table, date) + require.NoError(t, err) + require.Equal(t, "test/table1/5/2023-01/CDC000001.json", path) + path, err = f.GenerateDataFilePath(ctx, table, date) + require.NoError(t, err) + require.Equal(t, "test/table1/5/2023-01/CDC000002.json", path) + + // date-separator: day + mockClock = clock.NewMock() + f = testFilePathGenerator(ctx, t, dir) + f.config.DateSeparator = config.DateSeparatorDay.String() + f.clock = mockClock + mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) + date = f.GenerateDateStr() + path, err = f.GenerateDataFilePath(ctx, table, date) + require.NoError(t, err) + require.Equal(t, "test/table1/5/2022-12-31/CDC000001.json", path) + path, err = f.GenerateDataFilePath(ctx, table, date) + require.NoError(t, err) + require.Equal(t, "test/table1/5/2022-12-31/CDC000002.json", path) + // day changed + mockClock.Set(time.Date(2023, 1, 1, 0, 0, 20, 0, time.UTC)) + date = f.GenerateDateStr() + path, err = f.GenerateDataFilePath(ctx, table, date) + require.NoError(t, err) + require.Equal(t, "test/table1/5/2023-01-01/CDC000001.json", path) + path, err = f.GenerateDataFilePath(ctx, table, date) + require.NoError(t, err) + require.Equal(t, "test/table1/5/2023-01-01/CDC000002.json", path) +} + +func TestFetchIndexFromFileName(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + dir := t.TempDir() + f := testFilePathGenerator(ctx, t, dir) + testCases := []struct { + fileName string + wantErr string + }{ + { + fileName: "CDC000011.json", + wantErr: "", + }, + { + fileName: "CDC1000000.json", + wantErr: "", + }, + { + fileName: "CDC1.json", + wantErr: "filename in storage sink is invalid", + }, + { + fileName: "cdc000001.json", + wantErr: "filename in storage sink is invalid", + }, + { + fileName: "CDC000005.xxx", + wantErr: "filename in storage sink is invalid", + }, + { + fileName: "CDChello.json", + wantErr: "filename in storage sink is invalid", + }, + } + + for _, tc := range testCases { + _, err := f.fetchIndexFromFileName(tc.fileName) + if len(tc.wantErr) != 0 { + require.Contains(t, err.Error(), tc.wantErr) + } else { + require.NoError(t, err) + } + } +} + +func TestGenerateDataFilePathWithIndexFile(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + dir := t.TempDir() + f := testFilePathGenerator(ctx, t, dir) + mockClock := clock.NewMock() + f.config.DateSeparator = config.DateSeparatorDay.String() + f.clock = mockClock + mockClock.Set(time.Date(2023, 3, 9, 23, 59, 59, 0, time.UTC)) + table := VersionedTable{ + TableName: model.TableName{ + Schema: "test", + Table: "table1", + }, + Version: 5, + } + date := f.GenerateDateStr() + indexFilePath := f.GenerateIndexFilePath(table, date) + err := f.storage.WriteFile(ctx, indexFilePath, []byte("CDC000005.json\n")) + require.NoError(t, err) + dataFilePath, err := f.GenerateDataFilePath(ctx, table, date) + require.NoError(t, err) + require.Equal(t, "test/table1/5/2023-03-09/CDC000006.json", dataFilePath) +}