diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index a0fea83f..61d4563a 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -93,13 +93,6 @@ public interface ConfigurationOptions { int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50; - /** - * set types to ignore, split by comma - * e.g. - * "doris.ignore-type"="bitmap,hll" - */ - String DORIS_IGNORE_TYPE = "doris.ignore-type"; - String DORIS_SINK_ENABLE_2PC = "doris.sink.enable-2pc"; boolean DORIS_SINK_ENABLE_2PC_DEFAULT = false; diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java index 412b6a8f..3f3516fb 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java @@ -17,56 +17,21 @@ package org.apache.doris.spark.rest; +import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_BENODES; import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_FENODES; -import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_FILTER_QUERY; -import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD; import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD; import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_REQUEST_AUTH_USER; import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE; import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT; import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_MIN; import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLE_IDENTIFIER; -import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_BENODES; -import static org.apache.doris.spark.util.ErrorMessages.CONNECT_FAILED_MESSAGE; import static org.apache.doris.spark.util.ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE; import static org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE; import static org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.PrintWriter; -import java.io.Serializable; -import java.net.HttpURLConnection; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.HashMap; -import java.util.Base64; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.ArrayList; -import java.util.Set; -import java.util.HashSet; -import java.util.function.BiFunction; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.json.JsonMapper; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.doris.spark.cfg.ConfigurationOptions; import org.apache.doris.spark.cfg.Settings; import org.apache.doris.spark.cfg.SparkSettings; -import org.apache.doris.spark.exception.ConnectedFailedException; import org.apache.doris.spark.exception.DorisException; import org.apache.doris.spark.exception.IllegalArgumentException; import org.apache.doris.spark.exception.ShouldNeverHappenException; @@ -76,23 +41,44 @@ import org.apache.doris.spark.rest.models.QueryPlan; import org.apache.doris.spark.rest.models.Schema; import org.apache.doris.spark.rest.models.Tablet; +import org.apache.doris.spark.sql.SchemaUtils; import org.apache.doris.spark.util.HttpUtil; import org.apache.doris.spark.util.URLs; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHeaders; import org.apache.http.HttpStatus; -import org.apache.http.client.config.RequestConfig; +import org.apache.http.StatusLine; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.methods.HttpRequestBase; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; -import com.google.common.annotations.VisibleForTesting; -import scala.collection.JavaConverters; +import java.io.IOException; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.stream.Collectors; /** * Service for communicate with Doris FE. @@ -238,18 +224,33 @@ public static Schema parseSchema(String response, Logger logger) throws DorisExc * @throws DorisException throw when find partition failed */ public static List findPartitions(Settings cfg, Logger logger) throws DorisException { - String[] tableIdentifiers = parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger); - String sql = "select " + cfg.getProperty(DORIS_READ_FIELD, "*") + - " from `" + tableIdentifiers[0] + "`.`" + tableIdentifiers[1] + "`"; - if (!StringUtils.isEmpty(cfg.getProperty(DORIS_FILTER_QUERY))) { - sql += " where " + cfg.getProperty(DORIS_FILTER_QUERY); + String[] tableIdentifiers = + parseIdentifier(cfg.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER), logger); + String readFields = cfg.getProperty(ConfigurationOptions.DORIS_READ_FIELD, "*"); + if (!"*".equals(readFields)) { + String[] readFieldArr = readFields.split(","); + String[] bitmapColumns = cfg.getProperty(SchemaUtils.DORIS_BITMAP_COLUMNS(), "").split(","); + String[] hllColumns = cfg.getProperty(SchemaUtils.DORIS_HLL_COLUMNS(), "").split(","); + for (int i = 0; i < readFieldArr.length; i++) { + String readFieldName = readFieldArr[i].replaceAll("`", ""); + if (ArrayUtils.contains(bitmapColumns, readFieldName) + || ArrayUtils.contains(hllColumns, readFieldName)) { + readFieldArr[i] = "'READ UNSUPPORTED' AS " + readFieldArr[i]; + } + } + readFields = StringUtils.join(readFieldArr, ","); + } + String sql = "select " + readFields + " from `" + tableIdentifiers[0] + "`.`" + tableIdentifiers[1] + "`"; + if (!StringUtils.isEmpty(cfg.getProperty(ConfigurationOptions.DORIS_FILTER_QUERY))) { + sql += " where " + cfg.getProperty(ConfigurationOptions.DORIS_FILTER_QUERY); } logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql); String finalSql = sql; String response = queryAllFrontends((SparkSettings) cfg, (frontend, enableHttps) -> { - HttpPost httpPost = new HttpPost(URLs.queryPlan(frontend, tableIdentifiers[0], tableIdentifiers[1], enableHttps)); - String entity = "{\"sql\": \""+ finalSql +"\"}"; + HttpPost httpPost = + new HttpPost(URLs.queryPlan(frontend, tableIdentifiers[0], tableIdentifiers[1], enableHttps)); + String entity = "{\"sql\": \"" + finalSql + "\"}"; logger.debug("Post body Sending to Doris FE is: '{}'.", entity); StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8); stringEntity.setContentEncoding("UTF-8"); @@ -630,10 +631,11 @@ private static String queryAllFrontends(SparkSettings settings, BiFunction resMap = MAPPER.readValue(resStr, new TypeReference>() { @@ -643,6 +645,8 @@ private static String queryAllFrontends(SparkSettings settings, BiFunction rowCountInOneBatch) { - String errMsg = "Get row offset: " + rowIndex + " larger than row size: " + - rowCountInOneBatch; + String errMsg = "Get row offset: " + rowIndex + " larger than row size: " + rowCountInOneBatch; logger.error(errMsg); throw new NoSuchElementException(errMsg); } @@ -261,7 +259,8 @@ public void convertArrowToRowBatch() throws DorisException { ipv4Vector = (UInt4Vector) curFieldVector; } for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { - Object fieldValue = ipv4Vector.isNull(rowIndex) ? null : convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex)); + Object fieldValue = ipv4Vector.isNull(rowIndex) ? null : + IPUtils.convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex)); addValueToRow(rowIndex, fieldValue); } break; diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala index 719b16be..f9124a65 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala @@ -138,7 +138,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) exten protected val openResult: TScanOpenResult = lockClient(_.openScanner(openParams)) protected val contextId: String = openResult.getContextId protected val schema: Schema = - SchemaUtils.convertToSchema(openResult.getSelectedColumns) + SchemaUtils.convertToSchema(openResult.getSelectedColumns, settings) private[this] val asyncThread: Thread = new Thread { override def run(): Unit = { diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala index db546cef..aa014ffe 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala @@ -70,6 +70,11 @@ private[sql] class DorisRelation( .map(filter => s"($filter)").mkString(" and ") } + val bitmapColumnStr = cfg.getProperty(SchemaUtils.DORIS_BITMAP_COLUMNS, "") + paramWithScan += (SchemaUtils.DORIS_BITMAP_COLUMNS -> bitmapColumnStr) + val hllColumnStr = cfg.getProperty(SchemaUtils.DORIS_HLL_COLUMNS, "") + paramWithScan += (SchemaUtils.DORIS_HLL_COLUMNS -> hllColumnStr) + // required columns for column pruner if (requiredColumns != null && requiredColumns.length > 0) { paramWithScan += (ConfigurationOptions.DORIS_READ_FIELD -> diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala index 76d231ae..937514ad 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala @@ -19,8 +19,9 @@ package org.apache.doris.spark.sql import com.fasterxml.jackson.databind.json.JsonMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule -import org.apache.doris.sdk.thrift.TScanColumnDesc -import org.apache.doris.spark.cfg.ConfigurationOptions.{DORIS_IGNORE_TYPE, DORIS_READ_FIELD} +import org.apache.commons.lang3.StringUtils +import org.apache.doris.sdk.thrift.{TPrimitiveType, TScanColumnDesc} +import org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD import org.apache.doris.spark.cfg.Settings import org.apache.doris.spark.exception.DorisException import org.apache.doris.spark.rest.RestService @@ -38,6 +39,9 @@ private[spark] object SchemaUtils { private val logger = LoggerFactory.getLogger(SchemaUtils.getClass.getSimpleName.stripSuffix("$")) private val MAPPER = JsonMapper.builder().addModule(DefaultScalaModule).build() + val DORIS_BITMAP_COLUMNS = "doris.bitmap.columns" + val DORIS_HLL_COLUMNS = "doris.hll.columns" + /** * discover Doris table schema from Doris FE. * @@ -46,9 +50,12 @@ private[spark] object SchemaUtils { */ def discoverSchema(cfg: Settings): StructType = { val schema = discoverSchemaFromFe(cfg) + val bitmapColumns = schema.getProperties.filter(_.getType.equalsIgnoreCase("BITMAP")).map(_.getName).mkString(",") + cfg.setProperty(DORIS_BITMAP_COLUMNS, bitmapColumns) + val hllColumns = schema.getProperties.filter(_.getType.equalsIgnoreCase("HLL")).map(_.getName).mkString(",") + cfg.setProperty(DORIS_HLL_COLUMNS, hllColumns) val dorisReadField = cfg.getProperty(DORIS_READ_FIELD) - val ignoreColumnType = cfg.getProperty(DORIS_IGNORE_TYPE) - convertToStruct(schema, dorisReadField, ignoreColumnType) + convertToStruct(schema, dorisReadField) } /** @@ -67,20 +74,14 @@ private[spark] object SchemaUtils { * @param schema inner schema * @return Spark Catalyst StructType */ - def convertToStruct(schema: Schema, dorisReadFields: String, ignoredTypes: String): StructType = { + def convertToStruct(schema: Schema, dorisReadFields: String): StructType = { val fieldList = if (dorisReadFields != null && dorisReadFields.nonEmpty) { dorisReadFields.split(",") } else { Array.empty[String] } - val ignoredTypeList = if (ignoredTypes != null && ignoredTypes.nonEmpty) { - ignoredTypes.split(",").map(t => t.trim.toUpperCase) - } else { - Array.empty[String] - } val fields = schema.getProperties - .filter(x => (fieldList.contains(x.getName) || fieldList.isEmpty) - && !ignoredTypeList.contains(x.getType)) + .filter(x => fieldList.contains(x.getName) || fieldList.isEmpty) .map(f => DataTypes.createStructField( f.getName, @@ -132,8 +133,8 @@ private[spark] object SchemaUtils { case "VARIANT" => DataTypes.StringType case "IPV4" => DataTypes.StringType case "IPV6" => DataTypes.StringType - case "HLL" => - throw new DorisException("Unsupported type " + dorisType) + case "BITMAP" => DataTypes.StringType // Placeholder only, no support for reading + case "HLL" => DataTypes.StringType // Placeholder only, no support for reading case _ => throw new DorisException("Unrecognized Doris type " + dorisType) } @@ -145,12 +146,39 @@ private[spark] object SchemaUtils { * @param tscanColumnDescs Doris BE return schema * @return inner schema struct */ - def convertToSchema(tscanColumnDescs: Seq[TScanColumnDesc]): Schema = { - val schema = new Schema(tscanColumnDescs.length) - tscanColumnDescs.foreach(desc => schema.put(new Field(desc.getName, desc.getType.name, "", 0, 0, ""))) + def convertToSchema(tscanColumnDescs: Seq[TScanColumnDesc], settings: Settings): Schema = { + val readColumns = settings.getProperty(DORIS_READ_FIELD).split(",").map(_.replaceAll("`", "")) + val bitmapColumns = settings.getProperty(DORIS_BITMAP_COLUMNS, "").split(",") + val hllColumns = settings.getProperty(DORIS_HLL_COLUMNS, "").split(",") + val fieldList = fieldUnion(readColumns, bitmapColumns, hllColumns, tscanColumnDescs) + val schema = new Schema(fieldList.length) + fieldList.foreach(schema.put) schema } + private def fieldUnion(readColumns: Array[String], bitmapColumns: Array[String], hllColumns: Array[String], + tScanColumnDescSeq: Seq[TScanColumnDesc]): List[Field] = { + val fieldList = mutable.Buffer[Field]() + var rcIdx = 0; + var tsdIdx = 0; + while (rcIdx < readColumns.length || tsdIdx < tScanColumnDescSeq.length) { + if (rcIdx < readColumns.length) { + if (StringUtils.equals(readColumns(rcIdx), tScanColumnDescSeq(tsdIdx).getName)) { + fieldList += new Field(tScanColumnDescSeq(tsdIdx).getName, tScanColumnDescSeq(tsdIdx).getType.name, "", 0, 0, "") + rcIdx += 1 + tsdIdx += 1 + } else if (bitmapColumns.contains(readColumns(rcIdx)) || hllColumns.contains(readColumns(rcIdx))) { + fieldList += new Field(readColumns(rcIdx), TPrimitiveType.VARCHAR.name, "", 0, 0, "") + rcIdx += 1 + } + } else { + fieldList += new Field(tScanColumnDescSeq(tsdIdx).getName, tScanColumnDescSeq(tsdIdx).getType.name, "", 0, 0, "") + tsdIdx += 1 + } + } + fieldList.toList + } + def rowColumnValue(row: SpecializedGetters, ordinal: Int, dataType: DataType): Any = { if (row.isNullAt(ordinal)) null diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala index da797e2f..7fe6637b 100644 --- a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala +++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala @@ -18,8 +18,10 @@ package org.apache.doris.spark.sql import org.apache.doris.sdk.thrift.{TPrimitiveType, TScanColumnDesc} +import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} import org.apache.doris.spark.exception.DorisException import org.apache.doris.spark.rest.models.{Field, Schema} +import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types._ import org.hamcrest.core.StringStartsWith.startsWith @@ -68,10 +70,8 @@ class TestSchemaUtils extends ExpectedExceptionTest { Assert.assertEquals(DataTypes.StringType, SchemaUtils.getCatalystType("STRING", 0, 0)) Assert.assertEquals(DataTypes.StringType, SchemaUtils.getCatalystType("JSON", 0, 0)) Assert.assertEquals(DataTypes.StringType, SchemaUtils.getCatalystType("JSONB", 0, 0)) - - thrown.expect(classOf[DorisException]) - thrown.expectMessage(startsWith("Unsupported type")) - SchemaUtils.getCatalystType("HLL", 0, 0) + Assert.assertEquals(DataTypes.StringType, SchemaUtils.getCatalystType("BITMAP", 0, 0)) + Assert.assertEquals(DataTypes.StringType, SchemaUtils.getCatalystType("HLL", 0, 0)) thrown.expect(classOf[DorisException]) thrown.expectMessage(startsWith("Unrecognized Doris type")) @@ -80,6 +80,11 @@ class TestSchemaUtils extends ExpectedExceptionTest { @Test def testConvertToSchema(): Unit = { + + val sparkConf = new SparkConf() + sparkConf.set(ConfigurationOptions.DORIS_READ_FIELD, "k1,k2") + val settings = new SparkSettings(sparkConf) + val k1 = new TScanColumnDesc k1.setName("k1") k1.setType(TPrimitiveType.BOOLEAN) @@ -95,7 +100,7 @@ class TestSchemaUtils extends ExpectedExceptionTest { expected.put(ek1) expected.put(ek2) - Assert.assertEquals(expected, SchemaUtils.convertToSchema(Seq(k1, k2))) + Assert.assertEquals(expected, SchemaUtils.convertToSchema(Seq(k1, k2), settings)) } @Test