Skip to content

Commit

Permalink
arrow reader
Browse files Browse the repository at this point in the history
  • Loading branch information
gnehil committed Aug 5, 2024
1 parent 3901055 commit 9c49205
Show file tree
Hide file tree
Showing 15 changed files with 383 additions and 120 deletions.
49 changes: 40 additions & 9 deletions spark-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
<scala.version>2.12.10</scala.version>
<scala.major.version>2.12</scala.major.version>
<libthrift.version>0.16.0</libthrift.version>
<arrow.version>13.0.0</arrow.version>
<arrow.version>15.0.2</arrow.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.scm.id>github</project.scm.id>
<netty.version>4.1.77.Final</netty.version>
Expand Down Expand Up @@ -248,6 +248,37 @@
<version>4.5.13</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.major.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-flight-sql</artifactId>
<version>0.13.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>flight-sql-jdbc-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>flight-sql-jdbc-core</artifactId>
<version>${arrow.version}</version>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.22.3</version>
</dependency>

</dependencies>

<build>
Expand Down Expand Up @@ -291,7 +322,7 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.1</version>
<version>3.4.1</version>
<executions>
<execution>
<id>scala-compile-first</id>
Expand All @@ -317,7 +348,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<version>3.4.1</version>
<configuration>
<artifactSet>
<excludes>
Expand All @@ -327,10 +358,10 @@
</excludes>
</artifactSet>
<relocations>
<relocation>
<pattern>org.apache.arrow</pattern>
<shadedPattern>org.apache.doris.shaded.org.apache.arrow</shadedPattern>
</relocation>
<!-- <relocation> -->
<!-- <pattern>org.apache.arrow</pattern> -->
<!-- <shadedPattern>org.apache.doris.shaded.org.apache.arrow</shadedPattern> -->
<!-- </relocation> -->
<relocation>
<pattern>io.netty</pattern>
<shadedPattern>org.apache.doris.shaded.io.netty</shadedPattern>
Expand Down Expand Up @@ -370,8 +401,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,9 @@ public interface ConfigurationOptions {
String LOAD_MODE = "doris.sink.load.mode";
String DEFAULT_LOAD_MODE = "stream_load";

String DORIS_READ_MODE = "doris.read.mode";
String DORIS_READ_MODE_DEFAULT = "thrift";

String DORIS_ARROW_FLIGHT_SQL_PORT = "doris.arrow-flight-sql.port";

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@

package org.apache.doris.spark.cfg;

import java.util.Properties;

import org.apache.spark.SparkConf;

import com.google.common.base.Preconditions;

import org.apache.spark.SparkConf;
import scala.Option;
import scala.Serializable;
import scala.Tuple2;

import java.util.Properties;

public class SparkSettings extends Settings implements Serializable {

private final SparkConf cfg;
Expand All @@ -36,6 +34,16 @@ public SparkSettings(SparkConf cfg) {
this.cfg = cfg;
}

public static SparkSettings fromProperties(Properties props) {
SparkConf sparkConf = new SparkConf();
props.forEach((k, v) -> {
if (k instanceof String) {
sparkConf.set((String) k, v.toString());
}
});
return new SparkSettings(sparkConf);
}

public SparkSettings copy() {
return new SparkSettings(cfg.clone());
}
Expand Down Expand Up @@ -74,4 +82,5 @@ public Properties asProperties() {

return props;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@

package org.apache.doris.spark.rest;

import org.apache.doris.spark.cfg.PropertiesSettings;
import org.apache.doris.spark.cfg.Settings;
import org.apache.doris.spark.exception.IllegalArgumentException;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

import org.apache.doris.spark.cfg.PropertiesSettings;
import org.apache.doris.spark.cfg.Settings;
import org.apache.doris.spark.exception.IllegalArgumentException;

/**
* Doris RDD partition info.
*/
Expand Down Expand Up @@ -124,12 +124,12 @@ public boolean equals(Object o) {
return false;
}
PartitionDefinition that = (PartitionDefinition) o;
return Objects.equals(database, that.database) &&
Objects.equals(table, that.table) &&
Objects.equals(beAddress, that.beAddress) &&
Objects.equals(tabletIds, that.tabletIds) &&
Objects.equals(queryPlan, that.queryPlan) &&
Objects.equals(serializedSettings, that.serializedSettings);
return Objects.equals(database, that.database)
&& Objects.equals(table, that.table)
&& Objects.equals(beAddress, that.beAddress)
&& Objects.equals(tabletIds, that.tabletIds)
&& Objects.equals(queryPlan, that.queryPlan)
&& Objects.equals(serializedSettings, that.serializedSettings);
}

@Override
Expand All @@ -144,12 +144,12 @@ public int hashCode() {

@Override
public String toString() {
return "PartitionDefinition{" +
", database='" + database + '\'' +
", table='" + table + '\'' +
", beAddress='" + beAddress + '\'' +
", tabletIds=" + tabletIds +
", queryPlan='" + queryPlan + '\'' +
'}';
return "PartitionDefinition{"
+ ", database='" + database + '\''
+ ", table='" + table + '\''
+ ", beAddress='" + beAddress + '\''
+ ", tabletIds=" + tabletIds
+ ", queryPlan='" + queryPlan + '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.rest.models.Tablet;
import org.apache.doris.spark.sql.SchemaUtils;
import org.apache.doris.spark.sql.Utils;
import org.apache.doris.spark.util.HttpUtil;
import org.apache.doris.spark.util.URLs;

Expand All @@ -51,7 +52,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
Expand All @@ -64,6 +64,7 @@
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import scala.Option;

import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -227,23 +228,11 @@ public static List<PartitionDefinition> findPartitions(Settings cfg, Logger logg
String[] tableIdentifiers =
parseIdentifier(cfg.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER), logger);
String readFields = cfg.getProperty(ConfigurationOptions.DORIS_READ_FIELD, "*");
if (!"*".equals(readFields)) {
String[] readFieldArr = readFields.split(",");
String[] bitmapColumns = cfg.getProperty(SchemaUtils.DORIS_BITMAP_COLUMNS(), "").split(",");
String[] hllColumns = cfg.getProperty(SchemaUtils.DORIS_HLL_COLUMNS(), "").split(",");
for (int i = 0; i < readFieldArr.length; i++) {
String readFieldName = readFieldArr[i].replaceAll("`", "");
if (ArrayUtils.contains(bitmapColumns, readFieldName)
|| ArrayUtils.contains(hllColumns, readFieldName)) {
readFieldArr[i] = "'READ UNSUPPORTED' AS " + readFieldArr[i];
}
}
readFields = StringUtils.join(readFieldArr, ",");
}
String sql = "select " + readFields + " from `" + tableIdentifiers[0] + "`.`" + tableIdentifiers[1] + "`";
if (!StringUtils.isEmpty(cfg.getProperty(ConfigurationOptions.DORIS_FILTER_QUERY))) {
sql += " where " + cfg.getProperty(ConfigurationOptions.DORIS_FILTER_QUERY);
}
String[] bitmapColumns = cfg.getProperty(SchemaUtils.DORIS_BITMAP_COLUMNS(), "").split(",");
String[] hllColumns = cfg.getProperty(SchemaUtils.DORIS_HLL_COLUMNS(), "").split(",");
String sql = Utils.generateQueryStatement(readFields.split(","), bitmapColumns, hllColumns,
"`" + tableIdentifiers[0] + "`.`" + tableIdentifiers[1] + "`",
cfg.getProperty(ConfigurationOptions.DORIS_FILTER_QUERY, ""), Option.empty());
logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);

String finalSql = sql;
Expand Down
Loading

0 comments on commit 9c49205

Please sign in to comment.