From ffb158454392bd640f0cbd18b5ab138da5fc514e Mon Sep 17 00:00:00 2001 From: riverzzz <455405955@qq.com> Date: Thu, 16 Dec 2021 19:51:52 +0800 Subject: [PATCH 1/3] =?UTF-8?q?master=20=E2=86=92=20origin/master=20cb39a3?= =?UTF-8?q?e0(Merge=20branch=20'master'=20of=20https://github.com/riverzzz?= =?UTF-8?q?/nebula-exchange)=206587cbd6(Merge=20branch=20'master'=20of=20h?= =?UTF-8?q?ttps://github.com/riverzzz/nebula-exchange)=2053e37ee7(Merge=20?= =?UTF-8?q?branch=20'master'=20of=20https://github.com/riverzzz/nebula-exc?= =?UTF-8?q?hange)=20261677e2(=E5=A2=9E=E5=8A=A0fields=EF=BC=8C=E8=A7=A3?= =?UTF-8?q?=E6=9E=90kafkajson=E6=95=B0=E6=8D=AE)=2047eea8e5(Merge=20branch?= =?UTF-8?q?=20'master'=20of=20https://github.com/riverzzz/nebula-spark-uti?= =?UTF-8?q?ls)=2024ae4250(Merge=20pull=20request=20#166=20from=20Nicole00/?= =?UTF-8?q?readme)=20a8fd6623(Merge=20branch=20'master'=20into=20readme)?= =?UTF-8?q?=206c2700f4(Merge=20pull=20request=20#164=20from=20Thericecooke?= =?UTF-8?q?rs/master)=20d1724aa5(format)=20a13c2415(format)=206196e6d8(add?= =?UTF-8?q?=20repo=20transfer=20note)=20f41f41dc(Merge=20branch=20'master'?= =?UTF-8?q?=20into=20master)=204cd75f07(Merge=20pull=20request=20#165=20fr?= =?UTF-8?q?om=20Nicole00/louvain)=208a837127(Merge=20branch=20'master'=20i?= =?UTF-8?q?nto=20louvain)=2040dbe339(fix=20louvain's=20result=20format)=20?= =?UTF-8?q?138a1a11(bugfix:=20Reverse=20edge=20has=20wrong=20partitionId)?= =?UTF-8?q?=204d184f1e(=E5=88=A0=E9=99=A4=E5=A4=9A=E4=BD=99=E7=9A=84pom?= =?UTF-8?q?=E4=BE=9D=E8=B5=96=E9=81=BF=E5=85=8D=E6=8A=A5=E9=94=99)=2093a8f?= =?UTF-8?q?f74(=E6=94=AF=E6=8C=81=E8=A7=A3=E6=9E=90kafka=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=EF=BC=8C=E6=94=AF=E6=8C=81=E9=85=8D=E7=BD=AEoffset=E3=80=81?= =?UTF-8?q?=E6=8B=89=E5=8F=96=E9=99=90=E5=88=B6=20=E6=B5=81=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=92=8C=E7=A6=BB=E7=BA=BF=E5=AF=BC=E5=85=A5=E8=A7=A3?= =?UTF-8?q?=E6=9E=90=E5=85=BC=E5=AE=B9=20pom=E9=85=8D=E7=BD=AE=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E6=B5=81=E5=A4=84=E7=90=86)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- nebula-exchange/pom.xml | 23 ++++- .../nebula/exchange/config/Configs.scala | 11 ++- .../exchange/config/SourceConfigs.scala | 8 +- .../exchange/processor/EdgeProcessor.scala | 91 ++++++++++++------- .../nebula/exchange/processor/Processor.scala | 23 ++--- .../processor/VerticesProcessor.scala | 52 ++++++----- .../exchange/reader/StreamingBaseReader.scala | 33 ++++--- 7 files changed, 155 insertions(+), 86 deletions(-) diff --git a/nebula-exchange/pom.xml b/nebula-exchange/pom.xml index 8f1b6cb7..ae432333 100644 --- a/nebula-exchange/pom.xml +++ b/nebula-exchange/pom.xml @@ -37,6 +37,7 @@ 1.14 2.6.1 1.2.0 + 2.0.0 @@ -132,7 +133,7 @@ false - org.apache.spark:* + org.apache.hadoop:* org.apache.hive:* log4j:log4j @@ -254,6 +255,17 @@ + + org.apache.spark + spark-sql-kafka-0-10_2.11 + ${spark.version} + + + org.apache.spark + * + + + io.streamnative.connectors pulsar-spark-connector_2.11 @@ -263,6 +275,7 @@ org.apache.spark spark-core_2.11 ${spark.version} + provided snappy-java @@ -362,6 +375,7 @@ org.apache.spark spark-sql_2.11 ${spark.version} + provided snappy-java @@ -401,11 +415,13 @@ org.apache.spark spark-catalyst_2.11 ${spark.version} + provided org.apache.spark spark-hive_2.11 ${spark.version} + provided commons-codec @@ -455,12 +471,17 @@ commons-io commons-io + + hive-metastore + org.spark-project.hive + org.apache.spark spark-yarn_2.11 ${spark.version} + provided guava diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala index 819ee782..ddbb7544 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala @@ -249,6 +249,7 @@ object Configs { private[this] val DEFAULT_LOCAL_PATH = None private[this] val DEFAULT_REMOTE_PATH = None private[this] val DEFAULT_STREAM_INTERVAL = 30 + private[this] val DEFAULT_KAFKA_STARTINGOFFSETS = "latest" private[this] val DEFAULT_PARALLEL = 1 /** @@ -659,10 +660,18 @@ object Configs { val intervalSeconds = if (config.hasPath("interval.seconds")) config.getInt("interval.seconds") else DEFAULT_STREAM_INTERVAL + val startingOffsets = + if (config.hasPath("startingOffsets")) config.getString("startingOffsets") + else DEFAULT_KAFKA_STARTINGOFFSETS + val maxOffsetsPerTrigger = + if (config.hasPath("maxOffsetsPerTrigger")) Some(config.getLong("maxOffsetsPerTrigger")) + else None KafkaSourceConfigEntry(SourceCategory.KAFKA, intervalSeconds, config.getString("service"), - config.getString("topic")) + config.getString("topic"), + startingOffsets, + maxOffsetsPerTrigger) case SourceCategory.PULSAR => val options = config.getObject("options").unwrapped.asScala.map(x => x._1 -> x._2.toString).toMap diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala index 44a296cb..50a0f6eb 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala @@ -162,16 +162,20 @@ case class MySQLSourceConfigEntry(override val category: SourceCategory.Value, * * @param server * @param topic + * @param startingOffsets + * @param maxOffsetsPerTrigger */ case class KafkaSourceConfigEntry(override val category: SourceCategory.Value, override val intervalSeconds: Int, server: String, - topic: String) + topic: String, + startingOffsets: String, + maxOffsetsPerTrigger: Option[Long]=None) extends StreamingDataSourceConfigEntry { require(server.trim.nonEmpty && topic.trim.nonEmpty) override def toString: String = { - s"Kafka source server: ${server} topic:${topic}" + s"Kafka source server: ${server} topic:${topic} startingOffsets:${startingOffsets} maxOffsetsPerTrigger:${maxOffsetsPerTrigger}" } } diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 7e7cb900..d9e8cdf6 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -274,12 +274,59 @@ class EdgeProcessor(data: DataFrame, } } } else { + val streamFlag = data.isStreaming val edgeFrame = data + .filter { row => //filter and check row data,if streaming only print log + val sourceField = if (!edgeConfig.isGeo) { + val sourceIndex = row.schema.fieldIndex(edgeConfig.sourceField) + if (sourceIndex < 0 || row.isNullAt(sourceIndex)) { + printChoice(streamFlag, s"source vertexId must exist and cannot be null, your row data is $row") + None + } else Some(row.get(sourceIndex).toString) + } else { + val lat = row.getDouble(row.schema.fieldIndex(edgeConfig.latitude.get)) + val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get)) + Some(indexCells(lat, lng).mkString(",")) + } + + val sourceFlag = sourceField.isDefined + val sourcePolicyFlag = if (sourceFlag && edgeConfig.sourcePolicy.isEmpty && !isVidStringType + && !NebulaUtils.isNumic(sourceField.get)) { + printChoice(streamFlag, s"space vidType is int, but your srcId $sourceField is not numeric.your row data is $row") + false + } else if (sourceFlag && edgeConfig.sourcePolicy.isDefined && isVidStringType) { + printChoice(streamFlag, s"only int vidType can use policy, but your vidType is FIXED_STRING.your row data is $row") + false + } else true + + val targetIndex = row.schema.fieldIndex(edgeConfig.targetField) + val targetFlag = if (targetIndex < 0 || row.isNullAt(targetIndex)) { + printChoice(streamFlag, s"target vertexId must exist and cannot be null, your row data is $row") + false + } else { + val targetField = row.get(targetIndex).toString + if (edgeConfig.targetPolicy.isEmpty && !isVidStringType && !NebulaUtils.isNumic(targetField)) { + printChoice(streamFlag, s"space vidType is int, but your dstId $targetField is not numeric.your row data is $row") + false + } else if (edgeConfig.targetPolicy.isDefined && isVidStringType) { + printChoice(streamFlag, s"only int vidType can use policy, but your vidType is FIXED_STRING.your row data is $row") + false + } else true + } + + val edgeRankFlag = if (edgeConfig.rankingField.isDefined) { + val index = row.schema.fieldIndex(edgeConfig.rankingField.get) + val ranking = row.get(index).toString + if (!NebulaUtils.isNumic(ranking)) { + printChoice(streamFlag, s"Not support non-Numeric type for ranking field.your row data is $row") + false + } else true + } else true + sourceFlag && sourcePolicyFlag && targetFlag && edgeRankFlag + } .map { row => var sourceField = if (!edgeConfig.isGeo) { val sourceIndex = row.schema.fieldIndex(edgeConfig.sourceField) - assert(sourceIndex >= 0 && !row.isNullAt(sourceIndex), - s"source vertexId must exist and cannot be null, your row data is $row") val value = row.get(sourceIndex).toString if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value } else { @@ -287,36 +334,17 @@ class EdgeProcessor(data: DataFrame, val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get)) indexCells(lat, lng).mkString(",") } - - if (edgeConfig.sourcePolicy.isEmpty) { - // process string type vid - if (isVidStringType) { - sourceField = NebulaUtils.escapeUtil(sourceField).mkString("\"", "", "\"") - } else { - assert(NebulaUtils.isNumic(sourceField), - s"space vidType is int, but your srcId $sourceField is not numeric.") - } - } else { - assert(!isVidStringType, - "only int vidType can use policy, but your vidType is FIXED_STRING.") + // process string type vid + if (edgeConfig.sourcePolicy.isEmpty && isVidStringType) { + sourceField = NebulaUtils.escapeUtil(sourceField).mkString("\"", "", "\"") } val targetIndex = row.schema.fieldIndex(edgeConfig.targetField) - assert(targetIndex >= 0 && !row.isNullAt(targetIndex), - s"target vertexId must exist and cannot be null, your row data is $row") var targetField = row.get(targetIndex).toString if (targetField.equals(DEFAULT_EMPTY_VALUE)) targetField = "" - if (edgeConfig.targetPolicy.isEmpty) { - // process string type vid - if (isVidStringType) { - targetField = NebulaUtils.escapeUtil(targetField).mkString("\"", "", "\"") - } else { - assert(NebulaUtils.isNumic(targetField), - s"space vidType is int, but your dstId $targetField is not numeric.") - } - } else { - assert(!isVidStringType, - "only int vidType can use policy, but your vidType is FIXED_STRING.") + // process string type vid + if (edgeConfig.targetPolicy.isEmpty && isVidStringType) { + targetField = NebulaUtils.escapeUtil(targetField).mkString("\"", "", "\"") } val values = for { @@ -326,8 +354,6 @@ class EdgeProcessor(data: DataFrame, if (edgeConfig.rankingField.isDefined) { val index = row.schema.fieldIndex(edgeConfig.rankingField.get) val ranking = row.get(index).toString - assert(NebulaUtils.isNumic(ranking), s"Not support non-Numeric type for ranking field") - Edge(sourceField, targetField, Some(ranking.toLong), values) } else { Edge(sourceField, targetField, None, values) @@ -335,10 +361,13 @@ class EdgeProcessor(data: DataFrame, }(Encoders.kryo[Edge]) // streaming write - if (data.isStreaming) { + if (streamFlag) { val streamingDataSourceConfig = edgeConfig.dataSourceConfigEntry.asInstanceOf[StreamingDataSourceConfigEntry] - edgeFrame.writeStream + val wStream = edgeFrame.writeStream + if (edgeConfig.checkPointPath.isDefined) wStream.option("checkpointLocation", edgeConfig.checkPointPath.get) + + wStream .foreachBatch((edges, batchId) => { LOG.info(s"${edgeConfig.name} edge start batch ${batchId}.") edges.foreachPartition(processEachPartition _) diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala index 7dd7ede4..65cc1004 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala @@ -5,21 +5,10 @@ package com.vesoft.nebula.exchange.processor -import com.vesoft.nebula.{ - Coordinate, - Date, - DateTime, - Geography, - LineString, - NullType, - Point, - Polygon, - PropertyType, - Time, - Value -} +import com.vesoft.nebula.{Coordinate, Date, DateTime, Geography, LineString, NullType, Point, Polygon, PropertyType, Time, Value} import com.vesoft.nebula.exchange.utils.NebulaUtils.DEFAULT_EMPTY_VALUE import com.vesoft.nebula.exchange.utils.{HDFSUtils, NebulaUtils} +import org.apache.log4j.Logger import org.apache.spark.sql.Row import org.apache.spark.sql.types.{IntegerType, LongType, StringType} @@ -33,6 +22,9 @@ import scala.collection.mutable.ListBuffer */ trait Processor extends Serializable { + @transient + private[this] lazy val LOG = Logger.getLogger(this.getClass) + /** * process dataframe to vertices or edges */ @@ -230,4 +222,9 @@ trait Processor extends Serializable { } } } + + def printChoice(streamFlag: Boolean, context: String): Unit = { + if (streamFlag) LOG.info(context) + else assert(assertion = false, context) + } } diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index b646b2d7..e7b5ccc0 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -229,42 +229,46 @@ class VerticesProcessor(data: DataFrame, } } } else { + val streamFlag = data.isStreaming val vertices = data + .filter { row => //filter and check row data,if streaming only print log + val index = row.schema.fieldIndex(tagConfig.vertexField) + if (index < 0 || row.isNullAt(index)) { + printChoice(streamFlag, s"vertexId must exist and cannot be null, your row data is $row") + false + } else { + val vertexId = row.get(index).toString + // process int type vid + if (tagConfig.vertexPolicy.isEmpty && !isVidStringType && !NebulaUtils.isNumic(vertexId)) { + printChoice(streamFlag, s"space vidType is int, but your vertex id $vertexId is not numeric.your row data is $row") + false + } else if (tagConfig.vertexPolicy.isDefined && isVidStringType) { + printChoice(streamFlag, s"only int vidType can use policy, but your vidType is FIXED_STRING.your row data is $row") + false + } else true + } + } .map { row => - val vertexID = { - val index = row.schema.fieldIndex(tagConfig.vertexField) - assert(index >= 0 && !row.isNullAt(index), - s"vertexId must exist and cannot be null, your row data is $row") - var value = row.get(index).toString - if (value.equals(DEFAULT_EMPTY_VALUE)) { value = "" } - if (tagConfig.vertexPolicy.isEmpty) { - // process string type vid - if (isVidStringType) { - NebulaUtils.escapeUtil(value).mkString("\"", "", "\"") - } else { - // process int type vid - assert(NebulaUtils.isNumic(value), - s"space vidType is int, but your vertex id $value is not numeric.") - value - } - } else { - assert(!isVidStringType, - "only int vidType can use policy, but your vidType is FIXED_STRING.") - value - } + val index = row.schema.fieldIndex(tagConfig.vertexField) + var vertexId = row.get(index).toString + if (vertexId.equals(DEFAULT_EMPTY_VALUE)) { + vertexId = "" } val values = for { property <- fieldKeys if property.trim.length != 0 } yield extraValueForClient(row, property, fieldTypeMap) - Vertex(vertexID, values) + Vertex(vertexId, values) }(Encoders.kryo[Vertex]) // streaming write - if (data.isStreaming) { + if (streamFlag) { val streamingDataSourceConfig = tagConfig.dataSourceConfigEntry.asInstanceOf[StreamingDataSourceConfigEntry] - vertices.writeStream + val wStream = vertices.writeStream + if (tagConfig.checkPointPath.isDefined) wStream.option("checkpointLocation", tagConfig.checkPointPath.get) + + wStream .foreachBatch((vertexSet, batchId) => { LOG.info(s"${tagConfig.name} tag start batch ${batchId}.") vertexSet.foreachPartition(processEachPartition _) diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala index 332060ab..874fbaaa 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala @@ -6,10 +6,8 @@ package com.vesoft.nebula.exchange.reader import com.vesoft.nebula.exchange.config.{KafkaSourceConfigEntry, PulsarSourceConfigEntry} -import org.apache.spark.sql.{DataFrame, SparkSession, Row, Encoders} -import com.alibaba.fastjson.{JSON, JSONObject} -import org.apache.spark.sql.types.{StructField, StructType, StringType} -import org.apache.spark.sql.functions.{from_json,col} +import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.{DataFrame, SparkSession} /** * Spark Streaming @@ -27,6 +25,7 @@ abstract class StreamingBaseReader(override val session: SparkSession) extends R * * @param session * @param kafkaConfig + * @param targetFields */ class KafkaReader(override val session: SparkSession, kafkaConfig: KafkaSourceConfigEntry, @@ -36,18 +35,24 @@ class KafkaReader(override val session: SparkSession, require(kafkaConfig.server.trim.nonEmpty && kafkaConfig.topic.trim.nonEmpty && targetFields.nonEmpty) override def read(): DataFrame = { + import org.apache.spark.sql.functions._ import session.implicits._ val fields = targetFields.distinct - val jsonSchema = StructType(fields.map(field => StructField(field, StringType, true))) - session.readStream - .format("kafka") - .option("kafka.bootstrap.servers", kafkaConfig.server) - .option("subscribe", kafkaConfig.topic) - .load() - .selectExpr("CAST(value AS STRING)") - .as[(String)] - .withColumn("value", from_json(col("value"), jsonSchema)) - .select("value.*") + val reader = + session.readStream + .format("kafka") + .option("kafka.bootstrap.servers", kafkaConfig.server) + .option("subscribe", kafkaConfig.topic) + .option("startingOffsets", kafkaConfig.startingOffsets) + + val maxOffsetsPerTrigger = kafkaConfig.maxOffsetsPerTrigger + if(maxOffsetsPerTrigger.isDefined) reader.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger.get) + + reader.load() + .select($"value".cast(StringType)) + .select($"value", json_tuple($"value", fields: _*)) + .toDF("value" :: fields: _*) + } } From 7de738983c0775f708a373ee2369f255c40b859b Mon Sep 17 00:00:00 2001 From: riverzzz <455405955@qq.com> Date: Thu, 16 Dec 2021 22:01:28 +0800 Subject: [PATCH 2/3] =?UTF-8?q?master=20=E2=86=92=20origin/master=20cb39a3?= =?UTF-8?q?e0(Merge=20branch=20'master'=20of=20https://github.com/riverzzz?= =?UTF-8?q?/nebula-exchange)=206587cbd6(Merge=20branch=20'master'=20of=20h?= =?UTF-8?q?ttps://github.com/riverzzz/nebula-exchange)=2053e37ee7(Merge=20?= =?UTF-8?q?branch=20'master'=20of=20https://github.com/riverzzz/nebula-exc?= =?UTF-8?q?hange)=20261677e2(=E5=A2=9E=E5=8A=A0fields=EF=BC=8C=E8=A7=A3?= =?UTF-8?q?=E6=9E=90kafkajson=E6=95=B0=E6=8D=AE)=2047eea8e5(Merge=20branch?= =?UTF-8?q?=20'master'=20of=20https://github.com/riverzzz/nebula-spark-uti?= =?UTF-8?q?ls)=2024ae4250(Merge=20pull=20request=20#166=20from=20Nicole00/?= =?UTF-8?q?readme)=20a8fd6623(Merge=20branch=20'master'=20into=20readme)?= =?UTF-8?q?=206c2700f4(Merge=20pull=20request=20#164=20from=20Thericecooke?= =?UTF-8?q?rs/master)=20d1724aa5(format)=20a13c2415(format)=206196e6d8(add?= =?UTF-8?q?=20repo=20transfer=20note)=20f41f41dc(Merge=20branch=20'master'?= =?UTF-8?q?=20into=20master)=204cd75f07(Merge=20pull=20request=20#165=20fr?= =?UTF-8?q?om=20Nicole00/louvain)=208a837127(Merge=20branch=20'master'=20i?= =?UTF-8?q?nto=20louvain)=2040dbe339(fix=20louvain's=20result=20format)=20?= =?UTF-8?q?138a1a11(bugfix:=20Reverse=20edge=20has=20wrong=20partitionId)?= =?UTF-8?q?=204d184f1e(=E5=88=A0=E9=99=A4=E5=A4=9A=E4=BD=99=E7=9A=84pom?= =?UTF-8?q?=E4=BE=9D=E8=B5=96=E9=81=BF=E5=85=8D=E6=8A=A5=E9=94=99)=2093a8f?= =?UTF-8?q?f74(=E6=94=AF=E6=8C=81=E8=A7=A3=E6=9E=90kafka=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=EF=BC=8C=E6=94=AF=E6=8C=81=E9=85=8D=E7=BD=AEoffset=E3=80=81?= =?UTF-8?q?=E6=8B=89=E5=8F=96=E9=99=90=E5=88=B6=20=E6=B5=81=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=92=8C=E7=A6=BB=E7=BA=BF=E5=AF=BC=E5=85=A5=E8=A7=A3?= =?UTF-8?q?=E6=9E=90=E5=85=BC=E5=AE=B9=20pom=E9=85=8D=E7=BD=AE=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E6=B5=81=E5=A4=84=E7=90=86)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vesoft/nebula/exchange/processor/VerticesProcessor.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index e7b5ccc0..0507577d 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -255,6 +255,10 @@ class VerticesProcessor(data: DataFrame, vertexId = "" } + if (tagConfig.vertexPolicy.isEmpty && isVidStringType){ + vertexId = NebulaUtils.escapeUtil(vertexId).mkString("\"", "", "\"") + } + val values = for { property <- fieldKeys if property.trim.length != 0 } yield extraValueForClient(row, property, fieldTypeMap) From 14711662efa73f80b039ce0fe26052687353ef44 Mon Sep 17 00:00:00 2001 From: riverzzz <455405955@qq.com> Date: Wed, 22 Dec 2021 20:21:55 +0800 Subject: [PATCH 3/3] format the code style remove value data extract function --- .../nebula/exchange/config/Configs.scala | 40 +++---- .../exchange/config/SourceConfigs.scala | 4 +- .../exchange/processor/EdgeProcessor.scala | 103 ++++++++---------- .../exchange/reader/StreamingBaseReader.scala | 4 +- 4 files changed, 71 insertions(+), 80 deletions(-) diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala index ddbb7544..41a041d6 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala @@ -231,26 +231,26 @@ case class Configs(databaseConfig: DataBaseConfigEntry, object Configs { private[this] val LOG = Logger.getLogger(this.getClass) - private[this] val DEFAULT_CONNECTION_TIMEOUT = Integer.MAX_VALUE - private[this] val DEFAULT_CONNECTION_RETRY = 3 - private[this] val DEFAULT_EXECUTION_RETRY = 3 - private[this] val DEFAULT_EXECUTION_TIMEOUT = Integer.MAX_VALUE - private[this] val DEFAULT_EXECUTION_INTERVAL = 3000 - private[this] val DEFAULT_ERROR_OUTPUT_PATH = "/tmp/nebula.writer.errors/" - private[this] val DEFAULT_ERROR_MAX_BATCH_SIZE = Int.MaxValue - private[this] val DEFAULT_RATE_LIMIT = 1024 - private[this] val DEFAULT_RATE_TIMEOUT = 100 - private[this] val DEFAULT_ENABLE_SSL = false - private[this] val DEFAULT_SSL_SIGN_TYPE = "CA" - private[this] val DEFAULT_EDGE_RANKING = 0L - private[this] val DEFAULT_BATCH = 2 - private[this] val DEFAULT_PARTITION = -1 - private[this] val DEFAULT_CHECK_POINT_PATH = None - private[this] val DEFAULT_LOCAL_PATH = None - private[this] val DEFAULT_REMOTE_PATH = None - private[this] val DEFAULT_STREAM_INTERVAL = 30 - private[this] val DEFAULT_KAFKA_STARTINGOFFSETS = "latest" - private[this] val DEFAULT_PARALLEL = 1 + private[this] val DEFAULT_CONNECTION_TIMEOUT = Integer.MAX_VALUE + private[this] val DEFAULT_CONNECTION_RETRY = 3 + private[this] val DEFAULT_EXECUTION_RETRY = 3 + private[this] val DEFAULT_EXECUTION_TIMEOUT = Integer.MAX_VALUE + private[this] val DEFAULT_EXECUTION_INTERVAL = 3000 + private[this] val DEFAULT_ERROR_OUTPUT_PATH = "/tmp/nebula.writer.errors/" + private[this] val DEFAULT_ERROR_MAX_BATCH_SIZE = Int.MaxValue + private[this] val DEFAULT_RATE_LIMIT = 1024 + private[this] val DEFAULT_RATE_TIMEOUT = 100 + private[this] val DEFAULT_ENABLE_SSL = false + private[this] val DEFAULT_SSL_SIGN_TYPE = "CA" + private[this] val DEFAULT_EDGE_RANKING = 0L + private[this] val DEFAULT_BATCH = 2 + private[this] val DEFAULT_PARTITION = -1 + private[this] val DEFAULT_CHECK_POINT_PATH = None + private[this] val DEFAULT_LOCAL_PATH = None + private[this] val DEFAULT_REMOTE_PATH = None + private[this] val DEFAULT_STREAM_INTERVAL = 30 + private[this] val DEFAULT_KAFKA_STARTINGOFFSETS = "latest" + private[this] val DEFAULT_PARALLEL = 1 /** * diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala index 50a0f6eb..2a21c1ed 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala @@ -162,8 +162,8 @@ case class MySQLSourceConfigEntry(override val category: SourceCategory.Value, * * @param server * @param topic - * @param startingOffsets - * @param maxOffsetsPerTrigger + * @param startingOffsets + * @param maxOffsetsPerTrigger */ case class KafkaSourceConfigEntry(override val category: SourceCategory.Value, override val intervalSeconds: Int, diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index d9e8cdf6..425ae802 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -277,42 +277,9 @@ class EdgeProcessor(data: DataFrame, val streamFlag = data.isStreaming val edgeFrame = data .filter { row => //filter and check row data,if streaming only print log - val sourceField = if (!edgeConfig.isGeo) { - val sourceIndex = row.schema.fieldIndex(edgeConfig.sourceField) - if (sourceIndex < 0 || row.isNullAt(sourceIndex)) { - printChoice(streamFlag, s"source vertexId must exist and cannot be null, your row data is $row") - None - } else Some(row.get(sourceIndex).toString) - } else { - val lat = row.getDouble(row.schema.fieldIndex(edgeConfig.latitude.get)) - val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get)) - Some(indexCells(lat, lng).mkString(",")) - } - - val sourceFlag = sourceField.isDefined - val sourcePolicyFlag = if (sourceFlag && edgeConfig.sourcePolicy.isEmpty && !isVidStringType - && !NebulaUtils.isNumic(sourceField.get)) { - printChoice(streamFlag, s"space vidType is int, but your srcId $sourceField is not numeric.your row data is $row") - false - } else if (sourceFlag && edgeConfig.sourcePolicy.isDefined && isVidStringType) { - printChoice(streamFlag, s"only int vidType can use policy, but your vidType is FIXED_STRING.your row data is $row") - false - } else true + val sourceFlag = checkField(edgeConfig.sourceField, "source_field", row, edgeConfig.sourcePolicy, streamFlag, isVidStringType) - val targetIndex = row.schema.fieldIndex(edgeConfig.targetField) - val targetFlag = if (targetIndex < 0 || row.isNullAt(targetIndex)) { - printChoice(streamFlag, s"target vertexId must exist and cannot be null, your row data is $row") - false - } else { - val targetField = row.get(targetIndex).toString - if (edgeConfig.targetPolicy.isEmpty && !isVidStringType && !NebulaUtils.isNumic(targetField)) { - printChoice(streamFlag, s"space vidType is int, but your dstId $targetField is not numeric.your row data is $row") - false - } else if (edgeConfig.targetPolicy.isDefined && isVidStringType) { - printChoice(streamFlag, s"only int vidType can use policy, but your vidType is FIXED_STRING.your row data is $row") - false - } else true - } + val targetFlag = checkField(edgeConfig.targetField, "target_field", row, edgeConfig.targetPolicy, streamFlag, isVidStringType) val edgeRankFlag = if (edgeConfig.rankingField.isDefined) { val index = row.schema.fieldIndex(edgeConfig.rankingField.get) @@ -322,30 +289,12 @@ class EdgeProcessor(data: DataFrame, false } else true } else true - sourceFlag && sourcePolicyFlag && targetFlag && edgeRankFlag + sourceFlag && targetFlag && edgeRankFlag } .map { row => - var sourceField = if (!edgeConfig.isGeo) { - val sourceIndex = row.schema.fieldIndex(edgeConfig.sourceField) - val value = row.get(sourceIndex).toString - if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value - } else { - val lat = row.getDouble(row.schema.fieldIndex(edgeConfig.latitude.get)) - val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get)) - indexCells(lat, lng).mkString(",") - } - // process string type vid - if (edgeConfig.sourcePolicy.isEmpty && isVidStringType) { - sourceField = NebulaUtils.escapeUtil(sourceField).mkString("\"", "", "\"") - } + val sourceField = processField(edgeConfig.sourceField, "source_field", row, edgeConfig.sourcePolicy, isVidStringType) - val targetIndex = row.schema.fieldIndex(edgeConfig.targetField) - var targetField = row.get(targetIndex).toString - if (targetField.equals(DEFAULT_EMPTY_VALUE)) targetField = "" - // process string type vid - if (edgeConfig.targetPolicy.isEmpty && isVidStringType) { - targetField = NebulaUtils.escapeUtil(targetField).mkString("\"", "", "\"") - } + val targetField = processField(edgeConfig.targetField, "target_field", row, edgeConfig.targetPolicy, isVidStringType) val values = for { property <- fieldKeys if property.trim.length != 0 @@ -386,4 +335,46 @@ class EdgeProcessor(data: DataFrame, for (index <- DEFAULT_MIN_CELL_LEVEL to DEFAULT_MAX_CELL_LEVEL) yield s2CellId.parent(index).id() } + + private[this] def checkField(field: String, fieldType: String, row: Row, policy: Option[KeyPolicy.Value], streamFlag: Boolean, isVidStringType: Boolean): Boolean = { + val fieldValue = if (edgeConfig.isGeo && "source_field".equals(fieldType)) { + val lat = row.getDouble(row.schema.fieldIndex(edgeConfig.latitude.get)) + val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get)) + Some(indexCells(lat, lng).mkString(",")) + } else { + val index = row.schema.fieldIndex(field) + if (index < 0 || row.isNullAt(index)) { + printChoice(streamFlag, s"$fieldType must exist and cannot be null, your row data is $row") + None + } else Some(row.get(index).toString) + } + + val idFlag = fieldValue.isDefined + val policyFlag = if (idFlag && policy.isEmpty && !isVidStringType + && !NebulaUtils.isNumic(fieldValue.get)) { + printChoice(streamFlag, s"space vidType is int, but your $fieldType $fieldValue is not numeric.your row data is $row") + false + } else if (idFlag && policy.isDefined && isVidStringType) { + printChoice(streamFlag, s"only int vidType can use policy, but your vidType is FIXED_STRING.your row data is $row") + false + } else true + idFlag && policyFlag + } + + private[this] def processField(field: String, fieldType: String, row: Row, policy: Option[KeyPolicy.Value], isVidStringType: Boolean): String = { + var fieldValue = if (edgeConfig.isGeo && "source_field".equals(fieldType)) { + val lat = row.getDouble(row.schema.fieldIndex(edgeConfig.latitude.get)) + val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get)) + indexCells(lat, lng).mkString(",") + } else { + val index = row.schema.fieldIndex(field) + val value = row.get(index).toString + if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value + } + // process string type vid + if (policy.isEmpty && isVidStringType) { + fieldValue = NebulaUtils.escapeUtil(fieldValue).mkString("\"", "", "\"") + } + fieldValue + } } diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala index 874fbaaa..25c8fd50 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala @@ -50,8 +50,8 @@ class KafkaReader(override val session: SparkSession, reader.load() .select($"value".cast(StringType)) - .select($"value", json_tuple($"value", fields: _*)) - .toDF("value" :: fields: _*) + .select(json_tuple($"value", fields: _*)) + .toDF(fields: _*) } }