Skip to content
This repository has been archived by the owner on Nov 15, 2024. It is now read-only.

Commit

Permalink
[SPARK-22143][SQL][BRANCH-2.2] Fix memory leak in OffHeapColumnVector
Browse files Browse the repository at this point in the history
This is a backport of apache@02bb068.

## What changes were proposed in this pull request?
`WriteableColumnVector` does not close its child column vectors. This can create memory leaks for `OffHeapColumnVector` where we do not clean up the memory allocated by a vectors children. This can be especially bad for string columns (which uses a child byte column vector).

## How was this patch tested?
I have updated the existing tests to always use both on-heap and off-heap vectors. Testing and diagnosis was done locally.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes apache#19378 from hvanhovell/SPARK-22143-2.2.
  • Loading branch information
hvanhovell authored and MatthewRBruce committed Jul 31, 2018
1 parent 632dafc commit 0e3f910
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,21 @@ public void reset() {
* Cleans up memory for this column. The column is not usable after this.
* TODO: this should probably have ref-counted semantics.
*/
public abstract void close();
public void close() {
if (childColumns != null) {
for (int i = 0; i < childColumns.length; i++) {
if (childColumns[i] != null) {
childColumns[i].close();
childColumns[i] = null;
}
}
}
if (dictionaryIds != null) {
dictionaryIds.close();
dictionaryIds = null;
}
dictionary = null;
}

public void reserve(int requiredCapacity) {
if (requiredCapacity > capacity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public long nullsNativeAddress() {

@Override
public void close() {
super.close();
Platform.freeMemory(nulls);
Platform.freeMemory(data);
Platform.freeMemory(lengthData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ public long nullsNativeAddress() {

@Override
public void close() {
super.close();
nulls = null;
byteData = null;
shortData = null;
intData = null;
longData = null;
floatData = null;
doubleData = null;
arrayLengths = null;
arrayOffsets = null;
}

//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,24 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {

var testVector: ColumnVector = _

private def allocate(capacity: Int, dt: DataType): ColumnVector = {
new OnHeapColumnVector(capacity, dt)
private def withVector(
vector: ColumnVector)(
block: ColumnVector => Unit): Unit = {
try block(vector) finally vector.close()
}

override def afterEach(): Unit = {
testVector.close()
private def testVectors(
name: String,
size: Int,
dt: DataType)(
block: ColumnVector => Unit): Unit = {
test(name) {
withVector(new OnHeapColumnVector(size, dt))(block)
withVector(new OffHeapColumnVector(size, dt))(block)
}
}

test("boolean") {
testVector = allocate(10, BooleanType)
testVectors("boolean", 10, BooleanType) { testVector =>
(0 until 10).foreach { i =>
testVector.appendBoolean(i % 2 == 0)
}
Expand All @@ -49,8 +54,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
}
}

test("byte") {
testVector = allocate(10, ByteType)
testVectors("byte", 10, ByteType) { testVector =>
(0 until 10).foreach { i =>
testVector.appendByte(i.toByte)
}
Expand All @@ -62,8 +66,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
}
}

test("short") {
testVector = allocate(10, ShortType)
testVectors("short", 10, ShortType) { testVector =>
(0 until 10).foreach { i =>
testVector.appendShort(i.toShort)
}
Expand All @@ -75,8 +78,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
}
}

test("int") {
testVector = allocate(10, IntegerType)
testVectors("int", 10, IntegerType) { testVector =>
(0 until 10).foreach { i =>
testVector.appendInt(i)
}
Expand All @@ -88,8 +90,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
}
}

test("long") {
testVector = allocate(10, LongType)
testVectors("long", 10, LongType) { testVector =>
(0 until 10).foreach { i =>
testVector.appendLong(i)
}
Expand All @@ -101,8 +102,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
}
}

test("float") {
testVector = allocate(10, FloatType)
testVectors("float", 10, FloatType) { testVector =>
(0 until 10).foreach { i =>
testVector.appendFloat(i.toFloat)
}
Expand All @@ -114,8 +114,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
}
}

test("double") {
testVector = allocate(10, DoubleType)
testVectors("double", 10, DoubleType) { testVector =>
(0 until 10).foreach { i =>
testVector.appendDouble(i.toDouble)
}
Expand All @@ -127,8 +126,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
}
}

test("string") {
testVector = allocate(10, StringType)
testVectors("string", 10, StringType) { testVector =>
(0 until 10).map { i =>
val utf8 = s"str$i".getBytes("utf8")
testVector.appendByteArray(utf8, 0, utf8.length)
Expand All @@ -141,8 +139,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
}
}

test("binary") {
testVector = allocate(10, BinaryType)
testVectors("binary", 10, BinaryType) { testVector =>
(0 until 10).map { i =>
val utf8 = s"str$i".getBytes("utf8")
testVector.appendByteArray(utf8, 0, utf8.length)
Expand All @@ -156,9 +153,8 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
}
}

test("array") {
val arrayType = ArrayType(IntegerType, true)
testVector = allocate(10, arrayType)
val arrayType: ArrayType = ArrayType(IntegerType, containsNull = true)
testVectors("array", 10, arrayType) { testVector =>

val data = testVector.arrayData()
var i = 0
Expand All @@ -181,9 +177,8 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(array.getArray(3).asInstanceOf[ArrayData].toIntArray() === Array(3, 4, 5))
}

test("struct") {
val schema = new StructType().add("int", IntegerType).add("double", DoubleType)
testVector = allocate(10, schema)
val structType: StructType = new StructType().add("int", IntegerType).add("double", DoubleType)
testVectors("struct", 10, structType) { testVector =>
val c1 = testVector.getChildColumn(0)
val c2 = testVector.getChildColumn(1)
c1.putInt(0, 123)
Expand All @@ -200,28 +195,27 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
}

test("[SPARK-22092] off-heap column vector reallocation corrupts array data") {
val arrayType = ArrayType(IntegerType, true)
testVector = new OffHeapColumnVector(8, arrayType)
withVector(new OffHeapColumnVector(8, arrayType)) { testVector =>
val data = testVector.arrayData()
(0 until 8).foreach(i => data.putInt(i, i))
(0 until 8).foreach(i => testVector.putArray(i, i, 1))

val data = testVector.arrayData()
(0 until 8).foreach(i => data.putInt(i, i))
(0 until 8).foreach(i => testVector.putArray(i, i, 1))
// Increase vector's capacity and reallocate the data to new bigger buffers.
testVector.reserve(16)

// Increase vector's capacity and reallocate the data to new bigger buffers.
testVector.reserve(16)

// Check that none of the values got lost/overwritten.
val array = new ColumnVector.Array(testVector)
(0 until 8).foreach { i =>
assert(array.getArray(i).toIntArray() === Array(i))
// Check that none of the values got lost/overwritten.
val array = new ColumnVector.Array(testVector)
(0 until 8).foreach { i =>
assert(array.getArray(i).toIntArray() === Array(i))
}
}
}

test("[SPARK-22092] off-heap column vector reallocation corrupts struct nullability") {
val structType = new StructType().add("int", IntegerType).add("double", DoubleType)
testVector = new OffHeapColumnVector(8, structType)
(0 until 8).foreach(i => if (i % 2 == 0) testVector.putNull(i) else testVector.putNotNull(i))
testVector.reserve(16)
(0 until 8).foreach(i => assert(testVector.isNullAt(i) == (i % 2 == 0)))
withVector(new OffHeapColumnVector(8, structType)) { testVector =>
(0 until 8).foreach(i => if (i % 2 == 0) testVector.putNull(i) else testVector.putNotNull(i))
testVector.reserve(16)
(0 until 8).foreach(i => assert(testVector.isNullAt(i) == (i % 2 == 0)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(v._1 == Platform.getByte(null, addr + v._2))
}
}
column.close()
}}
}

Expand Down Expand Up @@ -317,6 +318,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(v._1 == Platform.getLong(null, addr + 8 * v._2))
}
}
column.close()
}}
}

Expand Down Expand Up @@ -443,6 +445,7 @@ class ColumnarBatchSuite extends SparkFunSuite {

column.reset()
assert(column.arrayData().elementsAppended == 0)
column.close()
}}
}

Expand Down Expand Up @@ -498,6 +501,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
column.putArray(0, 0, array.length)
assert(ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]]
=== array)
column.close()
}}
}

Expand Down Expand Up @@ -528,6 +532,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
val s2 = column.getStruct(1)
assert(s2.getInt(0) == 456)
assert(s2.getDouble(1) == 5.67)
column.close()
}}
}

Expand Down

0 comments on commit 0e3f910

Please sign in to comment.