From e57e443845abbdf56c378659b667a12b0fa3ebeb Mon Sep 17 00:00:00 2001
From: Nicole00 <16240361+Nicole00@users.noreply.github.com>
Date: Wed, 28 Jul 2021 11:22:40 +0800
Subject: [PATCH 1/7] remove useless repository
---
nebula-exchange/pom.xml | 11 ++++-------
1 file changed, 4 insertions(+), 7 deletions(-)
diff --git a/nebula-exchange/pom.xml b/nebula-exchange/pom.xml
index 13cd01ab..cb92bc5e 100644
--- a/nebula-exchange/pom.xml
+++ b/nebula-exchange/pom.xml
@@ -141,7 +141,8 @@
javax.inject:javax.inject
org.spark-project.hive:hive-exec
stax:stax-api
- org.glassfish.hk2.external:aopalliance-repackaged
+ org.glassfish.hk2.external:aopalliance-repackaged
+
@@ -209,7 +210,8 @@
maven-javadoc-plugin
3.2.0
- com.facebook.thrift:com.facebook.thrift.*
+ com.facebook.thrift:com.facebook.thrift.*
+
@@ -721,11 +723,6 @@
SparkPackagesRepo
https://repos.spark-packages.org
-
- bintray-streamnative-maven
- bintray
- https://dl.bintray.com/streamnative/maven
-
snapshots
https://oss.sonatype.org/content/repositories/snapshots/
From a33b2080fc58a6b09df47d386866e09fabdf5c38 Mon Sep 17 00:00:00 2001
From: Nicole00 <16240361+Nicole00@users.noreply.github.com>
Date: Wed, 28 Jul 2021 11:59:38 +0800
Subject: [PATCH 2/7] add pulsar dependency
---
.github/workflows/maven.yml | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 3154bc7a..84762c72 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -30,12 +30,14 @@ jobs:
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-maven-
- - name: download neo4j-contrib & graphframes dependency
+ - name: download neo4j-contrib & graphframes & pulsar-spark-connector dependency
run: |
wget https://oss-cdn.nebula-graph.com.cn/jar-packages/neo4j-contrib.zip
wget https://oss-cdn.nebula-graph.com.cn/jar-packages/graphframes.zip
+ wget https://oss-cdn.nebula-graph.com.cn/jar-packages/streamnative.zip
unzip -o -d ~/.m2/repository/ neo4j-contrib.zip
unzip -o -d ~/.m2/repository/ graphframes.zip
+ unzip -o -d ~/.m2/repository/io streamnative.zip
- name: Install nebula-graph
run: |
From 3abe0fba32b68c937fda458c2d22ac57a69a8dc2 Mon Sep 17 00:00:00 2001
From: Nicole00 <16240361+Nicole00@users.noreply.github.com>
Date: Mon, 2 Aug 2021 09:45:43 +0800
Subject: [PATCH 3/7] add / for directory
---
.github/workflows/maven.yml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 84762c72..0bf453e0 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -37,7 +37,7 @@ jobs:
wget https://oss-cdn.nebula-graph.com.cn/jar-packages/streamnative.zip
unzip -o -d ~/.m2/repository/ neo4j-contrib.zip
unzip -o -d ~/.m2/repository/ graphframes.zip
- unzip -o -d ~/.m2/repository/io streamnative.zip
+ unzip -o -d ~/.m2/repository/io/ streamnative.zip
- name: Install nebula-graph
run: |
From a183ef0d6d1646b483cea1cd9a436d02588aad92 Mon Sep 17 00:00:00 2001
From: Nicole00 <16240361+Nicole00@users.noreply.github.com>
Date: Mon, 2 Aug 2021 14:57:21 +0800
Subject: [PATCH 4/7] remove streamnative before unzip
---
.github/workflows/maven.yml | 1 +
1 file changed, 1 insertion(+)
diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 0bf453e0..059388d4 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -37,6 +37,7 @@ jobs:
wget https://oss-cdn.nebula-graph.com.cn/jar-packages/streamnative.zip
unzip -o -d ~/.m2/repository/ neo4j-contrib.zip
unzip -o -d ~/.m2/repository/ graphframes.zip
+ rm -rf ~/.m2/repository/io/streamnative
unzip -o -d ~/.m2/repository/io/ streamnative.zip
- name: Install nebula-graph
From 25cf0bbc636b7a7ec88d649bfc3f72288ae89220 Mon Sep 17 00:00:00 2001
From: Nicole00 <16240361+Nicole00@users.noreply.github.com>
Date: Mon, 2 Aug 2021 16:19:11 +0800
Subject: [PATCH 5/7] format example
---
.../examples/connector/NebulaSparkWriterExample.scala | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git a/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkWriterExample.scala b/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkWriterExample.scala
index 22319de9..d62c2ddf 100644
--- a/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkWriterExample.scala
+++ b/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkWriterExample.scala
@@ -34,14 +34,13 @@ object NebulaSparkWriterExample {
.config(sparkConf)
.getOrCreate()
-// writeVertex(spark)
-// writeEdge(spark)
+ writeVertex(spark)
+ writeEdge(spark)
updateVertex(spark)
- //updateEdge(spark)
+ updateEdge(spark)
spark.close()
- sys.exit()
}
/**
From 994dd7855b1880f06b092517692f02040a5f8fba Mon Sep 17 00:00:00 2001
From: Nicole00 <16240361+Nicole00@users.noreply.github.com>
Date: Mon, 2 Aug 2021 16:19:40 +0800
Subject: [PATCH 6/7] add close for writer & add test plugin
---
nebula-spark-connector/pom.xml | 32 ++++++++++++++++---
.../nebula/connector/NebulaConfig.scala | 4 +++
.../connector/writer/NebulaEdgeWriter.scala | 1 +
.../connector/writer/NebulaVertexWriter.scala | 1 +
.../writer/NebulaExecutorSuite.scala | 4 +--
5 files changed, 36 insertions(+), 6 deletions(-)
diff --git a/nebula-spark-connector/pom.xml b/nebula-spark-connector/pom.xml
index 35854dfd..a946aa44 100644
--- a/nebula-spark-connector/pom.xml
+++ b/nebula-spark-connector/pom.xml
@@ -117,7 +117,8 @@
javax.inject:javax.inject
org.spark-project.hive:hive-exec
stax:stax-api
- org.glassfish.hk2.external:aopalliance-repackaged
+ org.glassfish.hk2.external:aopalliance-repackaged
+
@@ -150,13 +151,36 @@
-
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ **/*Test.*
+ **/*Suite.*
+
+
+
+
+ org.scalatest
+ scalatest-maven-plugin
+ 2.0.0
+
+
+ test
+
+ test
+
+
+
+
org.apache.maven.plugins
maven-javadoc-plugin
3.2.0
- com.facebook.thrift:com.facebook.thrift.*
+ com.facebook.thrift:com.facebook.thrift.*
+
@@ -179,4 +203,4 @@
-
\ No newline at end of file
+
diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala
index f20d58a2..83d66271 100644
--- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala
+++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala
@@ -226,6 +226,8 @@ object WriteNebulaVertexConfig {
|| vidPolicy.equalsIgnoreCase(KeyPolicy.UUID.toString),
"config vidPolicy is illegal, please don't set vidPolicy or set vidPolicy \"HASH\" or \"UUID\""
)
+ assert(user != null && !user.isEmpty, "user is empty")
+ assert(passwd != null && !passwd.isEmpty, "passwd is empty")
try {
WriteMode.withName(writeMode.toLowerCase())
} catch {
@@ -442,6 +444,8 @@ object WriteNebulaEdgeConfig {
"config dstPolicy is illegal, please don't set dstPolicy or set dstPolicy \"HASH\" or \"UUID\""
)
assert(batch > 0, s"config batch must be positive, your batch is $batch.")
+ assert(user != null && !user.isEmpty, "user is empty")
+ assert(passwd != null && !passwd.isEmpty, "passwd is empty")
try {
WriteMode.withName(writeMode.toLowerCase)
} catch {
diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaEdgeWriter.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaEdgeWriter.scala
index cde3b7f1..7dfe2834 100644
--- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaEdgeWriter.scala
+++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaEdgeWriter.scala
@@ -93,6 +93,7 @@ class NebulaEdgeWriter(nebulaOptions: NebulaOptions,
if (edges.nonEmpty) {
execute()
}
+ graphProvider.close()
NebulaCommitMessage.apply(failedExecs.toList)
}
diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaVertexWriter.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaVertexWriter.scala
index 1039e910..c6637657 100644
--- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaVertexWriter.scala
+++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaVertexWriter.scala
@@ -73,6 +73,7 @@ class NebulaVertexWriter(nebulaOptions: NebulaOptions, vertexIndex: Int, schema:
if (vertices.nonEmpty) {
execute()
}
+ graphProvider.close()
NebulaCommitMessage(failedExecs.toList)
}
diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/NebulaExecutorSuite.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/NebulaExecutorSuite.scala
index b9400823..c273a5b6 100644
--- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/NebulaExecutorSuite.scala
+++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/NebulaExecutorSuite.scala
@@ -186,7 +186,7 @@ class NebulaExecutorSuite extends AnyFunSuite with BeforeAndAfterAll {
NebulaExecutor.toUpdateExecuteStatement("person", propNames, nebulaVertex)
val expectVertexUpdate =
"UPDATE VERTEX ON `person` \"vid1\" SET `col_string`=\"name\",`col_fixed_string`=\"name\"," +
- "`col_bool`=true,`col_int`=10,`col_int64`=100,`col_double`=1.0,`col_date`=2021-11-12;"
+ "`col_bool`=true,`col_int`=10,`col_int64`=100,`col_double`=1.0,`col_date`=2021-11-12"
assert(expectVertexUpdate.equals(updateVertexStatement))
}
@@ -209,7 +209,7 @@ class NebulaExecutorSuite extends AnyFunSuite with BeforeAndAfterAll {
val expectEdgeUpdate =
"UPDATE EDGE ON `friend` \"source\"->\"target\"@0 SET `col_string`=\"name\"," +
"`col_fixed_string`=\"name\",`col_bool`=true,`col_int`=10,`col_int64`=100," +
- "`col_double`=1.0,`col_date`=2021-11-12;"
+ "`col_double`=1.0,`col_date`=2021-11-12"
assert(expectEdgeUpdate.equals(updateEdgeStatement))
}
}
From 947fd097fdd781185e82d08409cfe71d1bf9f79c Mon Sep 17 00:00:00 2001
From: Nicole00 <16240361+Nicole00@users.noreply.github.com>
Date: Mon, 2 Aug 2021 16:20:19 +0800
Subject: [PATCH 7/7] add log for switch error & remove sys.exit
---
.../src/main/scala/com/vesoft/nebula/exchange/Exchange.scala | 1 -
.../main/scala/com/vesoft/nebula/exchange/GraphProvider.scala | 4 ++--
.../com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala | 4 ++--
3 files changed, 4 insertions(+), 5 deletions(-)
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
index 21ffcc7c..9ddb6694 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
@@ -212,7 +212,6 @@ object Exchange {
LOG.info(s"batchFailure.reimport: ${batchFailure.value}")
}
spark.close()
- sys.exit(0)
}
/**
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/GraphProvider.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/GraphProvider.scala
index 84b83076..6404df07 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/GraphProvider.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/GraphProvider.scala
@@ -47,11 +47,11 @@ class GraphProvider(addresses: List[HostAndPort], timeout: Int)
pool.close()
}
- def switchSpace(session: Session, space: String): Boolean = {
+ def switchSpace(session: Session, space: String): ResultSet = {
val switchStatment = s"use $space"
LOG.info(s"switch space $space")
val result = submit(session, switchStatment)
- result.isSucceeded
+ result
}
def submit(session: Session, statement: String): ResultSet = {
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala
index e469dd8a..8dba5205 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala
@@ -137,9 +137,9 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
def prepare(): Unit = {
val switchResult = graphProvider.switchSpace(session, dataBaseConfigEntry.space)
- if (!switchResult) {
+ if (!switchResult.isSucceeded) {
this.close()
- throw new RuntimeException("Switch Failed")
+ throw new RuntimeException("Switch Failed for " + switchResult.getErrorMessage)
}
LOG.info(s"Connection to ${dataBaseConfigEntry.graphAddress}")