Skip to content

Commit

Permalink
Coalesce multiple WAL fsyncs
Browse files Browse the repository at this point in the history
Fsyncs to the WAL can cause higher IO with lots of small writes or
slower disks.  This reworks the previous wal fsyncing to remove the
extra goroutine and remove the hard-coded 100ms delay.  Writes to
the wal still maintain the invariant that they do not return to the
caller until the write is fsync'd.

This also adds a new config options wal-fsync-delay (default 0s)
which can be increased if a delay is desired.  This is somewhat useful
for system with slower disks, but the current default works well as
is.
  • Loading branch information
jwilder committed Mar 15, 2017
1 parent 7bd1bd8 commit e9eb925
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features

- [#8143](https://github.com/influxdata/influxdb/pull/8143): Add WAL sync delay
- [#7977](https://github.com/influxdata/influxdb/issues/7977): Add chunked request processing back into the Go client v2
- [#7974](https://github.com/influxdata/influxdb/pull/7974): Allow non-admin users to execute SHOW DATABASES.
- [#7948](https://github.com/influxdata/influxdb/pull/7948): Reduce memory allocations by reusing gzip.Writers across requests
Expand Down
6 changes: 6 additions & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@
# The directory where the TSM storage engine stores WAL files.
wal-dir = "/var/lib/influxdb/wal"

# The amount of time that a write will wait before fsyncing. A duration
# greater than 0 can be used to batch up multiple fsync calls. This is useful for slower
# disks or when WAL write contention is seen. A value of 0s fsyncs every write to the WAL.
# Values in the range of 0-100ms are recommended for non-SSD disks.
# wal-fsync-delay = "0s"

# Trace logging provides more verbose output around the tsm engine. Turning
# this on can provide more useful output for debugging tsm engine issues.
# trace-logging-enabled = false
Expand Down
5 changes: 5 additions & 0 deletions tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ type Config struct {
// General WAL configuration options
WALDir string `toml:"wal-dir"`

// WALFsyncDelay is the amount of time that a write will wait before fsyncing. A duration
// greater than 0 can be used to batch up multiple fsync calls. This is useful for slower
// disks or when WAL write contention is seen. A value of 0 fsyncs every write to the WAL.
WALFsyncDelay toml.Duration `toml:"wal-fsync-delay"`

// Query logging
QueryLogEnabled bool `toml:"query-log-enabled"`

Expand Down
6 changes: 6 additions & 0 deletions tsdb/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tsdb_test

import (
"testing"
"time"

"github.com/BurntSushi/toml"
"github.com/influxdata/influxdb/tsdb"
Expand All @@ -13,6 +14,7 @@ func TestConfig_Parse(t *testing.T) {
if _, err := toml.Decode(`
dir = "/var/lib/influxdb/data"
wal-dir = "/var/lib/influxdb/wal"
wal-fsync-delay = "10s"
`, &c); err != nil {
t.Fatal(err)
}
Expand All @@ -27,6 +29,10 @@ wal-dir = "/var/lib/influxdb/wal"
if got, exp := c.WALDir, "/var/lib/influxdb/wal"; got != exp {
t.Errorf("unexpected wal-dir:\n\nexp=%v\n\ngot=%v\n\n", exp, got)
}
if got, exp := c.WALFsyncDelay, time.Duration(10*time.Second); time.Duration(got).Nanoseconds() != exp.Nanoseconds() {
t.Errorf("unexpected wal-fsync-delay:\n\nexp=%v\n\ngot=%v\n\n", exp, got)
}

}

func TestConfig_Validate_Error(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ type Engine struct {
// NewEngine returns a new instance of Engine.
func NewEngine(id uint64, path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine {
w := NewWAL(walPath)
w.syncDelay = time.Duration(opt.Config.WALFsyncDelay)

fs := NewFileStore(path)
cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), path)

Expand Down
33 changes: 25 additions & 8 deletions tsdb/engine/tsm1/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ type WAL struct {
closing chan struct{}
// goroutines waiting for the next fsync
syncWaiters chan chan error
syncCount uint64

// syncDelay sets the duration to wait before fsyncing writes. A value of 0 (default)
// will cause every write to be fsync'd. This must be set before the WAL
// is opened if a non-default value is required.
syncDelay time.Duration

// WALOutput is the writer used by the logger.
logger zap.Logger // Logger to be used for important messages
Expand Down Expand Up @@ -221,30 +227,39 @@ func (l *WAL) Open() error {
atomic.StoreInt64(&l.stats.OldBytes, totalOldDiskSize)

l.closing = make(chan struct{})
go l.syncPeriodically()

return nil
}

func (l *WAL) syncPeriodically() {
t := time.NewTicker(100 * time.Millisecond)
defer t.Stop()
for {
// sync will schedule an fsync to the current wal segment and notify any
// waiting gorutines. If an fsync is already scheduled, subsequent calls will
// not schedule a new fsync and will be handle by the existing scheduled fsync.
func (l *WAL) sync() {
// If we're not the first to sync, then another goroutine is fsyncing the wal for us.
if !atomic.CompareAndSwapUint64(&l.syncCount, 0, 1) {
return
}

// Fsync the wal and notify all pending waiters
go func() {
t := time.NewTimer(l.syncDelay)
select {
case <-t.C:
if len(l.syncWaiters) > 0 {
l.mu.Lock()
err := l.currentSegmentWriter.sync()
for i := 0; i < len(l.syncWaiters); i++ {
for len(l.syncWaiters) > 0 {
errC := <-l.syncWaiters
errC <- err
}
l.mu.Unlock()
}
case <-l.closing:
return
t.Stop()
}
}

atomic.StoreUint64(&l.syncCount, 0)
}()
}

// WritePoints writes the given points to the WAL. It returns the WAL segment ID to
Expand Down Expand Up @@ -389,6 +404,8 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) {
return segID, err
}

// schedule an fsync and wait for it to complete
l.sync()
return segID, <-syncErr
}

Expand Down

0 comments on commit e9eb925

Please sign in to comment.