diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java index 95d2be5a43a36..e777b5a6a5d58 100644 --- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java @@ -6,27 +6,21 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *
* http://www.apache.org/licenses/LICENSE-2.0 - * + *
* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package io.netty.buffer; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.channels.GatheringByteChannel; -import java.nio.channels.ScatteringByteChannel; -import java.nio.charset.Charset; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import com.google.common.base.Preconditions; + +import io.netty.util.internal.PlatformDependent; import org.apache.arrow.memory.AllocationManager.BufferLedger; import org.apache.arrow.memory.ArrowByteBufAllocator; @@ -37,15 +31,23 @@ import org.apache.arrow.memory.BufferManager; import org.apache.arrow.memory.util.HistoricalLog; -import com.google.common.base.Preconditions; - -import io.netty.util.internal.PlatformDependent; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; +import java.nio.charset.Charset; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ArrowBuf.class); private static final AtomicLong idGenerator = new AtomicLong(0); - + private static final int LOG_BYTES_PER_ROW = 10; private final long id = idGenerator.incrementAndGet(); private final AtomicInteger refCnt; private final UnsafeDirectLittleEndian udle; @@ -55,9 +57,9 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { private final BufferManager bufManager; private final ArrowByteBufAllocator alloc; private final boolean isEmpty; - private volatile int length; private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, "ArrowBuf[%d]", id) : null; + private volatile int length; public ArrowBuf( final AtomicInteger refCnt, @@ -85,6 +87,17 @@ public ArrowBuf( } + public static String bufferState(final ByteBuf buf) { + final int cap = buf.capacity(); + final int mcap = buf.maxCapacity(); + final int ri = buf.readerIndex(); + final int rb = buf.readableBytes(); + final int wi = buf.writerIndex(); + final int wb = buf.writableBytes(); + return String.format("cap/max: %d/%d, ri: %d, rb: %d, wi: %d, wb: %d", + cap, mcap, ri, rb, wi, wb); + } + public ArrowBuf reallocIfNeeded(final int size) { Preconditions.checkArgument(size >= 0, "reallocation size must be non-negative"); @@ -95,7 +108,8 @@ public ArrowBuf reallocIfNeeded(final int size) { if (bufManager != null) { return bufManager.replace(this, size); } else { - throw new UnsupportedOperationException("Realloc is only available in the context of an operator's UDFs"); + throw new UnsupportedOperationException("Realloc is only available in the context of an " + + "operator's UDFs"); } } @@ -128,14 +142,13 @@ private final void checkIndexD(int index, int fieldLength) { /** * Allows a function to determine whether not reading a particular string of bytes is valid. - * - * Will throw an exception if the memory is not readable for some reason. Only doesn't something in the case that + *
+ * Will throw an exception if the memory is not readable for some reason. Only doesn't + * something in the case that * AssertionUtil.BOUNDS_CHECKING_ENABLED is true. * - * @param start - * The starting position of the bytes to be read. - * @param end - * The exclusive endpoint of the bytes to be read. + * @param start The starting position of the bytes to be read. + * @param end The exclusive endpoint of the bytes to be read. */ public void checkBytes(int start, int end) { if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { @@ -156,17 +169,21 @@ private void ensure(int width) { } /** - * Create a new ArrowBuf that is associated with an alternative allocator for the purposes of memory ownership and - * accounting. This has no impact on the reference counting for the current ArrowBuf except in the situation where the + * Create a new ArrowBuf that is associated with an alternative allocator for the purposes of + * memory ownership and + * accounting. This has no impact on the reference counting for the current ArrowBuf except in + * the situation where the * passed in Allocator is the same as the current buffer. - * - * This operation has no impact on the reference count of this ArrowBuf. The newly created ArrowBuf with either have a - * reference count of 1 (in the case that this is the first time this memory is being associated with the new - * allocator) or the current value of the reference count + 1 for the other AllocationManager/BufferLedger combination + *
+ * This operation has no impact on the reference count of this ArrowBuf. The newly created + * ArrowBuf with either have a + * reference count of 1 (in the case that this is the first time this memory is being + * associated with the new + * allocator) or the current value of the reference count + 1 for the other + * AllocationManager/BufferLedger combination * in the case that the provided allocator already had an association to this underlying memory. * - * @param target - * The target allocator to create an association with. + * @param target The target allocator to create an association with. * @return A new ArrowBuf which shares the same underlying memory as this ArrowBuf. */ public ArrowBuf retain(BufferAllocator target) { @@ -186,28 +203,39 @@ public ArrowBuf retain(BufferAllocator target) { } /** - * Transfer the memory accounting ownership of this ArrowBuf to another allocator. This will generate a new ArrowBuf - * that carries an association with the underlying memory of this ArrowBuf. If this ArrowBuf is connected to the - * owning BufferLedger of this memory, that memory ownership/accounting will be transferred to the taret allocator. If - * this ArrowBuf does not currently own the memory underlying it (and is only associated with it), this does not + * Transfer the memory accounting ownership of this ArrowBuf to another allocator. This will + * generate a new ArrowBuf + * that carries an association with the underlying memory of this ArrowBuf. If this ArrowBuf is + * connected to the + * owning BufferLedger of this memory, that memory ownership/accounting will be transferred to + * the taret allocator. If + * this ArrowBuf does not currently own the memory underlying it (and is only associated with + * it), this does not * transfer any ownership to the newly created ArrowBuf. - * - * This operation has no impact on the reference count of this ArrowBuf. The newly created ArrowBuf with either have a - * reference count of 1 (in the case that this is the first time this memory is being associated with the new - * allocator) or the current value of the reference count for the other AllocationManager/BufferLedger combination in + *
+ * This operation has no impact on the reference count of this ArrowBuf. The newly created + * ArrowBuf with either have a + * reference count of 1 (in the case that this is the first time this memory is being + * associated with the new + * allocator) or the current value of the reference count for the other + * AllocationManager/BufferLedger combination in * the case that the provided allocator already had an association to this underlying memory. - * - * Transfers will always succeed, even if that puts the other allocator into an overlimit situation. This is possible - * due to the fact that the original owning allocator may have allocated this memory out of a local reservation - * whereas the target allocator may need to allocate new memory from a parent or RootAllocator. This operation is done - * in a mostly-lockless but consistent manner. As such, the overlimit==true situation could occur slightly prematurely - * to an actual overlimit==true condition. This is simply conservative behavior which means we may return overlimit + *
+ * Transfers will always succeed, even if that puts the other allocator into an overlimit + * situation. This is possible + * due to the fact that the original owning allocator may have allocated this memory out of a + * local reservation + * whereas the target allocator may need to allocate new memory from a parent or RootAllocator. + * This operation is done + * in a mostly-lockless but consistent manner. As such, the overlimit==true situation could + * occur slightly prematurely + * to an actual overlimit==true condition. This is simply conservative behavior which means we + * may return overlimit * slightly sooner than is necessary. * - * @param target - * The allocator to transfer ownership to. - * @return A new transfer result with the impact of the transfer (whether it was overlimit) as well as the newly - * created ArrowBuf. + * @param target The allocator to transfer ownership to. + * @return A new transfer result with the impact of the transfer (whether it was overlimit) as + * well as the newly created ArrowBuf. */ public TransferResult transferOwnership(BufferAllocator target) { @@ -223,28 +251,6 @@ public TransferResult transferOwnership(BufferAllocator target) { return new TransferResult(allocationFit, newBuf); } - /** - * The outcome of a Transfer. - */ - public class TransferResult { - - /** - * Whether this transfer fit within the target allocator's capacity. - */ - public final boolean allocationFit; - - /** - * The newly created buffer associated with the target allocator. - */ - public final ArrowBuf buffer; - - private TransferResult(boolean allocationFit, ArrowBuf buffer) { - this.allocationFit = allocationFit; - this.buffer = buffer; - } - - } - @Override public boolean release() { return release(1); @@ -261,7 +267,8 @@ public boolean release(int decrement) { } if (decrement < 1) { - throw new IllegalStateException(String.format("release(%d) argument is not positive. Buffer Info: %s", + throw new IllegalStateException(String.format("release(%d) argument is not positive. Buffer" + + " Info: %s", decrement, toVerboseString())); } @@ -273,7 +280,8 @@ public boolean release(int decrement) { if (refCnt < 0) { throw new IllegalStateException( - String.format("ArrowBuf[%d] refCnt has gone negative. Buffer Info: %s", id, toVerboseString())); + String.format("ArrowBuf[%d] refCnt has gone negative. Buffer Info: %s", id, + toVerboseString())); } return refCnt == 0; @@ -299,7 +307,8 @@ public synchronized ArrowBuf capacity(int newCapacity) { return this; } - throw new UnsupportedOperationException("Buffers don't support resizing that increases the size."); + throw new UnsupportedOperationException("Buffers don't support resizing that increases the " + + "size."); } @Override @@ -354,17 +363,6 @@ public ArrowBuf slice() { return slice(readerIndex(), readableBytes()); } - public static String bufferState(final ByteBuf buf) { - final int cap = buf.capacity(); - final int mcap = buf.maxCapacity(); - final int ri = buf.readerIndex(); - final int rb = buf.readableBytes(); - final int wi = buf.writerIndex(); - final int wb = buf.writableBytes(); - return String.format("cap/max: %d/%d, ri: %d, rb: %d, wi: %d, wb: %d", - cap, mcap, ri, rb, wi, wb); - } - @Override public ArrowBuf slice(int index, int length) { @@ -373,7 +371,8 @@ public ArrowBuf slice(int index, int length) { } /* - * Re the behavior of reference counting, see http://netty.io/wiki/reference-counted-objects.html#wiki-h3-5, which + * Re the behavior of reference counting, see http://netty.io/wiki/reference-counted-objects + * .html#wiki-h3-5, which * explains that derived buffers share their reference count with their parent */ final ArrowBuf newBuf = ledger.newArrowBuf(offset + index, length); @@ -408,12 +407,12 @@ public ByteBuffer internalNioBuffer(int index, int length) { @Override public ByteBuffer[] nioBuffers() { - return new ByteBuffer[] { nioBuffer() }; + return new ByteBuffer[]{nioBuffer()}; } @Override public ByteBuffer[] nioBuffers(int index, int length) { - return new ByteBuffer[] { nioBuffer(index, length) }; + return new ByteBuffer[]{nioBuffer(index, length)}; } @Override @@ -443,7 +442,8 @@ public long memoryAddress() { @Override public String toString() { - return String.format("ArrowBuf[%d], udle: [%d %d..%d]", id, udle.id, offset, offset + capacity()); + return String.format("ArrowBuf[%d], udle: [%d %d..%d]", id, udle.id, offset, offset + + capacity()); } @Override @@ -738,7 +738,8 @@ public ArrowBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { public ArrowBuf setBytes(int index, ByteBuffer src, int srcIndex, int length) { if (src.isDirect()) { checkIndex(index, length); - PlatformDependent.copyMemory(PlatformDependent.directBufferAddress(src) + srcIndex, this.memoryAddress() + index, + PlatformDependent.copyMemory(PlatformDependent.directBufferAddress(src) + srcIndex, this + .memoryAddress() + index, length); } else { if (srcIndex == 0 && src.capacity() == length) { @@ -788,7 +789,8 @@ public void close() { } /** - * Returns the possible memory consumed by this ArrowBuf in the worse case scenario. (not shared, connected to larger + * Returns the possible memory consumed by this ArrowBuf in the worse case scenario. (not + * shared, connected to larger * underlying buffer of allocated memory) * * @return Size in bytes. @@ -798,7 +800,8 @@ public int getPossibleMemoryConsumed() { } /** - * Return that is Accounted for by this buffer (and its potentially shared siblings within the context of the + * Return that is Accounted for by this buffer (and its potentially shared siblings within the + * context of the * associated allocator). * * @return Size in bytes. @@ -807,15 +810,11 @@ public int getActualMemoryConsumed() { return ledger.getAccountedSize(); } - private final static int LOG_BYTES_PER_ROW = 10; - /** * Return the buffer's byte contents in the form of a hex dump. * - * @param start - * the starting byte index - * @param length - * how many bytes to log + * @param start the starting byte index + * @param length how many bytes to log * @return A hex dump in a String. */ public String toHexString(final int start, final int length) { @@ -878,5 +877,27 @@ public ArrowBuf writerIndex(int writerIndex) { return this; } + /** + * The outcome of a Transfer. + */ + public class TransferResult { + + /** + * Whether this transfer fit within the target allocator's capacity. + */ + public final boolean allocationFit; + + /** + * The newly created buffer associated with the target allocator. + */ + public final ArrowBuf buffer; + + private TransferResult(boolean allocationFit, ArrowBuf buffer) { + this.allocationFit = allocationFit; + this.buffer = buffer; + } + + } + } diff --git a/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java b/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java index 7fb884daa3952..9f8af93109739 100644 --- a/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java @@ -6,21 +6,23 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *
* http://www.apache.org/licenses/LICENSE-2.0 - * + *
* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package io.netty.buffer; import org.apache.arrow.memory.BufferAllocator; /** - * Allows us to decorate ArrowBuf to make it expandable so that we can use them in the context of the Netty framework + * Allows us to decorate ArrowBuf to make it expandable so that we can use them in the context of + * the Netty framework * (thus supporting RPC level memory accounting). */ public class ExpandableByteBuf extends MutableWrappedByteBuf { diff --git a/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java b/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java index c026e430d77f3..9a6e402dad53e 100644 --- a/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java +++ b/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java @@ -6,21 +6,24 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *
* http://www.apache.org/licenses/LICENSE-2.0 - * + *
* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package io.netty.buffer; /** - * A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and counts. + * A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and + * counts. */ public class LargeBuffer extends MutableWrappedByteBuf { + public LargeBuffer(ByteBuf buffer) { super(buffer); } diff --git a/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java b/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java index 5709473135e4b..a5683adccbc32 100644 --- a/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java @@ -6,15 +6,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *
* http://www.apache.org/licenses/LICENSE-2.0 - * + *
* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package io.netty.buffer; import java.io.IOException; @@ -26,16 +27,12 @@ import java.nio.channels.ScatteringByteChannel; /** - * This is basically a complete copy of DuplicatedByteBuf. We copy because we want to override some behaviors and make + * This is basically a complete copy of DuplicatedByteBuf. We copy because we want to override + * some behaviors and make * buffer mutable. */ abstract class MutableWrappedByteBuf extends AbstractByteBuf { - @Override - public ByteBuffer nioBuffer(int index, int length) { - return unwrap().nioBuffer(index, length); - } - ByteBuf buffer; public MutableWrappedByteBuf(ByteBuf buffer) { @@ -50,6 +47,11 @@ public MutableWrappedByteBuf(ByteBuf buffer) { setIndex(buffer.readerIndex(), buffer.writerIndex()); } + @Override + public ByteBuffer nioBuffer(int index, int length) { + return unwrap().nioBuffer(index, length); + } + @Override public ByteBuf unwrap() { return buffer; diff --git a/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java index a843ac5586e79..b6de2e3aa2acb 100644 --- a/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java +++ b/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java @@ -6,42 +6,44 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *
* http://www.apache.org/licenses/LICENSE-2.0 - * + *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.netty.buffer;
-import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED;
+import io.netty.util.internal.StringUtil;
+
+import org.apache.arrow.memory.OutOfMemoryException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.arrow.memory.OutOfMemoryException;
-
-import io.netty.util.internal.StringUtil;
+import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED;
/**
- * The base allocator that we use for all of Arrow's memory management. Returns UnsafeDirectLittleEndian buffers.
+ * The base allocator that we use for all of Arrow's memory management. Returns
+ * UnsafeDirectLittleEndian buffers.
*/
public class PooledByteBufAllocatorL {
- private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator");
- private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
+ private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow" +
+ ".allocator");
+ private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
+ public final UnsafeDirectLittleEndian empty;
private final AtomicLong hugeBufferSize = new AtomicLong(0);
private final AtomicLong hugeBufferCount = new AtomicLong(0);
private final AtomicLong normalBufferSize = new AtomicLong(0);
private final AtomicLong normalBufferCount = new AtomicLong(0);
-
private final InnerAllocator allocator;
- public final UnsafeDirectLittleEndian empty;
public PooledByteBufAllocatorL() {
allocator = new InnerAllocator();
@@ -78,6 +80,7 @@ public long getNormalBufferCount() {
}
private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian {
+
private final long initialCapacity;
private final AtomicLong count;
private final AtomicLong size;
@@ -89,7 +92,8 @@ private AccountedUnsafeDirectLittleEndian(LargeBuffer buf, AtomicLong count, Ato
this.size = size;
}
- private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count, AtomicLong size) {
+ private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count,
+ AtomicLong size) {
super(buf);
this.initialCapacity = buf.capacity();
this.count = count;
@@ -119,6 +123,7 @@ public boolean release(int decrement) {
}
private class InnerAllocator extends PooledByteBufAllocator {
+
private final PoolArena
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,22 +18,31 @@
package io.netty.buffer;
+import io.netty.util.internal.PlatformDependent;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteOrder;
import java.util.concurrent.atomic.AtomicLong;
-import io.netty.util.internal.PlatformDependent;
-
/**
- * The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs to abstract away the
+ * The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs
+ * to abstract away the
* Netty classes and underlying Netty memory management.
*/
public class UnsafeDirectLittleEndian extends WrappedByteBuf {
+
+ public static final boolean ASSERT_ENABLED;
private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
+ static {
+ boolean isAssertEnabled = false;
+ assert isAssertEnabled = true;
+ ASSERT_ENABLED = isAssertEnabled;
+ }
+
public final long id = ID_GENERATOR.incrementAndGet();
private final AbstractByteBuf wrapped;
private final long memoryAddress;
@@ -60,21 +69,22 @@ private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake) {
this.wrapped = buf;
this.memoryAddress = buf.memoryAddress();
}
- private long addr(int index) {
- return memoryAddress + index;
- }
- @Override
- public long getLong(int index) {
+ private long addr(int index) {
+ return memoryAddress + index;
+ }
+
+ @Override
+ public long getLong(int index) {
// wrapped.checkIndex(index, 8);
- long v = PlatformDependent.getLong(addr(index));
- return v;
- }
+ long v = PlatformDependent.getLong(addr(index));
+ return v;
+ }
- @Override
- public float getFloat(int index) {
- return Float.intBitsToFloat(getInt(index));
- }
+ @Override
+ public float getFloat(int index) {
+ return Float.intBitsToFloat(getInt(index));
+ }
@Override
public ByteBuf slice() {
@@ -259,12 +269,4 @@ public int hashCode() {
return System.identityHashCode(this);
}
- public static final boolean ASSERT_ENABLED;
-
- static {
- boolean isAssertEnabled = false;
- assert isAssertEnabled = true;
- ASSERT_ENABLED = isAssertEnabled;
- }
-
}
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java b/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java
index 37c598ad89ece..6ddc8f784bc4a 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java
@@ -6,30 +6,33 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory;
+import com.google.common.base.Preconditions;
+
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.ThreadSafe;
-import com.google.common.base.Preconditions;
-
/**
- * Provides a concurrent way to manage account for memory usage without locking. Used as basis for Allocators. All
+ * Provides a concurrent way to manage account for memory usage without locking. Used as basis
+ * for Allocators. All
* operations are threadsafe (except for close).
*/
@ThreadSafe
class Accountant implements AutoCloseable {
- // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Accountant.class);
+ // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Accountant
+ // .class);
/**
* The parent allocator
@@ -37,7 +40,8 @@ class Accountant implements AutoCloseable {
protected final Accountant parent;
/**
- * The amount of memory reserved for this allocator. Releases below this amount of memory will not be returned to the
+ * The amount of memory reserved for this allocator. Releases below this amount of memory will
+ * not be returned to the
* parent Accountant until this Accountant is closed.
*/
protected final long reservation;
@@ -45,7 +49,8 @@ class Accountant implements AutoCloseable {
private final AtomicLong peakAllocation = new AtomicLong();
/**
- * Maximum local memory that can be held. This can be externally updated. Changing it won't cause past memory to
+ * Maximum local memory that can be held. This can be externally updated. Changing it won't
+ * cause past memory to
* change but will change responses to future allocation efforts
*/
private final AtomicLong allocationLimit = new AtomicLong();
@@ -56,11 +61,14 @@ class Accountant implements AutoCloseable {
private final AtomicLong locallyHeldMemory = new AtomicLong();
public Accountant(Accountant parent, long reservation, long maxAllocation) {
- Preconditions.checkArgument(reservation >= 0, "The initial reservation size must be non-negative.");
- Preconditions.checkArgument(maxAllocation >= 0, "The maximum allocation limit must be non-negative.");
+ Preconditions.checkArgument(reservation >= 0, "The initial reservation size must be " +
+ "non-negative.");
+ Preconditions.checkArgument(maxAllocation >= 0, "The maximum allocation limit must be " +
+ "non-negative.");
Preconditions.checkArgument(reservation <= maxAllocation,
"The initial reservation size must be <= the maximum allocation.");
- Preconditions.checkArgument(reservation == 0 || parent != null, "The root accountant can't reserve memory.");
+ Preconditions.checkArgument(reservation == 0 || parent != null, "The root accountant can't " +
+ "reserve memory.");
this.parent = parent;
this.reservation = reservation;
@@ -72,19 +80,20 @@ public Accountant(Accountant parent, long reservation, long maxAllocation) {
if (!outcome.isOk()) {
throw new OutOfMemoryException(String.format(
"Failure trying to allocate initial reservation for Allocator. "
- + "Attempted to allocate %d bytes and received an outcome of %s.", reservation, outcome.name()));
+ + "Attempted to allocate %d bytes and received an outcome of %s.", reservation,
+ outcome.name()));
}
}
}
/**
- * Attempt to allocate the requested amount of memory. Either completely succeeds or completely fails. Constructs a a
+ * Attempt to allocate the requested amount of memory. Either completely succeeds or completely
+ * fails. Constructs a a
* log of delta
- *
+ *
* If it fails, no changes are made to accounting.
*
- * @param size
- * The amount of memory to reserve in bytes.
+ * @param size The amount of memory to reserve in bytes.
* @return True if the allocation was successful, false if the allocation failed.
*/
AllocationOutcome allocateBytes(long size) {
@@ -116,8 +125,7 @@ private void updatePeak() {
/**
* Increase the accounting. Returns whether the allocation fit within limits.
*
- * @param size
- * to increase
+ * @param size to increase
* @return Whether the allocation fit within limits.
*/
boolean forceAllocate(long size) {
@@ -126,24 +134,29 @@ boolean forceAllocate(long size) {
}
/**
- * Internal method for allocation. This takes a forced approach to allocation to ensure that we manage reservation
- * boundary issues consistently. Allocation is always done through the entire tree. The two options that we influence
- * are whether the allocation should be forced and whether or not the peak memory allocation should be updated. If at
- * some point during allocation escalation we determine that the allocation is no longer possible, we will continue to
- * do a complete and consistent allocation but we will stop updating the peak allocation. We do this because we know
- * that we will be directly unwinding this allocation (and thus never actually making the allocation). If force
- * allocation is passed, then we continue to update the peak limits since we now know that this allocation will occur
+ * Internal method for allocation. This takes a forced approach to allocation to ensure that we
+ * manage reservation
+ * boundary issues consistently. Allocation is always done through the entire tree. The two
+ * options that we influence
+ * are whether the allocation should be forced and whether or not the peak memory allocation
+ * should be updated. If at
+ * some point during allocation escalation we determine that the allocation is no longer
+ * possible, we will continue to
+ * do a complete and consistent allocation but we will stop updating the peak allocation. We do
+ * this because we know
+ * that we will be directly unwinding this allocation (and thus never actually making the
+ * allocation). If force
+ * allocation is passed, then we continue to update the peak limits since we now know that this
+ * allocation will occur
* despite our moving past one or more limits.
*
- * @param size
- * The size of the allocation.
- * @param incomingUpdatePeak
- * Whether we should update the local peak for this allocation.
- * @param forceAllocation
- * Whether we should force the allocation.
+ * @param size The size of the allocation.
+ * @param incomingUpdatePeak Whether we should update the local peak for this allocation.
+ * @param forceAllocation Whether we should force the allocation.
* @return The outcome of the allocation.
*/
- private AllocationOutcome allocate(final long size, final boolean incomingUpdatePeak, final boolean forceAllocation) {
+ private AllocationOutcome allocate(final long size, final boolean incomingUpdatePeak, final
+ boolean forceAllocation) {
final long newLocal = locallyHeldMemory.addAndGet(size);
final long beyondReservation = newLocal - reservation;
final boolean beyondLimit = newLocal > allocationLimit.get();
@@ -173,7 +186,7 @@ public void releaseBytes(long size) {
Preconditions.checkArgument(newSize >= 0, "Accounted size went negative.");
final long originalSize = newSize + size;
- if(originalSize > reservation && parent != null){
+ if (originalSize > reservation && parent != null) {
// we deallocated memory that we should release to our parent.
final long possibleAmountToReleaseToParent = originalSize - reservation;
final long actualToReleaseToParent = Math.min(size, possibleAmountToReleaseToParent);
@@ -182,16 +195,6 @@ public void releaseBytes(long size) {
}
- /**
- * Set the maximum amount of memory that can be allocated in the this Accountant before failing an allocation.
- *
- * @param newLimit
- * The limit in bytes.
- */
- public void setLimit(long newLimit) {
- allocationLimit.set(newLimit);
- }
-
public boolean isOverLimit() {
return getAllocatedMemory() > getLimit() || (parent != null && parent.isOverLimit());
}
@@ -216,7 +219,18 @@ public long getLimit() {
}
/**
- * Return the current amount of allocated memory that this Accountant is managing accounting for. Note this does not
+ * Set the maximum amount of memory that can be allocated in the this Accountant before failing
+ * an allocation.
+ *
+ * @param newLimit The limit in bytes.
+ */
+ public void setLimit(long newLimit) {
+ allocationLimit.set(newLimit);
+ }
+
+ /**
+ * Return the current amount of allocated memory that this Accountant is managing accounting
+ * for. Note this does not
* include reservation memory that hasn't been allocated.
*
* @return Currently allocate memory in bytes.
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
index 1b127f8181222..d36cb37fc2e24 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
@@ -15,15 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory;
/**
* An allocation listener being notified for allocation/deallocation
- *
+ *
* It is expected to be called from multiple threads and as such,
* provider should take care of making the implementation thread-safe
*/
public interface AllocationListener {
+
public static final AllocationListener NOOP = new AllocationListener() {
@Override
public void onAllocation(long size) {
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
index f15bb8a40fa01..683752e6a4980 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
@@ -6,53 +6,62 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory;
-import static org.apache.arrow.memory.BaseAllocator.indent;
+import com.google.common.base.Preconditions;
-import java.util.IdentityHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import io.netty.buffer.ArrowBuf;
+import io.netty.buffer.PooledByteBufAllocatorL;
+import io.netty.buffer.UnsafeDirectLittleEndian;
import org.apache.arrow.memory.BaseAllocator.Verbosity;
import org.apache.arrow.memory.util.AutoCloseableLock;
import org.apache.arrow.memory.util.HistoricalLog;
-import com.google.common.base.Preconditions;
+import java.util.IdentityHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
-import io.netty.buffer.ArrowBuf;
-import io.netty.buffer.PooledByteBufAllocatorL;
-import io.netty.buffer.UnsafeDirectLittleEndian;
+import static org.apache.arrow.memory.BaseAllocator.indent;
/**
- * Manages the relationship between one or more allocators and a particular UDLE. Ensures that one allocator owns the
- * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its associated allocators.
- * This class is also responsible for managing when memory is allocated and returned to the Netty-based
+ * Manages the relationship between one or more allocators and a particular UDLE. Ensures that
+ * one allocator owns the
+ * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its
+ * associated allocators.
+ * This class is also responsible for managing when memory is allocated and returned to the
+ * Netty-based
* PooledByteBufAllocatorL.
- *
- * The only reason that this isn't package private is we're forced to put ArrowBuf in Netty's package which need access
+ *
+ * The only reason that this isn't package private is we're forced to put ArrowBuf in Netty's
+ * package which need access
* to these objects or methods.
- *
- * Threading: AllocationManager manages thread-safety internally. Operations within the context of a single BufferLedger
- * are lockless in nature and can be leveraged by multiple threads. Operations that cross the context of two ledgers
- * will acquire a lock on the AllocationManager instance. Important note, there is one AllocationManager per
- * UnsafeDirectLittleEndian buffer allocation. As such, there will be thousands of these in a typical query. The
+ *
+ * Threading: AllocationManager manages thread-safety internally. Operations within the context
+ * of a single BufferLedger
+ * are lockless in nature and can be leveraged by multiple threads. Operations that cross the
+ * context of two ledgers
+ * will acquire a lock on the AllocationManager instance. Important note, there is one
+ * AllocationManager per
+ * UnsafeDirectLittleEndian buffer allocation. As such, there will be thousands of these in a
+ * typical query. The
* contention of acquiring a lock on AllocationManager should be very low.
- *
*/
public class AllocationManager {
- // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AllocationManager.class);
+ // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger
+ // (AllocationManager.class);
private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0);
private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0);
@@ -81,17 +90,19 @@ public class AllocationManager {
this.root = accountingAllocator.root;
this.underlying = INNER_ALLOCATOR.allocate(size);
- // we do a no retain association since our creator will want to retrieve the newly created ledger and will create a
+ // we do a no retain association since our creator will want to retrieve the newly created
+ // ledger and will create a
// reference count at that point
this.owningLedger = associate(accountingAllocator, false);
this.size = underlying.capacity();
}
/**
- * Associate the existing underlying buffer with a new allocator. This will increase the reference count to the
+ * Associate the existing underlying buffer with a new allocator. This will increase the
+ * reference count to the
* provided ledger by 1.
- * @param allocator
- * The target allocator to associate this buffer with.
+ *
+ * @param allocator The target allocator to associate this buffer with.
* @return The Ledger (new or existing) that associates the underlying buffer to this new ledger.
*/
BufferLedger associate(final BaseAllocator allocator) {
@@ -118,7 +129,8 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta
}
try (AutoCloseableLock write = writeLock.open()) {
- // we have to recheck existing ledger since a second reader => writer could be competing with us.
+ // we have to recheck existing ledger since a second reader => writer could be competing
+ // with us.
final BufferLedger existingLedger = map.get(allocator);
if (existingLedger != null) {
@@ -141,7 +153,8 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta
/**
- * The way that a particular BufferLedger communicates back to the AllocationManager that it now longer needs to hold
+ * The way that a particular BufferLedger communicates back to the AllocationManager that it
+ * now longer needs to hold
* a reference to particular piece of memory.
*/
private class ReleaseListener {
@@ -169,16 +182,19 @@ public void release() {
amDestructionTime = System.nanoTime();
owningLedger = null;
} else {
- // we need to change the owning allocator. we've been removed so we'll get whatever is top of list
+ // we need to change the owning allocator. we've been removed so we'll get whatever is
+ // top of list
BufferLedger newLedger = map.values().iterator().next();
- // we'll forcefully transfer the ownership and not worry about whether we exceeded the limit
+ // we'll forcefully transfer the ownership and not worry about whether we exceeded the
+ // limit
// since this consumer can't do anything with this.
oldLedger.transferBalance(newLedger);
}
} else {
if (map.isEmpty()) {
- throw new IllegalStateException("The final removal of a ledger should be connected to the owning ledger.");
+ throw new IllegalStateException("The final removal of a ledger should be connected to " +
+ "the owning ledger.");
}
}
@@ -187,25 +203,30 @@ public void release() {
}
/**
- * The reference manager that binds an allocator manager to a particular BaseAllocator. Also responsible for creating
+ * The reference manager that binds an allocator manager to a particular BaseAllocator. Also
+ * responsible for creating
* a set of ArrowBufs that share a common fate and set of reference counts.
- * As with AllocationManager, the only reason this is public is due to ArrowBuf being in io.netty.buffer package.
+ * As with AllocationManager, the only reason this is public is due to ArrowBuf being in io
+ * .netty.buffer package.
*/
public class BufferLedger {
private final IdentityHashMap
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory;
import io.netty.buffer.ArrowBuf;
/**
- * Supports cumulative allocation reservation. Clients may increase the size of the reservation repeatedly until they
- * call for an allocation of the current total size. The reservation can only be used once, and will throw an exception
+ * Supports cumulative allocation reservation. Clients may increase the size of the reservation
+ * repeatedly until they
+ * call for an allocation of the current total size. The reservation can only be used once, and
+ * will throw an exception
* if it is used more than once.
*
- * For the purposes of airtight memory accounting, the reservation must be close()d whether it is used or not.
+ * For the purposes of airtight memory accounting, the reservation must be close()d whether it is
+ * used or not.
* This is not threadsafe.
*/
public interface AllocationReservation extends AutoCloseable {
/**
* Add to the current reservation.
- *
+ *
* Adding may fail if the allocator is not allowed to consume any more space.
*
* @param nBytes the number of bytes to add
@@ -42,7 +46,7 @@ public interface AllocationReservation extends AutoCloseable {
/**
* Requests a reservation of additional space.
- *
+ *
* The implementation of the allocator's inner class provides this.
*
* @param nBytes the amount to reserve
@@ -52,7 +56,7 @@ public interface AllocationReservation extends AutoCloseable {
/**
* Allocate a buffer whose size is the total of all the add()s made.
- *
+ *
* The allocation request can still fail, even if the amount of space
* requested is available, if the allocation cannot be made contiguously.
*
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocatorClosedException.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocatorClosedException.java
index 3274642dedd59..d5b638e1ed298 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/AllocatorClosedException.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocatorClosedException.java
@@ -6,15 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory;
/**
@@ -23,6 +24,7 @@
*/
@SuppressWarnings("serial")
public class AllocatorClosedException extends RuntimeException {
+
/**
* @param message string associated with the cause
*/
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java
index 5dc5ac397bd93..b8b5283423c82 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java
@@ -6,15 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory;
import io.netty.buffer.ByteBuf;
@@ -23,9 +24,12 @@
import io.netty.buffer.ExpandableByteBuf;
/**
- * An implementation of ByteBufAllocator that wraps a Arrow BufferAllocator. This allows the RPC layer to be accounted
- * and managed using Arrow's BufferAllocator infrastructure. The only thin different from a typical BufferAllocator is
- * the signature and the fact that this Allocator returns ExpandableByteBufs which enable otherwise non-expandable
+ * An implementation of ByteBufAllocator that wraps a Arrow BufferAllocator. This allows the RPC
+ * layer to be accounted
+ * and managed using Arrow's BufferAllocator infrastructure. The only thin different from a
+ * typical BufferAllocator is
+ * the signature and the fact that this Allocator returns ExpandableByteBufs which enable
+ * otherwise non-expandable
* ArrowBufs to be expandable.
*/
public class ArrowByteBufAllocator implements ByteBufAllocator {
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
index 9edafbce082cb..aaa7ce804c3e5 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
@@ -6,57 +6,54 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory;
-import java.util.Arrays;
-import java.util.IdentityHashMap;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.base.Preconditions;
+
+import io.netty.buffer.ArrowBuf;
+import io.netty.buffer.UnsafeDirectLittleEndian;
import org.apache.arrow.memory.AllocationManager.BufferLedger;
import org.apache.arrow.memory.util.AssertionUtil;
import org.apache.arrow.memory.util.HistoricalLog;
-import com.google.common.base.Preconditions;
-
-import io.netty.buffer.ArrowBuf;
-import io.netty.buffer.UnsafeDirectLittleEndian;
+import java.util.Arrays;
+import java.util.IdentityHashMap;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
public abstract class BaseAllocator extends Accountant implements BufferAllocator {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseAllocator.class);
public static final String DEBUG_ALLOCATOR = "arrow.memory.debug.allocator";
-
public static final int DEBUG_LOG_LENGTH = 6;
public static final boolean DEBUG = AssertionUtil.isAssertionsEnabled()
|| Boolean.parseBoolean(System.getProperty(DEBUG_ALLOCATOR, "false"));
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseAllocator
+ .class);
+ // Package exposed for sharing between AllocatorManger and BaseAllocator objects
+ final String name;
+ final RootAllocator root;
private final Object DEBUG_LOCK = DEBUG ? new Object() : null;
-
private final AllocationListener listener;
private final BaseAllocator parentAllocator;
private final ArrowByteBufAllocator thisAsByteBufAllocator;
private final IdentityHashMap
- * The implementation of the allocator's inner class provides this.
- *
- * @param nBytes
- * the size of the buffer requested
- * @return the buffer, or null, if the request cannot be satisfied
- */
- private ArrowBuf allocate(int nBytes) {
- assertOpen();
-
- boolean success = false;
-
- /*
- * The reservation already added the requested bytes to the allocators owned and allocated bytes via reserve().
- * This ensures that they can't go away. But when we ask for the buffer here, that will add to the allocated bytes
- * as well, so we need to return the same number back to avoid double-counting them.
- */
- try {
- final ArrowBuf arrowBuf = BaseAllocator.this.bufferWithoutReservation(nBytes, null);
-
- listener.onAllocation(nBytes);
- if (DEBUG) {
- historicalLog.recordEvent("allocate() => %s", String.format("ArrowBuf[%d]", arrowBuf.getId()));
- }
- success = true;
- return arrowBuf;
- } finally {
- if (!success) {
- releaseBytes(nBytes);
- }
- }
- }
-
- /**
- * Return the reservation back to the allocator without having used it.
- *
- * @param nBytes
- * the size of the reservation
- */
- private void releaseReservation(int nBytes) {
- assertOpen();
-
- releaseBytes(nBytes);
-
- if (DEBUG) {
- historicalLog.recordEvent("releaseReservation(%d)", nBytes);
- }
- }
-
- }
-
@Override
public AllocationReservation newReservation() {
assertOpen();
@@ -460,7 +327,6 @@ public AllocationReservation newReservation() {
return new Reservation();
}
-
@Override
public synchronized void close() {
/*
@@ -474,7 +340,7 @@ public synchronized void close() {
isClosed = true;
if (DEBUG) {
- synchronized(DEBUG_LOCK) {
+ synchronized (DEBUG_LOCK) {
verifyAllocator();
// are there outstanding child allocators?
@@ -488,7 +354,8 @@ public synchronized void close() {
}
throw new IllegalStateException(
- String.format("Allocator[%s] closed with outstanding child allocators.\n%s", name, toString()));
+ String.format("Allocator[%s] closed with outstanding child allocators.\n%s", name,
+ toString()));
}
// are there outstanding buffers?
@@ -501,7 +368,8 @@ public synchronized void close() {
if (reservations.size() != 0) {
throw new IllegalStateException(
- String.format("Allocator[%s] closed with outstanding reservations (%d).\n%s", name, reservations.size(),
+ String.format("Allocator[%s] closed with outstanding reservations (%d).\n%s", name,
+ reservations.size(),
toString()));
}
@@ -512,7 +380,8 @@ public synchronized void close() {
final long allocated = getAllocatedMemory();
if (allocated > 0) {
throw new IllegalStateException(
- String.format("Memory was leaked by query. Memory leaked: (%d)\n%s", allocated, toString()));
+ String.format("Memory was leaked by query. Memory leaked: (%d)\n%s", allocated,
+ toString()));
}
// we need to release our memory to our parent before we tell it we've closed.
@@ -543,7 +412,8 @@ public String toString() {
}
/**
- * Provide a verbose string of the current allocator state. Includes the state of all child allocators, along with
+ * Provide a verbose string of the current allocator state. Includes the state of all child
+ * allocators, along with
* historical logs of each object and including stacktraces.
*
* @return A Verbose string of current allocator state.
@@ -559,48 +429,32 @@ private void hist(String noteFormat, Object... args) {
historicalLog.recordEvent(noteFormat, args);
}
- /**
- * Rounds up the provided value to the nearest power of two.
- *
- * @param val
- * An integer value.
- * @return The closest power of two of that value.
- */
- static int nextPowerOfTwo(int val) {
- int highestBit = Integer.highestOneBit(val);
- if (highestBit == val) {
- return val;
- } else {
- return highestBit << 1;
- }
- }
-
-
/**
* Verifies the accounting state of the allocator. Only works for DEBUG.
*
- * @throws IllegalStateException
- * when any problems are found
+ * @throws IllegalStateException when any problems are found
*/
void verifyAllocator() {
- final IdentityHashMap
- * This overload is used for recursive calls, allowing for checking that ArrowBufs are unique across all allocators
+ *
+ * This overload is used for recursive calls, allowing for checking that ArrowBufs are unique
+ * across all allocators
* that are checked.
*
+ *
+ * The implementation of the allocator's inner class provides this.
+ *
+ * @param nBytes the size of the buffer requested
+ * @return the buffer, or null, if the request cannot be satisfied
+ */
+ private ArrowBuf allocate(int nBytes) {
+ assertOpen();
+
+ boolean success = false;
+
+ /*
+ * The reservation already added the requested bytes to the allocators owned and allocated
+ * bytes via reserve().
+ * This ensures that they can't go away. But when we ask for the buffer here, that will add
+ * to the allocated bytes
+ * as well, so we need to return the same number back to avoid double-counting them.
+ */
+ try {
+ final ArrowBuf arrowBuf = BaseAllocator.this.bufferWithoutReservation(nBytes, null);
+
+ listener.onAllocation(nBytes);
+ if (DEBUG) {
+ historicalLog.recordEvent("allocate() => %s", String.format("ArrowBuf[%d]", arrowBuf
+ .getId()));
+ }
+ success = true;
+ return arrowBuf;
+ } finally {
+ if (!success) {
+ releaseBytes(nBytes);
+ }
+ }
+ }
+
+ /**
+ * Return the reservation back to the allocator without having used it.
+ *
+ * @param nBytes the size of the reservation
+ */
+ private void releaseReservation(int nBytes) {
+ assertOpen();
+
+ releaseBytes(nBytes);
+
+ if (DEBUG) {
+ historicalLog.recordEvent("releaseReservation(%d)", nBytes);
+ }
+ }
+
}
}
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BoundsChecking.java b/java/memory/src/main/java/org/apache/arrow/memory/BoundsChecking.java
index 4e88c734ab4be..b0e9cd8c1a0e9 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/BoundsChecking.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BoundsChecking.java
@@ -6,21 +6,22 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory;
public class BoundsChecking {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BoundsChecking.class);
public static final boolean BOUNDS_CHECKING_ENABLED;
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BoundsChecking.class);
static {
boolean isAssertEnabled = false;
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java
index 356a3416cbf85..81ffb1bec780e 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java
@@ -6,47 +6,48 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory;
-import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ArrowBuf;
+import io.netty.buffer.ByteBufAllocator;
/**
* Wrapper class to deal with byte buffer allocation. Ensures users only use designated methods.
*/
public interface BufferAllocator extends AutoCloseable {
+
/**
- * Allocate a new or reused buffer of the provided size. Note that the buffer may technically be larger than the
- * requested size for rounding purposes. However, the buffer's capacity will be set to the configured size.
+ * Allocate a new or reused buffer of the provided size. Note that the buffer may technically
+ * be larger than the
+ * requested size for rounding purposes. However, the buffer's capacity will be set to the
+ * configured size.
*
- * @param size
- * The size in bytes.
+ * @param size The size in bytes.
* @return a new ArrowBuf, or null if the request can't be satisfied
- * @throws OutOfMemoryException
- * if buffer cannot be allocated
+ * @throws OutOfMemoryException if buffer cannot be allocated
*/
public ArrowBuf buffer(int size);
/**
- * Allocate a new or reused buffer of the provided size. Note that the buffer may technically be larger than the
- * requested size for rounding purposes. However, the buffer's capacity will be set to the configured size.
+ * Allocate a new or reused buffer of the provided size. Note that the buffer may technically
+ * be larger than the
+ * requested size for rounding purposes. However, the buffer's capacity will be set to the
+ * configured size.
*
- * @param size
- * The size in bytes.
- * @param manager
- * A buffer manager to manage reallocation.
+ * @param size The size in bytes.
+ * @param manager A buffer manager to manage reallocation.
* @return a new ArrowBuf, or null if the request can't be satisfied
- * @throws OutOfMemoryException
- * if buffer cannot be allocated
+ * @throws OutOfMemoryException if buffer cannot be allocated
*/
public ArrowBuf buffer(int size, BufferManager manager);
@@ -60,19 +61,16 @@ public interface BufferAllocator extends AutoCloseable {
/**
* Create a new child allocator.
*
- * @param name
- * the name of the allocator.
- * @param initReservation
- * the initial space reservation (obtained from this allocator)
- * @param maxAllocation
- * maximum amount of space the new allocator can allocate
+ * @param name the name of the allocator.
+ * @param initReservation the initial space reservation (obtained from this allocator)
+ * @param maxAllocation maximum amount of space the new allocator can allocate
* @return the new allocator, or null if it can't be created
*/
public BufferAllocator newChildAllocator(String name, long initReservation, long maxAllocation);
/**
* Close and release all buffers generated from this buffer pool.
- *
+ *
* When assertions are on, complains if there are any outstanding buffers; to avoid
* that, release all buffers before the allocator is closed.
*/
@@ -87,19 +85,18 @@ public interface BufferAllocator extends AutoCloseable {
public long getAllocatedMemory();
/**
- * Set the maximum amount of memory this allocator is allowed to allocate.
+ * Return the current maximum limit this allocator imposes.
*
- * @param newLimit
- * The new Limit to apply to allocations
+ * @return Limit in number of bytes.
*/
- public void setLimit(long newLimit);
+ public long getLimit();
/**
- * Return the current maximum limit this allocator imposes.
+ * Set the maximum amount of memory this allocator is allowed to allocate.
*
- * @return Limit in number of bytes.
+ * @param newLimit The new Limit to apply to allocations
*/
- public long getLimit();
+ public void setLimit(long newLimit);
/**
* Returns the peak amount of memory allocated from this allocator.
@@ -118,25 +115,31 @@ public interface BufferAllocator extends AutoCloseable {
public AllocationReservation newReservation();
/**
- * Get a reference to the empty buffer associated with this allocator. Empty buffers are special because we don't
- * worry about them leaking or managing reference counts on them since they don't actually point to any memory.
+ * Get a reference to the empty buffer associated with this allocator. Empty buffers are
+ * special because we don't
+ * worry about them leaking or managing reference counts on them since they don't actually
+ * point to any memory.
*/
public ArrowBuf getEmpty();
/**
- * Return the name of this allocator. This is a human readable name that can help debugging. Typically provides
+ * Return the name of this allocator. This is a human readable name that can help debugging.
+ * Typically provides
* coordinates about where this allocator was created
*/
public String getName();
/**
- * Return whether or not this allocator (or one if its parents) is over its limits. In the case that an allocator is
- * over its limit, all consumers of that allocator should aggressively try to addrss the overlimit situation.
+ * Return whether or not this allocator (or one if its parents) is over its limits. In the case
+ * that an allocator is
+ * over its limit, all consumers of that allocator should aggressively try to addrss the
+ * overlimit situation.
*/
public boolean isOverLimit();
/**
- * Return a verbose string describing this allocator. If in DEBUG mode, this will also include relevant stacktraces
+ * Return a verbose string describing this allocator. If in DEBUG mode, this will also include
+ * relevant stacktraces
* and historical logs for underlying objects
*
* @return A very verbose description of the allocator hierarchy.
@@ -144,7 +147,8 @@ public interface BufferAllocator extends AutoCloseable {
public String toVerboseString();
/**
- * Asserts (using java assertions) that the provided allocator is currently open. If assertions are disabled, this is
+ * Asserts (using java assertions) that the provided allocator is currently open. If assertions
+ * are disabled, this is
* a no-op.
*/
public void assertOpen();
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferManager.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferManager.java
index 8969434791012..2fe763e10aff9 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/BufferManager.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferManager.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
+
package org.apache.arrow.memory;
import io.netty.buffer.ArrowBuf;
@@ -24,7 +25,7 @@
* re-allocation the old buffer will be freed. Managing a list of these buffers
* prevents some parts of the system from needing to define a correct location
* to place the final call to free them.
- *
+ *
* The current uses of these types of buffers are within the pluggable components of Drill.
* In UDFs, memory management should not be a concern. We provide access to re-allocatable
* ArrowBufs to give UDF writers general purpose buffers we can account for. To prevent the need
@@ -38,12 +39,9 @@ public interface BufferManager extends AutoCloseable {
/**
* Replace an old buffer with a new version at least of the provided size. Does not copy data.
*
- * @param old
- * Old Buffer that the user is no longer going to use.
- * @param newSize
- * Size of new replacement buffer.
- * @return
- * A new version of the buffer.
+ * @param old Old Buffer that the user is no longer going to use.
+ * @param newSize Size of new replacement buffer.
+ * @return A new version of the buffer.
*/
public ArrowBuf replace(ArrowBuf old, int newSize);
@@ -57,8 +55,7 @@ public interface BufferManager extends AutoCloseable {
/**
* Get a managed buffer of at least a certain size.
*
- * @param size
- * The desired size
+ * @param size The desired size
* @return A buffer
*/
public ArrowBuf getManagedBuffer(int size);
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
index 11c9063fc9c69..f9a6dc72ece8c 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
@@ -6,15 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory;
@@ -22,21 +23,22 @@
* Child allocator class. Only slightly different from the {@see RootAllocator},
* in that these can't be created directly, but must be obtained from
* {@see BufferAllocator#newChildAllocator(AllocatorOwner, long, long, int)}.
-
+ *
* Child allocators can only be created by the root, or other children, so
* this class is package private.
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory;
public class OutOfMemoryException extends RuntimeException {
- private static final long serialVersionUID = -6858052345185793382L;
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OutOfMemoryException.class);
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OutOfMemoryException
+ .class);
+ private static final long serialVersionUID = -6858052345185793382L;
public OutOfMemoryException() {
super();
}
- public OutOfMemoryException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ public OutOfMemoryException(String message, Throwable cause, boolean enableSuppression, boolean
+ writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
index 57a2c0cdae8d8..1dc6bf0c92fa0 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
@@ -6,15 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory;
import com.google.common.annotations.VisibleForTesting;
@@ -24,6 +25,7 @@
* tree of descendant child allocators.
*/
public class RootAllocator extends BaseAllocator {
+
public RootAllocator(final long limit) {
this(AllocationListener.NOOP, limit);
}
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/package-info.java b/java/memory/src/main/java/org/apache/arrow/memory/package-info.java
index 40d25cada4519..cef382d1e044e 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/package-info.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/package-info.java
@@ -1,24 +1,43 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ * Memory Allocation, Account and Management
+ *
+ * See the README.md file in this directory for detailed information about Arrow's memory allocation
+ * subsystem.
+ *
+ * Memory Allocation, Account and Management
+ *
+ * See the README.md file in this directory for detailed information about Arrow's memory
+ * allocation subsystem.
+ *
+ * Memory Allocation, Account and Management
+ *
+ * See the README.md file in this directory for detailed information about Arrow's memory
+ * allocation subsystem.
+ *
+ * Memory Allocation, Account and Management
+ *
+ * See the README.md file in this directory for detailed information about Arrow's memory
+ * allocation subsystem.
*/
/**
* Memory Allocation, Account and Management
*
- * See the README.md file in this directory for detailed information about Arrow's memory allocation subsystem.
+ * See the README.md file in this directory for detailed information about Arrow's memory
+ * allocation subsystem.
*
*/
+
package org.apache.arrow.memory;
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/util/AssertionUtil.java b/java/memory/src/main/java/org/apache/arrow/memory/util/AssertionUtil.java
index 28d078528974e..710f572e06027 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/util/AssertionUtil.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/util/AssertionUtil.java
@@ -6,32 +6,33 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory.util;
public class AssertionUtil {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AssertionUtil.class);
public static final boolean ASSERT_ENABLED;
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AssertionUtil.class);
- static{
+ static {
boolean isAssertEnabled = false;
assert isAssertEnabled = true;
ASSERT_ENABLED = isAssertEnabled;
}
- public static boolean isAssertionsEnabled(){
- return ASSERT_ENABLED;
+ private AssertionUtil() {
}
- private AssertionUtil() {
+ public static boolean isAssertionsEnabled() {
+ return ASSERT_ENABLED;
}
}
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/util/AutoCloseableLock.java b/java/memory/src/main/java/org/apache/arrow/memory/util/AutoCloseableLock.java
index 94e5cc5fded4f..8d9008c894ac8 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/util/AutoCloseableLock.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/util/AutoCloseableLock.java
@@ -6,15 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory.util;
import java.util.concurrent.locks.Lock;
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/util/HistoricalLog.java b/java/memory/src/main/java/org/apache/arrow/memory/util/HistoricalLog.java
index c9b5c5385c596..c464598bfb856 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/util/HistoricalLog.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/util/HistoricalLog.java
@@ -6,53 +6,43 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory.util;
+import org.slf4j.Logger;
+
import java.util.Arrays;
import java.util.LinkedList;
-import org.slf4j.Logger;
-
/**
* Utility class that can be used to log activity within a class
* for later logging and debugging. Supports recording events and
* recording the stack at the time they occur.
*/
public class HistoricalLog {
- private static class Event {
- private final String note; // the event text
- private final StackTrace stackTrace; // where the event occurred
- private final long time;
-
- public Event(final String note) {
- this.note = note;
- this.time = System.nanoTime();
- stackTrace = new StackTrace();
- }
- }
private final LinkedList
* This form supports the specification of a limit that will limit the
* number of historical entries kept (which keeps down the amount of memory
* used). With the limit, the first entry made is always kept (under the
@@ -70,12 +60,12 @@ public HistoricalLog(final String idStringFormat, Object... args) {
* Each time a new entry is made, the oldest that is not the first is dropped.
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory.util;
import java.util.Arrays;
@@ -23,6 +24,7 @@
* Convenient way of obtaining and manipulating stack traces for debugging.
*/
public class StackTrace {
+
private final StackTraceElement[] stackTraceElements;
/**
@@ -36,10 +38,9 @@ public StackTrace() {
/**
* Write the stack trace to a StringBuilder.
- * @param sb
- * where to write it
- * @param indent
- * how many double spaces to indent each line
+ *
+ * @param sb where to write it
+ * @param indent how many double spaces to indent each line
*/
public void writeToBuilder(final StringBuilder sb, final int indent) {
// create the indentation string
@@ -47,7 +48,7 @@ public void writeToBuilder(final StringBuilder sb, final int indent) {
Arrays.fill(indentation, ' ');
// write the stack trace in standard Java format
- for(StackTraceElement ste : stackTraceElements) {
+ for (StackTraceElement ste : stackTraceElements) {
sb.append(indentation)
.append("at ")
.append(ste.getClassName())
diff --git a/java/pom.xml b/java/pom.xml
index fa03783396ffb..774761f0c1e66 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -35,6 +35,7 @@
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.arrow.tools;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
+package org.apache.arrow.tools;
import com.google.common.base.Preconditions;
@@ -31,11 +28,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+
public class EchoServer {
private static final Logger LOGGER = LoggerFactory.getLogger(EchoServer.class);
-
- private boolean closed = false;
private final ServerSocket serverSocket;
+ private boolean closed = false;
public EchoServer(int port) throws IOException {
LOGGER.info("Starting echo server.");
@@ -43,22 +43,64 @@ public EchoServer(int port) throws IOException {
LOGGER.info("Running echo server on port: " + port());
}
- public int port() { return serverSocket.getLocalPort(); }
+ public static void main(String[] args) throws Exception {
+ int port;
+ if (args.length > 0) {
+ port = Integer.parseInt(args[0]);
+ } else {
+ port = 8080;
+ }
+ new EchoServer(port).run();
+ }
+
+ public int port() {
+ return serverSocket.getLocalPort();
+ }
+
+ public void run() throws IOException {
+ try {
+ while (!closed) {
+ LOGGER.info("Waiting to accept new client connection.");
+ Socket clientSocket = serverSocket.accept();
+ LOGGER.info("Accepted new client connection.");
+ try (ClientConnection client = new ClientConnection(clientSocket)) {
+ try {
+ client.run();
+ } catch (IOException e) {
+ LOGGER.warn("Error handling client connection.", e);
+ }
+ }
+ LOGGER.info("Closed connection with client");
+ }
+ } catch (java.net.SocketException ex) {
+ if (!closed) throw ex;
+ } finally {
+ serverSocket.close();
+ LOGGER.info("Server closed.");
+ }
+ }
+
+ public void close() throws IOException {
+ closed = true;
+ serverSocket.close();
+ }
public static class ClientConnection implements AutoCloseable {
public final Socket socket;
+
public ClientConnection(Socket socket) {
this.socket = socket;
}
public void run() throws IOException {
- BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+ BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
// Read the entire input stream and write it back
try (ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
// load the first batch before instantiating the writer so that we have any dictionaries
reader.loadNextBatch();
- try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, socket.getOutputStream())) {
+ try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, socket
+ .getOutputStream())) {
writer.start();
int echoed = 0;
while (true) {
@@ -83,42 +125,4 @@ public void close() throws IOException {
socket.close();
}
}
-
- public void run() throws IOException {
- try {
- while (!closed) {
- LOGGER.info("Waiting to accept new client connection.");
- Socket clientSocket = serverSocket.accept();
- LOGGER.info("Accepted new client connection.");
- try (ClientConnection client = new ClientConnection(clientSocket)) {
- try {
- client.run();
- } catch (IOException e) {
- LOGGER.warn("Error handling client connection.", e);
- }
- }
- LOGGER.info("Closed connection with client");
- }
- } catch (java.net.SocketException ex) {
- if (!closed) throw ex;
- } finally {
- serverSocket.close();
- LOGGER.info("Server closed.");
- }
- }
-
- public void close() throws IOException {
- closed = true;
- serverSocket.close();
- }
-
- public static void main(String[] args) throws Exception {
- int port;
- if (args.length > 0) {
- port = Integer.parseInt(args[0]);
- } else {
- port = 8080;
- }
- new EchoServer(port).run();
- }
}
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
index 9fa7b761a5772..b8621920d3348 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
@@ -16,13 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.arrow.tools;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
+package org.apache.arrow.tools;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
@@ -38,17 +33,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
public class FileRoundtrip {
private static final Logger LOGGER = LoggerFactory.getLogger(FileRoundtrip.class);
-
- public static void main(String[] args) {
- System.exit(new FileRoundtrip(System.out, System.err).run(args));
- }
-
private final Options options;
private final PrintStream out;
private final PrintStream err;
-
FileRoundtrip(PrintStream out, PrintStream err) {
this.out = out;
this.err = err;
@@ -58,6 +53,10 @@ public static void main(String[] args) {
}
+ public static void main(String[] args) {
+ System.exit(new FileRoundtrip(System.out, System.err).run(args));
+ }
+
private File validateFile(String type, String fileName) {
if (fileName == null) {
throw new IllegalArgumentException("missing " + type + " file parameter");
@@ -81,7 +80,8 @@ int run(String[] args) {
File outFile = validateFile("output", outFileName);
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); // TODO: close
try (FileInputStream fileInputStream = new FileInputStream(inFile);
- ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) {
+ ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(),
+ allocator)) {
VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
Schema schema = root.getSchema();
@@ -89,7 +89,8 @@ int run(String[] args) {
LOGGER.debug("Found schema: " + schema);
try (FileOutputStream fileOutputStream = new FileOutputStream(outFile);
- ArrowFileWriter arrowWriter = new ArrowFileWriter(root, arrowReader, fileOutputStream.getChannel())) {
+ ArrowFileWriter arrowWriter = new ArrowFileWriter(root, arrowReader,
+ fileOutputStream.getChannel())) {
arrowWriter.start();
while (true) {
arrowReader.loadNextBatch();
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
index d5345535d19dc..be404fd4c5950 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
@@ -6,22 +6,17 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.arrow.tools;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
+package org.apache.arrow.tools;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
@@ -29,6 +24,12 @@
import org.apache.arrow.vector.file.ArrowFileReader;
import org.apache.arrow.vector.stream.ArrowStreamWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
/**
* Converts an Arrow file to an Arrow stream. The file should be specified as the
* first argument and the output is written to standard out.
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
index 5d4849c234383..453693d7fa489 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
@@ -16,15 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.arrow.tools;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
+package org.apache.arrow.tools;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
@@ -44,8 +37,25 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
public class Integration {
private static final Logger LOGGER = LoggerFactory.getLogger(Integration.class);
+ private final Options options;
+
+ Integration() {
+ this.options = new Options();
+ this.options.addOption("a", "arrow", true, "arrow file");
+ this.options.addOption("j", "json", true, "json file");
+ this.options.addOption("c", "command", true, "command to execute: " + Arrays.toString(Command
+ .values()));
+ }
public static void main(String[] args) {
try {
@@ -59,20 +69,61 @@ public static void main(String[] args) {
}
}
- private final Options options;
+ private static void fatalError(String message, Throwable e) {
+ System.err.println(message);
+ System.err.println(e.getMessage());
+ LOGGER.error(message, e);
+ System.exit(1);
+ }
+
+ private File validateFile(String type, String fileName, boolean shouldExist) {
+ if (fileName == null) {
+ throw new IllegalArgumentException("missing " + type + " file parameter");
+ }
+ File f = new File(fileName);
+ if (shouldExist && (!f.exists() || f.isDirectory())) {
+ throw new IllegalArgumentException(type + " file not found: " + f.getAbsolutePath());
+ }
+ if (!shouldExist && f.exists()) {
+ throw new IllegalArgumentException(type + " file already exists: " + f.getAbsolutePath());
+ }
+ return f;
+ }
+
+ void run(String[] args) throws ParseException, IOException {
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args, false);
+
+
+ Command command = toCommand(cmd.getOptionValue("command"));
+ File arrowFile = validateFile("arrow", cmd.getOptionValue("arrow"), command.arrowExists);
+ File jsonFile = validateFile("json", cmd.getOptionValue("json"), command.jsonExists);
+ command.execute(arrowFile, jsonFile);
+ }
+
+ private Command toCommand(String commandName) {
+ try {
+ return Command.valueOf(commandName);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Unknown command: " + commandName + " expected one of "
+ + Arrays.toString(Command.values()));
+ }
+ }
enum Command {
ARROW_TO_JSON(true, false) {
@Override
public void execute(File arrowFile, File jsonFile) throws IOException {
- try(BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
- FileInputStream fileInputStream = new FileInputStream(arrowFile);
- ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) {
+ try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+ FileInputStream fileInputStream = new FileInputStream(arrowFile);
+ ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(),
+ allocator)) {
VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
Schema schema = root.getSchema();
LOGGER.debug("Input file size: " + arrowFile.length());
LOGGER.debug("Found schema: " + schema);
- try (JsonFileWriter writer = new JsonFileWriter(jsonFile, JsonFileWriter.config().pretty(true))) {
+ try (JsonFileWriter writer = new JsonFileWriter(jsonFile, JsonFileWriter.config()
+ .pretty(true))) {
writer.start(schema);
for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) {
arrowReader.loadRecordBatch(rbBlock);
@@ -94,7 +145,8 @@ public void execute(File arrowFile, File jsonFile) throws IOException {
try (FileOutputStream fileOutputStream = new FileOutputStream(arrowFile);
VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
// TODO json dictionaries
- ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel())) {
+ ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream
+ .getChannel())) {
arrowWriter.start();
reader.read(root);
while (root.getRowCount() != 0) {
@@ -113,7 +165,8 @@ public void execute(File arrowFile, File jsonFile) throws IOException {
try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator);
FileInputStream fileInputStream = new FileInputStream(arrowFile);
- ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) {
+ ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(),
+ allocator)) {
Schema jsonSchema = jsonReader.start();
VectorSchemaRoot arrowRoot = arrowReader.getVectorSchemaRoot();
Schema arrowSchema = arrowRoot.getSchema();
@@ -135,7 +188,8 @@ public void execute(File arrowFile, File jsonFile) throws IOException {
boolean hasMoreJSON = jsonRoot != null;
boolean hasMoreArrow = iterator.hasNext();
if (hasMoreJSON || hasMoreArrow) {
- throw new IllegalArgumentException("Unexpected RecordBatches. J:" + hasMoreJSON + " A:" + hasMoreArrow);
+ throw new IllegalArgumentException("Unexpected RecordBatches. J:" + hasMoreJSON + " "
+ + "A:" + hasMoreArrow);
}
}
}
@@ -153,51 +207,4 @@ public void execute(File arrowFile, File jsonFile) throws IOException {
}
- Integration() {
- this.options = new Options();
- this.options.addOption("a", "arrow", true, "arrow file");
- this.options.addOption("j", "json", true, "json file");
- this.options.addOption("c", "command", true, "command to execute: " + Arrays.toString(Command.values()));
- }
-
- private File validateFile(String type, String fileName, boolean shouldExist) {
- if (fileName == null) {
- throw new IllegalArgumentException("missing " + type + " file parameter");
- }
- File f = new File(fileName);
- if (shouldExist && (!f.exists() || f.isDirectory())) {
- throw new IllegalArgumentException(type + " file not found: " + f.getAbsolutePath());
- }
- if (!shouldExist && f.exists()) {
- throw new IllegalArgumentException(type + " file already exists: " + f.getAbsolutePath());
- }
- return f;
- }
-
- void run(String[] args) throws ParseException, IOException {
- CommandLineParser parser = new PosixParser();
- CommandLine cmd = parser.parse(options, args, false);
-
-
- Command command = toCommand(cmd.getOptionValue("command"));
- File arrowFile = validateFile("arrow", cmd.getOptionValue("arrow"), command.arrowExists);
- File jsonFile = validateFile("json", cmd.getOptionValue("json"), command.jsonExists);
- command.execute(arrowFile, jsonFile);
- }
-
- private Command toCommand(String commandName) {
- try {
- return Command.valueOf(commandName);
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException("Unknown command: " + commandName + " expected one of " + Arrays.toString(Command.values()));
- }
- }
-
- private static void fatalError(String message, Throwable e) {
- System.err.println(message);
- System.err.println(e.getMessage());
- LOGGER.error(message, e);
- System.exit(1);
- }
-
}
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
index 3b79d5b05e116..41dfd347be579 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
@@ -6,17 +6,24 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.tools;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.file.ArrowFileWriter;
+import org.apache.arrow.vector.stream.ArrowStreamReader;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -25,12 +32,6 @@
import java.io.OutputStream;
import java.nio.channels.Channels;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.file.ArrowFileWriter;
-import org.apache.arrow.vector.stream.ArrowStreamReader;
-
/**
* Converts an Arrow stream to an Arrow file.
*/
diff --git a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
index f752f7eaa74b9..1a389098b4f47 100644
--- a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
+++ b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
@@ -16,13 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.arrow.tools;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
+package org.apache.arrow.tools;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
@@ -39,6 +34,12 @@
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.Assert;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
public class ArrowFileTestFixtures {
static final int COUNT = 10;
@@ -58,9 +59,11 @@ static void writeData(int count, MapVector parent) {
static void validateOutput(File testOutFile, BufferAllocator allocator) throws Exception {
// read
- try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+ try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer
+ .MAX_VALUE);
FileInputStream fileInputStream = new FileInputStream(testOutFile);
- ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) {
+ ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(),
+ readerAllocator)) {
VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
Schema schema = root.getSchema();
for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) {
@@ -81,16 +84,19 @@ static void validateContent(int count, VectorSchemaRoot root) {
static void write(FieldVector parent, File file) throws FileNotFoundException, IOException {
VectorSchemaRoot root = new VectorSchemaRoot(parent);
try (FileOutputStream fileOutputStream = new FileOutputStream(file);
- ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel())) {
+ ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream
+ .getChannel())) {
arrowWriter.writeBatch();
}
}
- static void writeInput(File testInFile, BufferAllocator allocator) throws FileNotFoundException, IOException {
+ static void writeInput(File testInFile, BufferAllocator allocator) throws
+ FileNotFoundException, IOException {
int count = ArrowFileTestFixtures.COUNT;
try (
- BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
+ BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0,
+ Integer.MAX_VALUE);
MapVector parent = new MapVector("parent", vectorAllocator, null)) {
writeData(count, parent);
write(parent.getChild("root"), testInFile);
diff --git a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
index 706f8e2ca4d36..5970c57f46583 100644
--- a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
+++ b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
@@ -6,28 +6,17 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.arrow.tools;
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
+package org.apache.arrow.tools;
import com.google.common.collect.ImmutableList;
@@ -57,6 +46,18 @@
import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
public class EchoServerTest {
private static EchoServer server;
@@ -94,8 +95,8 @@ private void testEchoServer(int serverPort,
BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
VectorSchemaRoot root = new VectorSchemaRoot(asList(field), asList((FieldVector) vector), 0);
try (Socket socket = new Socket("localhost", serverPort);
- ArrowStreamWriter writer = new ArrowStreamWriter(root, null, socket.getOutputStream());
- ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), alloc)) {
+ ArrowStreamWriter writer = new ArrowStreamWriter(root, null, socket.getOutputStream());
+ ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), alloc)) {
writer.start();
for (int i = 0; i < batches; i++) {
vector.allocateNew(16);
@@ -111,7 +112,8 @@ private void testEchoServer(int serverPort,
assertEquals(new Schema(asList(field)), reader.getVectorSchemaRoot().getSchema());
- NullableTinyIntVector readVector = (NullableTinyIntVector) reader.getVectorSchemaRoot().getFieldVectors().get(0);
+ NullableTinyIntVector readVector = (NullableTinyIntVector) reader.getVectorSchemaRoot()
+ .getFieldVectors().get(0);
for (int i = 0; i < batches; i++) {
reader.loadNextBatch();
assertEquals(16, reader.getVectorSchemaRoot().getRowCount());
@@ -131,7 +133,8 @@ private void testEchoServer(int serverPort,
public void basicTest() throws InterruptedException, IOException {
BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
- Field field = new Field("testField", true, new ArrowType.Int(8, true), Collections.