Skip to content

Commit

Permalink
Merge pull request #7027 from alvasw/persistence
Browse files Browse the repository at this point in the history
Implement AsyncFileWriter
  • Loading branch information
alejandrogarcia83 committed Feb 6, 2024
2 parents 588d57c + 6707af5 commit 7f33c03
Show file tree
Hide file tree
Showing 7 changed files with 348 additions and 0 deletions.
14 changes: 14 additions & 0 deletions persistence/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
plugins {
id 'bisq.java-conventions'
id 'java-library'
}

dependencies {
implementation enforcedPlatform(project(':platform'))
implementation project(':proto')
annotationProcessor libs.lombok
compileOnly libs.lombok
implementation libs.logback.classic
implementation libs.logback.core
implementation libs.slf4j.api
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.persistence;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;

import java.util.concurrent.CompletableFuture;

public class AsyncFileChannelWriter implements AsyncFileWriter {
private final AsynchronousFileChannel fileChannel;

public AsyncFileChannelWriter(AsynchronousFileChannel fileChannel) {
this.fileChannel = fileChannel;
}

@Override
public CompletableFuture<Integer> write(byte[] data, int offset) {
var byteBuffer = ByteBuffer.wrap(data);
var completableFuture = new CompletableFuture<Integer>();
var completionHandler = createCompletionHandler(completableFuture);
fileChannel.write(byteBuffer, offset, null, completionHandler);
return completableFuture;
}

private CompletionHandler<Integer, Object> createCompletionHandler(CompletableFuture<Integer> completableFuture) {
return new CompletionHandler<>() {
@Override
public void completed(Integer writtenData, Object o) {
completableFuture.complete(writtenData);
}

@Override
public void failed(Throwable throwable, Object o) {
completableFuture.completeExceptionally(throwable);
}
};
}
}
24 changes: 24 additions & 0 deletions persistence/src/main/java/bisq/persistence/AsyncFileWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.persistence;

import java.util.concurrent.CompletableFuture;

public interface AsyncFileWriter {
CompletableFuture<Integer> write(byte[] data, int offset);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.persistence;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

public class PersistenceFileWriter {
private final AsyncFileWriter asyncWriter;
private final ExecutorService writeRequestScheduler;

public PersistenceFileWriter(AsyncFileWriter asyncWriter, ExecutorService writeRequestScheduler) {
this.asyncWriter = asyncWriter;
this.writeRequestScheduler = writeRequestScheduler;
}

public CountDownLatch write(byte[] data) {
CountDownLatch writeFinished = new CountDownLatch(1);
scheduleAsyncWrite(data, 0, data.length, writeFinished);
return writeFinished;
}

private void scheduleAsyncWrite(byte[] data, int offset, int size, CountDownLatch writeFinished) {
asyncWriter.write(data, offset)
.thenAcceptAsync(writeUntilEndAsync(data, offset, size, writeFinished), writeRequestScheduler);
}

private Consumer<Integer> writeUntilEndAsync(byte[] data,
int offset,
int totalBytes,
CountDownLatch writeFinished) {
return writtenBytes -> {
if (writtenBytes == totalBytes) {
writeFinished.countDown();
return;
}

int remainingBytes = totalBytes - writtenBytes;
if (remainingBytes > 0) {
int newOffset = offset + writtenBytes;
scheduleAsyncWrite(data, newOffset, remainingBytes, writeFinished);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.persistence;

import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

import java.io.IOException;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

public class AsyncFileChannelWriterTests {
private Path filePath;
private AsynchronousFileChannel fileChannel;
private AsyncFileChannelWriter asyncFileChannelWriter;

@BeforeEach
void setup(@TempDir Path tempDir) throws IOException {
filePath = tempDir.resolve("file");
fileChannel = AsynchronousFileChannel.open(filePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
asyncFileChannelWriter = new AsyncFileChannelWriter(fileChannel);
}

@Test
void writeData() throws IOException, InterruptedException, ExecutionException, TimeoutException {
byte[] expected = new byte[1050];
new Random().nextBytes(expected);

CompletableFuture<Integer> completableFuture = asyncFileChannelWriter.write(expected, 0);

int writtenBytes = completableFuture.get(30, TimeUnit.SECONDS);
while (writtenBytes < expected.length) {
completableFuture = asyncFileChannelWriter.write(expected, writtenBytes);
writtenBytes += completableFuture.get(30, TimeUnit.SECONDS);
}

assertThat(writtenBytes, is(expected.length));

fileChannel.close();
byte[] actual = Files.readAllBytes(filePath);

assertThat(expected, is(actual));
}

@Test
void writeDataAtOffset() throws IOException, InterruptedException, ExecutionException, TimeoutException {
final int startOffset = 100;
byte[] data = new byte[1024];
new Random().nextBytes(data);

CompletableFuture<Integer> completableFuture = asyncFileChannelWriter.write(data, startOffset);

int writtenBytes = startOffset + completableFuture.get(30, TimeUnit.SECONDS);
while (writtenBytes < data.length) {
completableFuture = asyncFileChannelWriter.write(data, writtenBytes);
writtenBytes += completableFuture.get(30, TimeUnit.SECONDS);
}

assertThat(writtenBytes - startOffset, is(data.length));

fileChannel.close();
byte[] readFromFile = Files.readAllBytes(filePath);
assertThat(readFromFile.length, is(startOffset + data.length));

byte[] readFromFileWithoutOffset = Arrays.copyOfRange(readFromFile, 100, readFromFile.length);
assertThat(readFromFileWithoutOffset, is(data));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.persistence;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

@ExtendWith(MockitoExtension.class)
public class PersistenceFileWriterTests {
private static final ExecutorService writeRequestScheduler = Executors.newSingleThreadExecutor();
private final byte[] DATA = new byte[100];
private AsyncFileWriter asyncWriter;
private PersistenceFileWriter fileWriter;

@BeforeEach
void setup(@Mock AsyncFileWriter asyncWriter) {
this.asyncWriter = asyncWriter;
fileWriter = new PersistenceFileWriter(asyncWriter, writeRequestScheduler);
}

@AfterAll
static void teardown() {
writeRequestScheduler.shutdownNow();
}

@Test
void writeInOneGo() throws InterruptedException {
doReturn(completedFuture(DATA.length))
.when(asyncWriter).write(any(), anyInt());

boolean isSuccess = fileWriter.write(DATA)
.await(30, TimeUnit.SECONDS);

assertThat(isSuccess, is(true));
verify(asyncWriter, times(1)).write(any(), anyInt());
}

@Test
void writeInTwoPhases() throws InterruptedException {
doReturn(completedFuture(25), completedFuture(75))
.when(asyncWriter).write(any(), anyInt());

boolean isSuccess = fileWriter.write(DATA)
.await(30, TimeUnit.SECONDS);

assertThat(isSuccess, is(true));
verify(asyncWriter, times(2)).write(any(), anyInt());
}

@Test
void writeInFivePhases() throws InterruptedException {
doReturn(completedFuture(10), completedFuture(20),
completedFuture(30), completedFuture(15),
completedFuture(25))
.when(asyncWriter).write(any(), anyInt());

boolean isSuccess = fileWriter.write(DATA)
.await(30, TimeUnit.SECONDS);

assertThat(isSuccess, is(true));
verify(asyncWriter, times(5)).write(any(), anyInt());
}
}
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ include 'core'
include 'cli'
include 'daemon'
include 'desktop'
include 'persistence'
include 'seednode'
include 'statsnode'
include 'apitest'
Expand Down

0 comments on commit 7f33c03

Please sign in to comment.