diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/io/ProtoBufFile.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/io/ProtoBufFile.java index 14a6fe289..2e48f155d 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/io/ProtoBufFile.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/io/ProtoBufFile.java @@ -115,11 +115,12 @@ public boolean save(final Message msg, final boolean sync) throws IOException { Bits.putInt(lenBytes, 0, msgLen); output.write(lenBytes); msg.writeTo(output); - if (sync) { - fOut.getFD().sync(); - } + output.flush(); + } + if (sync) { + Utils.fsync(file); } - return Utils.atomicMoveFile(file, new File(this.path)); + return Utils.atomicMoveFile(file, new File(this.path), sync); } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Utils.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Utils.java index 102fa75fe..a266ce24a 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Utils.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Utils.java @@ -21,21 +21,21 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.nio.file.AtomicMoveNotSupportedException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; - import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.error.RaftError; @@ -53,8 +53,9 @@ public final class Utils { private static final Logger LOG = LoggerFactory.getLogger(Utils.class); /** - * The configured number of available processors. The default is {@link Runtime#availableProcessors()}. - * This can be overridden by setting the system property "jraft.available_processors". + * The configured number of available processors. The default is + * {@link Runtime#availableProcessors()}. This can be overridden by setting the system property + * "jraft.available_processors". */ private static final int CPUS = SystemPropertyUtil.getInt( "jraft.available_processors", Runtime @@ -93,7 +94,8 @@ public final class Utils { 32768); /** - * Whether use {@link com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor}, true by default. + * Whether use {@link com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor}, true by + * default. */ public static final boolean USE_MPSC_SINGLE_THREAD_EXECUTOR = SystemPropertyUtil.getBoolean( "jraft.use.mpsc.single.thread.executor", @@ -158,15 +160,21 @@ public static Future runInThread(final Runnable runnable) { /** * Run closure with status in thread pool. */ + @SuppressWarnings("Convert2Lambda") public static Future runClosureInThread(final Closure done, final Status status) { if (done == null) { return null; } - return runInThread(() -> { - try { - done.run(status); - } catch (final Throwable t) { - LOG.error("Fail to run done closure", t); + + return runInThread(new Runnable() { + + @Override + public void run() { + try { + done.run(status); + } catch (final Throwable t) { + LOG.error("Fail to run done closure", t); + } } }); } @@ -286,8 +294,8 @@ public static long monotonicMs() { } /** - * Returns the current time in milliseconds, it's not monotonic, - * would be forwarded/backward by clock synchronous. + * Returns the current time in milliseconds, it's not monotonic, would be forwarded/backward by + * clock synchronous. */ public static long nowMs() { return System.currentTimeMillis(); @@ -312,15 +320,17 @@ public static T withLockObject(final T obj) { } @SuppressWarnings("ConstantConditions") - public static boolean atomicMoveFile(final File source, final File target) throws IOException { + public static boolean atomicMoveFile(final File source, final File target, final boolean sync) throws IOException { // Move temp file to target path atomically. - // The code comes from https://github.com/jenkinsci/jenkins/blob/master/core/src/main/java/hudson/util/AtomicFileWriter.java#L187 + // The code comes from + // https://github.com/jenkinsci/jenkins/blob/master/core/src/main/java/hudson/util/AtomicFileWriter.java#L187 Requires.requireNonNull(source, "source"); Requires.requireNonNull(target, "target"); final Path sourcePath = source.toPath(); final Path targetPath = target.toPath(); + boolean success; try { - return Files.move(sourcePath, targetPath, StandardCopyOption.ATOMIC_MOVE) != null; + success = Files.move(sourcePath, targetPath, StandardCopyOption.ATOMIC_MOVE) != null; } catch (final IOException e) { // If it falls here that can mean many things. Either that the atomic move is not supported, // or something wrong happened. Anyway, let's try to be over-diagnosing @@ -335,7 +345,7 @@ public static boolean atomicMoveFile(final File source, final File target) throw } try { - return Files.move(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING) != null; + success = Files.move(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING) != null; } catch (final IOException e1) { e1.addSuppressed(e); LOG.warn("Unable to move {} to {}. Attempting to delete {} and abandoning.", sourcePath, targetPath, @@ -351,6 +361,30 @@ public static boolean atomicMoveFile(final File source, final File target) throw throw e1; } } + if (success && sync) { + File dir = target.getParentFile(); + // fsync on target parent dir. + fsync(dir); + } + return success; + } + + /** + * Calls fsync on a file or directory. + * @param file file or directory + * @throws IOException if an I/O error occurs + */ + public static void fsync(final File file) throws IOException { + final boolean isDir = file.isDirectory(); + // can't fsync on windowns. + if (isDir && Platform.isWindows()) { + LOG.warn("Unable to fsync directory {} on windows.", file); + return; + } + try (final FileChannel fc = FileChannel.open(file.toPath(), isDir ? StandardOpenOption.READ + : StandardOpenOption.WRITE)) { + fc.force(true); + } } public static String getString(final byte[] bs, final int off, final int len) { diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java index 88165dcc7..5dccd9c58 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java @@ -2641,6 +2641,7 @@ public void testShuttingDownLeaderTriggerTimeoutNow() throws Exception { Thread.sleep(100); leader = cluster.getLeader(); + cluster.waitLeader(); assertNotNull(leader); assertNotSame(leader, oldLeader);