From ffb158454392bd640f0cbd18b5ab138da5fc514e Mon Sep 17 00:00:00 2001
From: riverzzz <455405955@qq.com>
Date: Thu, 16 Dec 2021 19:51:52 +0800
Subject: [PATCH 1/3] =?UTF-8?q?master=20=E2=86=92=20origin/master=20cb39a3?=
=?UTF-8?q?e0(Merge=20branch=20'master'=20of=20https://github.com/riverzzz?=
=?UTF-8?q?/nebula-exchange)=206587cbd6(Merge=20branch=20'master'=20of=20h?=
=?UTF-8?q?ttps://github.com/riverzzz/nebula-exchange)=2053e37ee7(Merge=20?=
=?UTF-8?q?branch=20'master'=20of=20https://github.com/riverzzz/nebula-exc?=
=?UTF-8?q?hange)=20261677e2(=E5=A2=9E=E5=8A=A0fields=EF=BC=8C=E8=A7=A3?=
=?UTF-8?q?=E6=9E=90kafkajson=E6=95=B0=E6=8D=AE)=2047eea8e5(Merge=20branch?=
=?UTF-8?q?=20'master'=20of=20https://github.com/riverzzz/nebula-spark-uti?=
=?UTF-8?q?ls)=2024ae4250(Merge=20pull=20request=20#166=20from=20Nicole00/?=
=?UTF-8?q?readme)=20a8fd6623(Merge=20branch=20'master'=20into=20readme)?=
=?UTF-8?q?=206c2700f4(Merge=20pull=20request=20#164=20from=20Thericecooke?=
=?UTF-8?q?rs/master)=20d1724aa5(format)=20a13c2415(format)=206196e6d8(add?=
=?UTF-8?q?=20repo=20transfer=20note)=20f41f41dc(Merge=20branch=20'master'?=
=?UTF-8?q?=20into=20master)=204cd75f07(Merge=20pull=20request=20#165=20fr?=
=?UTF-8?q?om=20Nicole00/louvain)=208a837127(Merge=20branch=20'master'=20i?=
=?UTF-8?q?nto=20louvain)=2040dbe339(fix=20louvain's=20result=20format)=20?=
=?UTF-8?q?138a1a11(bugfix:=20Reverse=20edge=20has=20wrong=20partitionId)?=
=?UTF-8?q?=204d184f1e(=E5=88=A0=E9=99=A4=E5=A4=9A=E4=BD=99=E7=9A=84pom?=
=?UTF-8?q?=E4=BE=9D=E8=B5=96=E9=81=BF=E5=85=8D=E6=8A=A5=E9=94=99)=2093a8f?=
=?UTF-8?q?f74(=E6=94=AF=E6=8C=81=E8=A7=A3=E6=9E=90kafka=E6=95=B0=E6=8D=AE?=
=?UTF-8?q?=EF=BC=8C=E6=94=AF=E6=8C=81=E9=85=8D=E7=BD=AEoffset=E3=80=81?=
=?UTF-8?q?=E6=8B=89=E5=8F=96=E9=99=90=E5=88=B6=20=E6=B5=81=E6=95=B0?=
=?UTF-8?q?=E6=8D=AE=E5=92=8C=E7=A6=BB=E7=BA=BF=E5=AF=BC=E5=85=A5=E8=A7=A3?=
=?UTF-8?q?=E6=9E=90=E5=85=BC=E5=AE=B9=20pom=E9=85=8D=E7=BD=AE=E6=94=AF?=
=?UTF-8?q?=E6=8C=81=E6=B5=81=E5=A4=84=E7=90=86)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
nebula-exchange/pom.xml | 23 ++++-
.../nebula/exchange/config/Configs.scala | 11 ++-
.../exchange/config/SourceConfigs.scala | 8 +-
.../exchange/processor/EdgeProcessor.scala | 91 ++++++++++++-------
.../nebula/exchange/processor/Processor.scala | 23 ++---
.../processor/VerticesProcessor.scala | 52 ++++++-----
.../exchange/reader/StreamingBaseReader.scala | 33 ++++---
7 files changed, 155 insertions(+), 86 deletions(-)
diff --git a/nebula-exchange/pom.xml b/nebula-exchange/pom.xml
index 8f1b6cb7..ae432333 100644
--- a/nebula-exchange/pom.xml
+++ b/nebula-exchange/pom.xml
@@ -37,6 +37,7 @@
1.14
2.6.1
1.2.0
+ 2.0.0
@@ -132,7 +133,7 @@
false
- org.apache.spark:*
+
org.apache.hadoop:*
org.apache.hive:*
log4j:log4j
@@ -254,6 +255,17 @@
+
+ org.apache.spark
+ spark-sql-kafka-0-10_2.11
+ ${spark.version}
+
+
+ org.apache.spark
+ *
+
+
+
io.streamnative.connectors
pulsar-spark-connector_2.11
@@ -263,6 +275,7 @@
org.apache.spark
spark-core_2.11
${spark.version}
+ provided
snappy-java
@@ -362,6 +375,7 @@
org.apache.spark
spark-sql_2.11
${spark.version}
+ provided
snappy-java
@@ -401,11 +415,13 @@
org.apache.spark
spark-catalyst_2.11
${spark.version}
+ provided
org.apache.spark
spark-hive_2.11
${spark.version}
+ provided
commons-codec
@@ -455,12 +471,17 @@
commons-io
commons-io
+
+ hive-metastore
+ org.spark-project.hive
+
org.apache.spark
spark-yarn_2.11
${spark.version}
+ provided
guava
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala
index 819ee782..ddbb7544 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala
@@ -249,6 +249,7 @@ object Configs {
private[this] val DEFAULT_LOCAL_PATH = None
private[this] val DEFAULT_REMOTE_PATH = None
private[this] val DEFAULT_STREAM_INTERVAL = 30
+ private[this] val DEFAULT_KAFKA_STARTINGOFFSETS = "latest"
private[this] val DEFAULT_PARALLEL = 1
/**
@@ -659,10 +660,18 @@ object Configs {
val intervalSeconds =
if (config.hasPath("interval.seconds")) config.getInt("interval.seconds")
else DEFAULT_STREAM_INTERVAL
+ val startingOffsets =
+ if (config.hasPath("startingOffsets")) config.getString("startingOffsets")
+ else DEFAULT_KAFKA_STARTINGOFFSETS
+ val maxOffsetsPerTrigger =
+ if (config.hasPath("maxOffsetsPerTrigger")) Some(config.getLong("maxOffsetsPerTrigger"))
+ else None
KafkaSourceConfigEntry(SourceCategory.KAFKA,
intervalSeconds,
config.getString("service"),
- config.getString("topic"))
+ config.getString("topic"),
+ startingOffsets,
+ maxOffsetsPerTrigger)
case SourceCategory.PULSAR =>
val options =
config.getObject("options").unwrapped.asScala.map(x => x._1 -> x._2.toString).toMap
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala
index 44a296cb..50a0f6eb 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala
@@ -162,16 +162,20 @@ case class MySQLSourceConfigEntry(override val category: SourceCategory.Value,
*
* @param server
* @param topic
+ * @param startingOffsets
+ * @param maxOffsetsPerTrigger
*/
case class KafkaSourceConfigEntry(override val category: SourceCategory.Value,
override val intervalSeconds: Int,
server: String,
- topic: String)
+ topic: String,
+ startingOffsets: String,
+ maxOffsetsPerTrigger: Option[Long]=None)
extends StreamingDataSourceConfigEntry {
require(server.trim.nonEmpty && topic.trim.nonEmpty)
override def toString: String = {
- s"Kafka source server: ${server} topic:${topic}"
+ s"Kafka source server: ${server} topic:${topic} startingOffsets:${startingOffsets} maxOffsetsPerTrigger:${maxOffsetsPerTrigger}"
}
}
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala
index 7e7cb900..d9e8cdf6 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala
@@ -274,12 +274,59 @@ class EdgeProcessor(data: DataFrame,
}
}
} else {
+ val streamFlag = data.isStreaming
val edgeFrame = data
+ .filter { row => //filter and check row data,if streaming only print log
+ val sourceField = if (!edgeConfig.isGeo) {
+ val sourceIndex = row.schema.fieldIndex(edgeConfig.sourceField)
+ if (sourceIndex < 0 || row.isNullAt(sourceIndex)) {
+ printChoice(streamFlag, s"source vertexId must exist and cannot be null, your row data is $row")
+ None
+ } else Some(row.get(sourceIndex).toString)
+ } else {
+ val lat = row.getDouble(row.schema.fieldIndex(edgeConfig.latitude.get))
+ val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get))
+ Some(indexCells(lat, lng).mkString(","))
+ }
+
+ val sourceFlag = sourceField.isDefined
+ val sourcePolicyFlag = if (sourceFlag && edgeConfig.sourcePolicy.isEmpty && !isVidStringType
+ && !NebulaUtils.isNumic(sourceField.get)) {
+ printChoice(streamFlag, s"space vidType is int, but your srcId $sourceField is not numeric.your row data is $row")
+ false
+ } else if (sourceFlag && edgeConfig.sourcePolicy.isDefined && isVidStringType) {
+ printChoice(streamFlag, s"only int vidType can use policy, but your vidType is FIXED_STRING.your row data is $row")
+ false
+ } else true
+
+ val targetIndex = row.schema.fieldIndex(edgeConfig.targetField)
+ val targetFlag = if (targetIndex < 0 || row.isNullAt(targetIndex)) {
+ printChoice(streamFlag, s"target vertexId must exist and cannot be null, your row data is $row")
+ false
+ } else {
+ val targetField = row.get(targetIndex).toString
+ if (edgeConfig.targetPolicy.isEmpty && !isVidStringType && !NebulaUtils.isNumic(targetField)) {
+ printChoice(streamFlag, s"space vidType is int, but your dstId $targetField is not numeric.your row data is $row")
+ false
+ } else if (edgeConfig.targetPolicy.isDefined && isVidStringType) {
+ printChoice(streamFlag, s"only int vidType can use policy, but your vidType is FIXED_STRING.your row data is $row")
+ false
+ } else true
+ }
+
+ val edgeRankFlag = if (edgeConfig.rankingField.isDefined) {
+ val index = row.schema.fieldIndex(edgeConfig.rankingField.get)
+ val ranking = row.get(index).toString
+ if (!NebulaUtils.isNumic(ranking)) {
+ printChoice(streamFlag, s"Not support non-Numeric type for ranking field.your row data is $row")
+ false
+ } else true
+ } else true
+ sourceFlag && sourcePolicyFlag && targetFlag && edgeRankFlag
+ }
.map { row =>
var sourceField = if (!edgeConfig.isGeo) {
val sourceIndex = row.schema.fieldIndex(edgeConfig.sourceField)
- assert(sourceIndex >= 0 && !row.isNullAt(sourceIndex),
- s"source vertexId must exist and cannot be null, your row data is $row")
val value = row.get(sourceIndex).toString
if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value
} else {
@@ -287,36 +334,17 @@ class EdgeProcessor(data: DataFrame,
val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get))
indexCells(lat, lng).mkString(",")
}
-
- if (edgeConfig.sourcePolicy.isEmpty) {
- // process string type vid
- if (isVidStringType) {
- sourceField = NebulaUtils.escapeUtil(sourceField).mkString("\"", "", "\"")
- } else {
- assert(NebulaUtils.isNumic(sourceField),
- s"space vidType is int, but your srcId $sourceField is not numeric.")
- }
- } else {
- assert(!isVidStringType,
- "only int vidType can use policy, but your vidType is FIXED_STRING.")
+ // process string type vid
+ if (edgeConfig.sourcePolicy.isEmpty && isVidStringType) {
+ sourceField = NebulaUtils.escapeUtil(sourceField).mkString("\"", "", "\"")
}
val targetIndex = row.schema.fieldIndex(edgeConfig.targetField)
- assert(targetIndex >= 0 && !row.isNullAt(targetIndex),
- s"target vertexId must exist and cannot be null, your row data is $row")
var targetField = row.get(targetIndex).toString
if (targetField.equals(DEFAULT_EMPTY_VALUE)) targetField = ""
- if (edgeConfig.targetPolicy.isEmpty) {
- // process string type vid
- if (isVidStringType) {
- targetField = NebulaUtils.escapeUtil(targetField).mkString("\"", "", "\"")
- } else {
- assert(NebulaUtils.isNumic(targetField),
- s"space vidType is int, but your dstId $targetField is not numeric.")
- }
- } else {
- assert(!isVidStringType,
- "only int vidType can use policy, but your vidType is FIXED_STRING.")
+ // process string type vid
+ if (edgeConfig.targetPolicy.isEmpty && isVidStringType) {
+ targetField = NebulaUtils.escapeUtil(targetField).mkString("\"", "", "\"")
}
val values = for {
@@ -326,8 +354,6 @@ class EdgeProcessor(data: DataFrame,
if (edgeConfig.rankingField.isDefined) {
val index = row.schema.fieldIndex(edgeConfig.rankingField.get)
val ranking = row.get(index).toString
- assert(NebulaUtils.isNumic(ranking), s"Not support non-Numeric type for ranking field")
-
Edge(sourceField, targetField, Some(ranking.toLong), values)
} else {
Edge(sourceField, targetField, None, values)
@@ -335,10 +361,13 @@ class EdgeProcessor(data: DataFrame,
}(Encoders.kryo[Edge])
// streaming write
- if (data.isStreaming) {
+ if (streamFlag) {
val streamingDataSourceConfig =
edgeConfig.dataSourceConfigEntry.asInstanceOf[StreamingDataSourceConfigEntry]
- edgeFrame.writeStream
+ val wStream = edgeFrame.writeStream
+ if (edgeConfig.checkPointPath.isDefined) wStream.option("checkpointLocation", edgeConfig.checkPointPath.get)
+
+ wStream
.foreachBatch((edges, batchId) => {
LOG.info(s"${edgeConfig.name} edge start batch ${batchId}.")
edges.foreachPartition(processEachPartition _)
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala
index 7dd7ede4..65cc1004 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala
@@ -5,21 +5,10 @@
package com.vesoft.nebula.exchange.processor
-import com.vesoft.nebula.{
- Coordinate,
- Date,
- DateTime,
- Geography,
- LineString,
- NullType,
- Point,
- Polygon,
- PropertyType,
- Time,
- Value
-}
+import com.vesoft.nebula.{Coordinate, Date, DateTime, Geography, LineString, NullType, Point, Polygon, PropertyType, Time, Value}
import com.vesoft.nebula.exchange.utils.NebulaUtils.DEFAULT_EMPTY_VALUE
import com.vesoft.nebula.exchange.utils.{HDFSUtils, NebulaUtils}
+import org.apache.log4j.Logger
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
@@ -33,6 +22,9 @@ import scala.collection.mutable.ListBuffer
*/
trait Processor extends Serializable {
+ @transient
+ private[this] lazy val LOG = Logger.getLogger(this.getClass)
+
/**
* process dataframe to vertices or edges
*/
@@ -230,4 +222,9 @@ trait Processor extends Serializable {
}
}
}
+
+ def printChoice(streamFlag: Boolean, context: String): Unit = {
+ if (streamFlag) LOG.info(context)
+ else assert(assertion = false, context)
+ }
}
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala
index b646b2d7..e7b5ccc0 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala
@@ -229,42 +229,46 @@ class VerticesProcessor(data: DataFrame,
}
}
} else {
+ val streamFlag = data.isStreaming
val vertices = data
+ .filter { row => //filter and check row data,if streaming only print log
+ val index = row.schema.fieldIndex(tagConfig.vertexField)
+ if (index < 0 || row.isNullAt(index)) {
+ printChoice(streamFlag, s"vertexId must exist and cannot be null, your row data is $row")
+ false
+ } else {
+ val vertexId = row.get(index).toString
+ // process int type vid
+ if (tagConfig.vertexPolicy.isEmpty && !isVidStringType && !NebulaUtils.isNumic(vertexId)) {
+ printChoice(streamFlag, s"space vidType is int, but your vertex id $vertexId is not numeric.your row data is $row")
+ false
+ } else if (tagConfig.vertexPolicy.isDefined && isVidStringType) {
+ printChoice(streamFlag, s"only int vidType can use policy, but your vidType is FIXED_STRING.your row data is $row")
+ false
+ } else true
+ }
+ }
.map { row =>
- val vertexID = {
- val index = row.schema.fieldIndex(tagConfig.vertexField)
- assert(index >= 0 && !row.isNullAt(index),
- s"vertexId must exist and cannot be null, your row data is $row")
- var value = row.get(index).toString
- if (value.equals(DEFAULT_EMPTY_VALUE)) { value = "" }
- if (tagConfig.vertexPolicy.isEmpty) {
- // process string type vid
- if (isVidStringType) {
- NebulaUtils.escapeUtil(value).mkString("\"", "", "\"")
- } else {
- // process int type vid
- assert(NebulaUtils.isNumic(value),
- s"space vidType is int, but your vertex id $value is not numeric.")
- value
- }
- } else {
- assert(!isVidStringType,
- "only int vidType can use policy, but your vidType is FIXED_STRING.")
- value
- }
+ val index = row.schema.fieldIndex(tagConfig.vertexField)
+ var vertexId = row.get(index).toString
+ if (vertexId.equals(DEFAULT_EMPTY_VALUE)) {
+ vertexId = ""
}
val values = for {
property <- fieldKeys if property.trim.length != 0
} yield extraValueForClient(row, property, fieldTypeMap)
- Vertex(vertexID, values)
+ Vertex(vertexId, values)
}(Encoders.kryo[Vertex])
// streaming write
- if (data.isStreaming) {
+ if (streamFlag) {
val streamingDataSourceConfig =
tagConfig.dataSourceConfigEntry.asInstanceOf[StreamingDataSourceConfigEntry]
- vertices.writeStream
+ val wStream = vertices.writeStream
+ if (tagConfig.checkPointPath.isDefined) wStream.option("checkpointLocation", tagConfig.checkPointPath.get)
+
+ wStream
.foreachBatch((vertexSet, batchId) => {
LOG.info(s"${tagConfig.name} tag start batch ${batchId}.")
vertexSet.foreachPartition(processEachPartition _)
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala
index 332060ab..874fbaaa 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala
@@ -6,10 +6,8 @@
package com.vesoft.nebula.exchange.reader
import com.vesoft.nebula.exchange.config.{KafkaSourceConfigEntry, PulsarSourceConfigEntry}
-import org.apache.spark.sql.{DataFrame, SparkSession, Row, Encoders}
-import com.alibaba.fastjson.{JSON, JSONObject}
-import org.apache.spark.sql.types.{StructField, StructType, StringType}
-import org.apache.spark.sql.functions.{from_json,col}
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Spark Streaming
@@ -27,6 +25,7 @@ abstract class StreamingBaseReader(override val session: SparkSession) extends R
*
* @param session
* @param kafkaConfig
+ * @param targetFields
*/
class KafkaReader(override val session: SparkSession,
kafkaConfig: KafkaSourceConfigEntry,
@@ -36,18 +35,24 @@ class KafkaReader(override val session: SparkSession,
require(kafkaConfig.server.trim.nonEmpty && kafkaConfig.topic.trim.nonEmpty && targetFields.nonEmpty)
override def read(): DataFrame = {
+ import org.apache.spark.sql.functions._
import session.implicits._
val fields = targetFields.distinct
- val jsonSchema = StructType(fields.map(field => StructField(field, StringType, true)))
- session.readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", kafkaConfig.server)
- .option("subscribe", kafkaConfig.topic)
- .load()
- .selectExpr("CAST(value AS STRING)")
- .as[(String)]
- .withColumn("value", from_json(col("value"), jsonSchema))
- .select("value.*")
+ val reader =
+ session.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", kafkaConfig.server)
+ .option("subscribe", kafkaConfig.topic)
+ .option("startingOffsets", kafkaConfig.startingOffsets)
+
+ val maxOffsetsPerTrigger = kafkaConfig.maxOffsetsPerTrigger
+ if(maxOffsetsPerTrigger.isDefined) reader.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger.get)
+
+ reader.load()
+ .select($"value".cast(StringType))
+ .select($"value", json_tuple($"value", fields: _*))
+ .toDF("value" :: fields: _*)
+
}
}
From 7de738983c0775f708a373ee2369f255c40b859b Mon Sep 17 00:00:00 2001
From: riverzzz <455405955@qq.com>
Date: Thu, 16 Dec 2021 22:01:28 +0800
Subject: [PATCH 2/3] =?UTF-8?q?master=20=E2=86=92=20origin/master=20cb39a3?=
=?UTF-8?q?e0(Merge=20branch=20'master'=20of=20https://github.com/riverzzz?=
=?UTF-8?q?/nebula-exchange)=206587cbd6(Merge=20branch=20'master'=20of=20h?=
=?UTF-8?q?ttps://github.com/riverzzz/nebula-exchange)=2053e37ee7(Merge=20?=
=?UTF-8?q?branch=20'master'=20of=20https://github.com/riverzzz/nebula-exc?=
=?UTF-8?q?hange)=20261677e2(=E5=A2=9E=E5=8A=A0fields=EF=BC=8C=E8=A7=A3?=
=?UTF-8?q?=E6=9E=90kafkajson=E6=95=B0=E6=8D=AE)=2047eea8e5(Merge=20branch?=
=?UTF-8?q?=20'master'=20of=20https://github.com/riverzzz/nebula-spark-uti?=
=?UTF-8?q?ls)=2024ae4250(Merge=20pull=20request=20#166=20from=20Nicole00/?=
=?UTF-8?q?readme)=20a8fd6623(Merge=20branch=20'master'=20into=20readme)?=
=?UTF-8?q?=206c2700f4(Merge=20pull=20request=20#164=20from=20Thericecooke?=
=?UTF-8?q?rs/master)=20d1724aa5(format)=20a13c2415(format)=206196e6d8(add?=
=?UTF-8?q?=20repo=20transfer=20note)=20f41f41dc(Merge=20branch=20'master'?=
=?UTF-8?q?=20into=20master)=204cd75f07(Merge=20pull=20request=20#165=20fr?=
=?UTF-8?q?om=20Nicole00/louvain)=208a837127(Merge=20branch=20'master'=20i?=
=?UTF-8?q?nto=20louvain)=2040dbe339(fix=20louvain's=20result=20format)=20?=
=?UTF-8?q?138a1a11(bugfix:=20Reverse=20edge=20has=20wrong=20partitionId)?=
=?UTF-8?q?=204d184f1e(=E5=88=A0=E9=99=A4=E5=A4=9A=E4=BD=99=E7=9A=84pom?=
=?UTF-8?q?=E4=BE=9D=E8=B5=96=E9=81=BF=E5=85=8D=E6=8A=A5=E9=94=99)=2093a8f?=
=?UTF-8?q?f74(=E6=94=AF=E6=8C=81=E8=A7=A3=E6=9E=90kafka=E6=95=B0=E6=8D=AE?=
=?UTF-8?q?=EF=BC=8C=E6=94=AF=E6=8C=81=E9=85=8D=E7=BD=AEoffset=E3=80=81?=
=?UTF-8?q?=E6=8B=89=E5=8F=96=E9=99=90=E5=88=B6=20=E6=B5=81=E6=95=B0?=
=?UTF-8?q?=E6=8D=AE=E5=92=8C=E7=A6=BB=E7=BA=BF=E5=AF=BC=E5=85=A5=E8=A7=A3?=
=?UTF-8?q?=E6=9E=90=E5=85=BC=E5=AE=B9=20pom=E9=85=8D=E7=BD=AE=E6=94=AF?=
=?UTF-8?q?=E6=8C=81=E6=B5=81=E5=A4=84=E7=90=86)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../vesoft/nebula/exchange/processor/VerticesProcessor.scala | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala
index e7b5ccc0..0507577d 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala
@@ -255,6 +255,10 @@ class VerticesProcessor(data: DataFrame,
vertexId = ""
}
+ if (tagConfig.vertexPolicy.isEmpty && isVidStringType){
+ vertexId = NebulaUtils.escapeUtil(vertexId).mkString("\"", "", "\"")
+ }
+
val values = for {
property <- fieldKeys if property.trim.length != 0
} yield extraValueForClient(row, property, fieldTypeMap)
From 14711662efa73f80b039ce0fe26052687353ef44 Mon Sep 17 00:00:00 2001
From: riverzzz <455405955@qq.com>
Date: Wed, 22 Dec 2021 20:21:55 +0800
Subject: [PATCH 3/3] format the code style remove value data extract function
---
.../nebula/exchange/config/Configs.scala | 40 +++----
.../exchange/config/SourceConfigs.scala | 4 +-
.../exchange/processor/EdgeProcessor.scala | 103 ++++++++----------
.../exchange/reader/StreamingBaseReader.scala | 4 +-
4 files changed, 71 insertions(+), 80 deletions(-)
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala
index ddbb7544..41a041d6 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala
@@ -231,26 +231,26 @@ case class Configs(databaseConfig: DataBaseConfigEntry,
object Configs {
private[this] val LOG = Logger.getLogger(this.getClass)
- private[this] val DEFAULT_CONNECTION_TIMEOUT = Integer.MAX_VALUE
- private[this] val DEFAULT_CONNECTION_RETRY = 3
- private[this] val DEFAULT_EXECUTION_RETRY = 3
- private[this] val DEFAULT_EXECUTION_TIMEOUT = Integer.MAX_VALUE
- private[this] val DEFAULT_EXECUTION_INTERVAL = 3000
- private[this] val DEFAULT_ERROR_OUTPUT_PATH = "/tmp/nebula.writer.errors/"
- private[this] val DEFAULT_ERROR_MAX_BATCH_SIZE = Int.MaxValue
- private[this] val DEFAULT_RATE_LIMIT = 1024
- private[this] val DEFAULT_RATE_TIMEOUT = 100
- private[this] val DEFAULT_ENABLE_SSL = false
- private[this] val DEFAULT_SSL_SIGN_TYPE = "CA"
- private[this] val DEFAULT_EDGE_RANKING = 0L
- private[this] val DEFAULT_BATCH = 2
- private[this] val DEFAULT_PARTITION = -1
- private[this] val DEFAULT_CHECK_POINT_PATH = None
- private[this] val DEFAULT_LOCAL_PATH = None
- private[this] val DEFAULT_REMOTE_PATH = None
- private[this] val DEFAULT_STREAM_INTERVAL = 30
- private[this] val DEFAULT_KAFKA_STARTINGOFFSETS = "latest"
- private[this] val DEFAULT_PARALLEL = 1
+ private[this] val DEFAULT_CONNECTION_TIMEOUT = Integer.MAX_VALUE
+ private[this] val DEFAULT_CONNECTION_RETRY = 3
+ private[this] val DEFAULT_EXECUTION_RETRY = 3
+ private[this] val DEFAULT_EXECUTION_TIMEOUT = Integer.MAX_VALUE
+ private[this] val DEFAULT_EXECUTION_INTERVAL = 3000
+ private[this] val DEFAULT_ERROR_OUTPUT_PATH = "/tmp/nebula.writer.errors/"
+ private[this] val DEFAULT_ERROR_MAX_BATCH_SIZE = Int.MaxValue
+ private[this] val DEFAULT_RATE_LIMIT = 1024
+ private[this] val DEFAULT_RATE_TIMEOUT = 100
+ private[this] val DEFAULT_ENABLE_SSL = false
+ private[this] val DEFAULT_SSL_SIGN_TYPE = "CA"
+ private[this] val DEFAULT_EDGE_RANKING = 0L
+ private[this] val DEFAULT_BATCH = 2
+ private[this] val DEFAULT_PARTITION = -1
+ private[this] val DEFAULT_CHECK_POINT_PATH = None
+ private[this] val DEFAULT_LOCAL_PATH = None
+ private[this] val DEFAULT_REMOTE_PATH = None
+ private[this] val DEFAULT_STREAM_INTERVAL = 30
+ private[this] val DEFAULT_KAFKA_STARTINGOFFSETS = "latest"
+ private[this] val DEFAULT_PARALLEL = 1
/**
*
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala
index 50a0f6eb..2a21c1ed 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala
@@ -162,8 +162,8 @@ case class MySQLSourceConfigEntry(override val category: SourceCategory.Value,
*
* @param server
* @param topic
- * @param startingOffsets
- * @param maxOffsetsPerTrigger
+ * @param startingOffsets
+ * @param maxOffsetsPerTrigger
*/
case class KafkaSourceConfigEntry(override val category: SourceCategory.Value,
override val intervalSeconds: Int,
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala
index d9e8cdf6..425ae802 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala
@@ -277,42 +277,9 @@ class EdgeProcessor(data: DataFrame,
val streamFlag = data.isStreaming
val edgeFrame = data
.filter { row => //filter and check row data,if streaming only print log
- val sourceField = if (!edgeConfig.isGeo) {
- val sourceIndex = row.schema.fieldIndex(edgeConfig.sourceField)
- if (sourceIndex < 0 || row.isNullAt(sourceIndex)) {
- printChoice(streamFlag, s"source vertexId must exist and cannot be null, your row data is $row")
- None
- } else Some(row.get(sourceIndex).toString)
- } else {
- val lat = row.getDouble(row.schema.fieldIndex(edgeConfig.latitude.get))
- val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get))
- Some(indexCells(lat, lng).mkString(","))
- }
-
- val sourceFlag = sourceField.isDefined
- val sourcePolicyFlag = if (sourceFlag && edgeConfig.sourcePolicy.isEmpty && !isVidStringType
- && !NebulaUtils.isNumic(sourceField.get)) {
- printChoice(streamFlag, s"space vidType is int, but your srcId $sourceField is not numeric.your row data is $row")
- false
- } else if (sourceFlag && edgeConfig.sourcePolicy.isDefined && isVidStringType) {
- printChoice(streamFlag, s"only int vidType can use policy, but your vidType is FIXED_STRING.your row data is $row")
- false
- } else true
+ val sourceFlag = checkField(edgeConfig.sourceField, "source_field", row, edgeConfig.sourcePolicy, streamFlag, isVidStringType)
- val targetIndex = row.schema.fieldIndex(edgeConfig.targetField)
- val targetFlag = if (targetIndex < 0 || row.isNullAt(targetIndex)) {
- printChoice(streamFlag, s"target vertexId must exist and cannot be null, your row data is $row")
- false
- } else {
- val targetField = row.get(targetIndex).toString
- if (edgeConfig.targetPolicy.isEmpty && !isVidStringType && !NebulaUtils.isNumic(targetField)) {
- printChoice(streamFlag, s"space vidType is int, but your dstId $targetField is not numeric.your row data is $row")
- false
- } else if (edgeConfig.targetPolicy.isDefined && isVidStringType) {
- printChoice(streamFlag, s"only int vidType can use policy, but your vidType is FIXED_STRING.your row data is $row")
- false
- } else true
- }
+ val targetFlag = checkField(edgeConfig.targetField, "target_field", row, edgeConfig.targetPolicy, streamFlag, isVidStringType)
val edgeRankFlag = if (edgeConfig.rankingField.isDefined) {
val index = row.schema.fieldIndex(edgeConfig.rankingField.get)
@@ -322,30 +289,12 @@ class EdgeProcessor(data: DataFrame,
false
} else true
} else true
- sourceFlag && sourcePolicyFlag && targetFlag && edgeRankFlag
+ sourceFlag && targetFlag && edgeRankFlag
}
.map { row =>
- var sourceField = if (!edgeConfig.isGeo) {
- val sourceIndex = row.schema.fieldIndex(edgeConfig.sourceField)
- val value = row.get(sourceIndex).toString
- if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value
- } else {
- val lat = row.getDouble(row.schema.fieldIndex(edgeConfig.latitude.get))
- val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get))
- indexCells(lat, lng).mkString(",")
- }
- // process string type vid
- if (edgeConfig.sourcePolicy.isEmpty && isVidStringType) {
- sourceField = NebulaUtils.escapeUtil(sourceField).mkString("\"", "", "\"")
- }
+ val sourceField = processField(edgeConfig.sourceField, "source_field", row, edgeConfig.sourcePolicy, isVidStringType)
- val targetIndex = row.schema.fieldIndex(edgeConfig.targetField)
- var targetField = row.get(targetIndex).toString
- if (targetField.equals(DEFAULT_EMPTY_VALUE)) targetField = ""
- // process string type vid
- if (edgeConfig.targetPolicy.isEmpty && isVidStringType) {
- targetField = NebulaUtils.escapeUtil(targetField).mkString("\"", "", "\"")
- }
+ val targetField = processField(edgeConfig.targetField, "target_field", row, edgeConfig.targetPolicy, isVidStringType)
val values = for {
property <- fieldKeys if property.trim.length != 0
@@ -386,4 +335,46 @@ class EdgeProcessor(data: DataFrame,
for (index <- DEFAULT_MIN_CELL_LEVEL to DEFAULT_MAX_CELL_LEVEL)
yield s2CellId.parent(index).id()
}
+
+ private[this] def checkField(field: String, fieldType: String, row: Row, policy: Option[KeyPolicy.Value], streamFlag: Boolean, isVidStringType: Boolean): Boolean = {
+ val fieldValue = if (edgeConfig.isGeo && "source_field".equals(fieldType)) {
+ val lat = row.getDouble(row.schema.fieldIndex(edgeConfig.latitude.get))
+ val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get))
+ Some(indexCells(lat, lng).mkString(","))
+ } else {
+ val index = row.schema.fieldIndex(field)
+ if (index < 0 || row.isNullAt(index)) {
+ printChoice(streamFlag, s"$fieldType must exist and cannot be null, your row data is $row")
+ None
+ } else Some(row.get(index).toString)
+ }
+
+ val idFlag = fieldValue.isDefined
+ val policyFlag = if (idFlag && policy.isEmpty && !isVidStringType
+ && !NebulaUtils.isNumic(fieldValue.get)) {
+ printChoice(streamFlag, s"space vidType is int, but your $fieldType $fieldValue is not numeric.your row data is $row")
+ false
+ } else if (idFlag && policy.isDefined && isVidStringType) {
+ printChoice(streamFlag, s"only int vidType can use policy, but your vidType is FIXED_STRING.your row data is $row")
+ false
+ } else true
+ idFlag && policyFlag
+ }
+
+ private[this] def processField(field: String, fieldType: String, row: Row, policy: Option[KeyPolicy.Value], isVidStringType: Boolean): String = {
+ var fieldValue = if (edgeConfig.isGeo && "source_field".equals(fieldType)) {
+ val lat = row.getDouble(row.schema.fieldIndex(edgeConfig.latitude.get))
+ val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get))
+ indexCells(lat, lng).mkString(",")
+ } else {
+ val index = row.schema.fieldIndex(field)
+ val value = row.get(index).toString
+ if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value
+ }
+ // process string type vid
+ if (policy.isEmpty && isVidStringType) {
+ fieldValue = NebulaUtils.escapeUtil(fieldValue).mkString("\"", "", "\"")
+ }
+ fieldValue
+ }
}
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala
index 874fbaaa..25c8fd50 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala
@@ -50,8 +50,8 @@ class KafkaReader(override val session: SparkSession,
reader.load()
.select($"value".cast(StringType))
- .select($"value", json_tuple($"value", fields: _*))
- .toDF("value" :: fields: _*)
+ .select(json_tuple($"value", fields: _*))
+ .toDF(fields: _*)
}
}