From a9a570139966593ed84ddd842da73b60ace89e1e Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ozawa Date: Tue, 21 Mar 2017 15:24:19 -0700 Subject: [PATCH] ARROW-208: Add checkstyle policy to java project Author: Tsuyoshi Ozawa Closes #96 from oza/ARROW-208 and squashes the following commits: 809e729 [Tsuyoshi Ozawa] reformatted code in memory and tools dir with IDE 40ee6a3 [Tsuyoshi Ozawa] ARROW-208: Add checkstyle policy to java project --- .../main/java/io/netty/buffer/ArrowBuf.java | 219 +++---- .../io/netty/buffer/ExpandableByteBuf.java | 8 +- .../java/io/netty/buffer/LargeBuffer.java | 9 +- .../netty/buffer/MutableWrappedByteBuf.java | 18 +- .../netty/buffer/PooledByteBufAllocatorL.java | 84 +-- .../buffer/UnsafeDirectLittleEndian.java | 52 +- .../org/apache/arrow/memory/Accountant.java | 102 ++-- .../arrow/memory/AllocationListener.java | 4 +- .../arrow/memory/AllocationManager.java | 177 +++--- .../arrow/memory/AllocationReservation.java | 20 +- .../memory/AllocatorClosedException.java | 6 +- .../arrow/memory/ArrowByteBufAllocator.java | 14 +- .../apache/arrow/memory/BaseAllocator.java | 539 +++++++++--------- .../apache/arrow/memory/BoundsChecking.java | 7 +- .../apache/arrow/memory/BufferAllocator.java | 80 +-- .../apache/arrow/memory/BufferManager.java | 15 +- .../apache/arrow/memory/ChildAllocator.java | 18 +- .../arrow/memory/OutOfMemoryException.java | 13 +- .../apache/arrow/memory/RootAllocator.java | 6 +- .../org/apache/arrow/memory/package-info.java | 49 +- .../arrow/memory/util/AssertionUtil.java | 15 +- .../arrow/memory/util/AutoCloseableLock.java | 5 +- .../arrow/memory/util/HistoricalLog.java | 85 +-- .../apache/arrow/memory/util/StackTrace.java | 15 +- java/pom.xml | 55 ++ .../org/apache/arrow/tools/EchoServer.java | 102 ++-- .../org/apache/arrow/tools/FileRoundtrip.java | 29 +- .../org/apache/arrow/tools/FileToStream.java | 17 +- .../org/apache/arrow/tools/Integration.java | 133 +++-- .../org/apache/arrow/tools/StreamToFile.java | 17 +- .../arrow/tools/ArrowFileTestFixtures.java | 28 +- .../apache/arrow/tools/EchoServerTest.java | 66 ++- .../apache/arrow/tools/TestFileRoundtrip.java | 15 +- .../apache/arrow/tools/TestIntegration.java | 159 +++--- 34 files changed, 1218 insertions(+), 963 deletions(-) diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java index 95d2be5a43a36..e777b5a6a5d58 100644 --- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java @@ -6,27 +6,21 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package io.netty.buffer; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.channels.GatheringByteChannel; -import java.nio.channels.ScatteringByteChannel; -import java.nio.charset.Charset; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import com.google.common.base.Preconditions; + +import io.netty.util.internal.PlatformDependent; import org.apache.arrow.memory.AllocationManager.BufferLedger; import org.apache.arrow.memory.ArrowByteBufAllocator; @@ -37,15 +31,23 @@ import org.apache.arrow.memory.BufferManager; import org.apache.arrow.memory.util.HistoricalLog; -import com.google.common.base.Preconditions; - -import io.netty.util.internal.PlatformDependent; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; +import java.nio.charset.Charset; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ArrowBuf.class); private static final AtomicLong idGenerator = new AtomicLong(0); - + private static final int LOG_BYTES_PER_ROW = 10; private final long id = idGenerator.incrementAndGet(); private final AtomicInteger refCnt; private final UnsafeDirectLittleEndian udle; @@ -55,9 +57,9 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { private final BufferManager bufManager; private final ArrowByteBufAllocator alloc; private final boolean isEmpty; - private volatile int length; private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, "ArrowBuf[%d]", id) : null; + private volatile int length; public ArrowBuf( final AtomicInteger refCnt, @@ -85,6 +87,17 @@ public ArrowBuf( } + public static String bufferState(final ByteBuf buf) { + final int cap = buf.capacity(); + final int mcap = buf.maxCapacity(); + final int ri = buf.readerIndex(); + final int rb = buf.readableBytes(); + final int wi = buf.writerIndex(); + final int wb = buf.writableBytes(); + return String.format("cap/max: %d/%d, ri: %d, rb: %d, wi: %d, wb: %d", + cap, mcap, ri, rb, wi, wb); + } + public ArrowBuf reallocIfNeeded(final int size) { Preconditions.checkArgument(size >= 0, "reallocation size must be non-negative"); @@ -95,7 +108,8 @@ public ArrowBuf reallocIfNeeded(final int size) { if (bufManager != null) { return bufManager.replace(this, size); } else { - throw new UnsupportedOperationException("Realloc is only available in the context of an operator's UDFs"); + throw new UnsupportedOperationException("Realloc is only available in the context of an " + + "operator's UDFs"); } } @@ -128,14 +142,13 @@ private final void checkIndexD(int index, int fieldLength) { /** * Allows a function to determine whether not reading a particular string of bytes is valid. - * - * Will throw an exception if the memory is not readable for some reason. Only doesn't something in the case that + *

+ * Will throw an exception if the memory is not readable for some reason. Only doesn't + * something in the case that * AssertionUtil.BOUNDS_CHECKING_ENABLED is true. * - * @param start - * The starting position of the bytes to be read. - * @param end - * The exclusive endpoint of the bytes to be read. + * @param start The starting position of the bytes to be read. + * @param end The exclusive endpoint of the bytes to be read. */ public void checkBytes(int start, int end) { if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { @@ -156,17 +169,21 @@ private void ensure(int width) { } /** - * Create a new ArrowBuf that is associated with an alternative allocator for the purposes of memory ownership and - * accounting. This has no impact on the reference counting for the current ArrowBuf except in the situation where the + * Create a new ArrowBuf that is associated with an alternative allocator for the purposes of + * memory ownership and + * accounting. This has no impact on the reference counting for the current ArrowBuf except in + * the situation where the * passed in Allocator is the same as the current buffer. - * - * This operation has no impact on the reference count of this ArrowBuf. The newly created ArrowBuf with either have a - * reference count of 1 (in the case that this is the first time this memory is being associated with the new - * allocator) or the current value of the reference count + 1 for the other AllocationManager/BufferLedger combination + *

+ * This operation has no impact on the reference count of this ArrowBuf. The newly created + * ArrowBuf with either have a + * reference count of 1 (in the case that this is the first time this memory is being + * associated with the new + * allocator) or the current value of the reference count + 1 for the other + * AllocationManager/BufferLedger combination * in the case that the provided allocator already had an association to this underlying memory. * - * @param target - * The target allocator to create an association with. + * @param target The target allocator to create an association with. * @return A new ArrowBuf which shares the same underlying memory as this ArrowBuf. */ public ArrowBuf retain(BufferAllocator target) { @@ -186,28 +203,39 @@ public ArrowBuf retain(BufferAllocator target) { } /** - * Transfer the memory accounting ownership of this ArrowBuf to another allocator. This will generate a new ArrowBuf - * that carries an association with the underlying memory of this ArrowBuf. If this ArrowBuf is connected to the - * owning BufferLedger of this memory, that memory ownership/accounting will be transferred to the taret allocator. If - * this ArrowBuf does not currently own the memory underlying it (and is only associated with it), this does not + * Transfer the memory accounting ownership of this ArrowBuf to another allocator. This will + * generate a new ArrowBuf + * that carries an association with the underlying memory of this ArrowBuf. If this ArrowBuf is + * connected to the + * owning BufferLedger of this memory, that memory ownership/accounting will be transferred to + * the taret allocator. If + * this ArrowBuf does not currently own the memory underlying it (and is only associated with + * it), this does not * transfer any ownership to the newly created ArrowBuf. - * - * This operation has no impact on the reference count of this ArrowBuf. The newly created ArrowBuf with either have a - * reference count of 1 (in the case that this is the first time this memory is being associated with the new - * allocator) or the current value of the reference count for the other AllocationManager/BufferLedger combination in + *

+ * This operation has no impact on the reference count of this ArrowBuf. The newly created + * ArrowBuf with either have a + * reference count of 1 (in the case that this is the first time this memory is being + * associated with the new + * allocator) or the current value of the reference count for the other + * AllocationManager/BufferLedger combination in * the case that the provided allocator already had an association to this underlying memory. - * - * Transfers will always succeed, even if that puts the other allocator into an overlimit situation. This is possible - * due to the fact that the original owning allocator may have allocated this memory out of a local reservation - * whereas the target allocator may need to allocate new memory from a parent or RootAllocator. This operation is done - * in a mostly-lockless but consistent manner. As such, the overlimit==true situation could occur slightly prematurely - * to an actual overlimit==true condition. This is simply conservative behavior which means we may return overlimit + *

+ * Transfers will always succeed, even if that puts the other allocator into an overlimit + * situation. This is possible + * due to the fact that the original owning allocator may have allocated this memory out of a + * local reservation + * whereas the target allocator may need to allocate new memory from a parent or RootAllocator. + * This operation is done + * in a mostly-lockless but consistent manner. As such, the overlimit==true situation could + * occur slightly prematurely + * to an actual overlimit==true condition. This is simply conservative behavior which means we + * may return overlimit * slightly sooner than is necessary. * - * @param target - * The allocator to transfer ownership to. - * @return A new transfer result with the impact of the transfer (whether it was overlimit) as well as the newly - * created ArrowBuf. + * @param target The allocator to transfer ownership to. + * @return A new transfer result with the impact of the transfer (whether it was overlimit) as + * well as the newly created ArrowBuf. */ public TransferResult transferOwnership(BufferAllocator target) { @@ -223,28 +251,6 @@ public TransferResult transferOwnership(BufferAllocator target) { return new TransferResult(allocationFit, newBuf); } - /** - * The outcome of a Transfer. - */ - public class TransferResult { - - /** - * Whether this transfer fit within the target allocator's capacity. - */ - public final boolean allocationFit; - - /** - * The newly created buffer associated with the target allocator. - */ - public final ArrowBuf buffer; - - private TransferResult(boolean allocationFit, ArrowBuf buffer) { - this.allocationFit = allocationFit; - this.buffer = buffer; - } - - } - @Override public boolean release() { return release(1); @@ -261,7 +267,8 @@ public boolean release(int decrement) { } if (decrement < 1) { - throw new IllegalStateException(String.format("release(%d) argument is not positive. Buffer Info: %s", + throw new IllegalStateException(String.format("release(%d) argument is not positive. Buffer" + + " Info: %s", decrement, toVerboseString())); } @@ -273,7 +280,8 @@ public boolean release(int decrement) { if (refCnt < 0) { throw new IllegalStateException( - String.format("ArrowBuf[%d] refCnt has gone negative. Buffer Info: %s", id, toVerboseString())); + String.format("ArrowBuf[%d] refCnt has gone negative. Buffer Info: %s", id, + toVerboseString())); } return refCnt == 0; @@ -299,7 +307,8 @@ public synchronized ArrowBuf capacity(int newCapacity) { return this; } - throw new UnsupportedOperationException("Buffers don't support resizing that increases the size."); + throw new UnsupportedOperationException("Buffers don't support resizing that increases the " + + "size."); } @Override @@ -354,17 +363,6 @@ public ArrowBuf slice() { return slice(readerIndex(), readableBytes()); } - public static String bufferState(final ByteBuf buf) { - final int cap = buf.capacity(); - final int mcap = buf.maxCapacity(); - final int ri = buf.readerIndex(); - final int rb = buf.readableBytes(); - final int wi = buf.writerIndex(); - final int wb = buf.writableBytes(); - return String.format("cap/max: %d/%d, ri: %d, rb: %d, wi: %d, wb: %d", - cap, mcap, ri, rb, wi, wb); - } - @Override public ArrowBuf slice(int index, int length) { @@ -373,7 +371,8 @@ public ArrowBuf slice(int index, int length) { } /* - * Re the behavior of reference counting, see http://netty.io/wiki/reference-counted-objects.html#wiki-h3-5, which + * Re the behavior of reference counting, see http://netty.io/wiki/reference-counted-objects + * .html#wiki-h3-5, which * explains that derived buffers share their reference count with their parent */ final ArrowBuf newBuf = ledger.newArrowBuf(offset + index, length); @@ -408,12 +407,12 @@ public ByteBuffer internalNioBuffer(int index, int length) { @Override public ByteBuffer[] nioBuffers() { - return new ByteBuffer[] { nioBuffer() }; + return new ByteBuffer[]{nioBuffer()}; } @Override public ByteBuffer[] nioBuffers(int index, int length) { - return new ByteBuffer[] { nioBuffer(index, length) }; + return new ByteBuffer[]{nioBuffer(index, length)}; } @Override @@ -443,7 +442,8 @@ public long memoryAddress() { @Override public String toString() { - return String.format("ArrowBuf[%d], udle: [%d %d..%d]", id, udle.id, offset, offset + capacity()); + return String.format("ArrowBuf[%d], udle: [%d %d..%d]", id, udle.id, offset, offset + + capacity()); } @Override @@ -738,7 +738,8 @@ public ArrowBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { public ArrowBuf setBytes(int index, ByteBuffer src, int srcIndex, int length) { if (src.isDirect()) { checkIndex(index, length); - PlatformDependent.copyMemory(PlatformDependent.directBufferAddress(src) + srcIndex, this.memoryAddress() + index, + PlatformDependent.copyMemory(PlatformDependent.directBufferAddress(src) + srcIndex, this + .memoryAddress() + index, length); } else { if (srcIndex == 0 && src.capacity() == length) { @@ -788,7 +789,8 @@ public void close() { } /** - * Returns the possible memory consumed by this ArrowBuf in the worse case scenario. (not shared, connected to larger + * Returns the possible memory consumed by this ArrowBuf in the worse case scenario. (not + * shared, connected to larger * underlying buffer of allocated memory) * * @return Size in bytes. @@ -798,7 +800,8 @@ public int getPossibleMemoryConsumed() { } /** - * Return that is Accounted for by this buffer (and its potentially shared siblings within the context of the + * Return that is Accounted for by this buffer (and its potentially shared siblings within the + * context of the * associated allocator). * * @return Size in bytes. @@ -807,15 +810,11 @@ public int getActualMemoryConsumed() { return ledger.getAccountedSize(); } - private final static int LOG_BYTES_PER_ROW = 10; - /** * Return the buffer's byte contents in the form of a hex dump. * - * @param start - * the starting byte index - * @param length - * how many bytes to log + * @param start the starting byte index + * @param length how many bytes to log * @return A hex dump in a String. */ public String toHexString(final int start, final int length) { @@ -878,5 +877,27 @@ public ArrowBuf writerIndex(int writerIndex) { return this; } + /** + * The outcome of a Transfer. + */ + public class TransferResult { + + /** + * Whether this transfer fit within the target allocator's capacity. + */ + public final boolean allocationFit; + + /** + * The newly created buffer associated with the target allocator. + */ + public final ArrowBuf buffer; + + private TransferResult(boolean allocationFit, ArrowBuf buffer) { + this.allocationFit = allocationFit; + this.buffer = buffer; + } + + } + } diff --git a/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java b/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java index 7fb884daa3952..9f8af93109739 100644 --- a/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java @@ -6,21 +6,23 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package io.netty.buffer; import org.apache.arrow.memory.BufferAllocator; /** - * Allows us to decorate ArrowBuf to make it expandable so that we can use them in the context of the Netty framework + * Allows us to decorate ArrowBuf to make it expandable so that we can use them in the context of + * the Netty framework * (thus supporting RPC level memory accounting). */ public class ExpandableByteBuf extends MutableWrappedByteBuf { diff --git a/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java b/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java index c026e430d77f3..9a6e402dad53e 100644 --- a/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java +++ b/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java @@ -6,21 +6,24 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package io.netty.buffer; /** - * A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and counts. + * A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and + * counts. */ public class LargeBuffer extends MutableWrappedByteBuf { + public LargeBuffer(ByteBuf buffer) { super(buffer); } diff --git a/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java b/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java index 5709473135e4b..a5683adccbc32 100644 --- a/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java @@ -6,15 +6,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package io.netty.buffer; import java.io.IOException; @@ -26,16 +27,12 @@ import java.nio.channels.ScatteringByteChannel; /** - * This is basically a complete copy of DuplicatedByteBuf. We copy because we want to override some behaviors and make + * This is basically a complete copy of DuplicatedByteBuf. We copy because we want to override + * some behaviors and make * buffer mutable. */ abstract class MutableWrappedByteBuf extends AbstractByteBuf { - @Override - public ByteBuffer nioBuffer(int index, int length) { - return unwrap().nioBuffer(index, length); - } - ByteBuf buffer; public MutableWrappedByteBuf(ByteBuf buffer) { @@ -50,6 +47,11 @@ public MutableWrappedByteBuf(ByteBuf buffer) { setIndex(buffer.readerIndex(), buffer.writerIndex()); } + @Override + public ByteBuffer nioBuffer(int index, int length) { + return unwrap().nioBuffer(index, length); + } + @Override public ByteBuf unwrap() { return buffer; diff --git a/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java index a843ac5586e79..b6de2e3aa2acb 100644 --- a/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java +++ b/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java @@ -6,42 +6,44 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package io.netty.buffer; -import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED; +import io.netty.util.internal.StringUtil; + +import org.apache.arrow.memory.OutOfMemoryException; import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicLong; -import org.apache.arrow.memory.OutOfMemoryException; - -import io.netty.util.internal.StringUtil; +import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED; /** - * The base allocator that we use for all of Arrow's memory management. Returns UnsafeDirectLittleEndian buffers. + * The base allocator that we use for all of Arrow's memory management. Returns + * UnsafeDirectLittleEndian buffers. */ public class PooledByteBufAllocatorL { - private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator"); - private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60; + private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow" + + ".allocator"); + private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60; + public final UnsafeDirectLittleEndian empty; private final AtomicLong hugeBufferSize = new AtomicLong(0); private final AtomicLong hugeBufferCount = new AtomicLong(0); private final AtomicLong normalBufferSize = new AtomicLong(0); private final AtomicLong normalBufferCount = new AtomicLong(0); - private final InnerAllocator allocator; - public final UnsafeDirectLittleEndian empty; public PooledByteBufAllocatorL() { allocator = new InnerAllocator(); @@ -78,6 +80,7 @@ public long getNormalBufferCount() { } private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian { + private final long initialCapacity; private final AtomicLong count; private final AtomicLong size; @@ -89,7 +92,8 @@ private AccountedUnsafeDirectLittleEndian(LargeBuffer buf, AtomicLong count, Ato this.size = size; } - private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count, AtomicLong size) { + private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count, + AtomicLong size) { super(buf); this.initialCapacity = buf.capacity(); this.count = count; @@ -119,6 +123,7 @@ public boolean release(int decrement) { } private class InnerAllocator extends PooledByteBufAllocator { + private final PoolArena[] directArenas; private final MemoryStatusThread statusThread; private final int chunkSize; @@ -131,7 +136,8 @@ public InnerAllocator() { f.setAccessible(true); this.directArenas = (PoolArena[]) f.get(this); } catch (Exception e) { - throw new RuntimeException("Failure while initializing allocator. Unable to retrieve direct arenas field.", e); + throw new RuntimeException("Failure while initializing allocator. Unable to retrieve " + + "direct arenas field.", e); } this.chunkSize = directArenas[0].chunkSize; @@ -158,7 +164,8 @@ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCa hugeBufferCount.incrementAndGet(); // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception()); - return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount, hugeBufferSize); + return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount, + hugeBufferSize); } else { // within chunk, use arena. ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity); @@ -173,7 +180,8 @@ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCa normalBufferSize.addAndGet(buf.capacity()); normalBufferCount.incrementAndGet(); - return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount, normalBufferSize); + return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, + normalBufferCount, normalBufferSize); } } else { @@ -183,7 +191,8 @@ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCa private UnsupportedOperationException fail() { return new UnsupportedOperationException( - "Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform didn't provide that functionality."); + "Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform " + + "didn't provide that functionality."); } @Override @@ -203,7 +212,8 @@ public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { private void validate(int initialCapacity, int maxCapacity) { if (initialCapacity < 0) { - throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expectd: 0+)"); + throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expectd: " + + "0+)"); } if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( @@ -212,26 +222,6 @@ private void validate(int initialCapacity, int maxCapacity) { } } - private class MemoryStatusThread extends Thread { - - public MemoryStatusThread() { - super("allocation.logger"); - this.setDaemon(true); - } - - @Override - public void run() { - while (true) { - memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString()); - try { - Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000); - } catch (InterruptedException e) { - return; - } - } - } - } - @Override public String toString() { StringBuilder buf = new StringBuilder(); @@ -256,6 +246,26 @@ public String toString() { return buf.toString(); } + private class MemoryStatusThread extends Thread { + + public MemoryStatusThread() { + super("allocation.logger"); + this.setDaemon(true); + } + + @Override + public void run() { + while (true) { + memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString()); + try { + Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000); + } catch (InterruptedException e) { + return; + } + } + } + } + } } diff --git a/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java index 5ea176745f25e..87d822f58a315 100644 --- a/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java +++ b/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,22 +18,31 @@ package io.netty.buffer; +import io.netty.util.internal.PlatformDependent; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteOrder; import java.util.concurrent.atomic.AtomicLong; -import io.netty.util.internal.PlatformDependent; - /** - * The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs to abstract away the + * The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs + * to abstract away the * Netty classes and underlying Netty memory management. */ public class UnsafeDirectLittleEndian extends WrappedByteBuf { + + public static final boolean ASSERT_ENABLED; private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; private static final AtomicLong ID_GENERATOR = new AtomicLong(0); + static { + boolean isAssertEnabled = false; + assert isAssertEnabled = true; + ASSERT_ENABLED = isAssertEnabled; + } + public final long id = ID_GENERATOR.incrementAndGet(); private final AbstractByteBuf wrapped; private final long memoryAddress; @@ -60,21 +69,22 @@ private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake) { this.wrapped = buf; this.memoryAddress = buf.memoryAddress(); } - private long addr(int index) { - return memoryAddress + index; - } - @Override - public long getLong(int index) { + private long addr(int index) { + return memoryAddress + index; + } + + @Override + public long getLong(int index) { // wrapped.checkIndex(index, 8); - long v = PlatformDependent.getLong(addr(index)); - return v; - } + long v = PlatformDependent.getLong(addr(index)); + return v; + } - @Override - public float getFloat(int index) { - return Float.intBitsToFloat(getInt(index)); - } + @Override + public float getFloat(int index) { + return Float.intBitsToFloat(getInt(index)); + } @Override public ByteBuf slice() { @@ -259,12 +269,4 @@ public int hashCode() { return System.identityHashCode(this); } - public static final boolean ASSERT_ENABLED; - - static { - boolean isAssertEnabled = false; - assert isAssertEnabled = true; - ASSERT_ENABLED = isAssertEnabled; - } - } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java b/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java index 37c598ad89ece..6ddc8f784bc4a 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java @@ -6,30 +6,33 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.arrow.memory; +import com.google.common.base.Preconditions; + import java.util.concurrent.atomic.AtomicLong; import javax.annotation.concurrent.ThreadSafe; -import com.google.common.base.Preconditions; - /** - * Provides a concurrent way to manage account for memory usage without locking. Used as basis for Allocators. All + * Provides a concurrent way to manage account for memory usage without locking. Used as basis + * for Allocators. All * operations are threadsafe (except for close). */ @ThreadSafe class Accountant implements AutoCloseable { - // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Accountant.class); + // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Accountant + // .class); /** * The parent allocator @@ -37,7 +40,8 @@ class Accountant implements AutoCloseable { protected final Accountant parent; /** - * The amount of memory reserved for this allocator. Releases below this amount of memory will not be returned to the + * The amount of memory reserved for this allocator. Releases below this amount of memory will + * not be returned to the * parent Accountant until this Accountant is closed. */ protected final long reservation; @@ -45,7 +49,8 @@ class Accountant implements AutoCloseable { private final AtomicLong peakAllocation = new AtomicLong(); /** - * Maximum local memory that can be held. This can be externally updated. Changing it won't cause past memory to + * Maximum local memory that can be held. This can be externally updated. Changing it won't + * cause past memory to * change but will change responses to future allocation efforts */ private final AtomicLong allocationLimit = new AtomicLong(); @@ -56,11 +61,14 @@ class Accountant implements AutoCloseable { private final AtomicLong locallyHeldMemory = new AtomicLong(); public Accountant(Accountant parent, long reservation, long maxAllocation) { - Preconditions.checkArgument(reservation >= 0, "The initial reservation size must be non-negative."); - Preconditions.checkArgument(maxAllocation >= 0, "The maximum allocation limit must be non-negative."); + Preconditions.checkArgument(reservation >= 0, "The initial reservation size must be " + + "non-negative."); + Preconditions.checkArgument(maxAllocation >= 0, "The maximum allocation limit must be " + + "non-negative."); Preconditions.checkArgument(reservation <= maxAllocation, "The initial reservation size must be <= the maximum allocation."); - Preconditions.checkArgument(reservation == 0 || parent != null, "The root accountant can't reserve memory."); + Preconditions.checkArgument(reservation == 0 || parent != null, "The root accountant can't " + + "reserve memory."); this.parent = parent; this.reservation = reservation; @@ -72,19 +80,20 @@ public Accountant(Accountant parent, long reservation, long maxAllocation) { if (!outcome.isOk()) { throw new OutOfMemoryException(String.format( "Failure trying to allocate initial reservation for Allocator. " - + "Attempted to allocate %d bytes and received an outcome of %s.", reservation, outcome.name())); + + "Attempted to allocate %d bytes and received an outcome of %s.", reservation, + outcome.name())); } } } /** - * Attempt to allocate the requested amount of memory. Either completely succeeds or completely fails. Constructs a a + * Attempt to allocate the requested amount of memory. Either completely succeeds or completely + * fails. Constructs a a * log of delta - * + *

* If it fails, no changes are made to accounting. * - * @param size - * The amount of memory to reserve in bytes. + * @param size The amount of memory to reserve in bytes. * @return True if the allocation was successful, false if the allocation failed. */ AllocationOutcome allocateBytes(long size) { @@ -116,8 +125,7 @@ private void updatePeak() { /** * Increase the accounting. Returns whether the allocation fit within limits. * - * @param size - * to increase + * @param size to increase * @return Whether the allocation fit within limits. */ boolean forceAllocate(long size) { @@ -126,24 +134,29 @@ boolean forceAllocate(long size) { } /** - * Internal method for allocation. This takes a forced approach to allocation to ensure that we manage reservation - * boundary issues consistently. Allocation is always done through the entire tree. The two options that we influence - * are whether the allocation should be forced and whether or not the peak memory allocation should be updated. If at - * some point during allocation escalation we determine that the allocation is no longer possible, we will continue to - * do a complete and consistent allocation but we will stop updating the peak allocation. We do this because we know - * that we will be directly unwinding this allocation (and thus never actually making the allocation). If force - * allocation is passed, then we continue to update the peak limits since we now know that this allocation will occur + * Internal method for allocation. This takes a forced approach to allocation to ensure that we + * manage reservation + * boundary issues consistently. Allocation is always done through the entire tree. The two + * options that we influence + * are whether the allocation should be forced and whether or not the peak memory allocation + * should be updated. If at + * some point during allocation escalation we determine that the allocation is no longer + * possible, we will continue to + * do a complete and consistent allocation but we will stop updating the peak allocation. We do + * this because we know + * that we will be directly unwinding this allocation (and thus never actually making the + * allocation). If force + * allocation is passed, then we continue to update the peak limits since we now know that this + * allocation will occur * despite our moving past one or more limits. * - * @param size - * The size of the allocation. - * @param incomingUpdatePeak - * Whether we should update the local peak for this allocation. - * @param forceAllocation - * Whether we should force the allocation. + * @param size The size of the allocation. + * @param incomingUpdatePeak Whether we should update the local peak for this allocation. + * @param forceAllocation Whether we should force the allocation. * @return The outcome of the allocation. */ - private AllocationOutcome allocate(final long size, final boolean incomingUpdatePeak, final boolean forceAllocation) { + private AllocationOutcome allocate(final long size, final boolean incomingUpdatePeak, final + boolean forceAllocation) { final long newLocal = locallyHeldMemory.addAndGet(size); final long beyondReservation = newLocal - reservation; final boolean beyondLimit = newLocal > allocationLimit.get(); @@ -173,7 +186,7 @@ public void releaseBytes(long size) { Preconditions.checkArgument(newSize >= 0, "Accounted size went negative."); final long originalSize = newSize + size; - if(originalSize > reservation && parent != null){ + if (originalSize > reservation && parent != null) { // we deallocated memory that we should release to our parent. final long possibleAmountToReleaseToParent = originalSize - reservation; final long actualToReleaseToParent = Math.min(size, possibleAmountToReleaseToParent); @@ -182,16 +195,6 @@ public void releaseBytes(long size) { } - /** - * Set the maximum amount of memory that can be allocated in the this Accountant before failing an allocation. - * - * @param newLimit - * The limit in bytes. - */ - public void setLimit(long newLimit) { - allocationLimit.set(newLimit); - } - public boolean isOverLimit() { return getAllocatedMemory() > getLimit() || (parent != null && parent.isOverLimit()); } @@ -216,7 +219,18 @@ public long getLimit() { } /** - * Return the current amount of allocated memory that this Accountant is managing accounting for. Note this does not + * Set the maximum amount of memory that can be allocated in the this Accountant before failing + * an allocation. + * + * @param newLimit The limit in bytes. + */ + public void setLimit(long newLimit) { + allocationLimit.set(newLimit); + } + + /** + * Return the current amount of allocated memory that this Accountant is managing accounting + * for. Note this does not * include reservation memory that hasn't been allocated. * * @return Currently allocate memory in bytes. diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java index 1b127f8181222..d36cb37fc2e24 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java @@ -15,15 +15,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.arrow.memory; /** * An allocation listener being notified for allocation/deallocation - * + *

* It is expected to be called from multiple threads and as such, * provider should take care of making the implementation thread-safe */ public interface AllocationListener { + public static final AllocationListener NOOP = new AllocationListener() { @Override public void onAllocation(long size) { diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java index f15bb8a40fa01..683752e6a4980 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java @@ -6,53 +6,62 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.arrow.memory; -import static org.apache.arrow.memory.BaseAllocator.indent; +import com.google.common.base.Preconditions; -import java.util.IdentityHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import io.netty.buffer.ArrowBuf; +import io.netty.buffer.PooledByteBufAllocatorL; +import io.netty.buffer.UnsafeDirectLittleEndian; import org.apache.arrow.memory.BaseAllocator.Verbosity; import org.apache.arrow.memory.util.AutoCloseableLock; import org.apache.arrow.memory.util.HistoricalLog; -import com.google.common.base.Preconditions; +import java.util.IdentityHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; -import io.netty.buffer.ArrowBuf; -import io.netty.buffer.PooledByteBufAllocatorL; -import io.netty.buffer.UnsafeDirectLittleEndian; +import static org.apache.arrow.memory.BaseAllocator.indent; /** - * Manages the relationship between one or more allocators and a particular UDLE. Ensures that one allocator owns the - * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its associated allocators. - * This class is also responsible for managing when memory is allocated and returned to the Netty-based + * Manages the relationship between one or more allocators and a particular UDLE. Ensures that + * one allocator owns the + * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its + * associated allocators. + * This class is also responsible for managing when memory is allocated and returned to the + * Netty-based * PooledByteBufAllocatorL. - * - * The only reason that this isn't package private is we're forced to put ArrowBuf in Netty's package which need access + *

+ * The only reason that this isn't package private is we're forced to put ArrowBuf in Netty's + * package which need access * to these objects or methods. - * - * Threading: AllocationManager manages thread-safety internally. Operations within the context of a single BufferLedger - * are lockless in nature and can be leveraged by multiple threads. Operations that cross the context of two ledgers - * will acquire a lock on the AllocationManager instance. Important note, there is one AllocationManager per - * UnsafeDirectLittleEndian buffer allocation. As such, there will be thousands of these in a typical query. The + *

+ * Threading: AllocationManager manages thread-safety internally. Operations within the context + * of a single BufferLedger + * are lockless in nature and can be leveraged by multiple threads. Operations that cross the + * context of two ledgers + * will acquire a lock on the AllocationManager instance. Important note, there is one + * AllocationManager per + * UnsafeDirectLittleEndian buffer allocation. As such, there will be thousands of these in a + * typical query. The * contention of acquiring a lock on AllocationManager should be very low. - * */ public class AllocationManager { - // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AllocationManager.class); + // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger + // (AllocationManager.class); private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0); private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0); @@ -81,17 +90,19 @@ public class AllocationManager { this.root = accountingAllocator.root; this.underlying = INNER_ALLOCATOR.allocate(size); - // we do a no retain association since our creator will want to retrieve the newly created ledger and will create a + // we do a no retain association since our creator will want to retrieve the newly created + // ledger and will create a // reference count at that point this.owningLedger = associate(accountingAllocator, false); this.size = underlying.capacity(); } /** - * Associate the existing underlying buffer with a new allocator. This will increase the reference count to the + * Associate the existing underlying buffer with a new allocator. This will increase the + * reference count to the * provided ledger by 1. - * @param allocator - * The target allocator to associate this buffer with. + * + * @param allocator The target allocator to associate this buffer with. * @return The Ledger (new or existing) that associates the underlying buffer to this new ledger. */ BufferLedger associate(final BaseAllocator allocator) { @@ -118,7 +129,8 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta } try (AutoCloseableLock write = writeLock.open()) { - // we have to recheck existing ledger since a second reader => writer could be competing with us. + // we have to recheck existing ledger since a second reader => writer could be competing + // with us. final BufferLedger existingLedger = map.get(allocator); if (existingLedger != null) { @@ -141,7 +153,8 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta /** - * The way that a particular BufferLedger communicates back to the AllocationManager that it now longer needs to hold + * The way that a particular BufferLedger communicates back to the AllocationManager that it + * now longer needs to hold * a reference to particular piece of memory. */ private class ReleaseListener { @@ -169,16 +182,19 @@ public void release() { amDestructionTime = System.nanoTime(); owningLedger = null; } else { - // we need to change the owning allocator. we've been removed so we'll get whatever is top of list + // we need to change the owning allocator. we've been removed so we'll get whatever is + // top of list BufferLedger newLedger = map.values().iterator().next(); - // we'll forcefully transfer the ownership and not worry about whether we exceeded the limit + // we'll forcefully transfer the ownership and not worry about whether we exceeded the + // limit // since this consumer can't do anything with this. oldLedger.transferBalance(newLedger); } } else { if (map.isEmpty()) { - throw new IllegalStateException("The final removal of a ledger should be connected to the owning ledger."); + throw new IllegalStateException("The final removal of a ledger should be connected to " + + "the owning ledger."); } } @@ -187,25 +203,30 @@ public void release() { } /** - * The reference manager that binds an allocator manager to a particular BaseAllocator. Also responsible for creating + * The reference manager that binds an allocator manager to a particular BaseAllocator. Also + * responsible for creating * a set of ArrowBufs that share a common fate and set of reference counts. - * As with AllocationManager, the only reason this is public is due to ArrowBuf being in io.netty.buffer package. + * As with AllocationManager, the only reason this is public is due to ArrowBuf being in io + * .netty.buffer package. */ public class BufferLedger { private final IdentityHashMap buffers = BaseAllocator.DEBUG ? new IdentityHashMap() : null; - private final long ledgerId = LEDGER_ID_GENERATOR.incrementAndGet(); // unique ID assigned to each ledger - private final AtomicInteger bufRefCnt = new AtomicInteger(0); // start at zero so we can manage request for retain - // correctly + private final long ledgerId = LEDGER_ID_GENERATOR.incrementAndGet(); // unique ID assigned to + // each ledger + private final AtomicInteger bufRefCnt = new AtomicInteger(0); // start at zero so we can + // manage request for retain + // correctly private final long lCreationTime = System.nanoTime(); - private volatile long lDestructionTime = 0; private final BaseAllocator allocator; private final ReleaseListener listener; - private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, - "BufferLedger[%d]", 1) + private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog + (BaseAllocator.DEBUG_LOG_LENGTH, + "BufferLedger[%d]", 1) : null; + private volatile long lDestructionTime = 0; private BufferLedger(BaseAllocator allocator, ReleaseListener listener) { this.allocator = allocator; @@ -213,10 +234,11 @@ private BufferLedger(BaseAllocator allocator, ReleaseListener listener) { } /** - * Transfer any balance the current ledger has to the target ledger. In the case that the current ledger holds no + * Transfer any balance the current ledger has to the target ledger. In the case that the + * current ledger holds no * memory, no transfer is made to the new ledger. - * @param target - * The ledger to transfer ownership account to. + * + * @param target The ledger to transfer ownership account to. * @return Whether transfer fit within target ledgers limits. */ public boolean transferBalance(final BufferLedger target) { @@ -231,7 +253,8 @@ public boolean transferBalance(final BufferLedger target) { return true; } - // since two balance transfers out from the allocator manager could cause incorrect accounting, we need to ensure + // since two balance transfers out from the allocator manager could cause incorrect + // accounting, we need to ensure // that this won't happen by synchronizing on the allocator manager instance. try (AutoCloseableLock write = writeLock.open()) { if (owningLedger != this) { @@ -253,12 +276,10 @@ public boolean transferBalance(final BufferLedger target) { /** * Print the current ledger state to a the provided StringBuilder. - * @param sb - * The StringBuilder to populate. - * @param indent - * The level of indentation to position the data. - * @param verbosity - * The level of verbosity to print. + * + * @param sb The StringBuilder to populate. + * @param indent The level of indentation to position the data. + * @param verbosity The level of verbosity to print. */ public void print(StringBuilder sb, int indent, Verbosity verbosity) { indent(sb, indent) @@ -304,7 +325,8 @@ private void inc() { } /** - * Decrement the ledger's reference count. If the ledger is decremented to zero, this ledger should release its + * Decrement the ledger's reference count. If the ledger is decremented to zero, this ledger + * should release its * ownership back to the AllocationManager */ public int decrement(int decrement) { @@ -323,15 +345,19 @@ public int decrement(int decrement) { } /** - * Returns the ledger associated with a particular BufferAllocator. If the BufferAllocator doesn't currently have a - * ledger associated with this AllocationManager, a new one is created. This is placed on BufferLedger rather than - * AllocationManager directly because ArrowBufs don't have access to AllocationManager and they are the ones - * responsible for exposing the ability to associate multiple allocators with a particular piece of underlying - * memory. Note that this will increment the reference count of this ledger by one to ensure the ledger isn't + * Returns the ledger associated with a particular BufferAllocator. If the BufferAllocator + * doesn't currently have a + * ledger associated with this AllocationManager, a new one is created. This is placed on + * BufferLedger rather than + * AllocationManager directly because ArrowBufs don't have access to AllocationManager and + * they are the ones + * responsible for exposing the ability to associate multiple allocators with a particular + * piece of underlying + * memory. Note that this will increment the reference count of this ledger by one to ensure + * the ledger isn't * destroyed before use. * - * @param allocator - * A BufferAllocator. + * @param allocator A BufferAllocator. * @return The ledger associated with the BufferAllocator. */ public BufferLedger getLedgerForAllocator(BufferAllocator allocator) { @@ -339,13 +365,14 @@ public BufferLedger getLedgerForAllocator(BufferAllocator allocator) { } /** - * Create a new ArrowBuf associated with this AllocationManager and memory. Does not impact reference count. + * Create a new ArrowBuf associated with this AllocationManager and memory. Does not impact + * reference count. * Typically used for slicing. - * @param offset - * The offset in bytes to start this new ArrowBuf. - * @param length - * The length in bytes that this ArrowBuf will provide access to. - * @return A new ArrowBuf that shares references with all ArrowBufs associated with this BufferLedger + * + * @param offset The offset in bytes to start this new ArrowBuf. + * @param length The length in bytes that this ArrowBuf will provide access to. + * @return A new ArrowBuf that shares references with all ArrowBufs associated with this + * BufferLedger */ public ArrowBuf newArrowBuf(int offset, int length) { allocator.assertOpen(); @@ -354,13 +381,13 @@ public ArrowBuf newArrowBuf(int offset, int length) { /** * Create a new ArrowBuf associated with this AllocationManager and memory. - * @param offset - * The offset in bytes to start this new ArrowBuf. - * @param length - * The length in bytes that this ArrowBuf will provide access to. - * @param manager - * An optional BufferManager argument that can be used to manage expansion of this ArrowBuf - * @return A new ArrowBuf that shares references with all ArrowBufs associated with this BufferLedger + * + * @param offset The offset in bytes to start this new ArrowBuf. + * @param length The length in bytes that this ArrowBuf will provide access to. + * @param manager An optional BufferManager argument that can be used to manage expansion of + * this ArrowBuf + * @return A new ArrowBuf that shares references with all ArrowBufs associated with this + * BufferLedger */ public ArrowBuf newArrowBuf(int offset, int length, BufferManager manager) { allocator.assertOpen(); @@ -377,7 +404,8 @@ public ArrowBuf newArrowBuf(int offset, int length, BufferManager manager) { if (BaseAllocator.DEBUG) { historicalLog.recordEvent( - "ArrowBuf(BufferLedger, BufferAllocator[%s], UnsafeDirectLittleEndian[identityHashCode == " + "ArrowBuf(BufferLedger, BufferAllocator[%s], " + + "UnsafeDirectLittleEndian[identityHashCode == " + "%d](%s)) => ledger hc == %d", allocator.name, System.identityHashCode(buf), buf.toString(), System.identityHashCode(this)); @@ -401,7 +429,8 @@ public int getSize() { } /** - * How much memory is accounted for by this ledger. This is either getSize() if this is the owning ledger for the + * How much memory is accounted for by this ledger. This is either getSize() if this is the + * owning ledger for the * memory or zero in the case that this is not the owning ledger associated with this memory. * * @return Amount of accounted(owned) memory associated with this ledger. diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationReservation.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationReservation.java index 68d1244d1e328..7f5aa313779a7 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationReservation.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationReservation.java @@ -6,32 +6,36 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.arrow.memory; import io.netty.buffer.ArrowBuf; /** - * Supports cumulative allocation reservation. Clients may increase the size of the reservation repeatedly until they - * call for an allocation of the current total size. The reservation can only be used once, and will throw an exception + * Supports cumulative allocation reservation. Clients may increase the size of the reservation + * repeatedly until they + * call for an allocation of the current total size. The reservation can only be used once, and + * will throw an exception * if it is used more than once. *

- * For the purposes of airtight memory accounting, the reservation must be close()d whether it is used or not. + * For the purposes of airtight memory accounting, the reservation must be close()d whether it is + * used or not. * This is not threadsafe. */ public interface AllocationReservation extends AutoCloseable { /** * Add to the current reservation. - * + *

*

Adding may fail if the allocator is not allowed to consume any more space. * * @param nBytes the number of bytes to add @@ -42,7 +46,7 @@ public interface AllocationReservation extends AutoCloseable { /** * Requests a reservation of additional space. - * + *

*

The implementation of the allocator's inner class provides this. * * @param nBytes the amount to reserve @@ -52,7 +56,7 @@ public interface AllocationReservation extends AutoCloseable { /** * Allocate a buffer whose size is the total of all the add()s made. - * + *

*

The allocation request can still fail, even if the amount of space * requested is available, if the allocation cannot be made contiguously. * diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocatorClosedException.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocatorClosedException.java index 3274642dedd59..d5b638e1ed298 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocatorClosedException.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocatorClosedException.java @@ -6,15 +6,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.arrow.memory; /** @@ -23,6 +24,7 @@ */ @SuppressWarnings("serial") public class AllocatorClosedException extends RuntimeException { + /** * @param message string associated with the cause */ diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java index 5dc5ac397bd93..b8b5283423c82 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java @@ -6,15 +6,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.arrow.memory; import io.netty.buffer.ByteBuf; @@ -23,9 +24,12 @@ import io.netty.buffer.ExpandableByteBuf; /** - * An implementation of ByteBufAllocator that wraps a Arrow BufferAllocator. This allows the RPC layer to be accounted - * and managed using Arrow's BufferAllocator infrastructure. The only thin different from a typical BufferAllocator is - * the signature and the fact that this Allocator returns ExpandableByteBufs which enable otherwise non-expandable + * An implementation of ByteBufAllocator that wraps a Arrow BufferAllocator. This allows the RPC + * layer to be accounted + * and managed using Arrow's BufferAllocator infrastructure. The only thin different from a + * typical BufferAllocator is + * the signature and the fact that this Allocator returns ExpandableByteBufs which enable + * otherwise non-expandable * ArrowBufs to be expandable. */ public class ArrowByteBufAllocator implements ByteBufAllocator { diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java index 9edafbce082cb..aaa7ce804c3e5 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java @@ -6,57 +6,54 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.arrow.memory; -import java.util.Arrays; -import java.util.IdentityHashMap; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.base.Preconditions; + +import io.netty.buffer.ArrowBuf; +import io.netty.buffer.UnsafeDirectLittleEndian; import org.apache.arrow.memory.AllocationManager.BufferLedger; import org.apache.arrow.memory.util.AssertionUtil; import org.apache.arrow.memory.util.HistoricalLog; -import com.google.common.base.Preconditions; - -import io.netty.buffer.ArrowBuf; -import io.netty.buffer.UnsafeDirectLittleEndian; +import java.util.Arrays; +import java.util.IdentityHashMap; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; public abstract class BaseAllocator extends Accountant implements BufferAllocator { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseAllocator.class); public static final String DEBUG_ALLOCATOR = "arrow.memory.debug.allocator"; - public static final int DEBUG_LOG_LENGTH = 6; public static final boolean DEBUG = AssertionUtil.isAssertionsEnabled() || Boolean.parseBoolean(System.getProperty(DEBUG_ALLOCATOR, "false")); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseAllocator + .class); + // Package exposed for sharing between AllocatorManger and BaseAllocator objects + final String name; + final RootAllocator root; private final Object DEBUG_LOCK = DEBUG ? new Object() : null; - private final AllocationListener listener; private final BaseAllocator parentAllocator; private final ArrowByteBufAllocator thisAsByteBufAllocator; private final IdentityHashMap childAllocators; private final ArrowBuf empty; - - private volatile boolean isClosed = false; // the allocator has been closed - - // Package exposed for sharing between AllocatorManger and BaseAllocator objects - final String name; - final RootAllocator root; - // members used purely for debugging private final IdentityHashMap childLedgers; private final IdentityHashMap reservations; private final HistoricalLog historicalLog; + private volatile boolean isClosed = false; // the allocator has been closed protected BaseAllocator( final AllocationListener listener, @@ -91,7 +88,8 @@ private BaseAllocator( this.root = (RootAllocator) this; empty = createEmpty(); } else { - throw new IllegalStateException("An parent allocator must either carry a root or be the root."); + throw new IllegalStateException("An parent allocator must either carry a root or be the " + + "root."); } this.parentAllocator = parentAllocator; @@ -114,11 +112,52 @@ private BaseAllocator( } + private static String createErrorMsg(final BufferAllocator allocator, final int rounded, final + int requested) { + if (rounded != requested) { + return String.format( + "Unable to allocate buffer of size %d (rounded from %d) due to memory limit. Current " + + "allocation: %d", + rounded, requested, allocator.getAllocatedMemory()); + } else { + return String.format("Unable to allocate buffer of size %d due to memory limit. Current " + + "allocation: %d", + rounded, allocator.getAllocatedMemory()); + } + } + + /** + * Rounds up the provided value to the nearest power of two. + * + * @param val An integer value. + * @return The closest power of two of that value. + */ + static int nextPowerOfTwo(int val) { + int highestBit = Integer.highestOneBit(val); + if (highestBit == val) { + return val; + } else { + return highestBit << 1; + } + } + + public static StringBuilder indent(StringBuilder sb, int indent) { + final char[] indentation = new char[indent * 2]; + Arrays.fill(indentation, ' '); + sb.append(indentation); + return sb; + } + + public static boolean isDebug() { + return DEBUG; + } + @Override public void assertOpen() { if (AssertionUtil.ASSERT_ENABLED) { if (isClosed) { - throw new IllegalStateException("Attempting operation on allocator when allocator is closed.\n" + throw new IllegalStateException("Attempting operation on allocator when allocator is " + + "closed.\n" + toVerboseString()); } } @@ -136,7 +175,8 @@ public ArrowBuf getEmpty() { } /** - * For debug/verification purposes only. Allows an AllocationManager to tell the allocator that we have a new ledger + * For debug/verification purposes only. Allows an AllocationManager to tell the allocator that + * we have a new ledger * associated with this allocator. */ void associateLedger(BufferLedger ledger) { @@ -149,7 +189,8 @@ void associateLedger(BufferLedger ledger) { } /** - * For debug/verification purposes only. Allows an AllocationManager to tell the allocator that we are removing a + * For debug/verification purposes only. Allows an AllocationManager to tell the allocator that + * we are removing a * ledger associated with this allocator */ void dissociateLedger(BufferLedger ledger) { @@ -167,8 +208,7 @@ void dissociateLedger(BufferLedger ledger) { /** * Track when a ChildAllocator of this BaseAllocator is closed. Used for debugging purposes. * - * @param childAllocator - * The child allocator that has been closed. + * @param childAllocator The child allocator that has been closed. */ private void childClosed(final BaseAllocator childAllocator) { assertOpen(); @@ -187,17 +227,6 @@ private void childClosed(final BaseAllocator childAllocator) { } } - private static String createErrorMsg(final BufferAllocator allocator, final int rounded, final int requested) { - if (rounded != requested) { - return String.format( - "Unable to allocate buffer of size %d (rounded from %d) due to memory limit. Current allocation: %d", - rounded, requested, allocator.getAllocatedMemory()); - } else { - return String.format("Unable to allocate buffer of size %d due to memory limit. Current allocation: %d", - rounded, allocator.getAllocatedMemory()); - } - } - @Override public ArrowBuf buffer(final int initialRequestSize) { assertOpen(); @@ -205,7 +234,7 @@ public ArrowBuf buffer(final int initialRequestSize) { return buffer(initialRequestSize, null); } - private ArrowBuf createEmpty(){ + private ArrowBuf createEmpty() { assertOpen(); return new ArrowBuf(new AtomicInteger(), null, AllocationManager.EMPTY, null, null, 0, 0, true); @@ -221,7 +250,8 @@ public ArrowBuf buffer(final int initialRequestSize, BufferManager manager) { return empty; } - // round to next largest power of two if we're within a chunk since that is how our allocator operates + // round to next largest power of two if we're within a chunk since that is how our allocator + // operates final int actualRequestSize = initialRequestSize < AllocationManager.CHUNK_SIZE ? nextPowerOfTwo(initialRequestSize) : initialRequestSize; @@ -245,10 +275,12 @@ public ArrowBuf buffer(final int initialRequestSize, BufferManager manager) { } /** - * Used by usual allocation as well as for allocating a pre-reserved buffer. Skips the typical accounting associated + * Used by usual allocation as well as for allocating a pre-reserved buffer. Skips the typical + * accounting associated * with creating a new buffer. */ - private ArrowBuf bufferWithoutReservation(final int size, BufferManager bufferManager) throws OutOfMemoryException { + private ArrowBuf bufferWithoutReservation(final int size, BufferManager bufferManager) throws + OutOfMemoryException { assertOpen(); final AllocationManager manager = new AllocationManager(this, size); @@ -274,185 +306,20 @@ public BufferAllocator newChildAllocator( final long maxAllocation) { assertOpen(); - final ChildAllocator childAllocator = new ChildAllocator(this, name, initReservation, maxAllocation); + final ChildAllocator childAllocator = new ChildAllocator(this, name, initReservation, + maxAllocation); if (DEBUG) { synchronized (DEBUG_LOCK) { childAllocators.put(childAllocator, childAllocator); - historicalLog.recordEvent("allocator[%s] created new child allocator[%s]", name, childAllocator.name); + historicalLog.recordEvent("allocator[%s] created new child allocator[%s]", name, + childAllocator.name); } } return childAllocator; } - public class Reservation implements AllocationReservation { - private int nBytes = 0; - private boolean used = false; - private boolean closed = false; - private final HistoricalLog historicalLog; - - public Reservation() { - if (DEBUG) { - historicalLog = new HistoricalLog("Reservation[allocator[%s], %d]", name, System.identityHashCode(this)); - historicalLog.recordEvent("created"); - synchronized (DEBUG_LOCK) { - reservations.put(this, this); - } - } else { - historicalLog = null; - } - } - - @Override - public boolean add(final int nBytes) { - assertOpen(); - - Preconditions.checkArgument(nBytes >= 0, "nBytes(%d) < 0", nBytes); - Preconditions.checkState(!closed, "Attempt to increase reservation after reservation has been closed"); - Preconditions.checkState(!used, "Attempt to increase reservation after reservation has been used"); - - // we round up to next power of two since all reservations are done in powers of two. This may overestimate the - // preallocation since someone may perceive additions to be power of two. If this becomes a problem, we can look - // at - // modifying this behavior so that we maintain what we reserve and what the user asked for and make sure to only - // round to power of two as necessary. - final int nBytesTwo = BaseAllocator.nextPowerOfTwo(nBytes); - if (!reserve(nBytesTwo)) { - return false; - } - - this.nBytes += nBytesTwo; - return true; - } - - @Override - public ArrowBuf allocateBuffer() { - assertOpen(); - - Preconditions.checkState(!closed, "Attempt to allocate after closed"); - Preconditions.checkState(!used, "Attempt to allocate more than once"); - - final ArrowBuf arrowBuf = allocate(nBytes); - used = true; - return arrowBuf; - } - - @Override - public int getSize() { - return nBytes; - } - - @Override - public boolean isUsed() { - return used; - } - - @Override - public boolean isClosed() { - return closed; - } - - @Override - public void close() { - assertOpen(); - - if (closed) { - return; - } - - if (DEBUG) { - if (!isClosed()) { - final Object object; - synchronized (DEBUG_LOCK) { - object = reservations.remove(this); - } - if (object == null) { - final StringBuilder sb = new StringBuilder(); - print(sb, 0, Verbosity.LOG_WITH_STACKTRACE); - logger.debug(sb.toString()); - throw new IllegalStateException( - String.format("Didn't find closing reservation[%d]", System.identityHashCode(this))); - } - - historicalLog.recordEvent("closed"); - } - } - - if (!used) { - releaseReservation(nBytes); - } - - closed = true; - } - - @Override - public boolean reserve(int nBytes) { - assertOpen(); - - final AllocationOutcome outcome = BaseAllocator.this.allocateBytes(nBytes); - - if (DEBUG) { - historicalLog.recordEvent("reserve(%d) => %s", nBytes, Boolean.toString(outcome.isOk())); - } - - return outcome.isOk(); - } - - /** - * Allocate the a buffer of the requested size. - * - *

- * The implementation of the allocator's inner class provides this. - * - * @param nBytes - * the size of the buffer requested - * @return the buffer, or null, if the request cannot be satisfied - */ - private ArrowBuf allocate(int nBytes) { - assertOpen(); - - boolean success = false; - - /* - * The reservation already added the requested bytes to the allocators owned and allocated bytes via reserve(). - * This ensures that they can't go away. But when we ask for the buffer here, that will add to the allocated bytes - * as well, so we need to return the same number back to avoid double-counting them. - */ - try { - final ArrowBuf arrowBuf = BaseAllocator.this.bufferWithoutReservation(nBytes, null); - - listener.onAllocation(nBytes); - if (DEBUG) { - historicalLog.recordEvent("allocate() => %s", String.format("ArrowBuf[%d]", arrowBuf.getId())); - } - success = true; - return arrowBuf; - } finally { - if (!success) { - releaseBytes(nBytes); - } - } - } - - /** - * Return the reservation back to the allocator without having used it. - * - * @param nBytes - * the size of the reservation - */ - private void releaseReservation(int nBytes) { - assertOpen(); - - releaseBytes(nBytes); - - if (DEBUG) { - historicalLog.recordEvent("releaseReservation(%d)", nBytes); - } - } - - } - @Override public AllocationReservation newReservation() { assertOpen(); @@ -460,7 +327,6 @@ public AllocationReservation newReservation() { return new Reservation(); } - @Override public synchronized void close() { /* @@ -474,7 +340,7 @@ public synchronized void close() { isClosed = true; if (DEBUG) { - synchronized(DEBUG_LOCK) { + synchronized (DEBUG_LOCK) { verifyAllocator(); // are there outstanding child allocators? @@ -488,7 +354,8 @@ public synchronized void close() { } throw new IllegalStateException( - String.format("Allocator[%s] closed with outstanding child allocators.\n%s", name, toString())); + String.format("Allocator[%s] closed with outstanding child allocators.\n%s", name, + toString())); } // are there outstanding buffers? @@ -501,7 +368,8 @@ public synchronized void close() { if (reservations.size() != 0) { throw new IllegalStateException( - String.format("Allocator[%s] closed with outstanding reservations (%d).\n%s", name, reservations.size(), + String.format("Allocator[%s] closed with outstanding reservations (%d).\n%s", name, + reservations.size(), toString())); } @@ -512,7 +380,8 @@ public synchronized void close() { final long allocated = getAllocatedMemory(); if (allocated > 0) { throw new IllegalStateException( - String.format("Memory was leaked by query. Memory leaked: (%d)\n%s", allocated, toString())); + String.format("Memory was leaked by query. Memory leaked: (%d)\n%s", allocated, + toString())); } // we need to release our memory to our parent before we tell it we've closed. @@ -543,7 +412,8 @@ public String toString() { } /** - * Provide a verbose string of the current allocator state. Includes the state of all child allocators, along with + * Provide a verbose string of the current allocator state. Includes the state of all child + * allocators, along with * historical logs of each object and including stacktraces. * * @return A Verbose string of current allocator state. @@ -559,48 +429,32 @@ private void hist(String noteFormat, Object... args) { historicalLog.recordEvent(noteFormat, args); } - /** - * Rounds up the provided value to the nearest power of two. - * - * @param val - * An integer value. - * @return The closest power of two of that value. - */ - static int nextPowerOfTwo(int val) { - int highestBit = Integer.highestOneBit(val); - if (highestBit == val) { - return val; - } else { - return highestBit << 1; - } - } - - /** * Verifies the accounting state of the allocator. Only works for DEBUG. * - * @throws IllegalStateException - * when any problems are found + * @throws IllegalStateException when any problems are found */ void verifyAllocator() { - final IdentityHashMap buffersSeen = new IdentityHashMap<>(); + final IdentityHashMap buffersSeen = new + IdentityHashMap<>(); verifyAllocator(buffersSeen); } /** * Verifies the accounting state of the allocator. Only works for DEBUG. - * *

- * This overload is used for recursive calls, allowing for checking that ArrowBufs are unique across all allocators + *

+ * This overload is used for recursive calls, allowing for checking that ArrowBufs are unique + * across all allocators * that are checked. *

* - * @param buffersSeen - * a map of buffers that have already been seen when walking a tree of allocators - * @throws IllegalStateException - * when any problems are found + * @param buffersSeen a map of buffers that have already been seen when walking a tree of + * allocators + * @throws IllegalStateException when any problems are found */ - private void verifyAllocator(final IdentityHashMap buffersSeen) { + private void verifyAllocator(final IdentityHashMap + buffersSeen) { // The remaining tests can only be performed if we're in debug mode. if (!DEBUG) { return; @@ -618,7 +472,8 @@ private void verifyAllocator(final IdentityHashMap ledgerS } } - - public static StringBuilder indent(StringBuilder sb, int indent) { - final char[] indentation = new char[indent * 2]; - Arrays.fill(indentation, ' '); - sb.append(indentation); - return sb; - } - public static enum Verbosity { BASIC(false, false), // only include basic information LOG(true, false), // include basic @@ -800,7 +651,179 @@ public static enum Verbosity { } } - public static boolean isDebug() { - return DEBUG; + public class Reservation implements AllocationReservation { + + private final HistoricalLog historicalLog; + private int nBytes = 0; + private boolean used = false; + private boolean closed = false; + + public Reservation() { + if (DEBUG) { + historicalLog = new HistoricalLog("Reservation[allocator[%s], %d]", name, System + .identityHashCode(this)); + historicalLog.recordEvent("created"); + synchronized (DEBUG_LOCK) { + reservations.put(this, this); + } + } else { + historicalLog = null; + } + } + + @Override + public boolean add(final int nBytes) { + assertOpen(); + + Preconditions.checkArgument(nBytes >= 0, "nBytes(%d) < 0", nBytes); + Preconditions.checkState(!closed, "Attempt to increase reservation after reservation has " + + "been closed"); + Preconditions.checkState(!used, "Attempt to increase reservation after reservation has been" + + " used"); + + // we round up to next power of two since all reservations are done in powers of two. This + // may overestimate the + // preallocation since someone may perceive additions to be power of two. If this becomes a + // problem, we can look + // at + // modifying this behavior so that we maintain what we reserve and what the user asked for + // and make sure to only + // round to power of two as necessary. + final int nBytesTwo = BaseAllocator.nextPowerOfTwo(nBytes); + if (!reserve(nBytesTwo)) { + return false; + } + + this.nBytes += nBytesTwo; + return true; + } + + @Override + public ArrowBuf allocateBuffer() { + assertOpen(); + + Preconditions.checkState(!closed, "Attempt to allocate after closed"); + Preconditions.checkState(!used, "Attempt to allocate more than once"); + + final ArrowBuf arrowBuf = allocate(nBytes); + used = true; + return arrowBuf; + } + + @Override + public int getSize() { + return nBytes; + } + + @Override + public boolean isUsed() { + return used; + } + + @Override + public boolean isClosed() { + return closed; + } + + @Override + public void close() { + assertOpen(); + + if (closed) { + return; + } + + if (DEBUG) { + if (!isClosed()) { + final Object object; + synchronized (DEBUG_LOCK) { + object = reservations.remove(this); + } + if (object == null) { + final StringBuilder sb = new StringBuilder(); + print(sb, 0, Verbosity.LOG_WITH_STACKTRACE); + logger.debug(sb.toString()); + throw new IllegalStateException( + String.format("Didn't find closing reservation[%d]", System.identityHashCode + (this))); + } + + historicalLog.recordEvent("closed"); + } + } + + if (!used) { + releaseReservation(nBytes); + } + + closed = true; + } + + @Override + public boolean reserve(int nBytes) { + assertOpen(); + + final AllocationOutcome outcome = BaseAllocator.this.allocateBytes(nBytes); + + if (DEBUG) { + historicalLog.recordEvent("reserve(%d) => %s", nBytes, Boolean.toString(outcome.isOk())); + } + + return outcome.isOk(); + } + + /** + * Allocate the a buffer of the requested size. + *

+ *

+ * The implementation of the allocator's inner class provides this. + * + * @param nBytes the size of the buffer requested + * @return the buffer, or null, if the request cannot be satisfied + */ + private ArrowBuf allocate(int nBytes) { + assertOpen(); + + boolean success = false; + + /* + * The reservation already added the requested bytes to the allocators owned and allocated + * bytes via reserve(). + * This ensures that they can't go away. But when we ask for the buffer here, that will add + * to the allocated bytes + * as well, so we need to return the same number back to avoid double-counting them. + */ + try { + final ArrowBuf arrowBuf = BaseAllocator.this.bufferWithoutReservation(nBytes, null); + + listener.onAllocation(nBytes); + if (DEBUG) { + historicalLog.recordEvent("allocate() => %s", String.format("ArrowBuf[%d]", arrowBuf + .getId())); + } + success = true; + return arrowBuf; + } finally { + if (!success) { + releaseBytes(nBytes); + } + } + } + + /** + * Return the reservation back to the allocator without having used it. + * + * @param nBytes the size of the reservation + */ + private void releaseReservation(int nBytes) { + assertOpen(); + + releaseBytes(nBytes); + + if (DEBUG) { + historicalLog.recordEvent("releaseReservation(%d)", nBytes); + } + } + } } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BoundsChecking.java b/java/memory/src/main/java/org/apache/arrow/memory/BoundsChecking.java index 4e88c734ab4be..b0e9cd8c1a0e9 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BoundsChecking.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BoundsChecking.java @@ -6,21 +6,22 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.arrow.memory; public class BoundsChecking { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BoundsChecking.class); public static final boolean BOUNDS_CHECKING_ENABLED; + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BoundsChecking.class); static { boolean isAssertEnabled = false; diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java index 356a3416cbf85..81ffb1bec780e 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java @@ -6,47 +6,48 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.arrow.memory; -import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ArrowBuf; +import io.netty.buffer.ByteBufAllocator; /** * Wrapper class to deal with byte buffer allocation. Ensures users only use designated methods. */ public interface BufferAllocator extends AutoCloseable { + /** - * Allocate a new or reused buffer of the provided size. Note that the buffer may technically be larger than the - * requested size for rounding purposes. However, the buffer's capacity will be set to the configured size. + * Allocate a new or reused buffer of the provided size. Note that the buffer may technically + * be larger than the + * requested size for rounding purposes. However, the buffer's capacity will be set to the + * configured size. * - * @param size - * The size in bytes. + * @param size The size in bytes. * @return a new ArrowBuf, or null if the request can't be satisfied - * @throws OutOfMemoryException - * if buffer cannot be allocated + * @throws OutOfMemoryException if buffer cannot be allocated */ public ArrowBuf buffer(int size); /** - * Allocate a new or reused buffer of the provided size. Note that the buffer may technically be larger than the - * requested size for rounding purposes. However, the buffer's capacity will be set to the configured size. + * Allocate a new or reused buffer of the provided size. Note that the buffer may technically + * be larger than the + * requested size for rounding purposes. However, the buffer's capacity will be set to the + * configured size. * - * @param size - * The size in bytes. - * @param manager - * A buffer manager to manage reallocation. + * @param size The size in bytes. + * @param manager A buffer manager to manage reallocation. * @return a new ArrowBuf, or null if the request can't be satisfied - * @throws OutOfMemoryException - * if buffer cannot be allocated + * @throws OutOfMemoryException if buffer cannot be allocated */ public ArrowBuf buffer(int size, BufferManager manager); @@ -60,19 +61,16 @@ public interface BufferAllocator extends AutoCloseable { /** * Create a new child allocator. * - * @param name - * the name of the allocator. - * @param initReservation - * the initial space reservation (obtained from this allocator) - * @param maxAllocation - * maximum amount of space the new allocator can allocate + * @param name the name of the allocator. + * @param initReservation the initial space reservation (obtained from this allocator) + * @param maxAllocation maximum amount of space the new allocator can allocate * @return the new allocator, or null if it can't be created */ public BufferAllocator newChildAllocator(String name, long initReservation, long maxAllocation); /** * Close and release all buffers generated from this buffer pool. - * + *

*

When assertions are on, complains if there are any outstanding buffers; to avoid * that, release all buffers before the allocator is closed. */ @@ -87,19 +85,18 @@ public interface BufferAllocator extends AutoCloseable { public long getAllocatedMemory(); /** - * Set the maximum amount of memory this allocator is allowed to allocate. + * Return the current maximum limit this allocator imposes. * - * @param newLimit - * The new Limit to apply to allocations + * @return Limit in number of bytes. */ - public void setLimit(long newLimit); + public long getLimit(); /** - * Return the current maximum limit this allocator imposes. + * Set the maximum amount of memory this allocator is allowed to allocate. * - * @return Limit in number of bytes. + * @param newLimit The new Limit to apply to allocations */ - public long getLimit(); + public void setLimit(long newLimit); /** * Returns the peak amount of memory allocated from this allocator. @@ -118,25 +115,31 @@ public interface BufferAllocator extends AutoCloseable { public AllocationReservation newReservation(); /** - * Get a reference to the empty buffer associated with this allocator. Empty buffers are special because we don't - * worry about them leaking or managing reference counts on them since they don't actually point to any memory. + * Get a reference to the empty buffer associated with this allocator. Empty buffers are + * special because we don't + * worry about them leaking or managing reference counts on them since they don't actually + * point to any memory. */ public ArrowBuf getEmpty(); /** - * Return the name of this allocator. This is a human readable name that can help debugging. Typically provides + * Return the name of this allocator. This is a human readable name that can help debugging. + * Typically provides * coordinates about where this allocator was created */ public String getName(); /** - * Return whether or not this allocator (or one if its parents) is over its limits. In the case that an allocator is - * over its limit, all consumers of that allocator should aggressively try to addrss the overlimit situation. + * Return whether or not this allocator (or one if its parents) is over its limits. In the case + * that an allocator is + * over its limit, all consumers of that allocator should aggressively try to addrss the + * overlimit situation. */ public boolean isOverLimit(); /** - * Return a verbose string describing this allocator. If in DEBUG mode, this will also include relevant stacktraces + * Return a verbose string describing this allocator. If in DEBUG mode, this will also include + * relevant stacktraces * and historical logs for underlying objects * * @return A very verbose description of the allocator hierarchy. @@ -144,7 +147,8 @@ public interface BufferAllocator extends AutoCloseable { public String toVerboseString(); /** - * Asserts (using java assertions) that the provided allocator is currently open. If assertions are disabled, this is + * Asserts (using java assertions) that the provided allocator is currently open. If assertions + * are disabled, this is * a no-op. */ public void assertOpen(); diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferManager.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferManager.java index 8969434791012..2fe763e10aff9 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BufferManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferManager.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. ******************************************************************************/ + package org.apache.arrow.memory; import io.netty.buffer.ArrowBuf; @@ -24,7 +25,7 @@ * re-allocation the old buffer will be freed. Managing a list of these buffers * prevents some parts of the system from needing to define a correct location * to place the final call to free them. - * + *

* The current uses of these types of buffers are within the pluggable components of Drill. * In UDFs, memory management should not be a concern. We provide access to re-allocatable * ArrowBufs to give UDF writers general purpose buffers we can account for. To prevent the need @@ -38,12 +39,9 @@ public interface BufferManager extends AutoCloseable { /** * Replace an old buffer with a new version at least of the provided size. Does not copy data. * - * @param old - * Old Buffer that the user is no longer going to use. - * @param newSize - * Size of new replacement buffer. - * @return - * A new version of the buffer. + * @param old Old Buffer that the user is no longer going to use. + * @param newSize Size of new replacement buffer. + * @return A new version of the buffer. */ public ArrowBuf replace(ArrowBuf old, int newSize); @@ -57,8 +55,7 @@ public interface BufferManager extends AutoCloseable { /** * Get a managed buffer of at least a certain size. * - * @param size - * The desired size + * @param size The desired size * @return A buffer */ public ArrowBuf getManagedBuffer(int size); diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java index 11c9063fc9c69..f9a6dc72ece8c 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java @@ -6,15 +6,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.arrow.memory; @@ -22,21 +23,22 @@ * Child allocator class. Only slightly different from the {@see RootAllocator}, * in that these can't be created directly, but must be obtained from * {@see BufferAllocator#newChildAllocator(AllocatorOwner, long, long, int)}. - + *

*

Child allocators can only be created by the root, or other children, so * this class is package private.

*/ class ChildAllocator extends BaseAllocator { + /** * Constructor. * * @param parentAllocator parent allocator -- the one creating this child - * @param name the name of this child allocator + * @param name the name of this child allocator * @param initReservation initial amount of space to reserve (obtained from the parent) - * @param maxAllocation maximum amount of space that can be obtained from this allocator; - * note this includes direct allocations (via {@see BufferAllocator#buffer(int, int)} - * et al) and requests from descendant allocators. Depending on the allocation policy in - * force, even less memory may be available + * @param maxAllocation maximum amount of space that can be obtained from this allocator; note + * this includes direct allocations (via {@see BufferAllocator#buffer(int, + *int)} et al) and requests from descendant allocators. Depending on the + * allocation policy in force, even less memory may be available */ ChildAllocator( BaseAllocator parentAllocator, diff --git a/java/memory/src/main/java/org/apache/arrow/memory/OutOfMemoryException.java b/java/memory/src/main/java/org/apache/arrow/memory/OutOfMemoryException.java index 6ba0284d8d449..c36584c9538b0 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/OutOfMemoryException.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/OutOfMemoryException.java @@ -6,28 +6,31 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.arrow.memory; public class OutOfMemoryException extends RuntimeException { - private static final long serialVersionUID = -6858052345185793382L; - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OutOfMemoryException.class); + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OutOfMemoryException + .class); + private static final long serialVersionUID = -6858052345185793382L; public OutOfMemoryException() { super(); } - public OutOfMemoryException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + public OutOfMemoryException(String message, Throwable cause, boolean enableSuppression, boolean + writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java index 57a2c0cdae8d8..1dc6bf0c92fa0 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java @@ -6,15 +6,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.arrow.memory; import com.google.common.annotations.VisibleForTesting; @@ -24,6 +25,7 @@ * tree of descendant child allocators. */ public class RootAllocator extends BaseAllocator { + public RootAllocator(final long limit) { this(AllocationListener.NOOP, limit); } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/package-info.java b/java/memory/src/main/java/org/apache/arrow/memory/package-info.java index 40d25cada4519..cef382d1e044e 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/package-info.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/package-info.java @@ -1,24 +1,43 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + *

* http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + *

+ * Memory Allocation, Account and Management + *

+ * See the README.md file in this directory for detailed information about Arrow's memory allocation + * subsystem. + *

+ * Memory Allocation, Account and Management + *

+ * See the README.md file in this directory for detailed information about Arrow's memory + * allocation subsystem. + *

+ * Memory Allocation, Account and Management + *

+ * See the README.md file in this directory for detailed information about Arrow's memory + * allocation subsystem. + *

+ * Memory Allocation, Account and Management + *

+ * See the README.md file in this directory for detailed information about Arrow's memory + * allocation subsystem. */ /** * Memory Allocation, Account and Management * - * See the README.md file in this directory for detailed information about Arrow's memory allocation subsystem. + * See the README.md file in this directory for detailed information about Arrow's memory + * allocation subsystem. * */ + package org.apache.arrow.memory; diff --git a/java/memory/src/main/java/org/apache/arrow/memory/util/AssertionUtil.java b/java/memory/src/main/java/org/apache/arrow/memory/util/AssertionUtil.java index 28d078528974e..710f572e06027 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/util/AssertionUtil.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/util/AssertionUtil.java @@ -6,32 +6,33 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.arrow.memory.util; public class AssertionUtil { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AssertionUtil.class); public static final boolean ASSERT_ENABLED; + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AssertionUtil.class); - static{ + static { boolean isAssertEnabled = false; assert isAssertEnabled = true; ASSERT_ENABLED = isAssertEnabled; } - public static boolean isAssertionsEnabled(){ - return ASSERT_ENABLED; + private AssertionUtil() { } - private AssertionUtil() { + public static boolean isAssertionsEnabled() { + return ASSERT_ENABLED; } } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/util/AutoCloseableLock.java b/java/memory/src/main/java/org/apache/arrow/memory/util/AutoCloseableLock.java index 94e5cc5fded4f..8d9008c894ac8 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/util/AutoCloseableLock.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/util/AutoCloseableLock.java @@ -6,15 +6,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.arrow.memory.util; import java.util.concurrent.locks.Lock; diff --git a/java/memory/src/main/java/org/apache/arrow/memory/util/HistoricalLog.java b/java/memory/src/main/java/org/apache/arrow/memory/util/HistoricalLog.java index c9b5c5385c596..c464598bfb856 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/util/HistoricalLog.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/util/HistoricalLog.java @@ -6,53 +6,43 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.arrow.memory.util; +import org.slf4j.Logger; + import java.util.Arrays; import java.util.LinkedList; -import org.slf4j.Logger; - /** * Utility class that can be used to log activity within a class * for later logging and debugging. Supports recording events and * recording the stack at the time they occur. */ public class HistoricalLog { - private static class Event { - private final String note; // the event text - private final StackTrace stackTrace; // where the event occurred - private final long time; - - public Event(final String note) { - this.note = note; - this.time = System.nanoTime(); - stackTrace = new StackTrace(); - } - } private final LinkedList history = new LinkedList<>(); private final String idString; // the formatted id string - private Event firstEvent; // the first stack trace recorded private final int limit; // the limit on the number of events kept + private Event firstEvent; // the first stack trace recorded /** * Constructor. The format string will be formatted and have its arguments * substituted at the time this is called. * - * @param idStringFormat {@link String#format} format string that can be used - * to identify this object in a log. Including some kind of unique identifier - * that can be associated with the object instance is best. - * @param args for the format string, or nothing if none are required + * @param idStringFormat {@link String#format} format string that can be used to identify this + * object in a log. Including some kind of unique identifier that can be + * associated with the object instance is best. + * @param args for the format string, or nothing if none are required */ public HistoricalLog(final String idStringFormat, Object... args) { this(Integer.MAX_VALUE, idStringFormat, args); @@ -61,7 +51,7 @@ public HistoricalLog(final String idStringFormat, Object... args) { /** * Constructor. The format string will be formatted and have its arguments * substituted at the time this is called. - * + *

*

This form supports the specification of a limit that will limit the * number of historical entries kept (which keeps down the amount of memory * used). With the limit, the first entry made is always kept (under the @@ -70,12 +60,12 @@ public HistoricalLog(final String idStringFormat, Object... args) { * Each time a new entry is made, the oldest that is not the first is dropped. *

* - * @param limit the maximum number of historical entries that will be kept, - * not including the first entry made - * @param idStringFormat {@link String#format} format string that can be used - * to identify this object in a log. Including some kind of unique identifier - * that can be associated with the object instance is best. - * @param args for the format string, or nothing if none are required + * @param limit the maximum number of historical entries that will be kept, not including + * the first entry made + * @param idStringFormat {@link String#format} format string that can be used to identify this + * object in a log. Including some kind of unique identifier that can be + * associated with the object instance is best. + * @param args for the format string, or nothing if none are required */ public HistoricalLog(final int limit, final String idStringFormat, Object... args) { this.limit = limit; @@ -88,7 +78,7 @@ public HistoricalLog(final int limit, final String idStringFormat, Object... arg * at the time this is called. * * @param noteFormat {@link String#format} format string that describes the event - * @param args for the format string, or nothing if none are required + * @param args for the format string, or nothing if none are required */ public synchronized void recordEvent(final String noteFormat, Object... args) { final String note = String.format(noteFormat, args); @@ -113,23 +103,14 @@ public void buildHistory(final StringBuilder sb, boolean includeStackTrace) { buildHistory(sb, 0, includeStackTrace); } - /** - * Write the history of this object to the given {@link StringBuilder}. The history - * includes the identifying string provided at construction time, and all the recorded - * events with their stack traces. - * - * @param sb {@link StringBuilder} to write to - * @param additional an extra string that will be written between the identifying - * information and the history; often used for a current piece of state - */ - /** * * @param sb * @param indent * @param includeStackTrace */ - public synchronized void buildHistory(final StringBuilder sb, int indent, boolean includeStackTrace) { + public synchronized void buildHistory(final StringBuilder sb, int indent, boolean + includeStackTrace) { final char[] indentation = new char[indent]; final char[] innerIndentation = new char[indent + 2]; Arrays.fill(indentation, ' '); @@ -140,7 +121,6 @@ public synchronized void buildHistory(final StringBuilder sb, int indent, boolea .append(idString) .append('\n'); - if (firstEvent != null) { sb.append(innerIndentation) .append(firstEvent.time) @@ -151,7 +131,7 @@ public synchronized void buildHistory(final StringBuilder sb, int indent, boolea firstEvent.stackTrace.writeToBuilder(sb, indent + 2); } - for(final Event event : history) { + for (final Event event : history) { if (event == firstEvent) { continue; } @@ -170,6 +150,16 @@ public synchronized void buildHistory(final StringBuilder sb, int indent, boolea } } + /** + * Write the history of this object to the given {@link StringBuilder}. The history + * includes the identifying string provided at construction time, and all the recorded + * events with their stack traces. + * + * @param sb {@link StringBuilder} to write to + * @param additional an extra string that will be written between the identifying + * information and the history; often used for a current piece of state + */ + /** * Write the history of this object to the given {@link Logger}. The history * includes the identifying string provided at construction time, and all the recorded @@ -182,4 +172,17 @@ public void logHistory(final Logger logger) { buildHistory(sb, 0, true); logger.debug(sb.toString()); } + + private static class Event { + + private final String note; // the event text + private final StackTrace stackTrace; // where the event occurred + private final long time; + + public Event(final String note) { + this.note = note; + this.time = System.nanoTime(); + stackTrace = new StackTrace(); + } + } } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/util/StackTrace.java b/java/memory/src/main/java/org/apache/arrow/memory/util/StackTrace.java index 638c2fb9a959e..bb4ea6c46179e 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/util/StackTrace.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/util/StackTrace.java @@ -6,15 +6,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.arrow.memory.util; import java.util.Arrays; @@ -23,6 +24,7 @@ * Convenient way of obtaining and manipulating stack traces for debugging. */ public class StackTrace { + private final StackTraceElement[] stackTraceElements; /** @@ -36,10 +38,9 @@ public StackTrace() { /** * Write the stack trace to a StringBuilder. - * @param sb - * where to write it - * @param indent - * how many double spaces to indent each line + * + * @param sb where to write it + * @param indent how many double spaces to indent each line */ public void writeToBuilder(final StringBuilder sb, final int indent) { // create the indentation string @@ -47,7 +48,7 @@ public void writeToBuilder(final StringBuilder sb, final int indent) { Arrays.fill(indentation, ' '); // write the stack trace in standard Java format - for(StackTraceElement ste : stackTraceElements) { + for (StackTraceElement ste : stackTraceElements) { sb.append(indentation) .append("at ") .append(ste.getClassName()) diff --git a/java/pom.xml b/java/pom.xml index fa03783396ffb..774761f0c1e66 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -35,6 +35,7 @@ 2 2.7.1 2.7.1 + false @@ -269,6 +270,47 @@ + + + org.apache.maven.plugins + maven-checkstyle-plugin + 2.17 + + + com.puppycrawl.tools + checkstyle + 6.15 + + + com.google.guava + guava + ${dep.guava.version} + + + + + validate + validate + + check + + + + + google_checks.xml + UTF-8 + true + ${checkstyle.failOnViolation} + ${checkstyle.failOnViolation} + warning + xml + html + ${project.build.directory}/test/checkstyle-errors.xml + false + + + + @@ -382,6 +424,19 @@ + + + org.apache.maven.plugins + maven-checkstyle-plugin + [0,) + + check + + + + + + diff --git a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java index 7c0cadd9d77dd..24079b62da919 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java @@ -6,20 +6,17 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.arrow.tools; -import java.io.IOException; -import java.net.ServerSocket; -import java.net.Socket; +package org.apache.arrow.tools; import com.google.common.base.Preconditions; @@ -31,11 +28,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; + public class EchoServer { private static final Logger LOGGER = LoggerFactory.getLogger(EchoServer.class); - - private boolean closed = false; private final ServerSocket serverSocket; + private boolean closed = false; public EchoServer(int port) throws IOException { LOGGER.info("Starting echo server."); @@ -43,22 +43,64 @@ public EchoServer(int port) throws IOException { LOGGER.info("Running echo server on port: " + port()); } - public int port() { return serverSocket.getLocalPort(); } + public static void main(String[] args) throws Exception { + int port; + if (args.length > 0) { + port = Integer.parseInt(args[0]); + } else { + port = 8080; + } + new EchoServer(port).run(); + } + + public int port() { + return serverSocket.getLocalPort(); + } + + public void run() throws IOException { + try { + while (!closed) { + LOGGER.info("Waiting to accept new client connection."); + Socket clientSocket = serverSocket.accept(); + LOGGER.info("Accepted new client connection."); + try (ClientConnection client = new ClientConnection(clientSocket)) { + try { + client.run(); + } catch (IOException e) { + LOGGER.warn("Error handling client connection.", e); + } + } + LOGGER.info("Closed connection with client"); + } + } catch (java.net.SocketException ex) { + if (!closed) throw ex; + } finally { + serverSocket.close(); + LOGGER.info("Server closed."); + } + } + + public void close() throws IOException { + closed = true; + serverSocket.close(); + } public static class ClientConnection implements AutoCloseable { public final Socket socket; + public ClientConnection(Socket socket) { this.socket = socket; } public void run() throws IOException { - BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); // Read the entire input stream and write it back try (ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) { VectorSchemaRoot root = reader.getVectorSchemaRoot(); // load the first batch before instantiating the writer so that we have any dictionaries reader.loadNextBatch(); - try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, socket.getOutputStream())) { + try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, socket + .getOutputStream())) { writer.start(); int echoed = 0; while (true) { @@ -83,42 +125,4 @@ public void close() throws IOException { socket.close(); } } - - public void run() throws IOException { - try { - while (!closed) { - LOGGER.info("Waiting to accept new client connection."); - Socket clientSocket = serverSocket.accept(); - LOGGER.info("Accepted new client connection."); - try (ClientConnection client = new ClientConnection(clientSocket)) { - try { - client.run(); - } catch (IOException e) { - LOGGER.warn("Error handling client connection.", e); - } - } - LOGGER.info("Closed connection with client"); - } - } catch (java.net.SocketException ex) { - if (!closed) throw ex; - } finally { - serverSocket.close(); - LOGGER.info("Server closed."); - } - } - - public void close() throws IOException { - closed = true; - serverSocket.close(); - } - - public static void main(String[] args) throws Exception { - int port; - if (args.length > 0) { - port = Integer.parseInt(args[0]); - } else { - port = 8080; - } - new EchoServer(port).run(); - } } diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java index 9fa7b761a5772..b8621920d3348 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java @@ -16,13 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.arrow.tools; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintStream; +package org.apache.arrow.tools; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -38,17 +33,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; + public class FileRoundtrip { private static final Logger LOGGER = LoggerFactory.getLogger(FileRoundtrip.class); - - public static void main(String[] args) { - System.exit(new FileRoundtrip(System.out, System.err).run(args)); - } - private final Options options; private final PrintStream out; private final PrintStream err; - FileRoundtrip(PrintStream out, PrintStream err) { this.out = out; this.err = err; @@ -58,6 +53,10 @@ public static void main(String[] args) { } + public static void main(String[] args) { + System.exit(new FileRoundtrip(System.out, System.err).run(args)); + } + private File validateFile(String type, String fileName) { if (fileName == null) { throw new IllegalArgumentException("missing " + type + " file parameter"); @@ -81,7 +80,8 @@ int run(String[] args) { File outFile = validateFile("output", outFileName); BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); // TODO: close try (FileInputStream fileInputStream = new FileInputStream(inFile); - ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) { + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), + allocator)) { VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); Schema schema = root.getSchema(); @@ -89,7 +89,8 @@ int run(String[] args) { LOGGER.debug("Found schema: " + schema); try (FileOutputStream fileOutputStream = new FileOutputStream(outFile); - ArrowFileWriter arrowWriter = new ArrowFileWriter(root, arrowReader, fileOutputStream.getChannel())) { + ArrowFileWriter arrowWriter = new ArrowFileWriter(root, arrowReader, + fileOutputStream.getChannel())) { arrowWriter.start(); while (true) { arrowReader.loadNextBatch(); diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java index d5345535d19dc..be404fd4c5950 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java @@ -6,22 +6,17 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.arrow.tools; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; +package org.apache.arrow.tools; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -29,6 +24,12 @@ import org.apache.arrow.vector.file.ArrowFileReader; import org.apache.arrow.vector.stream.ArrowStreamWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; + /** * Converts an Arrow file to an Arrow stream. The file should be specified as the * first argument and the output is written to standard out. diff --git a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java index 5d4849c234383..453693d7fa489 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java @@ -16,15 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.arrow.tools; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; +package org.apache.arrow.tools; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -44,8 +37,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + public class Integration { private static final Logger LOGGER = LoggerFactory.getLogger(Integration.class); + private final Options options; + + Integration() { + this.options = new Options(); + this.options.addOption("a", "arrow", true, "arrow file"); + this.options.addOption("j", "json", true, "json file"); + this.options.addOption("c", "command", true, "command to execute: " + Arrays.toString(Command + .values())); + } public static void main(String[] args) { try { @@ -59,20 +69,61 @@ public static void main(String[] args) { } } - private final Options options; + private static void fatalError(String message, Throwable e) { + System.err.println(message); + System.err.println(e.getMessage()); + LOGGER.error(message, e); + System.exit(1); + } + + private File validateFile(String type, String fileName, boolean shouldExist) { + if (fileName == null) { + throw new IllegalArgumentException("missing " + type + " file parameter"); + } + File f = new File(fileName); + if (shouldExist && (!f.exists() || f.isDirectory())) { + throw new IllegalArgumentException(type + " file not found: " + f.getAbsolutePath()); + } + if (!shouldExist && f.exists()) { + throw new IllegalArgumentException(type + " file already exists: " + f.getAbsolutePath()); + } + return f; + } + + void run(String[] args) throws ParseException, IOException { + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(options, args, false); + + + Command command = toCommand(cmd.getOptionValue("command")); + File arrowFile = validateFile("arrow", cmd.getOptionValue("arrow"), command.arrowExists); + File jsonFile = validateFile("json", cmd.getOptionValue("json"), command.jsonExists); + command.execute(arrowFile, jsonFile); + } + + private Command toCommand(String commandName) { + try { + return Command.valueOf(commandName); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Unknown command: " + commandName + " expected one of " + + Arrays.toString(Command.values())); + } + } enum Command { ARROW_TO_JSON(true, false) { @Override public void execute(File arrowFile, File jsonFile) throws IOException { - try(BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); - FileInputStream fileInputStream = new FileInputStream(arrowFile); - ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) { + try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(arrowFile); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), + allocator)) { VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); Schema schema = root.getSchema(); LOGGER.debug("Input file size: " + arrowFile.length()); LOGGER.debug("Found schema: " + schema); - try (JsonFileWriter writer = new JsonFileWriter(jsonFile, JsonFileWriter.config().pretty(true))) { + try (JsonFileWriter writer = new JsonFileWriter(jsonFile, JsonFileWriter.config() + .pretty(true))) { writer.start(schema); for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) { arrowReader.loadRecordBatch(rbBlock); @@ -94,7 +145,8 @@ public void execute(File arrowFile, File jsonFile) throws IOException { try (FileOutputStream fileOutputStream = new FileOutputStream(arrowFile); VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); // TODO json dictionaries - ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel())) { + ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream + .getChannel())) { arrowWriter.start(); reader.read(root); while (root.getRowCount() != 0) { @@ -113,7 +165,8 @@ public void execute(File arrowFile, File jsonFile) throws IOException { try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator); FileInputStream fileInputStream = new FileInputStream(arrowFile); - ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) { + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), + allocator)) { Schema jsonSchema = jsonReader.start(); VectorSchemaRoot arrowRoot = arrowReader.getVectorSchemaRoot(); Schema arrowSchema = arrowRoot.getSchema(); @@ -135,7 +188,8 @@ public void execute(File arrowFile, File jsonFile) throws IOException { boolean hasMoreJSON = jsonRoot != null; boolean hasMoreArrow = iterator.hasNext(); if (hasMoreJSON || hasMoreArrow) { - throw new IllegalArgumentException("Unexpected RecordBatches. J:" + hasMoreJSON + " A:" + hasMoreArrow); + throw new IllegalArgumentException("Unexpected RecordBatches. J:" + hasMoreJSON + " " + + "A:" + hasMoreArrow); } } } @@ -153,51 +207,4 @@ public void execute(File arrowFile, File jsonFile) throws IOException { } - Integration() { - this.options = new Options(); - this.options.addOption("a", "arrow", true, "arrow file"); - this.options.addOption("j", "json", true, "json file"); - this.options.addOption("c", "command", true, "command to execute: " + Arrays.toString(Command.values())); - } - - private File validateFile(String type, String fileName, boolean shouldExist) { - if (fileName == null) { - throw new IllegalArgumentException("missing " + type + " file parameter"); - } - File f = new File(fileName); - if (shouldExist && (!f.exists() || f.isDirectory())) { - throw new IllegalArgumentException(type + " file not found: " + f.getAbsolutePath()); - } - if (!shouldExist && f.exists()) { - throw new IllegalArgumentException(type + " file already exists: " + f.getAbsolutePath()); - } - return f; - } - - void run(String[] args) throws ParseException, IOException { - CommandLineParser parser = new PosixParser(); - CommandLine cmd = parser.parse(options, args, false); - - - Command command = toCommand(cmd.getOptionValue("command")); - File arrowFile = validateFile("arrow", cmd.getOptionValue("arrow"), command.arrowExists); - File jsonFile = validateFile("json", cmd.getOptionValue("json"), command.jsonExists); - command.execute(arrowFile, jsonFile); - } - - private Command toCommand(String commandName) { - try { - return Command.valueOf(commandName); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Unknown command: " + commandName + " expected one of " + Arrays.toString(Command.values())); - } - } - - private static void fatalError(String message, Throwable e) { - System.err.println(message); - System.err.println(e.getMessage()); - LOGGER.error(message, e); - System.exit(1); - } - } diff --git a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java index 3b79d5b05e116..41dfd347be579 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java @@ -6,17 +6,24 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.arrow.tools; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.file.ArrowFileWriter; +import org.apache.arrow.vector.stream.ArrowStreamReader; + import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -25,12 +32,6 @@ import java.io.OutputStream; import java.nio.channels.Channels; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.file.ArrowFileWriter; -import org.apache.arrow.vector.stream.ArrowStreamReader; - /** * Converts an Arrow stream to an Arrow file. */ diff --git a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java index f752f7eaa74b9..1a389098b4f47 100644 --- a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java +++ b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java @@ -16,13 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.arrow.tools; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; +package org.apache.arrow.tools; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.FieldVector; @@ -39,6 +34,12 @@ import org.apache.arrow.vector.types.pojo.Schema; import org.junit.Assert; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; + public class ArrowFileTestFixtures { static final int COUNT = 10; @@ -58,9 +59,11 @@ static void writeData(int count, MapVector parent) { static void validateOutput(File testOutFile, BufferAllocator allocator) throws Exception { // read - try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer + .MAX_VALUE); FileInputStream fileInputStream = new FileInputStream(testOutFile); - ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) { + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), + readerAllocator)) { VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); Schema schema = root.getSchema(); for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) { @@ -81,16 +84,19 @@ static void validateContent(int count, VectorSchemaRoot root) { static void write(FieldVector parent, File file) throws FileNotFoundException, IOException { VectorSchemaRoot root = new VectorSchemaRoot(parent); try (FileOutputStream fileOutputStream = new FileOutputStream(file); - ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel())) { + ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream + .getChannel())) { arrowWriter.writeBatch(); } } - static void writeInput(File testInFile, BufferAllocator allocator) throws FileNotFoundException, IOException { + static void writeInput(File testInFile, BufferAllocator allocator) throws + FileNotFoundException, IOException { int count = ArrowFileTestFixtures.COUNT; try ( - BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, + Integer.MAX_VALUE); MapVector parent = new MapVector("parent", vectorAllocator, null)) { writeData(count, parent); write(parent.getChild("root"), testInFile); diff --git a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java index 706f8e2ca4d36..5970c57f46583 100644 --- a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java +++ b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java @@ -6,28 +6,17 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.arrow.tools; -import static java.util.Arrays.asList; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.net.Socket; -import java.net.UnknownHostException; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; +package org.apache.arrow.tools; import com.google.common.collect.ImmutableList; @@ -57,6 +46,18 @@ import org.junit.BeforeClass; import org.junit.Test; +import java.io.IOException; +import java.net.Socket; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class EchoServerTest { private static EchoServer server; @@ -94,8 +95,8 @@ private void testEchoServer(int serverPort, BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); VectorSchemaRoot root = new VectorSchemaRoot(asList(field), asList((FieldVector) vector), 0); try (Socket socket = new Socket("localhost", serverPort); - ArrowStreamWriter writer = new ArrowStreamWriter(root, null, socket.getOutputStream()); - ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), alloc)) { + ArrowStreamWriter writer = new ArrowStreamWriter(root, null, socket.getOutputStream()); + ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), alloc)) { writer.start(); for (int i = 0; i < batches; i++) { vector.allocateNew(16); @@ -111,7 +112,8 @@ private void testEchoServer(int serverPort, assertEquals(new Schema(asList(field)), reader.getVectorSchemaRoot().getSchema()); - NullableTinyIntVector readVector = (NullableTinyIntVector) reader.getVectorSchemaRoot().getFieldVectors().get(0); + NullableTinyIntVector readVector = (NullableTinyIntVector) reader.getVectorSchemaRoot() + .getFieldVectors().get(0); for (int i = 0; i < batches; i++) { reader.loadNextBatch(); assertEquals(16, reader.getVectorSchemaRoot().getRowCount()); @@ -131,7 +133,8 @@ private void testEchoServer(int serverPort, public void basicTest() throws InterruptedException, IOException { BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); - Field field = new Field("testField", true, new ArrowType.Int(8, true), Collections.emptyList()); + Field field = new Field("testField", true, new ArrowType.Int(8, true), Collections + .emptyList()); NullableTinyIntVector vector = new NullableTinyIntVector("testField", alloc, null); Schema schema = new Schema(asList(field)); @@ -150,7 +153,8 @@ public void testFlatDictionary() throws IOException { DictionaryEncoding writeEncoding = new DictionaryEncoding(1L, false, null); try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); NullableIntVector writeVector = new NullableIntVector("varchar", allocator, writeEncoding); - NullableVarCharVector writeDictionaryVector = new NullableVarCharVector("dict", allocator, null)) { + NullableVarCharVector writeDictionaryVector = new NullableVarCharVector("dict", + allocator, null)) { writeVector.allocateNewSafe(); NullableIntVector.Mutator mutator = writeVector.getMutator(); mutator.set(0, 0); @@ -171,10 +175,12 @@ public void testFlatDictionary() throws IOException { List vectors = ImmutableList.of((FieldVector) writeVector); VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 6); - DictionaryProvider writeProvider = new MapDictionaryProvider(new Dictionary(writeDictionaryVector, writeEncoding)); + DictionaryProvider writeProvider = new MapDictionaryProvider(new Dictionary + (writeDictionaryVector, writeEncoding)); try (Socket socket = new Socket("localhost", serverPort); - ArrowStreamWriter writer = new ArrowStreamWriter(root, writeProvider, socket.getOutputStream()); + ArrowStreamWriter writer = new ArrowStreamWriter(root, writeProvider, socket + .getOutputStream()); ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) { writer.start(); writer.writeBatch(); @@ -202,7 +208,8 @@ public void testFlatDictionary() throws IOException { Dictionary dictionary = reader.lookup(1L); Assert.assertNotNull(dictionary); - NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) dictionary.getVector()).getAccessor(); + NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) dictionary + .getVector()).getAccessor(); Assert.assertEquals(3, dictionaryAccessor.getValueCount()); Assert.assertEquals(new Text("foo"), dictionaryAccessor.getObject(0)); Assert.assertEquals(new Text("bar"), dictionaryAccessor.getObject(1)); @@ -215,7 +222,8 @@ public void testFlatDictionary() throws IOException { public void testNestedDictionary() throws IOException { DictionaryEncoding writeEncoding = new DictionaryEncoding(2L, false, null); try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); - NullableVarCharVector writeDictionaryVector = new NullableVarCharVector("dictionary", allocator, null); + NullableVarCharVector writeDictionaryVector = new NullableVarCharVector("dictionary", + allocator, null); ListVector writeVector = new ListVector("list", allocator, null, null)) { // data being written: @@ -245,10 +253,12 @@ public void testNestedDictionary() throws IOException { List vectors = ImmutableList.of((FieldVector) writeVector); VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 3); - DictionaryProvider writeProvider = new MapDictionaryProvider(new Dictionary(writeDictionaryVector, writeEncoding)); + DictionaryProvider writeProvider = new MapDictionaryProvider(new Dictionary + (writeDictionaryVector, writeEncoding)); try (Socket socket = new Socket("localhost", serverPort); - ArrowStreamWriter writer = new ArrowStreamWriter(root, writeProvider, socket.getOutputStream()); + ArrowStreamWriter writer = new ArrowStreamWriter(root, writeProvider, socket + .getOutputStream()); ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) { writer.start(); writer.writeBatch(); @@ -262,7 +272,8 @@ public void testNestedDictionary() throws IOException { Assert.assertNotNull(readVector); Assert.assertNull(readVector.getField().getDictionary()); - DictionaryEncoding readEncoding = readVector.getField().getChildren().get(0).getDictionary(); + DictionaryEncoding readEncoding = readVector.getField().getChildren().get(0) + .getDictionary(); Assert.assertNotNull(readEncoding); Assert.assertEquals(2L, readEncoding.getId()); @@ -281,7 +292,8 @@ public void testNestedDictionary() throws IOException { Dictionary readDictionary = reader.lookup(2L); Assert.assertNotNull(readDictionary); - NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) readDictionary.getVector()).getAccessor(); + NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) + readDictionary.getVector()).getAccessor(); Assert.assertEquals(2, dictionaryAccessor.getValueCount()); Assert.assertEquals(new Text("foo"), dictionaryAccessor.getObject(0)); Assert.assertEquals(new Text("bar"), dictionaryAccessor.getObject(1)); diff --git a/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java b/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java index ee39f5e92c7b0..78021f8ad076c 100644 --- a/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java +++ b/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java @@ -16,13 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.arrow.tools; -import static org.apache.arrow.tools.ArrowFileTestFixtures.validateOutput; -import static org.apache.arrow.tools.ArrowFileTestFixtures.writeInput; -import static org.junit.Assert.assertEquals; - -import java.io.File; +package org.apache.arrow.tools; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -32,6 +27,12 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.File; + +import static org.apache.arrow.tools.ArrowFileTestFixtures.validateOutput; +import static org.apache.arrow.tools.ArrowFileTestFixtures.writeInput; +import static org.junit.Assert.assertEquals; + public class TestFileRoundtrip { @Rule @@ -56,7 +57,7 @@ public void test() throws Exception { writeInput(testInFile, allocator); - String[] args = { "-i", testInFile.getAbsolutePath(), "-o", testOutFile.getAbsolutePath()}; + String[] args = {"-i", testInFile.getAbsolutePath(), "-o", testOutFile.getAbsolutePath()}; int result = new FileRoundtrip(System.out, System.err).run(args); assertEquals(0, result); diff --git a/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java b/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java index 9d4ef5c26505b..7d9a41985bbe3 100644 --- a/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java +++ b/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java @@ -16,22 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.arrow.tools; - -import static org.apache.arrow.tools.ArrowFileTestFixtures.validateOutput; -import static org.apache.arrow.tools.ArrowFileTestFixtures.write; -import static org.apache.arrow.tools.ArrowFileTestFixtures.writeData; -import static org.apache.arrow.tools.ArrowFileTestFixtures.writeInput; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.StringReader; -import java.util.Map; +package org.apache.arrow.tools; import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; import com.fasterxml.jackson.core.util.DefaultPrettyPrinter.NopIndenter; @@ -54,12 +40,75 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.StringReader; +import java.util.Map; + +import static org.apache.arrow.tools.ArrowFileTestFixtures.validateOutput; +import static org.apache.arrow.tools.ArrowFileTestFixtures.write; +import static org.apache.arrow.tools.ArrowFileTestFixtures.writeData; +import static org.apache.arrow.tools.ArrowFileTestFixtures.writeInput; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class TestIntegration { @Rule public TemporaryFolder testFolder = new TemporaryFolder(); private BufferAllocator allocator; + private ObjectMapper om = new ObjectMapper(); + + { + DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter(); + prettyPrinter.indentArraysWith(NopIndenter.instance); + om.setDefaultPrettyPrinter(prettyPrinter); + om.enable(SerializationFeature.INDENT_OUTPUT); + om.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS); + } + + static void writeInputFloat(File testInFile, BufferAllocator allocator, double... f) throws + FileNotFoundException, IOException { + try ( + BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, + Integer.MAX_VALUE); + MapVector parent = new MapVector("parent", vectorAllocator, null)) { + ComplexWriter writer = new ComplexWriterImpl("root", parent); + MapWriter rootWriter = writer.rootAsMap(); + Float8Writer floatWriter = rootWriter.float8("float"); + for (int i = 0; i < f.length; i++) { + floatWriter.setPosition(i); + floatWriter.writeFloat8(f[i]); + } + writer.setValueCount(f.length); + write(parent.getChild("root"), testInFile); + } + } + + static void writeInput2(File testInFile, BufferAllocator allocator) throws + FileNotFoundException, IOException { + int count = ArrowFileTestFixtures.COUNT; + try ( + BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, + Integer.MAX_VALUE); + MapVector parent = new MapVector("parent", vectorAllocator, null)) { + writeData(count, parent); + ComplexWriter writer = new ComplexWriterImpl("root", parent); + MapWriter rootWriter = writer.rootAsMap(); + IntWriter intWriter = rootWriter.integer("int"); + BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt"); + intWriter.setPosition(5); + intWriter.writeInt(999); + bigIntWriter.setPosition(4); + bigIntWriter.writeBigInt(777L); + writer.setValueCount(count); + write(parent.getChild("root"), testInFile); + } + } @Before public void init() { @@ -85,18 +134,21 @@ public void testValid() throws Exception { Integration integration = new Integration(); // convert it to json - String[] args1 = { "-arrow", testInFile.getAbsolutePath(), "-json", testJSONFile.getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; + String[] args1 = {"-arrow", testInFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; integration.run(args1); // convert back to arrow - String[] args2 = { "-arrow", testOutFile.getAbsolutePath(), "-json", testJSONFile.getAbsolutePath(), "-command", Command.JSON_TO_ARROW.name()}; + String[] args2 = {"-arrow", testOutFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.JSON_TO_ARROW.name()}; integration.run(args2); // check it is the same validateOutput(testOutFile, allocator); // validate arrow against json - String[] args3 = { "-arrow", testInFile.getAbsolutePath(), "-json", testJSONFile.getAbsolutePath(), "-command", Command.VALIDATE.name()}; + String[] args3 = {"-arrow", testInFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.VALIDATE.name()}; integration.run(args3); } @@ -111,11 +163,13 @@ public void testJSONRoundTripWithVariableWidth() throws Exception { Integration integration = new Integration(); // convert to arrow - String[] args1 = { "-arrow", testOutFile.getAbsolutePath(), "-json", testJSONFile.getAbsolutePath(), "-command", Command.JSON_TO_ARROW.name()}; + String[] args1 = {"-arrow", testOutFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.JSON_TO_ARROW.name()}; integration.run(args1); // convert back to json - String[] args2 = { "-arrow", testOutFile.getAbsolutePath(), "-json", testRoundTripJSONFile.getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; + String[] args2 = {"-arrow", testOutFile.getAbsolutePath(), "-json", testRoundTripJSONFile + .getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; integration.run(args2); BufferedReader orig = readNormalized(testJSONFile); @@ -139,11 +193,13 @@ public void testJSONRoundTripWithStruct() throws Exception { Integration integration = new Integration(); // convert to arrow - String[] args1 = { "-arrow", testOutFile.getAbsolutePath(), "-json", testJSONFile.getAbsolutePath(), "-command", Command.JSON_TO_ARROW.name()}; + String[] args1 = {"-arrow", testOutFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.JSON_TO_ARROW.name()}; integration.run(args1); // convert back to json - String[] args2 = { "-arrow", testOutFile.getAbsolutePath(), "-json", testRoundTripJSONFile.getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; + String[] args2 = {"-arrow", testOutFile.getAbsolutePath(), "-json", testRoundTripJSONFile + .getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; integration.run(args2); BufferedReader orig = readNormalized(testJSONFile); @@ -156,22 +212,12 @@ public void testJSONRoundTripWithStruct() throws Exception { } } - private ObjectMapper om = new ObjectMapper(); - { - DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter(); - prettyPrinter.indentArraysWith(NopIndenter.instance); - om.setDefaultPrettyPrinter(prettyPrinter); - om.enable(SerializationFeature.INDENT_OUTPUT); - om.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS); - } - private BufferedReader readNormalized(File f) throws IOException { - Map tree = om.readValue(f, Map.class); + Map tree = om.readValue(f, Map.class); String normalized = om.writeValueAsString(tree); return new BufferedReader(new StringReader(normalized)); } - /** * the test should not be sensitive to small variations in float representation */ @@ -190,11 +236,13 @@ public void testFloat() throws Exception { Integration integration = new Integration(); // convert the "valid" file to json - String[] args1 = { "-arrow", testValidInFile.getAbsolutePath(), "-json", testJSONFile.getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; + String[] args1 = {"-arrow", testValidInFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; integration.run(args1); // compare the "invalid" file to the "valid" json - String[] args3 = { "-arrow", testInvalidInFile.getAbsolutePath(), "-json", testJSONFile.getAbsolutePath(), "-command", Command.VALIDATE.name()}; + String[] args3 = {"-arrow", testInvalidInFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.VALIDATE.name()}; // this should fail integration.run(args3); } @@ -214,11 +262,13 @@ public void testInvalid() throws Exception { Integration integration = new Integration(); // convert the "valid" file to json - String[] args1 = { "-arrow", testValidInFile.getAbsolutePath(), "-json", testJSONFile.getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; + String[] args1 = {"-arrow", testValidInFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; integration.run(args1); // compare the "invalid" file to the "valid" json - String[] args3 = { "-arrow", testInvalidInFile.getAbsolutePath(), "-json", testJSONFile.getAbsolutePath(), "-command", Command.VALIDATE.name()}; + String[] args3 = {"-arrow", testInvalidInFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.VALIDATE.name()}; // this should fail try { integration.run(args3); @@ -229,39 +279,4 @@ public void testInvalid() throws Exception { } } - - static void writeInputFloat(File testInFile, BufferAllocator allocator, double... f) throws FileNotFoundException, IOException { - try ( - BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); - MapVector parent = new MapVector("parent", vectorAllocator, null)) { - ComplexWriter writer = new ComplexWriterImpl("root", parent); - MapWriter rootWriter = writer.rootAsMap(); - Float8Writer floatWriter = rootWriter.float8("float"); - for (int i = 0; i < f.length; i++) { - floatWriter.setPosition(i); - floatWriter.writeFloat8(f[i]); - } - writer.setValueCount(f.length); - write(parent.getChild("root"), testInFile); - } - } - - static void writeInput2(File testInFile, BufferAllocator allocator) throws FileNotFoundException, IOException { - int count = ArrowFileTestFixtures.COUNT; - try ( - BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); - MapVector parent = new MapVector("parent", vectorAllocator, null)) { - writeData(count, parent); - ComplexWriter writer = new ComplexWriterImpl("root", parent); - MapWriter rootWriter = writer.rootAsMap(); - IntWriter intWriter = rootWriter.integer("int"); - BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt"); - intWriter.setPosition(5); - intWriter.writeInt(999); - bigIntWriter.setPosition(4); - bigIntWriter.writeBigInt(777L); - writer.setValueCount(count); - write(parent.getChild("root"), testInFile); - } - } }