Skip to content

Commit

Permalink
fix columnar batch (pingcap#1559)
Browse files Browse the repository at this point in the history
  • Loading branch information
birdstorm authored Aug 19, 2020
1 parent 9e88b0c commit 5451b76
Show file tree
Hide file tree
Showing 13 changed files with 320 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@

public class TiColumnVectorAdapter extends ColumnVector {
private final TiColumnVector tiColumnVector;
private final ColumnVector offsets;

/** Sets up the data type of this column vector. */
public TiColumnVectorAdapter(TiColumnVector tiColumnVector) {
super(TypeMapping.toSparkType(tiColumnVector.dataType()));
this.tiColumnVector = tiColumnVector;
if (tiColumnVector.getOffset() == null) {
this.offsets = null;
} else {
this.offsets = new TiColumnVectorAdapter(tiColumnVector.getOffset());
}
}

/**
Expand Down Expand Up @@ -122,7 +128,13 @@ public double getDouble(int rowId) {
*/
@Override
public ColumnarArray getArray(int rowId) {
throw new UnsupportedOperationException("TiColumnVectorAdapter is not supported this method");
if (tiColumnVector.isNullAt(rowId)) {
return null;
}
int index = rowId * 8;
int start = offsets.getInt(index);
int end = offsets.getInt(index + 1);
return new ColumnarArray(this, start, end - start);
}

/**
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/com/pingcap/tikv/datatype/TypeMapping.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static com.pingcap.tikv.types.MySQLType.TypeLonglong;

import com.pingcap.tikv.types.AbstractDateTimeType;
import com.pingcap.tikv.types.ArrayType;
import com.pingcap.tikv.types.BytesType;
import com.pingcap.tikv.types.DataType;
import com.pingcap.tikv.types.DateType;
Expand Down Expand Up @@ -96,6 +97,10 @@ public static org.apache.spark.sql.types.DataType toSparkType(DataType type) {
return DataTypes.LongType;
}

if (type instanceof ArrayType) {
return DataTypes.createArrayType(DataTypes.LongType);
}

throw new UnsupportedOperationException(
String.format("found unsupported type %s", type.getClass().getCanonicalName()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,10 @@ class TiBatchWrite(
logger.info(s"sampleSize=$sampleSize")

val sampleData =
rdd.map(_._1).sample(withReplacement = false, sampleSize.toDouble / count).collect()
rdd
.map(_._1)
.sample(withReplacement = false, fraction = sampleSize.toDouble / count)
.collect()
logger.info(s"sampleData size=${sampleData.length}")

val sortedSampleData = sampleData.sorted(new Ordering[SerializableKey] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ case class ColumnarRegionTaskExec(
val batchSize = tiConf.getIndexScanBatchSize
val downgradeThreshold = tiConf.getDowngradeThreshold

iter.flatMap { row =>
def computeWithRowIterator(row: InternalRow): Iterator[InternalRow] = {
val handles = row.getArray(1).toLongArray()
val handleIterator: util.Iterator[Long] = handles.iterator
var taskCount = 0
Expand Down Expand Up @@ -376,6 +376,20 @@ case class ColumnarRegionTaskExec(
}
}.asInstanceOf[Iterator[InternalRow]]
}

iter match {
case batch: Iterator[ColumnarBatch] =>
batch.asInstanceOf[Iterator[ColumnarBatch]].flatMap { it =>
it.rowIterator().flatMap { row =>
computeWithRowIterator(row)
}
}
case _: Iterator[InternalRow] =>
iter.flatMap { row =>
computeWithRowIterator(row)
}
}

}

override protected def doExecute(): RDD[InternalRow] = {
Expand Down
85 changes: 73 additions & 12 deletions core/src/main/scala/org/apache/spark/sql/tispark/TiHandleRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,31 @@

package org.apache.spark.sql.tispark

import java.nio.ByteBuffer

import com.pingcap.tikv.codec.CodecDataOutput
import com.pingcap.tikv.columnar.{
TiChunk,
TiChunkColumnVector,
TiColumnVector,
TiColumnarBatchHelper
}
import com.pingcap.tikv.meta.TiDAGRequest
import com.pingcap.tikv.types.{ArrayType, DataType, IntegerType}
import com.pingcap.tikv.util.RangeSplitter
import com.pingcap.tikv.{TiConfiguration, TiSession}
import com.pingcap.tispark.utils.TiUtil
import com.pingcap.tispark.{TiPartition, TiTableReference}
import gnu.trove.list.array.TLongArrayList
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.sql.SparkSession
import org.apache.spark.{Partition, TaskContext, TaskKilledException}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
* RDD used for retrieving handles from TiKV. Result is arranged as
Expand All @@ -53,15 +65,15 @@ class TiHandleRDD(
outputTypes.map(CatalystTypeConverters.createToCatalystConverter)

override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] =
new Iterator[InternalRow] {
new Iterator[ColumnarBatch] {
checkTimezone()

private val tiPartition = split.asInstanceOf[TiPartition]
private val session = TiSession.getInstance(tiConf)
private val snapshot = session.createSnapshot(dagRequest.getStartTs)
private[this] val tasks = tiPartition.tasks

private val handleIterator = snapshot.indexHandleRead(dagRequest, tasks)
private val handleIterator = snapshot.indexHandleReadRow(dagRequest, tasks)
private val regionManager = session.getRegionManager
private lazy val handleList = {
val lst = new TLongArrayList()
Expand Down Expand Up @@ -92,14 +104,63 @@ class TiHandleRDD(
iterator.hasNext
}

override def next(): InternalRow = {
val next = iterator.next
val regionId = next._1
val handleList = next._2
override def next(): ColumnarBatch = {
var numRows = 0
val batchSize = 20480
val cdi0 = new CodecDataOutput()
val cdi1 = new CodecDataOutput()
var offsets = new mutable.ArrayBuffer[Long]
var curOffset = 0L
while (hasNext && numRows < batchSize) {
val next = iterator.next
val regionId = next._1
val handleList = next._2
if (!handleList.isEmpty) {
// Returns RegionId:[handle1, handle2, handle3...] K-V pair
// val sparkRow = Row.apply(regionId, handleList.toArray())
// TiUtil.rowToInternalRow(sparkRow, outputTypes, converters)
cdi0.writeLong(regionId)
cdi1.writeLong(handleList.size())
for (i <- 0 until handleList.size()) {
cdi1.writeLong(handleList.get(i))
}
offsets += curOffset
curOffset += handleList.size().toLong
numRows += 1
}
}
offsets += curOffset

val buffer0 = ByteBuffer.wrap(cdi0.toBytes)
val buffer1 = ByteBuffer.wrap(cdi1.toBytes)

val nullBitMaps = DataType.setAllNotNullBitMapWithNumRows(numRows)

val regionIdType = IntegerType.BIGINT
val handleListType = ArrayType.ARRAY

// Returns RegionId:[handle1, handle2, handle3...] K-V pair
val sparkRow = Row.apply(regionId, handleList.toArray())
TiUtil.rowToInternalRow(sparkRow, outputTypes, converters)
val childColumnVectors = new ArrayBuffer[TiColumnVector]
childColumnVectors +=
new TiChunkColumnVector(
regionIdType,
regionIdType.getFixLen,
numRows,
0,
nullBitMaps,
null,
buffer0)
childColumnVectors +=
// any type will do? actual type is array[Long]
new TiChunkColumnVector(
handleListType,
8,
curOffset.toInt,
0,
nullBitMaps,
offsets.toArray,
buffer1)
val chunk = new TiChunk(childColumnVectors.toArray)
TiColumnarBatchHelper.createColumnarBatch(chunk)
}
}
}.asInstanceOf[Iterator[InternalRow]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ class BaseDataSourceTest(val table: String, val database: String = "tispark_test

if (!compResult(jdbcResult, tidbResult)) {
logger.error(s"""Failed on $tblName\n
|DataSourceAPI result: ${listToString(jdbcResult)}\n
|TiDB via JDBC result: ${listToString(tidbResult)}""".stripMargin)
|TiDB via JDBC result: ${listToString(jdbcResult)}\n
|DataSourceAPI result: ${listToString(tidbResult)}""".stripMargin)
fail()
}
}
Expand Down
7 changes: 6 additions & 1 deletion tikv-client/src/main/java/com/pingcap/tikv/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ private Iterator<Row> tableReadRow(TiDAGRequest dagRequest, List<RegionTask> tas
}
}

public Iterator<TiChunk> indexHandleReadChunk(
TiDAGRequest dagRequest, List<RegionTask> tasks, int numOfRows) {
return getTiChunkIterator(dagRequest, tasks, getSession(), numOfRows);
}

/**
* Below is lower level API for env like Spark which already did key range split Perform handle
* scan
Expand All @@ -139,7 +144,7 @@ private Iterator<Row> tableReadRow(TiDAGRequest dagRequest, List<RegionTask> tas
* @param tasks RegionTask of the coprocessor request to send
* @return Row iterator to iterate over resulting rows
*/
public Iterator<Long> indexHandleRead(TiDAGRequest dagRequest, List<RegionTask> tasks) {
public Iterator<Long> indexHandleReadRow(TiDAGRequest dagRequest, List<RegionTask> tasks) {
return getHandleIterator(dagRequest, tasks, session);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ public BatchedTiChunkColumnVector(List<TiChunkColumnVector> child, int numOfRows
}
}

public final String typeName() {
return dataType().getType().name();
}

// TODO: once we switch off_heap mode, we need control memory access pattern.
public void free() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

import com.google.common.primitives.UnsignedLong;
import com.pingcap.tikv.codec.CodecDataInput;
import com.pingcap.tikv.codec.CodecDataOutput;
import com.pingcap.tikv.codec.MyDecimal;
import com.pingcap.tikv.types.AbstractDateTimeType;
import com.pingcap.tikv.types.ArrayType;
import com.pingcap.tikv.types.BitType;
import com.pingcap.tikv.types.DataType;
import com.pingcap.tikv.types.DateTimeType;
Expand Down Expand Up @@ -55,16 +57,33 @@ public TiChunkColumnVector(
byte[] nullBitMaps,
long[] offsets,
ByteBuffer data) {
super(dataType, numOfRows);
super(dataType, numOfRows, buildColumnVectorFromOffsets(numOfRows, offsets));
this.fixLength = fixLength;
this.numOfNulls = numOfNulls;
this.nullBitMaps = nullBitMaps;
this.data = data;
this.offsets = offsets;
}

public final String typeName() {
return dataType().getType().name();
private static TiChunkColumnVector buildColumnVectorFromOffsets(int numOfRows, long[] offsets) {
if (offsets == null) {
return null;
} else {
DataType type = IntegerType.BIGINT;
int fixLength = type.getFixLen();
CodecDataOutput cdo = new CodecDataOutput();
for (long offset : offsets) {
cdo.writeLong(offset);
}
return new TiChunkColumnVector(
type,
fixLength,
numOfRows,
0,
DataType.setAllNotNullBitMapWithNumRows(numOfRows),
null,
ByteBuffer.wrap(cdo.toBytes()));
}
}

// TODO: once we switch off_heap mode, we need control memory access pattern.
Expand Down Expand Up @@ -176,6 +195,8 @@ public long getLong(int rowId) {
return getTime(rowId);
} else if (type instanceof TimeType) {
return data.getLong(rowId * fixLength);
} else if (type instanceof ArrayType) {
return data.getLong(rowId * fixLength + fixLength);
}

throw new UnsupportedOperationException("only IntegerType and Time related are supported.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,22 @@
public abstract class TiColumnVector implements AutoCloseable {

private final int numOfRows;
private final TiColumnVector offsets;
/** Data type for this column. */
protected DataType type;

/** Sets up the data type of this column vector. */
protected TiColumnVector(DataType type, int numOfRows) {
this.type = type;
this.numOfRows = numOfRows;
this.offsets = null;
}

/** Sets up the data type of this column vector. */
protected TiColumnVector(DataType type, int numOfRows, TiColumnVector offsets) {
this.type = type;
this.numOfRows = numOfRows;
this.offsets = offsets;
}

/** Returns the data type of this column vector. */
Expand Down Expand Up @@ -218,4 +227,8 @@ public double[] getDoubles(int rowId, int count) {
public int numOfRows() {
return numOfRows;
}

public TiColumnVector getOffset() {
return offsets;
}
}
Loading

0 comments on commit 5451b76

Please sign in to comment.