Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #189] DefaultMmapFile memory usage Optimize #190

Merged
merged 1 commit into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,10 +40,23 @@ public class DefaultMmapFile extends ReferenceResource implements MmapFile {
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);

final AtomicInteger startPosition = new AtomicInteger(0);
final AtomicInteger wrotePosition = new AtomicInteger(0);
final AtomicInteger committedPosition = new AtomicInteger(0);
final AtomicInteger flushedPosition = new AtomicInteger(0);
private static final AtomicIntegerFieldUpdater<DefaultMmapFile> START_POSITION_UPDATER;
private static final AtomicIntegerFieldUpdater<DefaultMmapFile> WROTE_POSITION_UPDATER;
private static final AtomicIntegerFieldUpdater<DefaultMmapFile> COMMITTED_POSITION_UPDATER;
private static final AtomicIntegerFieldUpdater<DefaultMmapFile> FLUSHED_POSITION_UPDATER;

static {
START_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultMmapFile.class, "startPosition");
WROTE_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultMmapFile.class, "wrotePosition");
COMMITTED_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultMmapFile.class, "committedPosition");
FLUSHED_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultMmapFile.class, "flushedPosition");
}

private volatile int startPosition = 0;
private volatile int wrotePosition = 0;
private volatile int committedPosition = 0;
private volatile int flushedPosition = 0;

protected File file;
int fileSize;
long fileFromOffset;
Expand Down Expand Up @@ -178,13 +192,13 @@ public boolean appendMessage(final byte[] data) {
*/
@Override
public boolean appendMessage(final byte[] data, final int offset, final int length) {
int currentPos = this.wrotePosition.get();
int currentPos = this.wrotePosition;

if ((currentPos + length) <= this.fileSize) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
byteBuffer.put(data, offset, length);
this.wrotePosition.addAndGet(length);
WROTE_POSITION_UPDATER.addAndGet(this, length);
return true;
}
return false;
Expand All @@ -204,24 +218,24 @@ public int flush(final int flushLeastPages) {
logger.error("Error occurred when force data to disk.", e);
}

this.flushedPosition.set(value);
FLUSHED_POSITION_UPDATER.set(this, value);
this.release();
} else {
logger.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
logger.warn("in flush, hold failed, flush offset = " + this.flushedPosition);
FLUSHED_POSITION_UPDATER.set(this, getReadPosition());
}
}
return this.getFlushedPosition();
}

@Override
public int commit(final int commitLeastPages) {
this.committedPosition.set(this.wrotePosition.get());
return this.committedPosition.get();
COMMITTED_POSITION_UPDATER.set(this, this.wrotePosition);
return this.committedPosition;
}

private boolean isAbleToFlush(final int flushLeastPages) {
int flushedPos = this.flushedPosition.get();
int flushedPos = this.flushedPosition;
int writePos = getReadPosition();

if (this.isFull()) {
Expand All @@ -237,27 +251,27 @@ private boolean isAbleToFlush(final int flushLeastPages) {

@Override
public int getFlushedPosition() {
return flushedPosition.get();
return this.flushedPosition;
}

@Override
public void setFlushedPosition(int pos) {
this.flushedPosition.set(pos);
FLUSHED_POSITION_UPDATER.set(this, pos);
}

@Override
public int getStartPosition() {
return startPosition.get();
return this.startPosition;
}

@Override
public void setStartPosition(int startPosition) {
this.startPosition.set(startPosition);
START_POSITION_UPDATER.set(this, startPosition);
}

@Override
public boolean isFull() {
return this.fileSize == this.wrotePosition.get();
return this.fileSize == this.wrotePosition;
}

@Override
Expand Down Expand Up @@ -381,25 +395,25 @@ public boolean destroy(final long intervalForcibly) {

@Override
public int getWrotePosition() {
return wrotePosition.get();
return this.wrotePosition;
}

@Override
public void setWrotePosition(int pos) {
this.wrotePosition.set(pos);
WROTE_POSITION_UPDATER.set(this, pos);
}

/**
* @return The max position which have valid data
*/
@Override
public int getReadPosition() {
return this.wrotePosition.get();
return this.wrotePosition;
}

@Override
public void setCommittedPosition(int pos) {
this.committedPosition.set(pos);
COMMITTED_POSITION_UPDATER.set(this, pos);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2017-2022 The DLedger Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.storage.dledger.store.file;

import io.openmessaging.storage.dledger.ServerTestBase;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class DefaultMmapFileTest extends ServerTestBase {

@Test
public void testDefaultMmapFile() throws Exception{

String path = "/opt/file/a/";
bases.add(path);
DefaultMmapFile file = new DefaultMmapFile(path+"0000",1023);

file.setStartPosition(1);
file.setWrotePosition(2);
file.setFlushedPosition(3);
file.setCommittedPosition(1);

Assertions.assertEquals(1, file.getStartPosition());
Assertions.assertEquals(2, file.getWrotePosition());
Assertions.assertEquals(3, file.getFlushedPosition());
Assertions.assertEquals(2, file.commit(1));
}


}