From 5ac0a8ee919b40a452111a3543c695003990ff0b Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Mon, 24 May 2021 11:04:54 +0800 Subject: [PATCH] Fix Index stuck issues (#122) --- .../oap/index/BTreeIndexRecordWriter.scala | 12 ++++++------ .../oap/statistics/PartByValueStatistics.scala | 2 +- .../oap/statistics/SampleBasedStatistics.scala | 2 +- .../oap/statistics/PartByValueStatisticsSuite.scala | 4 ++-- .../oap/statistics/SampleBasedStatisticsSuite.scala | 4 ++-- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/Plasma-based-cache/src/main/scala/org/apache/spark/sql/execution/datasources/oap/index/BTreeIndexRecordWriter.scala b/Plasma-based-cache/src/main/scala/org/apache/spark/sql/execution/datasources/oap/index/BTreeIndexRecordWriter.scala index bc92093af..030ad08ab 100644 --- a/Plasma-based-cache/src/main/scala/org/apache/spark/sql/execution/datasources/oap/index/BTreeIndexRecordWriter.scala +++ b/Plasma-based-cache/src/main/scala/org/apache/spark/sql/execution/datasources/oap/index/BTreeIndexRecordWriter.scala @@ -74,14 +74,14 @@ abstract class BTreeIndexRecordWriter( @transient private lazy val genericProjector = SafeProjection.create(keySchema) @transient protected lazy val nnkw = new NonNullKeyWriter(keySchema) - private val combiner: Int => Seq[Int] = Seq(_) - private val merger: (Seq[Int], Int) => Seq[Int] = _ :+ _ - private val mergeCombiner: (Seq[Int], Seq[Int]) => Seq[Int] = _ ++ _ + private val combiner: Int => ArrayBuffer[Int] = ArrayBuffer[Int](_) + private val merger: (ArrayBuffer[Int], Int) => ArrayBuffer[Int] = _ += _ + private val mergeCombiner: (ArrayBuffer[Int], ArrayBuffer[Int]) => ArrayBuffer[Int] = _ ++= _ private val aggregator = - new Aggregator[InternalRow, Int, Seq[Int]](combiner, merger, mergeCombiner) + new Aggregator[InternalRow, Int, ArrayBuffer[Int]](combiner, merger, mergeCombiner) private val externalSorter = { val taskContext = TaskContext.get() - val sorter = new OapExternalSorter[InternalRow, Int, Seq[Int]]( + val sorter = new OapExternalSorter[InternalRow, Int, ArrayBuffer[Int]]( taskContext, Some(aggregator), Some(ordering)) taskContext.addTaskCompletionListener[Unit](_ => sorter.stop()) sorter @@ -260,7 +260,7 @@ abstract class BTreeIndexRecordWriter( * @return BTreeNodeMetaData */ private def serializeNode( - uniqueKeys: Array[Product2[InternalRow, Seq[Int]]], + uniqueKeys: Array[Product2[InternalRow, ArrayBuffer[Int]]], initRowPos: Int, rowIdListWriter: IndexFileWriter, rowIdListBuffer: ByteArrayOutputStream): BTreeNodeMetaData = { diff --git a/Plasma-based-cache/src/main/scala/org/apache/spark/sql/execution/datasources/oap/statistics/PartByValueStatistics.scala b/Plasma-based-cache/src/main/scala/org/apache/spark/sql/execution/datasources/oap/statistics/PartByValueStatistics.scala index 92f90f91c..0512de127 100644 --- a/Plasma-based-cache/src/main/scala/org/apache/spark/sql/execution/datasources/oap/statistics/PartByValueStatistics.scala +++ b/Plasma-based-cache/src/main/scala/org/apache/spark/sql/execution/datasources/oap/statistics/PartByValueStatistics.scala @@ -265,7 +265,7 @@ private[oap] class PartByValueStatisticsWriter(schema: StructType, conf: Configu // This should provide the same function to get the metas as buildPartMeta(). // And this will be used when using the oapExternalSorter data - def buildMetas(keyArray: Array[Product2[Key, Seq[Int]]], isLast: Boolean): Unit = { + def buildMetas(keyArray: Array[Product2[Key, ArrayBuffer[Int]]], isLast: Boolean): Unit = { var kv: Product2[Key, Seq[Int]] = null if (keyArray != null && keyArray.size != 0) { keyArray.foreach( diff --git a/Plasma-based-cache/src/main/scala/org/apache/spark/sql/execution/datasources/oap/statistics/SampleBasedStatistics.scala b/Plasma-based-cache/src/main/scala/org/apache/spark/sql/execution/datasources/oap/statistics/SampleBasedStatistics.scala index 8fa104087..c0fda0f86 100644 --- a/Plasma-based-cache/src/main/scala/org/apache/spark/sql/execution/datasources/oap/statistics/SampleBasedStatistics.scala +++ b/Plasma-based-cache/src/main/scala/org/apache/spark/sql/execution/datasources/oap/statistics/SampleBasedStatistics.scala @@ -150,7 +150,7 @@ private[oap] class SampleBasedStatisticsWriter(schema: StructType, conf: Configu } } - def buildSampleArray(keyArray: Array[Product2[Key, Seq[Int]]], isLast: Boolean): Unit = { + def buildSampleArray(keyArray: Array[Product2[Key, ArrayBuffer[Int]]], isLast: Boolean): Unit = { keyArray.foreach( value => { value._2.foreach( diff --git a/Plasma-based-cache/src/test/scala/org/apache/spark/sql/execution/datasources/oap/statistics/PartByValueStatisticsSuite.scala b/Plasma-based-cache/src/test/scala/org/apache/spark/sql/execution/datasources/oap/statistics/PartByValueStatisticsSuite.scala index 02c9ce07a..dfc9f800b 100644 --- a/Plasma-based-cache/src/test/scala/org/apache/spark/sql/execution/datasources/oap/statistics/PartByValueStatisticsSuite.scala +++ b/Plasma-based-cache/src/test/scala/org/apache/spark/sql/execution/datasources/oap/statistics/PartByValueStatisticsSuite.scala @@ -180,7 +180,7 @@ class PartByValueStatisticsSuite extends StatisticsTest { val keys = (1 to 300).map(i => rowGen(i)).toArray val product2Keys = keys.map(v => (v, Seq(1))) - .asInstanceOf[Array[Product2[Key, Seq[Int]]]] + .asInstanceOf[Array[Product2[Key, ArrayBuffer[Int]]]] val testPartByValueWriter = new TestPartByValueWriter(schema) testPartByValueWriter.initParams(product2Keys.size) @@ -213,7 +213,7 @@ class PartByValueStatisticsSuite extends StatisticsTest { val keys = (1 to 300).map(i => rowGen(i)).toArray val product2Keys = keys.map(v => (v, Seq(1))) - .asInstanceOf[Array[Product2[Key, Seq[Int]]]] + .asInstanceOf[Array[Product2[Key, ArrayBuffer[Int]]]] val testPartByValueWriter = new TestPartByValueWriter(schema) testPartByValueWriter.initParams(product2Keys.size) diff --git a/Plasma-based-cache/src/test/scala/org/apache/spark/sql/execution/datasources/oap/statistics/SampleBasedStatisticsSuite.scala b/Plasma-based-cache/src/test/scala/org/apache/spark/sql/execution/datasources/oap/statistics/SampleBasedStatisticsSuite.scala index 458382a74..23685adc0 100644 --- a/Plasma-based-cache/src/test/scala/org/apache/spark/sql/execution/datasources/oap/statistics/SampleBasedStatisticsSuite.scala +++ b/Plasma-based-cache/src/test/scala/org/apache/spark/sql/execution/datasources/oap/statistics/SampleBasedStatisticsSuite.scala @@ -167,7 +167,7 @@ class SampleBasedStatisticsSuite extends StatisticsTest { val keys = (1 to 300).map(i => rowGen(i)).toArray val product2Keys = keys.map(v => (v, Seq(1))) - .asInstanceOf[Array[Product2[Key, Seq[Int]]]] + .asInstanceOf[Array[Product2[Key, ArrayBuffer[Int]]]] val testSampleWriter = new TestSampleWriter(schema) testSampleWriter.initParams(product2Keys.size) @@ -192,7 +192,7 @@ class SampleBasedStatisticsSuite extends StatisticsTest { val keys = (1 to 300).map(i => rowGen(i)).toArray val product2Keys = keys.map(v => (v, Seq(1))) - .asInstanceOf[Array[Product2[Key, Seq[Int]]]] + .asInstanceOf[Array[Product2[Key, ArrayBuffer[Int]]]] val testSampleWriter = new TestSampleWriter(schema) testSampleWriter.initParams(product2Keys.size)