Skip to content

Commit

Permalink
fix bitmap read and write
Browse files Browse the repository at this point in the history
  • Loading branch information
gnehil committed Jul 18, 2024
1 parent 3e745e7 commit 8149b3a
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,6 @@ public interface ConfigurationOptions {

int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50;

/**
* set types to ignore, split by comma
* e.g.
* "doris.ignore-type"="bitmap,hll"
*/
String DORIS_IGNORE_TYPE = "doris.ignore-type";

String DORIS_SINK_ENABLE_2PC = "doris.sink.enable-2pc";
boolean DORIS_SINK_ENABLE_2PC_DEFAULT = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,56 +17,21 @@

package org.apache.doris.spark.rest;

import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_BENODES;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_FENODES;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_FILTER_QUERY;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_REQUEST_AUTH_USER;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_MIN;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLE_IDENTIFIER;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_BENODES;
import static org.apache.doris.spark.util.ErrorMessages.CONNECT_FAILED_MESSAGE;
import static org.apache.doris.spark.util.ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE;
import static org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE;
import static org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.HashMap;
import java.util.Base64;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.json.JsonMapper;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.Settings;
import org.apache.doris.spark.cfg.SparkSettings;
import org.apache.doris.spark.exception.ConnectedFailedException;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.IllegalArgumentException;
import org.apache.doris.spark.exception.ShouldNeverHappenException;
Expand All @@ -76,23 +41,44 @@
import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.rest.models.Tablet;
import org.apache.doris.spark.sql.SchemaUtils;
import org.apache.doris.spark.util.HttpUtil;
import org.apache.doris.spark.util.URLs;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;

import com.google.common.annotations.VisibleForTesting;
import scala.collection.JavaConverters;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

/**
* Service for communicate with Doris FE.
Expand Down Expand Up @@ -238,18 +224,33 @@ public static Schema parseSchema(String response, Logger logger) throws DorisExc
* @throws DorisException throw when find partition failed
*/
public static List<PartitionDefinition> findPartitions(Settings cfg, Logger logger) throws DorisException {
String[] tableIdentifiers = parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger);
String sql = "select " + cfg.getProperty(DORIS_READ_FIELD, "*") +
" from `" + tableIdentifiers[0] + "`.`" + tableIdentifiers[1] + "`";
if (!StringUtils.isEmpty(cfg.getProperty(DORIS_FILTER_QUERY))) {
sql += " where " + cfg.getProperty(DORIS_FILTER_QUERY);
String[] tableIdentifiers =
parseIdentifier(cfg.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER), logger);
String readFields = cfg.getProperty(ConfigurationOptions.DORIS_READ_FIELD, "*");
if (!"*".equals(readFields)) {
String[] readFieldArr = readFields.split(",");
String[] bitmapColumns = cfg.getProperty(SchemaUtils.DORIS_BITMAP_COLUMNS(), "").split(",");
String[] hllColumns = cfg.getProperty(SchemaUtils.DORIS_HLL_COLUMNS(), "").split(",");
for (int i = 0; i < readFieldArr.length; i++) {
String readFieldName = readFieldArr[i].replaceAll("`", "");
if (ArrayUtils.contains(bitmapColumns, readFieldName)
|| ArrayUtils.contains(hllColumns, readFieldName)) {
readFieldArr[i] = "'READ UNSUPPORTED' AS " + readFieldArr[i];
}
}
readFields = StringUtils.join(readFieldArr, ",");
}
String sql = "select " + readFields + " from `" + tableIdentifiers[0] + "`.`" + tableIdentifiers[1] + "`";
if (!StringUtils.isEmpty(cfg.getProperty(ConfigurationOptions.DORIS_FILTER_QUERY))) {
sql += " where " + cfg.getProperty(ConfigurationOptions.DORIS_FILTER_QUERY);
}
logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);

String finalSql = sql;
String response = queryAllFrontends((SparkSettings) cfg, (frontend, enableHttps) -> {
HttpPost httpPost = new HttpPost(URLs.queryPlan(frontend, tableIdentifiers[0], tableIdentifiers[1], enableHttps));
String entity = "{\"sql\": \""+ finalSql +"\"}";
HttpPost httpPost =
new HttpPost(URLs.queryPlan(frontend, tableIdentifiers[0], tableIdentifiers[1], enableHttps));
String entity = "{\"sql\": \"" + finalSql + "\"}";
logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8);
stringEntity.setContentEncoding("UTF-8");
Expand Down Expand Up @@ -630,10 +631,11 @@ private static String queryAllFrontends(SparkSettings settings, BiFunction<Strin
String user = settings.getProperty(DORIS_REQUEST_AUTH_USER, "");
String password = settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "");
logger.info("Send request to Doris FE '{}' with user '{}'.", request.getURI(), user);
request.setHeader(HttpHeaders.AUTHORIZATION, "Basic " +
Base64.getEncoder().encodeToString((user + ":" + password).getBytes(StandardCharsets.UTF_8)));
request.setHeader(HttpHeaders.AUTHORIZATION, "Basic "
+ Base64.getEncoder().encodeToString((user + ":" + password).getBytes(StandardCharsets.UTF_8)));
CloseableHttpResponse response = client.execute(request);
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
StatusLine statusLine = response.getStatusLine();
if (statusLine.getStatusCode() == HttpStatus.SC_OK) {
String resStr = EntityUtils.toString(response.getEntity());
Map<String, Object> resMap = MAPPER.readValue(resStr,
new TypeReference<Map<String, Object>>() {
Expand All @@ -643,6 +645,8 @@ private static String queryAllFrontends(SparkSettings settings, BiFunction<Strin
}
return resStr;
}
logger.warn("Request for {} get a bad status, code: {}, msg: {}", request.getURI().toString(),
statusLine.getStatusCode(), statusLine.getReasonPhrase());
} catch (IOException e) {
logger.error("Doris FE node {} is unavailable, Request the next Doris FE node. Err: {}", frontend, e.getMessage());
}
Expand All @@ -652,4 +656,4 @@ private static String queryAllFrontends(SparkSettings settings, BiFunction<Strin
throw new DorisException(errMsg);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.doris.spark.serialization;

import org.apache.doris.sdk.thrift.TScanBatchResult;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.util.IPUtils;

import com.google.common.base.Preconditions;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BaseIntVector;
Expand All @@ -43,10 +48,6 @@
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.Types;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.doris.sdk.thrift.TScanBatchResult;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.util.IPUtils;
import org.apache.spark.sql.types.Decimal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -72,8 +73,6 @@
import java.util.NoSuchElementException;
import java.util.Objects;

import static org.apache.doris.spark.util.IPUtils.convertLongToIPv4Address;

/**
* row batch data container.
*/
Expand Down Expand Up @@ -157,8 +156,7 @@ public boolean hasNext() {

private void addValueToRow(int rowIndex, Object obj) {
if (rowIndex > rowCountInOneBatch) {
String errMsg = "Get row offset: " + rowIndex + " larger than row size: " +
rowCountInOneBatch;
String errMsg = "Get row offset: " + rowIndex + " larger than row size: " + rowCountInOneBatch;
logger.error(errMsg);
throw new NoSuchElementException(errMsg);
}
Expand Down Expand Up @@ -261,7 +259,8 @@ public void convertArrowToRowBatch() throws DorisException {
ipv4Vector = (UInt4Vector) curFieldVector;
}
for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
Object fieldValue = ipv4Vector.isNull(rowIndex) ? null : convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex));
Object fieldValue = ipv4Vector.isNull(rowIndex) ? null :
IPUtils.convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex));
addValueToRow(rowIndex, fieldValue);
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) exten
protected val openResult: TScanOpenResult = lockClient(_.openScanner(openParams))
protected val contextId: String = openResult.getContextId
protected val schema: Schema =
SchemaUtils.convertToSchema(openResult.getSelectedColumns)
SchemaUtils.convertToSchema(openResult.getSelectedColumns, settings)

private[this] val asyncThread: Thread = new Thread {
override def run(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ private[sql] class DorisRelation(
.map(filter => s"($filter)").mkString(" and ")
}

val bitmapColumnStr = cfg.getProperty(SchemaUtils.DORIS_BITMAP_COLUMNS, "")
paramWithScan += (SchemaUtils.DORIS_BITMAP_COLUMNS -> bitmapColumnStr)
val hllColumnStr = cfg.getProperty(SchemaUtils.DORIS_HLL_COLUMNS, "")
paramWithScan += (SchemaUtils.DORIS_HLL_COLUMNS -> hllColumnStr)

// required columns for column pruner
if (requiredColumns != null && requiredColumns.length > 0) {
paramWithScan += (ConfigurationOptions.DORIS_READ_FIELD ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ package org.apache.doris.spark.sql

import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.doris.sdk.thrift.TScanColumnDesc
import org.apache.doris.spark.cfg.ConfigurationOptions.{DORIS_IGNORE_TYPE, DORIS_READ_FIELD}
import org.apache.commons.lang3.StringUtils
import org.apache.doris.sdk.thrift.{TPrimitiveType, TScanColumnDesc}
import org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD
import org.apache.doris.spark.cfg.Settings
import org.apache.doris.spark.exception.DorisException
import org.apache.doris.spark.rest.RestService
Expand All @@ -38,6 +39,9 @@ private[spark] object SchemaUtils {
private val logger = LoggerFactory.getLogger(SchemaUtils.getClass.getSimpleName.stripSuffix("$"))
private val MAPPER = JsonMapper.builder().addModule(DefaultScalaModule).build()

val DORIS_BITMAP_COLUMNS = "doris.bitmap.columns"
val DORIS_HLL_COLUMNS = "doris.hll.columns"

/**
* discover Doris table schema from Doris FE.
*
Expand All @@ -46,9 +50,12 @@ private[spark] object SchemaUtils {
*/
def discoverSchema(cfg: Settings): StructType = {
val schema = discoverSchemaFromFe(cfg)
val bitmapColumns = schema.getProperties.filter(_.getType.equalsIgnoreCase("BITMAP")).map(_.getName).mkString(",")
cfg.setProperty(DORIS_BITMAP_COLUMNS, bitmapColumns)
val hllColumns = schema.getProperties.filter(_.getType.equalsIgnoreCase("HLL")).map(_.getName).mkString(",")
cfg.setProperty(DORIS_HLL_COLUMNS, hllColumns)
val dorisReadField = cfg.getProperty(DORIS_READ_FIELD)
val ignoreColumnType = cfg.getProperty(DORIS_IGNORE_TYPE)
convertToStruct(schema, dorisReadField, ignoreColumnType)
convertToStruct(schema, dorisReadField)
}

/**
Expand All @@ -67,20 +74,14 @@ private[spark] object SchemaUtils {
* @param schema inner schema
* @return Spark Catalyst StructType
*/
def convertToStruct(schema: Schema, dorisReadFields: String, ignoredTypes: String): StructType = {
def convertToStruct(schema: Schema, dorisReadFields: String): StructType = {
val fieldList = if (dorisReadFields != null && dorisReadFields.nonEmpty) {
dorisReadFields.split(",")
} else {
Array.empty[String]
}
val ignoredTypeList = if (ignoredTypes != null && ignoredTypes.nonEmpty) {
ignoredTypes.split(",").map(t => t.trim.toUpperCase)
} else {
Array.empty[String]
}
val fields = schema.getProperties
.filter(x => (fieldList.contains(x.getName) || fieldList.isEmpty)
&& !ignoredTypeList.contains(x.getType))
.filter(x => fieldList.contains(x.getName) || fieldList.isEmpty)
.map(f =>
DataTypes.createStructField(
f.getName,
Expand Down Expand Up @@ -132,8 +133,8 @@ private[spark] object SchemaUtils {
case "VARIANT" => DataTypes.StringType
case "IPV4" => DataTypes.StringType
case "IPV6" => DataTypes.StringType
case "HLL" =>
throw new DorisException("Unsupported type " + dorisType)
case "BITMAP" => DataTypes.StringType // Placeholder only, no support for reading
case "HLL" => DataTypes.StringType // Placeholder only, no support for reading
case _ =>
throw new DorisException("Unrecognized Doris type " + dorisType)
}
Expand All @@ -145,12 +146,39 @@ private[spark] object SchemaUtils {
* @param tscanColumnDescs Doris BE return schema
* @return inner schema struct
*/
def convertToSchema(tscanColumnDescs: Seq[TScanColumnDesc]): Schema = {
val schema = new Schema(tscanColumnDescs.length)
tscanColumnDescs.foreach(desc => schema.put(new Field(desc.getName, desc.getType.name, "", 0, 0, "")))
def convertToSchema(tscanColumnDescs: Seq[TScanColumnDesc], settings: Settings): Schema = {
val readColumns = settings.getProperty(DORIS_READ_FIELD).split(",").map(_.replaceAll("`", ""))
val bitmapColumns = settings.getProperty(DORIS_BITMAP_COLUMNS, "").split(",")
val hllColumns = settings.getProperty(DORIS_HLL_COLUMNS, "").split(",")
val fieldList = fieldUnion(readColumns, bitmapColumns, hllColumns, tscanColumnDescs)
val schema = new Schema(fieldList.length)
fieldList.foreach(schema.put)
schema
}

private def fieldUnion(readColumns: Array[String], bitmapColumns: Array[String], hllColumns: Array[String],
tScanColumnDescSeq: Seq[TScanColumnDesc]): List[Field] = {
val fieldList = mutable.Buffer[Field]()
var rcIdx = 0;
var tsdIdx = 0;
while (rcIdx < readColumns.length || tsdIdx < tScanColumnDescSeq.length) {
if (rcIdx < readColumns.length) {
if (StringUtils.equals(readColumns(rcIdx), tScanColumnDescSeq(tsdIdx).getName)) {
fieldList += new Field(tScanColumnDescSeq(tsdIdx).getName, tScanColumnDescSeq(tsdIdx).getType.name, "", 0, 0, "")
rcIdx += 1
tsdIdx += 1
} else if (bitmapColumns.contains(readColumns(rcIdx)) || hllColumns.contains(readColumns(rcIdx))) {
fieldList += new Field(readColumns(rcIdx), TPrimitiveType.VARCHAR.name, "", 0, 0, "")
rcIdx += 1
}
} else {
fieldList += new Field(tScanColumnDescSeq(tsdIdx).getName, tScanColumnDescSeq(tsdIdx).getType.name, "", 0, 0, "")
tsdIdx += 1
}
}
fieldList.toList
}

def rowColumnValue(row: SpecializedGetters, ordinal: Int, dataType: DataType): Any = {

if (row.isNullAt(ordinal)) null
Expand Down
Loading

0 comments on commit 8149b3a

Please sign in to comment.