Skip to content

Commit

Permalink
Fix stream writer flushing to be thread safe.
Browse files Browse the repository at this point in the history
This change fixes the raft streams so that Flush() is not called
asynchronously while the snapshot is being written.
  • Loading branch information
benbjohnson committed Apr 15, 2015
1 parent 817c2e7 commit d2bcc7a
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 15 deletions.
4 changes: 3 additions & 1 deletion messaging/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,9 @@ func CopyFlush(dst io.Writer, src io.Reader) (written int64, err error) {
}

// Flush after write.
if dst, ok := dst.(http.Flusher); ok {
if dst, ok := dst.(interface {
Flush()
}); ok {
dst.Flush()
}

Expand Down
25 changes: 11 additions & 14 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,9 +1357,6 @@ func (l *Log) appendToWriters(buf []byte) {
i--
continue
}

// Flush, if possible.
flushWriter(w.Writer)
}
}

Expand Down Expand Up @@ -1808,15 +1805,6 @@ func (l *Log) removeWriter(writer *logWriter) {
return
}

// Flush pushes out buffered data for all open writers.
func (l *Log) Flush() {
l.lock()
defer l.unlock()
for _, w := range l.writers {
flushWriter(w.Writer)
}
}

// ReadFrom continually reads log entries from a reader.
func (l *Log) ReadFrom(r io.ReadCloser) error {
l.tracef("ReadFrom")
Expand Down Expand Up @@ -1961,12 +1949,21 @@ type logWriter struct {
}

// Write writes bytes to the underlying writer.
// The write is ignored if the writer is currently snapshotting.
func (w *logWriter) Write(p []byte) (int, error) {
// Ignore if the writer is currently snapshotting.
if w.snapshotIndex != 0 {
return 0, nil
}
return w.Writer.Write(p)

// Write to underlying writer.
n, err := w.Writer.Write(p)
if err != nil {
return n, err
}

// Flush writer if successful.
flushWriter(w.Writer)
return n, err
}

func (w *logWriter) Close() error {
Expand Down

0 comments on commit d2bcc7a

Please sign in to comment.