Skip to content

Commit be9be01

Browse files
authored
log-backup: restore meta kv with batch method (#37100) (#37139)
close #37111
1 parent 9d6a61d commit be9be01

File tree

2 files changed

+396
-45
lines changed

2 files changed

+396
-45
lines changed

br/pkg/restore/client.go

+182-45
Original file line numberDiff line numberDiff line change
@@ -1771,14 +1771,6 @@ func (rc *Client) ReadStreamDataFiles(
17711771
}
17721772
}
17731773

1774-
// sort files firstly.
1775-
slices.SortFunc(mFiles, func(i, j *backuppb.DataFileInfo) bool {
1776-
if i.ResolvedTs > 0 && j.ResolvedTs > 0 {
1777-
return i.ResolvedTs < j.ResolvedTs
1778-
} else {
1779-
return i.MaxTs < j.MaxTs
1780-
}
1781-
})
17821774
return dFiles, mFiles, nil
17831775
}
17841776

@@ -2000,6 +1992,31 @@ func (rc *Client) InitSchemasReplaceForDDL(
20001992
return stream.NewSchemasReplace(dbMap, rc.currentTS, tableFilter, rc.GenGlobalID, rc.GenGlobalIDs, rc.InsertDeleteRangeForTable, rc.InsertDeleteRangeForIndex), nil
20011993
}
20021994

1995+
func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo {
1996+
slices.SortFunc(files, func(i, j *backuppb.DataFileInfo) bool {
1997+
if i.GetMinTs() < j.GetMinTs() {
1998+
return true
1999+
} else if i.GetMinTs() > j.GetMinTs() {
2000+
return false
2001+
}
2002+
2003+
if i.GetMaxTs() < j.GetMaxTs() {
2004+
return true
2005+
} else if i.GetMaxTs() > j.GetMaxTs() {
2006+
return false
2007+
}
2008+
2009+
if i.GetResolvedTs() < j.GetResolvedTs() {
2010+
return true
2011+
} else if i.GetResolvedTs() > j.GetResolvedTs() {
2012+
return false
2013+
}
2014+
2015+
return true
2016+
})
2017+
return files
2018+
}
2019+
20032020
// RestoreMetaKVFiles tries to restore files about meta kv-event from stream-backup.
20042021
func (rc *Client) RestoreMetaKVFiles(
20052022
ctx context.Context,
@@ -2008,7 +2025,10 @@ func (rc *Client) RestoreMetaKVFiles(
20082025
updateStats func(kvCount uint64, size uint64),
20092026
progressInc func(),
20102027
) error {
2028+
// sort files firstly.
2029+
files = SortMetaKVFiles(files)
20112030
filesInWriteCF := make([]*backuppb.DataFileInfo, 0, len(files))
2031+
filesInDefaultCF := make([]*backuppb.DataFileInfo, 0, len(files))
20122032

20132033
// The k-v events in default CF should be restored firstly. The reason is that:
20142034
// The error of transactions of meta could happen if restore write CF events successfully,
@@ -2018,30 +2038,39 @@ func (rc *Client) RestoreMetaKVFiles(
20182038
filesInWriteCF = append(filesInWriteCF, f)
20192039
continue
20202040
}
2021-
20222041
if f.Type == backuppb.FileType_Delete {
20232042
// this should happen abnormally.
20242043
// only do some preventive checks here.
20252044
log.Warn("detected delete file of meta key, skip it", zap.Any("file", f))
20262045
continue
20272046
}
2028-
2029-
kvCount, size, err := rc.RestoreMetaKVFile(ctx, f, schemasReplace)
2030-
if err != nil {
2031-
return errors.Trace(err)
2047+
if f.Cf == stream.DefaultCF {
2048+
filesInDefaultCF = append(filesInDefaultCF, f)
20322049
}
2033-
updateStats(kvCount, size)
2034-
progressInc()
2050+
}
2051+
2052+
// Restore files in default CF.
2053+
if err := rc.RestoreMetaKVFilesWithBatchMethod(
2054+
ctx,
2055+
filesInDefaultCF,
2056+
schemasReplace,
2057+
updateStats,
2058+
progressInc,
2059+
rc.RestoreBatchMetaKVFiles,
2060+
); err != nil {
2061+
return errors.Trace(err)
20352062
}
20362063

20372064
// Restore files in write CF.
2038-
for _, f := range filesInWriteCF {
2039-
kvCount, size, err := rc.RestoreMetaKVFile(ctx, f, schemasReplace)
2040-
if err != nil {
2041-
return errors.Trace(err)
2042-
}
2043-
updateStats(kvCount, size)
2044-
progressInc()
2065+
if err := rc.RestoreMetaKVFilesWithBatchMethod(
2066+
ctx,
2067+
filesInWriteCF,
2068+
schemasReplace,
2069+
updateStats,
2070+
progressInc,
2071+
rc.RestoreBatchMetaKVFiles,
2072+
); err != nil {
2073+
return errors.Trace(err)
20452074
}
20462075

20472076
// Update global schema version and report all of TiDBs.
@@ -2051,41 +2080,128 @@ func (rc *Client) RestoreMetaKVFiles(
20512080
return nil
20522081
}
20532082

2054-
// RestoreMetaKVFile tries to restore a file about meta kv-event from stream-backup.
2055-
func (rc *Client) RestoreMetaKVFile(
2083+
func (rc *Client) RestoreMetaKVFilesWithBatchMethod(
20562084
ctx context.Context,
2057-
file *backuppb.DataFileInfo,
2058-
sr *stream.SchemasReplace,
2059-
) (uint64, uint64, error) {
2085+
files []*backuppb.DataFileInfo,
2086+
schemasReplace *stream.SchemasReplace,
2087+
updateStats func(kvCount uint64, size uint64),
2088+
progressInc func(),
2089+
restoreBatch func(
2090+
ctx context.Context,
2091+
files []*backuppb.DataFileInfo,
2092+
schemasReplace *stream.SchemasReplace,
2093+
updateStats func(kvCount uint64, size uint64),
2094+
progressInc func(),
2095+
) error,
2096+
) error {
20602097
var (
2061-
kvCount uint64
2062-
size uint64
2098+
rangeMin uint64
2099+
rangeMax uint64
2100+
idx int
20632101
)
2064-
log.Info("restore meta kv events", zap.String("file", file.Path),
2065-
zap.String("cf", file.Cf), zap.Int64("kv-count", file.NumberOfEntries),
2066-
zap.Uint64("min-ts", file.MinTs), zap.Uint64("max-ts", file.MaxTs))
2102+
for i, f := range files {
2103+
if i == 0 {
2104+
idx = i
2105+
rangeMax = f.MaxTs
2106+
rangeMin = f.MinTs
2107+
} else {
2108+
if f.MinTs <= rangeMax {
2109+
rangeMin = mathutil.Min(rangeMin, f.MinTs)
2110+
rangeMax = mathutil.Max(rangeMax, f.MaxTs)
2111+
} else {
2112+
err := restoreBatch(ctx, files[idx:i], schemasReplace, updateStats, progressInc)
2113+
if err != nil {
2114+
return errors.Trace(err)
2115+
}
2116+
idx = i
2117+
rangeMin = f.MinTs
2118+
rangeMax = f.MaxTs
2119+
}
2120+
}
2121+
2122+
if i == len(files)-1 {
2123+
err := restoreBatch(ctx, files[idx:], schemasReplace, updateStats, progressInc)
2124+
if err != nil {
2125+
return errors.Trace(err)
2126+
}
2127+
}
2128+
}
2129+
return nil
2130+
}
2131+
2132+
// the kv entry with ts, the ts is decoded from entry.
2133+
type kvEntryWithTS struct {
2134+
e kv.Entry
2135+
ts uint64
2136+
}
2137+
2138+
func (rc *Client) RestoreBatchMetaKVFiles(
2139+
ctx context.Context,
2140+
files []*backuppb.DataFileInfo,
2141+
schemasReplace *stream.SchemasReplace,
2142+
updateStats func(kvCount uint64, size uint64),
2143+
progressInc func(),
2144+
) error {
2145+
if len(files) == 0 {
2146+
return nil
2147+
}
2148+
2149+
// read all of entries from files.
2150+
kvEntries := make([]*kvEntryWithTS, 0)
2151+
for _, f := range files {
2152+
es, err := rc.readAllEntries(ctx, f)
2153+
if err != nil {
2154+
return errors.Trace(err)
2155+
}
2156+
2157+
kvEntries = append(kvEntries, es...)
2158+
}
2159+
2160+
// sort these entries.
2161+
slices.SortFunc(kvEntries, func(i, j *kvEntryWithTS) bool {
2162+
return i.ts < j.ts
2163+
})
2164+
2165+
// restore these entries with rawPut() method.
2166+
kvCount, size, err := rc.restoreMetaKvEntries(ctx, schemasReplace, kvEntries, files[0].GetCf())
2167+
if err != nil {
2168+
return errors.Trace(err)
2169+
}
2170+
2171+
updateStats(kvCount, size)
2172+
for i := 0; i < len(files); i++ {
2173+
progressInc()
2174+
}
2175+
return nil
2176+
}
2177+
2178+
func (rc *Client) readAllEntries(
2179+
ctx context.Context,
2180+
file *backuppb.DataFileInfo,
2181+
) ([]*kvEntryWithTS, error) {
2182+
kvEntries := make([]*kvEntryWithTS, 0)
20672183

2068-
rc.rawKVClient.SetColumnFamily(file.GetCf())
20692184
buff, err := rc.storage.ReadFile(ctx, file.Path)
20702185
if err != nil {
2071-
return 0, 0, errors.Trace(err)
2186+
return nil, errors.Trace(err)
20722187
}
2188+
20732189
if checksum := sha256.Sum256(buff); !bytes.Equal(checksum[:], file.GetSha256()) {
2074-
return 0, 0, errors.Annotatef(berrors.ErrInvalidMetaFile,
2190+
return nil, errors.Annotatef(berrors.ErrInvalidMetaFile,
20752191
"checksum mismatch expect %x, got %x", file.GetSha256(), checksum[:])
20762192
}
20772193

20782194
iter := stream.NewEventIterator(buff)
20792195
for iter.Valid() {
20802196
iter.Next()
20812197
if iter.GetError() != nil {
2082-
return 0, 0, errors.Trace(iter.GetError())
2198+
return nil, errors.Trace(iter.GetError())
20832199
}
20842200

20852201
txnEntry := kv.Entry{Key: iter.Key(), Value: iter.Value()}
20862202
ts, err := GetKeyTS(txnEntry.Key)
20872203
if err != nil {
2088-
return 0, 0, errors.Trace(err)
2204+
return nil, errors.Trace(err)
20892205
}
20902206

20912207
// The commitTs in write CF need be limited on [startTs, restoreTs].
@@ -2105,20 +2221,41 @@ func (rc *Client) RestoreMetaKVFile(
21052221
log.Warn("txn entry is null", zap.Uint64("key-ts", ts), zap.ByteString("tnxKey", txnEntry.Key))
21062222
continue
21072223
}
2108-
log.Debug("txn entry", zap.Uint64("key-ts", ts), zap.Int("txnKey-len", len(txnEntry.Key)),
2109-
zap.Int("txnValue-len", len(txnEntry.Value)), zap.ByteString("txnKey", txnEntry.Key))
2110-
newEntry, err := sr.RewriteKvEntry(&txnEntry, file.Cf)
2224+
kvEntries = append(kvEntries, &kvEntryWithTS{e: txnEntry, ts: ts})
2225+
}
2226+
2227+
return kvEntries, nil
2228+
}
2229+
2230+
func (rc *Client) restoreMetaKvEntries(
2231+
ctx context.Context,
2232+
sr *stream.SchemasReplace,
2233+
entries []*kvEntryWithTS,
2234+
columnFamily string,
2235+
) (uint64, uint64, error) {
2236+
var (
2237+
kvCount uint64
2238+
size uint64
2239+
)
2240+
2241+
rc.rawKVClient.SetColumnFamily(columnFamily)
2242+
2243+
for _, entry := range entries {
2244+
log.Debug("before rewrte entry", zap.Uint64("key-ts", entry.ts), zap.Int("key-len", len(entry.e.Key)),
2245+
zap.Int("value-len", len(entry.e.Value)), zap.ByteString("key", entry.e.Key))
2246+
2247+
newEntry, err := sr.RewriteKvEntry(&entry.e, columnFamily)
21112248
if err != nil {
2112-
log.Error("rewrite txn entry failed", zap.Int("klen", len(txnEntry.Key)),
2113-
logutil.Key("txn-key", txnEntry.Key))
2249+
log.Error("rewrite txn entry failed", zap.Int("klen", len(entry.e.Key)),
2250+
logutil.Key("txn-key", entry.e.Key))
21142251
return 0, 0, errors.Trace(err)
21152252
} else if newEntry == nil {
21162253
continue
21172254
}
2118-
log.Debug("rewrite txn entry", zap.Int("newKey-len", len(newEntry.Key)),
2119-
zap.Int("newValue-len", len(txnEntry.Value)), zap.ByteString("newkey", newEntry.Key))
2255+
log.Debug("after rewrite entry", zap.Int("new-key-len", len(newEntry.Key)),
2256+
zap.Int("new-value-len", len(entry.e.Value)), zap.ByteString("new-key", newEntry.Key))
21202257

2121-
if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, ts); err != nil {
2258+
if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, entry.ts); err != nil {
21222259
return 0, 0, errors.Trace(err)
21232260
}
21242261

0 commit comments

Comments
 (0)