From d2bcc7ae87a34cb4746e03384149a88e9df391e3 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 15 Apr 2015 11:58:47 -0600 Subject: [PATCH] Fix stream writer flushing to be thread safe. This change fixes the raft streams so that Flush() is not called asynchronously while the snapshot is being written. --- messaging/handler.go | 4 +++- raft/log.go | 25 +++++++++++-------------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/messaging/handler.go b/messaging/handler.go index c5c2390a81f..bbba9141301 100644 --- a/messaging/handler.go +++ b/messaging/handler.go @@ -231,7 +231,9 @@ func CopyFlush(dst io.Writer, src io.Reader) (written int64, err error) { } // Flush after write. - if dst, ok := dst.(http.Flusher); ok { + if dst, ok := dst.(interface { + Flush() + }); ok { dst.Flush() } diff --git a/raft/log.go b/raft/log.go index 26be8cec0d1..08d4b24d162 100644 --- a/raft/log.go +++ b/raft/log.go @@ -1357,9 +1357,6 @@ func (l *Log) appendToWriters(buf []byte) { i-- continue } - - // Flush, if possible. - flushWriter(w.Writer) } } @@ -1808,15 +1805,6 @@ func (l *Log) removeWriter(writer *logWriter) { return } -// Flush pushes out buffered data for all open writers. -func (l *Log) Flush() { - l.lock() - defer l.unlock() - for _, w := range l.writers { - flushWriter(w.Writer) - } -} - // ReadFrom continually reads log entries from a reader. func (l *Log) ReadFrom(r io.ReadCloser) error { l.tracef("ReadFrom") @@ -1961,12 +1949,21 @@ type logWriter struct { } // Write writes bytes to the underlying writer. -// The write is ignored if the writer is currently snapshotting. func (w *logWriter) Write(p []byte) (int, error) { + // Ignore if the writer is currently snapshotting. if w.snapshotIndex != 0 { return 0, nil } - return w.Writer.Write(p) + + // Write to underlying writer. + n, err := w.Writer.Write(p) + if err != nil { + return n, err + } + + // Flush writer if successful. + flushWriter(w.Writer) + return n, err } func (w *logWriter) Close() error {