diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index e19ec6b05a03..c9fe2cbef6a0 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -193,6 +193,8 @@ type SSTBatcher struct { asyncAddSSTs ctxgroup.Group + valueScratch []byte + mu struct { syncutil.Mutex @@ -332,16 +334,17 @@ func (b *SSTBatcher) SetOnFlush(onFlush func(summary kvpb.BulkOpSummary)) { func (b *SSTBatcher) AddMVCCKeyWithImportEpoch( ctx context.Context, key storage.MVCCKey, value []byte, importEpoch uint32, ) error { + mvccVal, err := storage.DecodeMVCCValue(value) if err != nil { return err } mvccVal.MVCCValueHeader.ImportEpoch = importEpoch - encVal, err := storage.EncodeMVCCValue(mvccVal) + b.valueScratch, err = storage.EncodeMVCCValueToBuf(mvccVal, b.valueScratch[:0]) if err != nil { return err } - return b.AddMVCCKey(ctx, key, encVal) + return b.AddMVCCKey(ctx, key, b.valueScratch) } // AddMVCCKey adds a key+timestamp/value pair to the batch (flushing if needed). @@ -427,6 +430,7 @@ func (b *SSTBatcher) Reset(ctx context.Context) { b.batchEndTimestamp = hlc.Timestamp{} b.flushKey = nil b.flushKeyChecked = false + b.valueScratch = b.valueScratch[:0] b.ms.Reset() if b.writeAtBatchTS { diff --git a/pkg/storage/bench_test.go b/pkg/storage/bench_test.go index b272d8bdea6b..107a1f80e1a6 100644 --- a/pkg/storage/bench_test.go +++ b/pkg/storage/bench_test.go @@ -218,6 +218,25 @@ func BenchmarkMVCCExportToSST(b *testing.B) { runMVCCExportToSST(b, opts) }) } + withImportEpochs := []bool{false, true} + for _, ie := range withImportEpochs { + numKey := numKeys[len(numKeys)-1] + numRevision := numRevisions[len(numRevisions)-1] + numRangeKey := numRangeKeys[len(numRangeKeys)-1] + exportAllRevisionVal := exportAllRevisions[len(exportAllRevisions)-1] + b.Run(fmt.Sprintf("importEpochs=%t/numKeys=%d/numRevisions=%d/exportAllRevisions=%t", + ie, numKey, numRevision, exportAllRevisionVal, + ), func(b *testing.B) { + opts := mvccExportToSSTOpts{ + numKeys: numKey, + numRevisions: numRevision, + numRangeKeys: numRangeKey, + exportAllRevisions: exportAllRevisionVal, + importEpochs: ie, + } + runMVCCExportToSST(b, opts) + }) + } } const numIntentKeys = 1000 @@ -1678,8 +1697,8 @@ func runMVCCAcquireLockCommon( } type mvccExportToSSTOpts struct { - numKeys, numRevisions, numRangeKeys int - exportAllRevisions, useElasticCPUHandle bool + numKeys, numRevisions, numRangeKeys int + importEpochs, exportAllRevisions, useElasticCPUHandle bool // percentage specifies the share of the dataset to export. 100 will be a full // export, disabling the TBI optimization. <100 will be an incremental export @@ -1741,6 +1760,9 @@ func runMVCCExportToSST(b *testing.B, opts mvccExportToSSTOpts) { for j := 0; j < opts.numRevisions; j++ { mvccKey := MVCCKey{Key: key, Timestamp: hlc.Timestamp{WallTime: mkWall(j), Logical: 0}} mvccValue := MVCCValue{Value: roachpb.MakeValueFromString("foobar")} + if opts.importEpochs { + mvccValue.ImportEpoch = 1 + } err := batch.PutMVCC(mvccKey, mvccValue) if err != nil { b.Fatal(err) @@ -1792,14 +1814,15 @@ func runMVCCExportToSST(b *testing.B, opts mvccExportToSSTOpts) { startTS := hlc.Timestamp{WallTime: startWall} endTS := hlc.Timestamp{WallTime: endWall} _, _, err := MVCCExportToSST(ctx, st, engine, MVCCExportOptions{ - StartKey: MVCCKey{Key: keys.LocalMax}, - EndKey: roachpb.KeyMax, - StartTS: startTS, - EndTS: endTS, - ExportAllRevisions: opts.exportAllRevisions, - TargetSize: 0, - MaxSize: 0, - StopMidKey: false, + StartKey: MVCCKey{Key: keys.LocalMax}, + EndKey: roachpb.KeyMax, + StartTS: startTS, + EndTS: endTS, + ExportAllRevisions: opts.exportAllRevisions, + TargetSize: 0, + MaxSize: 0, + StopMidKey: false, + IncludeMVCCValueHeader: opts.importEpochs, }, &buf) if err != nil { b.Fatal(err) diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index ab63da1d7116..79386bbc5a0b 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -7751,6 +7751,7 @@ func mvccExportToWriter( return maxSize } + var valueScratch []byte iter.SeekGE(opts.StartKey) for { if ok, err := iter.Valid(); err != nil { @@ -7884,11 +7885,12 @@ func mvccExportToWriter( return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "decoding mvcc value %s", unsafeKey) } - if opts.IncludeMVCCValueHeader { - unsafeValue, err = EncodeMVCCValueForExport(mvccValue) + if !ok && opts.IncludeMVCCValueHeader { + valueScratch, err = EncodeMVCCValueForExport(mvccValue, valueScratch[:0]) if err != nil { return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "repackaging imported mvcc value %s", unsafeKey) } + unsafeValue = valueScratch } else { unsafeValue = mvccValue.Value.RawBytes } diff --git a/pkg/storage/mvcc_value.go b/pkg/storage/mvcc_value.go index 2b8ea7f426f4..e3a8548dad48 100644 --- a/pkg/storage/mvcc_value.go +++ b/pkg/storage/mvcc_value.go @@ -136,16 +136,14 @@ func (v MVCCValue) SafeFormat(w redact.SafePrinter, _ rune) { // EncodeMVCCValueForExport encodes fields from the MVCCValueHeader // that are appropriate for export out of the cluster. -func EncodeMVCCValueForExport(mvccValue MVCCValue) ([]byte, error) { - if mvccValue.ImportEpoch == 0 { +// +// NB: Must be updated with ExtendedEncodingSizeForExport. +func EncodeMVCCValueForExport(mvccValue MVCCValue, b []byte) ([]byte, error) { + mvccValue.MVCCValueHeader.LocalTimestamp = hlc.ClockTimestamp{} + if mvccValue.MVCCValueHeader.IsEmpty() { return mvccValue.Value.RawBytes, nil } - - // We only export ImportEpoch. - mvccValue.MVCCValueHeader = enginepb.MVCCValueHeader{ - ImportEpoch: mvccValue.ImportEpoch, - } - return EncodeMVCCValue(mvccValue) + return EncodeMVCCValueToBuf(mvccValue, b) } // When running a metamorphic build, disable the simple MVCC value encoding to @@ -179,13 +177,24 @@ func encodedMVCCValueSize(v MVCCValue) int { // EncodeMVCCValue encodes an MVCCValue into its Pebble representation. See the // comment on MVCCValue for a description of the encoding scheme. +func EncodeMVCCValue(v MVCCValue) ([]byte, error) { + return EncodeMVCCValueToBuf(v, nil) +} + +// EncodeMVCCValueToBuf encodes an MVCCValue into its Pebble +// representation. See the comment on MVCCValue for a description of +// the encoding scheme. +// +// If extended encoding is required, the given buffer will be used if +// it is large enough. If the provided buffer is not large enough a +// new buffer is allocated. // // TODO(erikgrinaker): This could mid-stack inline when we compared // v.MVCCValueHeader == enginepb.MVCCValueHeader{} instead of IsEmpty(), but // struct comparisons have a significant performance regression in Go 1.19 which // negates the inlining gain. Reconsider this with Go 1.20. See: // https://github.com/cockroachdb/cockroach/issues/88818 -func EncodeMVCCValue(v MVCCValue) ([]byte, error) { +func EncodeMVCCValueToBuf(v MVCCValue, buf []byte) ([]byte, error) { if v.MVCCValueHeader.IsEmpty() && !disableSimpleValueEncoding { // Simple encoding. Use the roachpb.Value encoding directly with no // modification. No need to re-allocate or copy. @@ -198,7 +207,13 @@ func EncodeMVCCValue(v MVCCValue) ([]byte, error) { headerSize := extendedPreludeSize + headerLen valueSize := headerSize + len(v.Value.RawBytes) - buf := make([]byte, valueSize) + if valueSize > cap(buf) { + buf = make([]byte, valueSize) + } else { + buf = buf[:valueSize] + } + // Extended encoding. Wrap the roachpb.Value encoding with a header containing + // MVCC-level metadata. Requires a copy. // 4-byte-header-len binary.BigEndian.PutUint32(buf, uint32(headerLen)) // 1-byte-sentinel diff --git a/pkg/storage/mvcc_value_test.go b/pkg/storage/mvcc_value_test.go index b5bfd6e4151a..9ba0c00e1250 100644 --- a/pkg/storage/mvcc_value_test.go +++ b/pkg/storage/mvcc_value_test.go @@ -105,7 +105,7 @@ func TestEncodeMVCCValueForExport(t *testing.T) { } for name, tc := range testcases { t.Run(name, func(t *testing.T) { - encodedVal, err := EncodeMVCCValueForExport(tc.val) + encodedVal, err := EncodeMVCCValueForExport(tc.val, nil) require.NoError(t, err) strippedMVCCVal, err := DecodeMVCCValue(encodedVal) require.NoError(t, err) @@ -316,13 +316,37 @@ func BenchmarkEncodeMVCCValueForExport(b *testing.B) { for vDesc, v := range values { name := fmt.Sprintf("header=%s/value=%s", hDesc, vDesc) mvccValue := MVCCValue{MVCCValueHeader: h, Value: v} + var buf []byte b.Run(name, func(b *testing.B) { for i := 0; i < b.N; i++ { - res, err := EncodeMVCCValueForExport(mvccValue) + var err error + buf, err = EncodeMVCCValueForExport(mvccValue, buf[:0]) if err != nil { // for performance require.NoError(b, err) } - _ = res + _ = buf + } + }) + } + } +} + +func BenchmarkEncodeMVCCValueWithAllocator(b *testing.B) { + DisableMetamorphicSimpleValueEncoding(b) + headers, values := mvccValueBenchmarkConfigs() + for hDesc, h := range headers { + for vDesc, v := range values { + name := fmt.Sprintf("header=%s/value=%s", hDesc, vDesc) + mvccValue := MVCCValue{MVCCValueHeader: h, Value: v} + var buf []byte + b.Run(name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + var err error + buf, err = EncodeMVCCValueToBuf(mvccValue, buf[:0]) + if err != nil { // for performance + require.NoError(b, err) + } + _ = buf } }) }