Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: wjhuang2016 <huangwenjun1997@gmail.com>
  • Loading branch information
wjhuang2016 committed Jun 27, 2023
1 parent 7e7461c commit 3cdecf4
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 13 deletions.
12 changes: 6 additions & 6 deletions br/pkg/lightning/backend/sharedisk/fileformat.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ type KeyValueStore struct {
func Create(ctx context.Context, dataWriter, statWriter storage.ExternalFileWriter) (*KeyValueStore, error) {
kvStore := &KeyValueStore{dataWriter: dataWriter, statWriter: statWriter, ctx: ctx}
kvStore.writeBuffer = make([]byte, writeBufferSize)
kvStore.rc = &RangePropertiesCollector{}
return kvStore, nil
}

func (s KeyValueStore) AddKeyValue(key, value []byte) error {
func (s *KeyValueStore) AddKeyValue(key, value []byte) error {
kvLen := len(key) + len(value) + 16

_, err := s.dataWriter.Write(s.ctx, binary.BigEndian.AppendUint64(nil, uint64(len(key))))
Expand All @@ -62,12 +61,12 @@ func (s KeyValueStore) AddKeyValue(key, value []byte) error {
return err
}

if len(s.rc.lastKey) == 0 || s.offset >= s.rc.propSizeIdxDistance ||
s.keyCnt >= s.rc.propKeysIdxDistance {
if len(s.rc.lastKey) == 0 || s.rc.currProp.Size >= s.rc.propSizeIdxDistance ||
s.rc.currProp.Keys >= s.rc.propKeysIdxDistance {
if len(s.rc.lastKey) != 0 {
s.rc.props = append(s.rc.props, s.rc.currProp)
}
s.rc.currProp = RangeProperty{
s.rc.currProp = &RangeProperty{
Key: key,
offset: s.offset,
rangeOffsets: rangeOffsets{
Expand All @@ -77,6 +76,7 @@ func (s KeyValueStore) AddKeyValue(key, value []byte) error {
}
}

s.rc.lastKey = key
s.bufferOffset += kvLen
s.offset += uint64(kvLen)
s.keyCnt++
Expand All @@ -87,7 +87,7 @@ func (s KeyValueStore) AddKeyValue(key, value []byte) error {
return nil
}

func (s KeyValueStore) Finish() error {
func (s *KeyValueStore) Finish() error {
if s.rc.currProp.Keys > 0 {
s.rc.props = append(s.rc.props, s.rc.currProp)
}
Expand Down
14 changes: 9 additions & 5 deletions br/pkg/lightning/backend/sharedisk/sharedisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ type RangeProperty struct {

// RangePropertiesCollector collects range properties for each range.
type RangePropertiesCollector struct {
props []RangeProperty
currProp RangeProperty
props []*RangeProperty
currProp *RangeProperty
lastOffsets rangeOffsets
lastKey []byte
currentOffsets rangeOffsets
Expand Down Expand Up @@ -96,7 +96,7 @@ func Decode2RangeProperty(data []byte) (*RangeProperty, error) {
}

type Engine struct {
rc RangePropertiesCollector
rc *RangePropertiesCollector
}

// Writer is used to write data into external storage.
Expand Down Expand Up @@ -267,7 +267,7 @@ func (sr *statFileReader) GetNextProp() (*RangeProperty, error) {
}
sr.init = true
}
if len(sr.readBuffer)-sr.currBufferOffset < 4 {
if sr.bufferMaxOffset-sr.currBufferOffset < 4 {
get, err := sr.getMoreDataFromStorage()
if err != nil {
return nil, err
Expand All @@ -279,7 +279,7 @@ func (sr *statFileReader) GetNextProp() (*RangeProperty, error) {
propLen := binary.BigEndian.Uint32(sr.readBuffer[sr.currBufferOffset:])
sr.currBufferOffset += 4

if len(sr.readBuffer)-sr.currBufferOffset < int(propLen) {
if sr.bufferMaxOffset-sr.currBufferOffset < int(propLen) {
get, err := sr.getMoreDataFromStorage()
if err != nil {
return nil, err
Expand Down Expand Up @@ -359,6 +359,9 @@ func (w *Writer) AppendRows(ctx context.Context, columnNames []string, rows enco
}

func (w *Writer) flushKVs(ctx context.Context) error {
if w.batchCount == 0 {
return nil
}
dataWriter, statWriter, err := w.createStorageWriter()
if err != nil {
return err
Expand All @@ -373,6 +376,7 @@ func (w *Writer) flushKVs(ctx context.Context) error {
})

w.kvStore, err = Create(w.ctx, dataWriter, statWriter)
w.kvStore.rc = w.engine.rc

for i := 0; i < w.batchCount; i++ {
err = w.kvStore.AddKeyValue(w.writeBatch[i].Key, w.writeBatch[i].Val)
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/lightning/backend/sharedisk/sharedisk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ func TestWriter(t *testing.T) {
storage, err := storage2.New(context.Background(), backend, &storage2.ExternalStorageOptions{})
require.NoError(t, err)

eg := &Engine{}
writer := &Writer{ctx: context.Background(), engine: eg, memtableSizeLimit: 1024 * 1024, keyAdapter: &local.NoopKeyAdapter{}}
eg := &Engine{rc: &RangePropertiesCollector{}}
eg.rc.propSizeIdxDistance = 2048
eg.rc.propKeysIdxDistance = 256
writer := &Writer{ctx: context.Background(), engine: eg, memtableSizeLimit: 8 * 1024, keyAdapter: &local.NoopKeyAdapter{}}
writer.tikvCodec = keyspace.CodecV1
writer.exStorage = storage
writer.filenamePrefix = "test"
Expand Down

0 comments on commit 3cdecf4

Please sign in to comment.