Skip to content

Commit

Permalink
(fix) Flush output stream before calling fd.sync, #480 (#481)
Browse files Browse the repository at this point in the history
* (fix) Flush output stream before calling fd.sync, #480

* (fix) we should call fsync on the containing directory after move

* code format

* code format

* (fix) wait leader in testShuttingDownLeaderTriggerTimeoutNow

Co-authored-by: jiachun.fjc <jiachun_fjc@163.com>
  • Loading branch information
killme2008 and fengjiachun authored Jul 8, 2020
1 parent b6338f5 commit 8dc7729
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,12 @@ public boolean save(final Message msg, final boolean sync) throws IOException {
Bits.putInt(lenBytes, 0, msgLen);
output.write(lenBytes);
msg.writeTo(output);
if (sync) {
fOut.getFD().sync();
}
output.flush();
}
if (sync) {
Utils.fsync(file);
}

return Utils.atomicMoveFile(file, new File(this.path));
return Utils.atomicMoveFile(file, new File(this.path), sync);
}
}
66 changes: 50 additions & 16 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.AtomicMoveNotSupportedException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RaftError;
Expand All @@ -53,8 +53,9 @@ public final class Utils {
private static final Logger LOG = LoggerFactory.getLogger(Utils.class);

/**
* The configured number of available processors. The default is {@link Runtime#availableProcessors()}.
* This can be overridden by setting the system property "jraft.available_processors".
* The configured number of available processors. The default is
* {@link Runtime#availableProcessors()}. This can be overridden by setting the system property
* "jraft.available_processors".
*/
private static final int CPUS = SystemPropertyUtil.getInt(
"jraft.available_processors", Runtime
Expand Down Expand Up @@ -93,7 +94,8 @@ public final class Utils {
32768);

/**
* Whether use {@link com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor}, true by default.
* Whether use {@link com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor}, true by
* default.
*/
public static final boolean USE_MPSC_SINGLE_THREAD_EXECUTOR = SystemPropertyUtil.getBoolean(
"jraft.use.mpsc.single.thread.executor",
Expand Down Expand Up @@ -158,15 +160,21 @@ public static Future<?> runInThread(final Runnable runnable) {
/**
* Run closure with status in thread pool.
*/
@SuppressWarnings("Convert2Lambda")
public static Future<?> runClosureInThread(final Closure done, final Status status) {
if (done == null) {
return null;
}
return runInThread(() -> {
try {
done.run(status);
} catch (final Throwable t) {
LOG.error("Fail to run done closure", t);

return runInThread(new Runnable() {

@Override
public void run() {
try {
done.run(status);
} catch (final Throwable t) {
LOG.error("Fail to run done closure", t);
}
}
});
}
Expand Down Expand Up @@ -286,8 +294,8 @@ public static long monotonicMs() {
}

/**
* Returns the current time in milliseconds, it's not monotonic,
* would be forwarded/backward by clock synchronous.
* Returns the current time in milliseconds, it's not monotonic, would be forwarded/backward by
* clock synchronous.
*/
public static long nowMs() {
return System.currentTimeMillis();
Expand All @@ -312,15 +320,17 @@ public static <T> T withLockObject(final T obj) {
}

@SuppressWarnings("ConstantConditions")
public static boolean atomicMoveFile(final File source, final File target) throws IOException {
public static boolean atomicMoveFile(final File source, final File target, final boolean sync) throws IOException {
// Move temp file to target path atomically.
// The code comes from https://github.com/jenkinsci/jenkins/blob/master/core/src/main/java/hudson/util/AtomicFileWriter.java#L187
// The code comes from
// https://github.com/jenkinsci/jenkins/blob/master/core/src/main/java/hudson/util/AtomicFileWriter.java#L187
Requires.requireNonNull(source, "source");
Requires.requireNonNull(target, "target");
final Path sourcePath = source.toPath();
final Path targetPath = target.toPath();
boolean success;
try {
return Files.move(sourcePath, targetPath, StandardCopyOption.ATOMIC_MOVE) != null;
success = Files.move(sourcePath, targetPath, StandardCopyOption.ATOMIC_MOVE) != null;
} catch (final IOException e) {
// If it falls here that can mean many things. Either that the atomic move is not supported,
// or something wrong happened. Anyway, let's try to be over-diagnosing
Expand All @@ -335,7 +345,7 @@ public static boolean atomicMoveFile(final File source, final File target) throw
}

try {
return Files.move(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING) != null;
success = Files.move(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING) != null;
} catch (final IOException e1) {
e1.addSuppressed(e);
LOG.warn("Unable to move {} to {}. Attempting to delete {} and abandoning.", sourcePath, targetPath,
Expand All @@ -351,6 +361,30 @@ public static boolean atomicMoveFile(final File source, final File target) throw
throw e1;
}
}
if (success && sync) {
File dir = target.getParentFile();
// fsync on target parent dir.
fsync(dir);
}
return success;
}

/**
* Calls fsync on a file or directory.
* @param file file or directory
* @throws IOException if an I/O error occurs
*/
public static void fsync(final File file) throws IOException {
final boolean isDir = file.isDirectory();
// can't fsync on windowns.
if (isDir && Platform.isWindows()) {
LOG.warn("Unable to fsync directory {} on windows.", file);
return;
}
try (final FileChannel fc = FileChannel.open(file.toPath(), isDir ? StandardOpenOption.READ
: StandardOpenOption.WRITE)) {
fc.force(true);
}
}

public static String getString(final byte[] bs, final int off, final int len) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2641,6 +2641,7 @@ public void testShuttingDownLeaderTriggerTimeoutNow() throws Exception {

Thread.sleep(100);
leader = cluster.getLeader();
cluster.waitLeader();
assertNotNull(leader);
assertNotSame(leader, oldLeader);

Expand Down

0 comments on commit 8dc7729

Please sign in to comment.