diff --git a/java/c/src/main/java/org/apache/arrow/c/ArrayExporter.java b/java/c/src/main/java/org/apache/arrow/c/ArrayExporter.java index d6479a3ba4ca8..05ab3e5ff6063 100644 --- a/java/c/src/main/java/org/apache/arrow/c/ArrayExporter.java +++ b/java/c/src/main/java/org/apache/arrow/c/ArrayExporter.java @@ -98,15 +98,7 @@ void export(ArrowArray array, FieldVector vector, DictionaryProvider dictionaryP if (buffers != null) { data.buffers = new ArrayList<>(buffers.size()); data.buffers_ptrs = allocator.buffer((long) buffers.size() * Long.BYTES); - for (ArrowBuf arrowBuf : buffers) { - if (arrowBuf != null) { - arrowBuf.getReferenceManager().retain(); - data.buffers_ptrs.writeLong(arrowBuf.memoryAddress()); - } else { - data.buffers_ptrs.writeLong(NULL); - } - data.buffers.add(arrowBuf); - } + vector.exportCDataBuffers(data.buffers, data.buffers_ptrs, NULL); } if (dictionaryEncoding != null) { diff --git a/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java b/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java index a7e3cde2e7b4b..768394ef7ab60 100644 --- a/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java +++ b/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.arrow.memory.ArrowBuf; @@ -165,10 +166,25 @@ VectorSchemaRoot vectorSchemaRootRoundtrip(VectorSchemaRoot root) { } boolean roundtrip(FieldVector vector, Class clazz) { + List fieldBuffers = vector.getFieldBuffers(); + List orgRefCnts = fieldBuffers.stream().map(buf -> buf.refCnt()).collect(Collectors.toList()); + long orgMemorySize = allocator.getAllocatedMemory(); + + boolean result = false; try (ValueVector imported = vectorRoundtrip(vector)) { assertTrue(clazz.isInstance(imported), String.format("expected %s but was %s", clazz, imported.getClass())); - return VectorEqualsVisitor.vectorEquals(vector, imported); + result = VectorEqualsVisitor.vectorEquals(vector, imported); } + + // Check that the ref counts of the buffers are the same after the roundtrip + IntStream.range(0, orgRefCnts.size()).forEach(i -> { + ArrowBuf buf = fieldBuffers.get(i); + assertEquals(buf.refCnt(), orgRefCnts.get(i)); + }); + + assertEquals(orgMemorySize, allocator.getAllocatedMemory()); + + return result; } @Test diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseLargeVariableWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseLargeVariableWidthVector.java index c239edbcc3c29..34c9e73a0b072 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseLargeVariableWidthVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseLargeVariableWidthVector.java @@ -336,6 +336,34 @@ public List getFieldBuffers() { return result; } + /** + * Export the buffers of the fields for C Data Interface. This method traverse the buffers and + * export buffer and buffer's memory address into a list of buffers and a pointer to the list of buffers. + */ + @Override + public void exportCDataBuffers(List buffers, ArrowBuf buffersPtr, long nullValue) { + // before flight/IPC, we must bring the vector to a consistent state. + // this is because, it is possible that the offset buffers of some trailing values + // are not updated. this may cause some data in the data buffer being lost. + // for details, please see TestValueVector#testUnloadVariableWidthVector. + fillHoles(valueCount); + + exportBuffer(validityBuffer, buffers, buffersPtr, nullValue, true); + + if (offsetBuffer.capacity() == 0) { + // Empty offset buffer is allowed for historical reason. + // To export it through C Data interface, we need to allocate a buffer with one offset. + // We set `retain = false` to explicitly not increase the ref count for the exported buffer. + // The ref count of the newly created buffer (i.e., 1) already represents the usage + // at imported side. + exportBuffer(allocateOffsetBuffer(OFFSET_WIDTH), buffers, buffersPtr, nullValue, false); + } else { + exportBuffer(offsetBuffer, buffers, buffersPtr, nullValue, true); + } + + exportBuffer(valueBuffer, buffers, buffersPtr, nullValue, true); + } + /** * Set the reader and writer indexes for the inner buffers. */ @@ -456,10 +484,11 @@ private void allocateBytes(final long valueBufferSize, final int valueCount) { } /* allocate offset buffer */ - private void allocateOffsetBuffer(final long size) { - offsetBuffer = allocator.buffer(size); + private ArrowBuf allocateOffsetBuffer(final long size) { + ArrowBuf offsetBuffer = allocator.buffer(size); offsetBuffer.readerIndex(0); initOffsetBuffer(); + return offsetBuffer; } /* allocate validity buffer */ @@ -760,7 +789,7 @@ private void splitAndTransferOffsetBuffer(int startIndex, int length, BaseLargeV final long start = getStartOffset(startIndex); final long end = getStartOffset(startIndex + length); final long dataLength = end - start; - target.allocateOffsetBuffer((long) (length + 1) * OFFSET_WIDTH); + target.offsetBuffer = target.allocateOffsetBuffer((long) (length + 1) * OFFSET_WIDTH); for (int i = 0; i < length + 1; i++) { final long relativeSourceOffset = getStartOffset(startIndex + i) - start; target.offsetBuffer.setLong((long) i * OFFSET_WIDTH, relativeSourceOffset); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java index 4cf495a349f02..6b82dd7729a6c 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java @@ -355,6 +355,34 @@ public List getFieldBuffers() { return result; } + /** + * Export the buffers of the fields for C Data Interface. This method traverse the buffers and + * export buffer and buffer's memory address into a list of buffers and a pointer to the list of buffers. + */ + @Override + public void exportCDataBuffers(List buffers, ArrowBuf buffersPtr, long nullValue) { + // before flight/IPC, we must bring the vector to a consistent state. + // this is because, it is possible that the offset buffers of some trailing values + // are not updated. this may cause some data in the data buffer being lost. + // for details, please see TestValueVector#testUnloadVariableWidthVector. + fillHoles(valueCount); + + exportBuffer(validityBuffer, buffers, buffersPtr, nullValue, true); + + if (offsetBuffer.capacity() == 0) { + // Empty offset buffer is allowed for historical reason. + // To export it through C Data interface, we need to allocate a buffer with one offset. + // We set `retain = false` to explicitly not increase the ref count for the exported buffer. + // The ref count of the newly created buffer (i.e., 1) already represents the usage + // at imported side. + exportBuffer(allocateOffsetBuffer(OFFSET_WIDTH), buffers, buffersPtr, nullValue, false); + } else { + exportBuffer(offsetBuffer, buffers, buffersPtr, nullValue, true); + } + + exportBuffer(valueBuffer, buffers, buffersPtr, nullValue, true); + } + /** * Set the reader and writer indexes for the inner buffers. */ @@ -476,11 +504,12 @@ private void allocateBytes(final long valueBufferSize, final int valueCount) { } /* allocate offset buffer */ - private void allocateOffsetBuffer(final long size) { + private ArrowBuf allocateOffsetBuffer(final long size) { final int curSize = (int) size; - offsetBuffer = allocator.buffer(curSize); + ArrowBuf offsetBuffer = allocator.buffer(curSize); offsetBuffer.readerIndex(0); initOffsetBuffer(); + return offsetBuffer; } /* allocate validity buffer */ @@ -805,7 +834,7 @@ private void splitAndTransferOffsetBuffer(int startIndex, int length, BaseVariab (1 + length) * ((long) OFFSET_WIDTH)); target.offsetBuffer = transferBuffer(slicedOffsetBuffer, target.allocator); } else { - target.allocateOffsetBuffer((long) (length + 1) * OFFSET_WIDTH); + target.offsetBuffer = target.allocateOffsetBuffer((long) (length + 1) * OFFSET_WIDTH); for (int i = 0; i < length + 1; i++) { final int relativeSourceOffset = getStartOffset(startIndex + i) - start; target.offsetBuffer.setInt((long) i * OFFSET_WIDTH, relativeSourceOffset); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java index 299828f6d9d08..04229563bcc67 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java @@ -60,6 +60,47 @@ public interface FieldVector extends ValueVector { */ List getFieldBuffers(); + /** + * Export a given buffer and its memory address into a list of buffers and a pointer to the list of buffers. + * + * @param buffer the buffer to export + * @param buffers the list of buffers + * @param buffersPtr the pointer to the list of buffers + * @param nullValue the null value to use for null buffer + * @param retain whether to retain the buffer when exporting + */ + default void exportBuffer( + ArrowBuf buffer, + List buffers, + ArrowBuf buffersPtr, + long nullValue, + boolean retain) { + if (buffer != null) { + if (retain) { + buffer.getReferenceManager().retain(); + } + buffersPtr.writeLong(buffer.memoryAddress()); + } else { + buffersPtr.writeLong(nullValue); + } + buffers.add(buffer); + } + + /** + * Export the buffers of the fields for C Data Interface. This method traverse the buffers and + * export buffer and buffer's memory address into a list of buffers and a pointer to the list of buffers. + * + * By default, when exporting a buffer, it will increase ref count for exported buffer that counts + * the usage at imported side. + */ + default void exportCDataBuffers(List buffers, ArrowBuf buffersPtr, long nullValue) { + List fieldBuffers = getFieldBuffers(); + + for (ArrowBuf arrowBuf : fieldBuffers) { + exportBuffer(arrowBuf, buffers, buffersPtr, nullValue, true); + } + } + /** * Get the inner vectors. * diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java index 8ba2e48dc2fa3..7906d90c2fff0 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java @@ -83,7 +83,7 @@ public String getName() { public boolean allocateNewSafe() { boolean dataAlloc = false; try { - allocateOffsetBuffer(offsetAllocationSizeInBytes); + offsetBuffer = allocateOffsetBuffer(offsetAllocationSizeInBytes); dataAlloc = vector.allocateNewSafe(); } catch (Exception e) { e.printStackTrace(); @@ -97,12 +97,13 @@ public boolean allocateNewSafe() { return dataAlloc; } - protected void allocateOffsetBuffer(final long size) { + protected ArrowBuf allocateOffsetBuffer(final long size) { final int curSize = (int) size; - offsetBuffer = allocator.buffer(curSize); + ArrowBuf offsetBuffer = allocator.buffer(curSize); offsetBuffer.readerIndex(0); offsetAllocationSizeInBytes = curSize; offsetBuffer.setZero(0, offsetBuffer.capacity()); + return offsetBuffer; } @Override diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/LargeListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/LargeListVector.java index b934cbd81db16..b29b72ad2b1a0 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/LargeListVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/LargeListVector.java @@ -287,6 +287,26 @@ public List getFieldBuffers() { return result; } + /** + * Export the buffers of the fields for C Data Interface. This method traverse the buffers and + * export buffer and buffer's memory address into a list of buffers and a pointer to the list of buffers. + */ + @Override + public void exportCDataBuffers(List buffers, ArrowBuf buffersPtr, long nullValue) { + exportBuffer(validityBuffer, buffers, buffersPtr, nullValue, true); + + if (offsetBuffer.capacity() == 0) { + // Empty offset buffer is allowed for historical reason. + // To export it through C Data interface, we need to allocate a buffer with one offset. + // We set `retain = false` to explicitly not increase the ref count for the exported buffer. + // The ref count of the newly created buffer (i.e., 1) already represents the usage + // at imported side. + exportBuffer(allocateOffsetBuffer(OFFSET_WIDTH), buffers, buffersPtr, nullValue, false); + } else { + exportBuffer(offsetBuffer, buffers, buffersPtr, nullValue, true); + } + } + /** * Set the reader and writer indexes for the inner buffers. */ @@ -343,7 +363,7 @@ public boolean allocateNewSafe() { /* allocate offset and data buffer */ boolean dataAlloc = false; try { - allocateOffsetBuffer(offsetAllocationSizeInBytes); + offsetBuffer = allocateOffsetBuffer(offsetAllocationSizeInBytes); dataAlloc = vector.allocateNewSafe(); } catch (Exception e) { e.printStackTrace(); @@ -371,11 +391,12 @@ private void allocateValidityBuffer(final long size) { validityBuffer.setZero(0, validityBuffer.capacity()); } - protected void allocateOffsetBuffer(final long size) { - offsetBuffer = allocator.buffer(size); + protected ArrowBuf allocateOffsetBuffer(final long size) { + ArrowBuf offsetBuffer = allocator.buffer(size); offsetBuffer.readerIndex(0); offsetAllocationSizeInBytes = size; offsetBuffer.setZero(0, offsetBuffer.capacity()); + return offsetBuffer; } /** @@ -656,7 +677,7 @@ public void splitAndTransfer(int startIndex, int length) { final long startPoint = offsetBuffer.getLong((long) startIndex * OFFSET_WIDTH); final long sliceLength = offsetBuffer.getLong((long) (startIndex + length) * OFFSET_WIDTH) - startPoint; to.clear(); - to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH); + to.offsetBuffer = to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH); /* splitAndTransfer offset buffer */ for (int i = 0; i < length + 1; i++) { final long relativeOffset = offsetBuffer.getLong((long) (startIndex + i) * OFFSET_WIDTH) - startPoint; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java index 7df659e4cc9da..91275ae73d2c3 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java @@ -242,6 +242,26 @@ public List getFieldBuffers() { return result; } + /** + * Export the buffers of the fields for C Data Interface. This method traverse the buffers and + * export buffer and buffer's memory address into a list of buffers and a pointer to the list of buffers. + */ + @Override + public void exportCDataBuffers(List buffers, ArrowBuf buffersPtr, long nullValue) { + exportBuffer(validityBuffer, buffers, buffersPtr, nullValue, true); + + if (offsetBuffer.capacity() == 0) { + // Empty offset buffer is allowed for historical reason. + // To export it through C Data interface, we need to allocate a buffer with one offset. + // We set `retain = false` to explicitly not increase the ref count for the exported buffer. + // The ref count of the newly created buffer (i.e., 1) already represents the usage + // at imported side. + exportBuffer(allocateOffsetBuffer(OFFSET_WIDTH), buffers, buffersPtr, nullValue, false); + } else { + exportBuffer(offsetBuffer, buffers, buffersPtr, nullValue, true); + } + } + /** * Set the reader and writer indexes for the inner buffers. */ @@ -535,7 +555,7 @@ public void splitAndTransfer(int startIndex, int length) { final int startPoint = offsetBuffer.getInt(startIndex * OFFSET_WIDTH); final int sliceLength = offsetBuffer.getInt((startIndex + length) * OFFSET_WIDTH) - startPoint; to.clear(); - to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH); + to.offsetBuffer = to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH); /* splitAndTransfer offset buffer */ for (int i = 0; i < length + 1; i++) { final int relativeOffset = offsetBuffer.getInt((startIndex + i) * OFFSET_WIDTH) - startPoint; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java index e082b2f43be64..c49f138b64c6b 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java @@ -209,7 +209,7 @@ public void splitAndTransfer(int startIndex, int length) { final int startPoint = offsetBuffer.getInt(startIndex * OFFSET_WIDTH); final int sliceLength = offsetBuffer.getInt((startIndex + length) * OFFSET_WIDTH) - startPoint; to.clear(); - to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH); + to.offsetBuffer = to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH); /* splitAndTransfer offset buffer */ for (int i = 0; i < length + 1; i++) { final int relativeOffset = offsetBuffer.getInt((startIndex + i) * OFFSET_WIDTH) - startPoint;