Skip to content

Commit

Permalink
ARROW-208: Add checkstyle policy to java project
Browse files Browse the repository at this point in the history
Author: Tsuyoshi Ozawa <ozawa@apache.org>

Closes #96 from oza/ARROW-208 and squashes the following commits:

809e729 [Tsuyoshi Ozawa] reformatted code in memory and tools dir with IDE
40ee6a3 [Tsuyoshi Ozawa] ARROW-208: Add checkstyle policy to java project
  • Loading branch information
oza authored and julienledem committed Mar 21, 2017
1 parent a8bf0fb commit a9a5701
Show file tree
Hide file tree
Showing 34 changed files with 1,218 additions and 963 deletions.
219 changes: 120 additions & 99 deletions java/memory/src/main/java/io/netty/buffer/ArrowBuf.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,23 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.netty.buffer;

import org.apache.arrow.memory.BufferAllocator;

/**
* Allows us to decorate ArrowBuf to make it expandable so that we can use them in the context of the Netty framework
* Allows us to decorate ArrowBuf to make it expandable so that we can use them in the context of
* the Netty framework
* (thus supporting RPC level memory accounting).
*/
public class ExpandableByteBuf extends MutableWrappedByteBuf {
Expand Down
9 changes: 6 additions & 3 deletions java/memory/src/main/java/io/netty/buffer/LargeBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,24 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.netty.buffer;

/**
* A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and counts.
* A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and
* counts.
*/
public class LargeBuffer extends MutableWrappedByteBuf {

public LargeBuffer(ByteBuf buffer) {
super(buffer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.netty.buffer;

import java.io.IOException;
Expand All @@ -26,16 +27,12 @@
import java.nio.channels.ScatteringByteChannel;

/**
* This is basically a complete copy of DuplicatedByteBuf. We copy because we want to override some behaviors and make
* This is basically a complete copy of DuplicatedByteBuf. We copy because we want to override
* some behaviors and make
* buffer mutable.
*/
abstract class MutableWrappedByteBuf extends AbstractByteBuf {

@Override
public ByteBuffer nioBuffer(int index, int length) {
return unwrap().nioBuffer(index, length);
}

ByteBuf buffer;

public MutableWrappedByteBuf(ByteBuf buffer) {
Expand All @@ -50,6 +47,11 @@ public MutableWrappedByteBuf(ByteBuf buffer) {
setIndex(buffer.readerIndex(), buffer.writerIndex());
}

@Override
public ByteBuffer nioBuffer(int index, int length) {
return unwrap().nioBuffer(index, length);
}

@Override
public ByteBuf unwrap() {
return buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,44 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.netty.buffer;

import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED;
import io.netty.util.internal.StringUtil;

import org.apache.arrow.memory.OutOfMemoryException;

import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.arrow.memory.OutOfMemoryException;

import io.netty.util.internal.StringUtil;
import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED;

/**
* The base allocator that we use for all of Arrow's memory management. Returns UnsafeDirectLittleEndian buffers.
* The base allocator that we use for all of Arrow's memory management. Returns
* UnsafeDirectLittleEndian buffers.
*/
public class PooledByteBufAllocatorL {
private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator");

private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow" +
".allocator");

private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
public final UnsafeDirectLittleEndian empty;
private final AtomicLong hugeBufferSize = new AtomicLong(0);
private final AtomicLong hugeBufferCount = new AtomicLong(0);
private final AtomicLong normalBufferSize = new AtomicLong(0);
private final AtomicLong normalBufferCount = new AtomicLong(0);

private final InnerAllocator allocator;
public final UnsafeDirectLittleEndian empty;

public PooledByteBufAllocatorL() {
allocator = new InnerAllocator();
Expand Down Expand Up @@ -78,6 +80,7 @@ public long getNormalBufferCount() {
}

private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian {

private final long initialCapacity;
private final AtomicLong count;
private final AtomicLong size;
Expand All @@ -89,7 +92,8 @@ private AccountedUnsafeDirectLittleEndian(LargeBuffer buf, AtomicLong count, Ato
this.size = size;
}

private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count, AtomicLong size) {
private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count,
AtomicLong size) {
super(buf);
this.initialCapacity = buf.capacity();
this.count = count;
Expand Down Expand Up @@ -119,6 +123,7 @@ public boolean release(int decrement) {
}

private class InnerAllocator extends PooledByteBufAllocator {

private final PoolArena<ByteBuffer>[] directArenas;
private final MemoryStatusThread statusThread;
private final int chunkSize;
Expand All @@ -131,7 +136,8 @@ public InnerAllocator() {
f.setAccessible(true);
this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
} catch (Exception e) {
throw new RuntimeException("Failure while initializing allocator. Unable to retrieve direct arenas field.", e);
throw new RuntimeException("Failure while initializing allocator. Unable to retrieve " +
"direct arenas field.", e);
}

this.chunkSize = directArenas[0].chunkSize;
Expand All @@ -158,7 +164,8 @@ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCa
hugeBufferCount.incrementAndGet();

// logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount, hugeBufferSize);
return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount,
hugeBufferSize);
} else {
// within chunk, use arena.
ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
Expand All @@ -173,7 +180,8 @@ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCa
normalBufferSize.addAndGet(buf.capacity());
normalBufferCount.incrementAndGet();

return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount, normalBufferSize);
return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf,
normalBufferCount, normalBufferSize);
}

} else {
Expand All @@ -183,7 +191,8 @@ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCa

private UnsupportedOperationException fail() {
return new UnsupportedOperationException(
"Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform didn't provide that functionality.");
"Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform " +
"didn't provide that functionality.");
}

@Override
Expand All @@ -203,7 +212,8 @@ public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {

private void validate(int initialCapacity, int maxCapacity) {
if (initialCapacity < 0) {
throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expectd: 0+)");
throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expectd: " +
"0+)");
}
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
Expand All @@ -212,26 +222,6 @@ private void validate(int initialCapacity, int maxCapacity) {
}
}

private class MemoryStatusThread extends Thread {

public MemoryStatusThread() {
super("allocation.logger");
this.setDaemon(true);
}

@Override
public void run() {
while (true) {
memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString());
try {
Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
} catch (InterruptedException e) {
return;
}
}
}
}

@Override
public String toString() {
StringBuilder buf = new StringBuilder();
Expand All @@ -256,6 +246,26 @@ public String toString() {
return buf.toString();
}

private class MemoryStatusThread extends Thread {

public MemoryStatusThread() {
super("allocation.logger");
this.setDaemon(true);
}

@Override
public void run() {
while (true) {
memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString());
try {
Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
} catch (InterruptedException e) {
return;
}
}
}
}


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -18,22 +18,31 @@

package io.netty.buffer;

import io.netty.util.internal.PlatformDependent;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteOrder;
import java.util.concurrent.atomic.AtomicLong;

import io.netty.util.internal.PlatformDependent;

/**
* The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs to abstract away the
* The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs
* to abstract away the
* Netty classes and underlying Netty memory management.
*/
public class UnsafeDirectLittleEndian extends WrappedByteBuf {

public static final boolean ASSERT_ENABLED;
private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
private static final AtomicLong ID_GENERATOR = new AtomicLong(0);

static {
boolean isAssertEnabled = false;
assert isAssertEnabled = true;
ASSERT_ENABLED = isAssertEnabled;
}

public final long id = ID_GENERATOR.incrementAndGet();
private final AbstractByteBuf wrapped;
private final long memoryAddress;
Expand All @@ -60,21 +69,22 @@ private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake) {
this.wrapped = buf;
this.memoryAddress = buf.memoryAddress();
}
private long addr(int index) {
return memoryAddress + index;
}

@Override
public long getLong(int index) {
private long addr(int index) {
return memoryAddress + index;
}

@Override
public long getLong(int index) {
// wrapped.checkIndex(index, 8);
long v = PlatformDependent.getLong(addr(index));
return v;
}
long v = PlatformDependent.getLong(addr(index));
return v;
}

@Override
public float getFloat(int index) {
return Float.intBitsToFloat(getInt(index));
}
@Override
public float getFloat(int index) {
return Float.intBitsToFloat(getInt(index));
}

@Override
public ByteBuf slice() {
Expand Down Expand Up @@ -259,12 +269,4 @@ public int hashCode() {
return System.identityHashCode(this);
}

public static final boolean ASSERT_ENABLED;

static {
boolean isAssertEnabled = false;
assert isAssertEnabled = true;
ASSERT_ENABLED = isAssertEnabled;
}

}
Loading

0 comments on commit a9a5701

Please sign in to comment.