Skip to content

Commit

Permalink
open with directio at best effort (#274)
Browse files Browse the repository at this point in the history
Signed-off-by: YangKeao <yangkeao@chunibyo.icu>
  • Loading branch information
YangKeao authored Dec 29, 2022
1 parent 36227a4 commit ddffaa0
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 9 deletions.
2 changes: 1 addition & 1 deletion blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"sync/atomic"
"unsafe"

"github.com/ncw/directio"
"github.com/pingcap/badger/directio"
"github.com/pingcap/badger/epoch"
"github.com/pingcap/badger/fileutil"
"github.com/pingcap/badger/y"
Expand Down
12 changes: 7 additions & 5 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"time"

"github.com/dgryski/go-farm"
"github.com/ncw/directio"
"github.com/pingcap/badger/cache"
"github.com/pingcap/badger/directio"
"github.com/pingcap/badger/epoch"
"github.com/pingcap/badger/options"
"github.com/pingcap/badger/protos"
Expand Down Expand Up @@ -780,7 +780,8 @@ func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {

// batchSet applies a list of badger.Entry. If a request level error occurs it
// will be returned.
// Check(kv.BatchSet(entries))
//
// Check(kv.BatchSet(entries))
func (db *DB) batchSet(entries []*Entry) error {
sort.Slice(entries, func(i, j int) bool {
return entries[i].Key.Compare(entries[j].Key) < 0
Expand All @@ -796,9 +797,10 @@ func (db *DB) batchSet(entries []*Entry) error {
// batchSetAsync is the asynchronous version of batchSet. It accepts a callback
// function which is called when all the sets are complete. If a request level
// error occurs, it will be passed back via the callback.
// err := kv.BatchSetAsync(entries, func(err error)) {
// Check(err)
// }
//
// err := kv.BatchSetAsync(entries, func(err error)) {
// Check(err)
// }
func (db *DB) batchSetAsync(entries []*Entry, f func(error)) error {
req, err := db.sendToWriteCh(entries)
if err != nil {
Expand Down
29 changes: 29 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1865,3 +1865,32 @@ func TestRemoteCompaction(t *testing.T) {
return nil
})
}

func TestNonDirectIO(t *testing.T) {
// if the dir is a tmpfs (or other file system which doesn't support directio), badger
// should still work
dir, err := ioutil.TempDir("", "badger")
require.NoError(t, err)
defer os.RemoveAll(dir)
remoteAddr := "127.0.0.1:4080"
compactionServer, err := NewCompactionServer(remoteAddr)
require.NoError(t, err)
go compactionServer.Run()
defer compactionServer.Close()
opts := getTestOptions(dir)
db, err := Open(opts)
require.NoError(t, err)
defer db.Close()

for i := 0; i < 512; i++ {
err = db.Update(func(txn *Txn) error {
key := []byte(fmt.Sprintf("key%03d", rand.Intn(128)))
val := make([]byte, 1024*4)
copy(val, key)
return txn.Set(key, val)
})
require.NoError(t, err)
}
// flushing memTable shouldn't hang forever even on file system which doesn't support directio
db.flushMemTable().Wait()
}
46 changes: 46 additions & 0 deletions directio/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// directio is a proxy package for github.com/pingcap/badger/directio
package directio

import (
"errors"
"os"

"github.com/ncw/directio"
"golang.org/x/sys/unix"
)

const (
// AlignSize is the size to align the buffer to
AlignSize = directio.AlignSize
// BlockSize is the minimum block size
BlockSize = directio.BlockSize
)

// AlignedBlock returns []byte of size BlockSize aligned to a multiple
// of AlignSize in memory (must be power of two)
var AlignedBlock = directio.AlignedBlock

// OpenFile tries to open file in directio mode. If the file system doesn't support directio,
// it will fallback to open the file directly (without O_DIRECT)
func OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) {
file, err := directio.OpenFile(name, flag, perm)
if errors.Is(err, unix.EINVAL) {
return os.OpenFile(name, flag, perm)
}

return file, err
}
2 changes: 1 addition & 1 deletion fileutil/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"os"

"github.com/ncw/directio"
"github.com/pingcap/badger/directio"
"golang.org/x/time/rate"
)

Expand Down
2 changes: 1 addition & 1 deletion fileutil/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"os"
"testing"

"github.com/ncw/directio"
"github.com/pingcap/badger/directio"
"github.com/stretchr/testify/require"
)

Expand Down
2 changes: 1 addition & 1 deletion levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"sort"
"time"

"github.com/ncw/directio"
"github.com/pingcap/badger/directio"
"github.com/pingcap/badger/epoch"
"github.com/pingcap/badger/options"
"github.com/pingcap/badger/protos"
Expand Down

0 comments on commit ddffaa0

Please sign in to comment.