Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-208: Add checkstyle policy to java project #96

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 120 additions & 99 deletions java/memory/src/main/java/io/netty/buffer/ArrowBuf.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,23 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.netty.buffer;

import org.apache.arrow.memory.BufferAllocator;

/**
* Allows us to decorate ArrowBuf to make it expandable so that we can use them in the context of the Netty framework
* Allows us to decorate ArrowBuf to make it expandable so that we can use them in the context of
* the Netty framework
* (thus supporting RPC level memory accounting).
*/
public class ExpandableByteBuf extends MutableWrappedByteBuf {
Expand Down
9 changes: 6 additions & 3 deletions java/memory/src/main/java/io/netty/buffer/LargeBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,24 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.netty.buffer;

/**
* A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and counts.
* A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and
* counts.
*/
public class LargeBuffer extends MutableWrappedByteBuf {

public LargeBuffer(ByteBuf buffer) {
super(buffer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.netty.buffer;

import java.io.IOException;
Expand All @@ -26,16 +27,12 @@
import java.nio.channels.ScatteringByteChannel;

/**
* This is basically a complete copy of DuplicatedByteBuf. We copy because we want to override some behaviors and make
* This is basically a complete copy of DuplicatedByteBuf. We copy because we want to override
* some behaviors and make
* buffer mutable.
*/
abstract class MutableWrappedByteBuf extends AbstractByteBuf {

@Override
public ByteBuffer nioBuffer(int index, int length) {
return unwrap().nioBuffer(index, length);
}

ByteBuf buffer;

public MutableWrappedByteBuf(ByteBuf buffer) {
Expand All @@ -50,6 +47,11 @@ public MutableWrappedByteBuf(ByteBuf buffer) {
setIndex(buffer.readerIndex(), buffer.writerIndex());
}

@Override
public ByteBuffer nioBuffer(int index, int length) {
return unwrap().nioBuffer(index, length);
}

@Override
public ByteBuf unwrap() {
return buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,44 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.netty.buffer;

import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED;
import io.netty.util.internal.StringUtil;

import org.apache.arrow.memory.OutOfMemoryException;

import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.arrow.memory.OutOfMemoryException;

import io.netty.util.internal.StringUtil;
import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED;

/**
* The base allocator that we use for all of Arrow's memory management. Returns UnsafeDirectLittleEndian buffers.
* The base allocator that we use for all of Arrow's memory management. Returns
* UnsafeDirectLittleEndian buffers.
*/
public class PooledByteBufAllocatorL {
private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator");

private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow" +
".allocator");

private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
public final UnsafeDirectLittleEndian empty;
private final AtomicLong hugeBufferSize = new AtomicLong(0);
private final AtomicLong hugeBufferCount = new AtomicLong(0);
private final AtomicLong normalBufferSize = new AtomicLong(0);
private final AtomicLong normalBufferCount = new AtomicLong(0);

private final InnerAllocator allocator;
public final UnsafeDirectLittleEndian empty;

public PooledByteBufAllocatorL() {
allocator = new InnerAllocator();
Expand Down Expand Up @@ -78,6 +80,7 @@ public long getNormalBufferCount() {
}

private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian {

private final long initialCapacity;
private final AtomicLong count;
private final AtomicLong size;
Expand All @@ -89,7 +92,8 @@ private AccountedUnsafeDirectLittleEndian(LargeBuffer buf, AtomicLong count, Ato
this.size = size;
}

private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count, AtomicLong size) {
private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count,
AtomicLong size) {
super(buf);
this.initialCapacity = buf.capacity();
this.count = count;
Expand Down Expand Up @@ -119,6 +123,7 @@ public boolean release(int decrement) {
}

private class InnerAllocator extends PooledByteBufAllocator {

private final PoolArena<ByteBuffer>[] directArenas;
private final MemoryStatusThread statusThread;
private final int chunkSize;
Expand All @@ -131,7 +136,8 @@ public InnerAllocator() {
f.setAccessible(true);
this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
} catch (Exception e) {
throw new RuntimeException("Failure while initializing allocator. Unable to retrieve direct arenas field.", e);
throw new RuntimeException("Failure while initializing allocator. Unable to retrieve " +
"direct arenas field.", e);
}

this.chunkSize = directArenas[0].chunkSize;
Expand All @@ -158,7 +164,8 @@ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCa
hugeBufferCount.incrementAndGet();

// logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount, hugeBufferSize);
return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount,
hugeBufferSize);
} else {
// within chunk, use arena.
ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
Expand All @@ -173,7 +180,8 @@ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCa
normalBufferSize.addAndGet(buf.capacity());
normalBufferCount.incrementAndGet();

return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount, normalBufferSize);
return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf,
normalBufferCount, normalBufferSize);
}

} else {
Expand All @@ -183,7 +191,8 @@ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCa

private UnsupportedOperationException fail() {
return new UnsupportedOperationException(
"Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform didn't provide that functionality.");
"Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform " +
"didn't provide that functionality.");
}

@Override
Expand All @@ -203,7 +212,8 @@ public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {

private void validate(int initialCapacity, int maxCapacity) {
if (initialCapacity < 0) {
throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expectd: 0+)");
throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expectd: " +
"0+)");
}
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
Expand All @@ -212,26 +222,6 @@ private void validate(int initialCapacity, int maxCapacity) {
}
}

private class MemoryStatusThread extends Thread {

public MemoryStatusThread() {
super("allocation.logger");
this.setDaemon(true);
}

@Override
public void run() {
while (true) {
memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString());
try {
Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
} catch (InterruptedException e) {
return;
}
}
}
}

@Override
public String toString() {
StringBuilder buf = new StringBuilder();
Expand All @@ -256,6 +246,26 @@ public String toString() {
return buf.toString();
}

private class MemoryStatusThread extends Thread {

public MemoryStatusThread() {
super("allocation.logger");
this.setDaemon(true);
}

@Override
public void run() {
while (true) {
memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString());
try {
Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
} catch (InterruptedException e) {
return;
}
}
}
}


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -18,22 +18,31 @@

package io.netty.buffer;

import io.netty.util.internal.PlatformDependent;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteOrder;
import java.util.concurrent.atomic.AtomicLong;

import io.netty.util.internal.PlatformDependent;

/**
* The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs to abstract away the
* The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs
* to abstract away the
* Netty classes and underlying Netty memory management.
*/
public class UnsafeDirectLittleEndian extends WrappedByteBuf {

public static final boolean ASSERT_ENABLED;
private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
private static final AtomicLong ID_GENERATOR = new AtomicLong(0);

static {
boolean isAssertEnabled = false;
assert isAssertEnabled = true;
ASSERT_ENABLED = isAssertEnabled;
}

public final long id = ID_GENERATOR.incrementAndGet();
private final AbstractByteBuf wrapped;
private final long memoryAddress;
Expand All @@ -60,21 +69,22 @@ private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake) {
this.wrapped = buf;
this.memoryAddress = buf.memoryAddress();
}
private long addr(int index) {
return memoryAddress + index;
}

@Override
public long getLong(int index) {
private long addr(int index) {
return memoryAddress + index;
}

@Override
public long getLong(int index) {
// wrapped.checkIndex(index, 8);
long v = PlatformDependent.getLong(addr(index));
return v;
}
long v = PlatformDependent.getLong(addr(index));
return v;
}

@Override
public float getFloat(int index) {
return Float.intBitsToFloat(getInt(index));
}
@Override
public float getFloat(int index) {
return Float.intBitsToFloat(getInt(index));
}

@Override
public ByteBuf slice() {
Expand Down Expand Up @@ -259,12 +269,4 @@ public int hashCode() {
return System.identityHashCode(this);
}

public static final boolean ASSERT_ENABLED;

static {
boolean isAssertEnabled = false;
assert isAssertEnabled = true;
ASSERT_ENABLED = isAssertEnabled;
}

}
Loading