diff --git a/btm/pom.xml b/btm/pom.xml index 1322cd9e..2d5fddd6 100644 --- a/btm/pom.xml +++ b/btm/pom.xml @@ -12,7 +12,7 @@ bundle - + javax.transaction jta provided @@ -70,6 +70,24 @@ provided true + + org.apache.commons + commons-io + 1.3.2 + test + + + org.apache.commons + commons-lang3 + 3.4 + test + + + org.easytesting + fest-assert-core + 2.0M10 + test + diff --git a/btm/src/main/java/bitronix/tm/gui/TransactionLogHeaderPanel.java b/btm/src/main/java/bitronix/tm/gui/TransactionLogHeaderPanel.java index 0d334a3f..4d5a4a01 100644 --- a/btm/src/main/java/bitronix/tm/gui/TransactionLogHeaderPanel.java +++ b/btm/src/main/java/bitronix/tm/gui/TransactionLogHeaderPanel.java @@ -16,6 +16,7 @@ package bitronix.tm.gui; import bitronix.tm.journal.TransactionLogHeader; +import bitronix.tm.journal.InterruptibleLockedRandomAccessFile; import bitronix.tm.utils.Decoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,8 +75,8 @@ public void setPosition(long position) { } public void read(File logFile, boolean active) throws IOException { - RandomAccessFile raf = new RandomAccessFile(logFile, "r"); - TransactionLogHeader header = new TransactionLogHeader(raf.getChannel(), 0L); + InterruptibleLockedRandomAccessFile raf = new InterruptibleLockedRandomAccessFile(logFile, "r"); + TransactionLogHeader header = new TransactionLogHeader(raf, 0L); raf.close(); if (log.isDebugEnabled()) { log.debug("read header: " + header); } setLogFile(logFile); diff --git a/btm/src/main/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFile.java b/btm/src/main/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFile.java new file mode 100644 index 00000000..a10c967d --- /dev/null +++ b/btm/src/main/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFile.java @@ -0,0 +1,104 @@ +package bitronix.tm.journal; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; + +public class InterruptibleLockedRandomAccessFile { + + private final File file; + private final String mode; + private RandomAccessFile openedFile; + private FileChannel fileChannel; + private FileLock fileLock; + private long currentPosition = 0; + private boolean closed; + + public InterruptibleLockedRandomAccessFile(final File file, final String mode) + throws IOException { + this.file = file; + this.mode = mode; + open(); + } + + private synchronized void open() throws IOException { + openedFile = new RandomAccessFile(file, mode); + fileChannel = openedFile.getChannel(); + + final boolean shared = false; + this.fileLock = fileChannel + .tryLock(0, TransactionLogHeader.TIMESTAMP_HEADER, shared); + if (this.fileLock == null) { + throw new IOException("File " + file.getAbsolutePath() + + " is locked. Is another instance already running?"); + } + } + + public synchronized final void close() throws IOException { + try { + if (!fileLock.isValid()) { + checkState(!fileChannel.isOpen(), "invalid/unhandled state"); + return; + } + fileLock.release(); + fileChannel.close(); + openedFile.close(); + } finally { + closed = true; + } + } + + public synchronized void position(final long newPosition) throws IOException { + checkNotClosed(); + reopenFileChannelIfClosed(); + + fileChannel.position(newPosition); + currentPosition = newPosition; + } + + private void checkNotClosed() { + checkState(!closed, "File has been closed"); + } + + private static void checkState(final boolean expression, final String errorMessage) { + if (!expression) { + throw new IllegalStateException(errorMessage); + } + } + + public synchronized void force(final boolean metaData) throws IOException { + checkNotClosed(); + reopenFileChannelIfClosed(); + + fileChannel.force(metaData); + } + + public synchronized int write(final ByteBuffer src, final long position) + throws IOException { + checkNotClosed(); + reopenFileChannelIfClosed(); + + return fileChannel.write(src, position); + } + + public synchronized void read(final ByteBuffer buffer) throws IOException { + checkNotClosed(); + reopenFileChannelIfClosed(); + + fileChannel.read(buffer); + currentPosition = fileChannel.position(); + } + + private void reopenFileChannelIfClosed() throws IOException { + if (!fileChannel.isOpen()) { + open(); + } + + if (fileChannel.position() != currentPosition) { + fileChannel.position(currentPosition); + } + } +} \ No newline at end of file diff --git a/btm/src/main/java/bitronix/tm/journal/TransactionLogAppender.java b/btm/src/main/java/bitronix/tm/journal/TransactionLogAppender.java index 55338cc7..7a96b130 100644 --- a/btm/src/main/java/bitronix/tm/journal/TransactionLogAppender.java +++ b/btm/src/main/java/bitronix/tm/journal/TransactionLogAppender.java @@ -15,17 +15,9 @@ */ package bitronix.tm.journal; -import bitronix.tm.utils.Uid; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.transaction.Status; import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.channels.FileLock; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -35,6 +27,13 @@ import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; +import javax.transaction.Status; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import bitronix.tm.utils.Uid; + /** * Used to write {@link TransactionLogRecord} objects to a log file. * @@ -53,9 +52,7 @@ public class TransactionLogAppender { public static final int END_RECORD = 0x786e7442; private final File file; - private final RandomAccessFile randomeAccessFile; - private final FileChannel fc; - private final FileLock lock; + private final InterruptibleLockedRandomAccessFile randomAccessFile; private final TransactionLogHeader header; private final long maxFileLength; private final AtomicInteger outstandingWrites; @@ -70,14 +67,10 @@ public class TransactionLogAppender { */ public TransactionLogAppender(File file, long maxFileLength) throws IOException { this.file = file; - this.randomeAccessFile = new RandomAccessFile(file, "rw"); - this.fc = randomeAccessFile.getChannel(); - this.header = new TransactionLogHeader(fc, maxFileLength); + this.randomAccessFile = new InterruptibleLockedRandomAccessFile(file, "rw"); + this.header = new TransactionLogHeader(randomAccessFile, maxFileLength); this.maxFileLength = maxFileLength; - this.lock = fc.tryLock(0, TransactionLogHeader.TIMESTAMP_HEADER, false); - if (this.lock == null) - throw new IOException("transaction log file " + file.getName() + " is locked. Is another instance already running?"); - + this.outstandingWrites = new AtomicInteger(); this.danglingRecords = new HashMap>(); @@ -144,7 +137,7 @@ protected void writeLog(TransactionLogRecord tlog) throws IOException { final long writePosition = tlog.getWritePosition(); while (buf.hasRemaining()) { - fc.write(buf, writePosition + buf.position()); + randomAccessFile.write(buf, writePosition + buf.position()); } trackOutstanding(status, gtrid, uniqueNames); @@ -273,18 +266,14 @@ public long getPosition() { return position; } - /** * Close the appender and the underlying file. * @throws IOException if an I/O error occurs. */ protected void close() throws IOException { header.setState(TransactionLogHeader.CLEAN_LOG_STATE); - fc.force(false); - if (lock != null) - lock.release(); - fc.close(); - randomeAccessFile.close(); + randomAccessFile.force(false); + randomAccessFile.close(); } /** @@ -304,7 +293,7 @@ protected TransactionLogCursor getCursor() throws IOException { */ protected void force() throws IOException { if (log.isDebugEnabled()) { log.debug("forcing log writing"); } - fc.force(false); + randomAccessFile.force(false); if (log.isDebugEnabled()) { log.debug("done forcing log"); } } diff --git a/btm/src/main/java/bitronix/tm/journal/TransactionLogHeader.java b/btm/src/main/java/bitronix/tm/journal/TransactionLogHeader.java index 748ab635..9f681aac 100644 --- a/btm/src/main/java/bitronix/tm/journal/TransactionLogHeader.java +++ b/btm/src/main/java/bitronix/tm/journal/TransactionLogHeader.java @@ -69,7 +69,7 @@ public class TransactionLogHeader { */ public final static byte UNCLEAN_LOG_STATE = -1; - private final FileChannel fc; + private final InterruptibleLockedRandomAccessFile file; private final long maxFileLength; private volatile int formatId; @@ -79,25 +79,25 @@ public class TransactionLogHeader { /** * TransactionLogHeader are used to control headers of the specified RandomAccessFile. - * @param fc the file channel to read from. + * @param randomAccessFile the file to read from. * @param maxFileLength the max file length. * @throws IOException if an I/O error occurs. */ - public TransactionLogHeader(FileChannel fc, long maxFileLength) throws IOException { - this.fc = fc; + public TransactionLogHeader(InterruptibleLockedRandomAccessFile randomAccessFile, long maxFileLength) throws IOException { + this.file = randomAccessFile; this.maxFileLength = maxFileLength; - fc.position(FORMAT_ID_HEADER); + randomAccessFile.position(FORMAT_ID_HEADER); ByteBuffer buf = ByteBuffer.allocate(4 + 8 + 1 + 8); while (buf.hasRemaining()) { - this.fc.read(buf); + this.file.read(buf); } buf.flip(); formatId = buf.getInt(); timestamp = buf.getLong(); state = buf.get(); position = buf.getLong(); - fc.position(position); + randomAccessFile.position(position); if (log.isDebugEnabled()) { log.debug("read header " + this); } } @@ -149,7 +149,7 @@ public void setFormatId(int formatId) throws IOException { buf.putInt(formatId); buf.flip(); while (buf.hasRemaining()) { - fc.write(buf, FORMAT_ID_HEADER + buf.position()); + file.write(buf, FORMAT_ID_HEADER + buf.position()); } this.formatId = formatId; } @@ -165,7 +165,7 @@ public void setTimestamp(long timestamp) throws IOException { buf.putLong(timestamp); buf.flip(); while (buf.hasRemaining()) { - fc.write(buf, TIMESTAMP_HEADER + buf.position()); + file.write(buf, TIMESTAMP_HEADER + buf.position()); } this.timestamp = timestamp; } @@ -181,7 +181,7 @@ public void setState(byte state) throws IOException { buf.put(state); buf.flip(); while (buf.hasRemaining()) { - fc.write(buf, STATE_HEADER + buf.position()); + file.write(buf, STATE_HEADER + buf.position()); } this.state = state; } @@ -202,11 +202,11 @@ public void setPosition(long position) throws IOException { buf.putLong(position); buf.flip(); while (buf.hasRemaining()) { - fc.write(buf, CURRENT_POSITION_HEADER + buf.position()); + file.write(buf, CURRENT_POSITION_HEADER + buf.position()); } this.position = position; - fc.position(position); + file.position(position); } /** diff --git a/btm/src/test/java/bitronix/tm/journal/ByteBufferUtil.java b/btm/src/test/java/bitronix/tm/journal/ByteBufferUtil.java new file mode 100644 index 00000000..92f02eda --- /dev/null +++ b/btm/src/test/java/bitronix/tm/journal/ByteBufferUtil.java @@ -0,0 +1,18 @@ +package bitronix.tm.journal; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +public final class ByteBufferUtil { + + private ByteBufferUtil() { + } + + public static ByteBuffer createByteBuffer(final String input) + throws UnsupportedEncodingException { + final byte[] inputArray = input.getBytes("UTF-8"); + final ByteBuffer byteBuffer = ByteBuffer.wrap(inputArray); + return byteBuffer; + } + +} diff --git a/btm/src/test/java/bitronix/tm/journal/DiskJournalInterruptTest.java b/btm/src/test/java/bitronix/tm/journal/DiskJournalInterruptTest.java new file mode 100644 index 00000000..d04b99a7 --- /dev/null +++ b/btm/src/test/java/bitronix/tm/journal/DiskJournalInterruptTest.java @@ -0,0 +1,73 @@ +package bitronix.tm.journal; + +import java.io.IOException; +import java.nio.channels.ClosedChannelException; +import java.util.HashSet; +import java.util.Set; + +import javax.transaction.Status; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import bitronix.tm.utils.UidGenerator; + +/** + * Source: http://bitronix-transaction-manager.10986.n7.nabble.com/Fix-for-BTM-138-td1701.html + * + * @author Kazuya Uno + */ +public class DiskJournalInterruptTest { + + private DiskJournal diskJournal = new DiskJournal(); + private Set names = new HashSet(); + + @Before + public void setUp() throws IOException { + diskJournal.open(); + } + + @After + public void tearDown() throws IOException { + diskJournal.close(); + } + + @Test + public void testShouldInterruptOnAThreadDontCauseOtherThreadToFail() + throws Exception { + // given: a thread writing logs + Thread thread = new Thread() { + + @Override + public void run() { + try { + writeLog(); + } catch (IOException e) { + // normal + } + }; + }; + thread.start(); + + // when thread is interrupted + thread.interrupt(); + + // this detect closed channel and reopen logs + try { + writeLog(); + } catch (ClosedChannelException cce) { + // this is expected. + } + + // then writing logs should work + writeLog(); + + } + + private void writeLog() throws IOException { + diskJournal.log(Status.STATUS_COMMITTED, UidGenerator.generateUid(), + names); + } + +} \ No newline at end of file diff --git a/btm/src/test/java/bitronix/tm/journal/InterruptService.java b/btm/src/test/java/bitronix/tm/journal/InterruptService.java new file mode 100644 index 00000000..0de8ea52 --- /dev/null +++ b/btm/src/test/java/bitronix/tm/journal/InterruptService.java @@ -0,0 +1,53 @@ +package bitronix.tm.journal; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class InterruptService { + + private final InterruptableThreadList interruptableThreadList; + private final AtomicLong successfulInterrupts = new AtomicLong(); + private ExecutorService executorService; + private final AtomicBoolean run = new AtomicBoolean(true); + + public InterruptService(final InterruptableThreadList interruptableThreadList) { + if (interruptableThreadList == null) { + throw new NullPointerException("threadList cannot be null"); + } + this.interruptableThreadList = interruptableThreadList; + } + + public void start() { + executorService = Executors.newSingleThreadExecutor(); + final Runnable interrupter = new Runnable() { + @Override + public void run() { + while (run.get()) { + final boolean successfulInterrupt = interruptableThreadList + .interruptRandomThread(); + if (successfulInterrupt) { + successfulInterrupts.incrementAndGet(); + } + } + } + }; + executorService.submit(interrupter); + } + + public void stop() throws InterruptedException { + run.set(false); + executorService.shutdown(); + final boolean terminated = executorService.awaitTermination(2, + TimeUnit.SECONDS); + if (!terminated) { + throw new IllegalStateException("termination"); + } + } + + public long getSuccessfulInterrupts() { + return successfulInterrupts.get(); + } +} diff --git a/btm/src/test/java/bitronix/tm/journal/InterruptableThreadList.java b/btm/src/test/java/bitronix/tm/journal/InterruptableThreadList.java new file mode 100644 index 00000000..3066942b --- /dev/null +++ b/btm/src/test/java/bitronix/tm/journal/InterruptableThreadList.java @@ -0,0 +1,40 @@ +package bitronix.tm.journal; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +public class InterruptableThreadList { + + private final List threads = new ArrayList(); + + public InterruptableThreadList() { + } + + public synchronized void addCurrentThread() { + final Thread currentThread = Thread.currentThread(); + threads.add(currentThread); + Thread.yield(); + } + + public synchronized void removeCurrentThread() { + final Thread currentThread = Thread.currentThread(); + threads.remove(currentThread); + } + + /** + * + * @return true on successful interruption + */ + public synchronized boolean interruptRandomThread() { + if (threads.isEmpty()) { + return false; + } + final Random random = new Random(); + final int threadIndex = random.nextInt(threads.size()); + + final Thread thread = threads.get(threadIndex); + thread.interrupt(); + return true; + } +} diff --git a/btm/src/test/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFileStressTest.java b/btm/src/test/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFileStressTest.java new file mode 100644 index 00000000..4bf6c746 --- /dev/null +++ b/btm/src/test/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFileStressTest.java @@ -0,0 +1,149 @@ +package bitronix.tm.journal; + +import static bitronix.tm.journal.ByteBufferUtil.createByteBuffer; +import static org.fest.assertions.api.Assertions.assertThat; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class InterruptibleLockedRandomAccessFileStressTest { + + @Rule + public final TemporaryFolder folder = new TemporaryFolder(); + + private File inputFile; + + private final InterruptableThreadList threadList = new InterruptableThreadList(); + + private final InterruptService interruptService = new InterruptService( + threadList); + + @Before + public void setUp() throws Exception { + inputFile = folder.newFile("bitronix-stresstest.log"); + interruptService.start(); + } + + @After + public void tearDown() throws Exception { + interruptService.stop(); + } + + @Test + public void stressTestWriteInterrupts() throws Exception { + final int recordLength = 15; + final int taskNumber = 10000; + initializeFileContent(recordLength, taskNumber); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + + final ExecutorService executorService = Executors.newFixedThreadPool(4); + + final AtomicLong successfulWrites = new AtomicLong(); + final AtomicLong writeErrors = new AtomicLong(); + for (int i = 0; i < taskNumber; i++) { + final int taskId = i; + final String data = createRecord(taskId); + assertThat(data.length()).isLessThanOrEqualTo(recordLength); + executorService.submit(new Runnable() { + + @Override + public void run() { + threadList.addCurrentThread(); + try { + final int position = taskId * recordLength; + file.write(createByteBuffer(data), position); + successfulWrites.incrementAndGet(); + } catch (final Exception expected) { + writeErrors.incrementAndGet(); + } finally { + threadList.removeCurrentThread(); + } + } + }); + } + + shutdownExecutor(executorService, 30, TimeUnit.SECONDS); + + file.close(); + + assertThat(successfulWrites.get() + writeErrors.get()).isEqualTo( + taskNumber); + + final long writtenRecords = countWrittenRecords(taskNumber); + final long missingRecords = countMissingRecords(taskNumber); + assertThat(writtenRecords + missingRecords).isEqualTo(taskNumber); + + // System.out.println("written: " + writtenRecords); + // System.out.println("missing: " + missingRecords); + // System.out.println("successful writes: " + successfulWrites); + // System.out.println("write errors: " + writeErrors); + // System.out.println("interrupts: " + // + interruptService.getSuccessfulInterrupts()); + + assertThat(writtenRecords).isGreaterThanOrEqualTo( + successfulWrites.get()); + assertThat(missingRecords).isLessThanOrEqualTo(writeErrors.get()); + assertThat(interruptService.getSuccessfulInterrupts()) + .isGreaterThanOrEqualTo(missingRecords); + } + + private void initializeFileContent(final int recordLength, + final int taskNumber) throws Exception { + final String initialFileConent = StringUtils.repeat(".", recordLength + * taskNumber); + FileUtils.writeStringToFile(inputFile, initialFileConent); + } + + private String createRecord(final int recordId) { + return String.format("data%5dX", recordId); + } + + private long countMissingRecords(final int taskNumber) throws Exception { + final String writtenContent = FileUtils.readFileToString(inputFile, + "UTF-8"); + long missingRecords = 0; + for (int taskId = 0; taskId < taskNumber; taskId++) { + final String data = createRecord(taskId); + if (!writtenContent.contains(data)) { + missingRecords++; + } + } + return missingRecords; + } + + private long countWrittenRecords(final int taskNumber) throws Exception { + final String writtenContent = FileUtils.readFileToString(inputFile, + "UTF-8"); + long writtenRecords = 0; + for (int taskId = 0; taskId < taskNumber; taskId++) { + final String data = createRecord(taskId); + if (writtenContent.contains(data)) { + writtenRecords++; + } + } + return writtenRecords; + } + + private void shutdownExecutor(final ExecutorService executorService, + final long timeout, final TimeUnit timeoutUnit) + throws InterruptedException { + executorService.shutdown(); + final boolean terminated = executorService.awaitTermination(timeout, + timeoutUnit); + assertTrue("termination", terminated); + } +} diff --git a/btm/src/test/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFileTest.java b/btm/src/test/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFileTest.java new file mode 100644 index 00000000..e5f8a25f --- /dev/null +++ b/btm/src/test/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFileTest.java @@ -0,0 +1,387 @@ +package bitronix.tm.journal; + +import static bitronix.tm.journal.ByteBufferUtil.createByteBuffer; +import static org.fest.assertions.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.OverlappingFileLockException; + +import org.apache.commons.io.FileUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class InterruptibleLockedRandomAccessFileTest { + + @Rule + public final TemporaryFolder folder = new TemporaryFolder(); + + private File inputFile; + + @Before + public void setUp() throws Exception { + inputFile = folder.newFile("btmlog-test.log"); + } + + @Test + public void testOpenClose() throws Exception { + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + file.close(); + } + + @Test + public void testLockedOpen() throws Exception { + final RandomAccessFile firstFile = new RandomAccessFile(inputFile, "rw"); + final FileChannel fileChannel = firstFile.getChannel(); + final FileLock lock = fileChannel.tryLock(); + assertNotNull("null lock", lock); + + try { + new InterruptibleLockedRandomAccessFile(inputFile, "rw"); + fail("should not open a locked file"); + } catch (OverlappingFileLockException expected) { + } finally { + lock.release(); + fileChannel.close(); + firstFile.close(); + } + } + + @Test + public void testReadAfterClose() throws Exception { + final String data = "testdata"; + FileUtils.writeStringToFile(inputFile, data, "UTF-8"); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + file.close(); + + try { + readFile(file, 1); + fail("should not read after close"); + } catch (final IllegalStateException expected) { + } + } + + @Test + public void testWriteAfterClose() throws Exception { + final String data = "testdata"; + FileUtils.writeStringToFile(inputFile, data, "UTF-8"); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + file.close(); + + final ByteBuffer buffer = createByteBuffer("testdata"); + final long position = 0L; + try { + file.write(buffer, position); + fail("should not write after close"); + } catch (final IllegalStateException expected) { + } + } + + @Test + public void testForceAfterClose() throws Exception { + final String data = "testdata"; + FileUtils.writeStringToFile(inputFile, data, "UTF-8"); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + file.close(); + + try { + final boolean metaData = true; + file.force(metaData); + fail("should not force after close"); + } catch (final IllegalStateException expected) { + } + } + + @Test + public void testPositionAfterClose() throws Exception { + final String data = "testdata"; + FileUtils.writeStringToFile(inputFile, data, "UTF-8"); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + file.close(); + + try { + file.position(1L); + fail("should not position after close"); + } catch (final IllegalStateException expected) { + } + } + + @Test + public void testRead() throws Exception { + final String data = "testdata"; + FileUtils.writeStringToFile(inputFile, data, "UTF-8"); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + verifyRead(data, file); + file.close(); + } + + @Test + public void testReadTwice() throws Exception { + final String data = "testdata"; + FileUtils.writeStringToFile(inputFile, data, "UTF-8"); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + + verifyRead(data, file); + file.position(0L); + verifyRead(data, file); + + file.close(); + } + + @Test + public void testReadAfterInterrupt() throws Exception { + final String data = "testdataTESTDATA"; + FileUtils.writeStringToFile(inputFile, data, "UTF-8"); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + + verifyRead(data, file); + file.position(0L); + interruptCurrentThread(); + try { + verifyRead(data, file); + fail("interrupt should close the FileChannel"); + } catch (final ClosedByInterruptException expected) { + } + + clearInterruptedFlag(); + + verifyRead(data, file); + file.close(); + } + + private void verifyRead(final String expectedData, + final InterruptibleLockedRandomAccessFile file) throws Exception { + final int bytesToRead = expectedData.getBytes("UTF-8").length; + final String readData = readFile(file, bytesToRead); + + assertThat(readData).isEqualTo(expectedData); + } + + private String readFile(final InterruptibleLockedRandomAccessFile file, + final int bytesToRead) throws IOException { + final ByteBuffer inputBuffer = ByteBuffer.allocate(bytesToRead); + file.read(inputBuffer); + return toString(inputBuffer); + } + + private String toString(ByteBuffer buffer) + throws UnsupportedEncodingException { + return new String(buffer.array(), "UTF-8"); + } + + @Test + public void testWrite() throws Exception { + final String data = "testdata"; + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + + final ByteBuffer outputBuffer = createByteBuffer(data); + + final long position = 0L; + file.write(outputBuffer, position); + + file.close(); + + verifyFileContent(inputFile, data); + } + + @Test + public void testWriteAndForce() throws Exception { + final String data = "testdata"; + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + + final ByteBuffer outputBuffer = createByteBuffer(data); + + final long position = 0L; + file.write(outputBuffer, position); + file.force(true); + + file.close(); + + verifyFileContent(inputFile, data); + } + + private void verifyFileContent(final File file, final String expectedData) + throws IOException { + final String fileContent = FileUtils.readFileToString(inputFile, + "UTF-8"); + assertEquals(expectedData, fileContent); + } + + @Test + public void testTwoWrites() throws Exception { + final String dataOne = "testdata"; + final String dataTwo = "TESTDATA"; + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + + final ByteBuffer dataOneBuffer = createByteBuffer(dataOne); + file.write(dataOneBuffer, 0L); + file.write(createByteBuffer(dataTwo), dataOneBuffer.capacity()); + + file.close(); + + verifyFileContent(inputFile, dataOne + dataTwo); + } + + @Test + public void testWriteAfterInterrupt() throws Exception { + final String dataOne = "testdata"; + final String dataTwo = "TESTDATA"; + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + file.write(createByteBuffer(dataOne), 0L); + + interruptCurrentThread(); + + try { + file.write(createByteBuffer(dataTwo), dataOne.length()); + } catch (final ClosedByInterruptException expected) { + } + + clearInterruptedFlag(); + + final String dataThree = "__third__"; + file.write(createByteBuffer(dataThree), dataOne.length()); + + file.close(); + + verifyFileContent(inputFile, dataOne + dataThree); + } + + @Test + public void testForceAfterInterrupt() throws Exception { + final String dataOne = "testdata"; + final String dataTwo = "TESTDATA"; + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + file.write(createByteBuffer(dataOne), 0L); + + interruptCurrentThread(); + + try { + file.write(createByteBuffer(dataTwo), dataOne.length()); + } catch (final ClosedByInterruptException expected) { + } + + clearInterruptedFlag(); + + file.force(true); + + file.close(); + + verifyFileContent(inputFile, dataOne); + } + + @Test + public void testFilePositionSetOnWriteInterrupt() throws Exception { + final String dataOne = "testdata"; + FileUtils.writeStringToFile(inputFile, dataOne, "UTF-8"); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + + final ByteBuffer outputBuffer = createByteBuffer(dataOne); + + long position = 2; + file.position(position); + + interruptCurrentThread(); + try { + file.write(outputBuffer, 1L); + fail("writing a FileChannel should fail on an interrupted thread"); + } catch (ClosedChannelException expected) { + } + clearInterruptedFlag(); + + final String readData = readFile(file, 5); + file.close(); + + assertEquals("read from file", "stdat", readData); + } + + @Test + public void testFilePositionSetOnReadInterrupt() throws Exception { + final String dataOne = "testdata"; + FileUtils.writeStringToFile(inputFile, dataOne, "UTF-8"); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + + readFile(file, 4); + + interruptCurrentThread(); + try { + readFile(file, 2); + fail("reading a FileChannel should fail on an interrupted thread"); + } catch (ClosedChannelException expected) { + } + clearInterruptedFlag(); + + final String readData = readFile(file, 4); + file.close(); + + assertEquals("read from file", "data", readData); + } + + @Test + public void testCloseAfterInterrupt() throws Exception { + final String data = "testdata"; + FileUtils.writeStringToFile(inputFile, data, "UTF-8"); + + final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile( + inputFile, "rw"); + + interruptCurrentThread(); + try { + readFile(file, 1); + fail("should not read after interrupt"); + } catch (final ClosedChannelException expected) { + } + clearInterruptedFlag(); + + file.close(); + } + + private void interruptCurrentThread() { + Thread.currentThread().interrupt(); + } + + private void clearInterruptedFlag() { + Thread.interrupted(); + } + +}