diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 3154bc7a..059388d4 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -30,12 +30,15 @@ jobs:
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-maven-
- - name: download neo4j-contrib & graphframes dependency
+ - name: download neo4j-contrib & graphframes & pulsar-spark-connector dependency
run: |
wget https://oss-cdn.nebula-graph.com.cn/jar-packages/neo4j-contrib.zip
wget https://oss-cdn.nebula-graph.com.cn/jar-packages/graphframes.zip
+ wget https://oss-cdn.nebula-graph.com.cn/jar-packages/streamnative.zip
unzip -o -d ~/.m2/repository/ neo4j-contrib.zip
unzip -o -d ~/.m2/repository/ graphframes.zip
+ rm -rf ~/.m2/repository/io/streamnative
+ unzip -o -d ~/.m2/repository/io/ streamnative.zip
- name: Install nebula-graph
run: |
diff --git a/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkWriterExample.scala b/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkWriterExample.scala
index 22319de9..d62c2ddf 100644
--- a/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkWriterExample.scala
+++ b/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkWriterExample.scala
@@ -34,14 +34,13 @@ object NebulaSparkWriterExample {
.config(sparkConf)
.getOrCreate()
-// writeVertex(spark)
-// writeEdge(spark)
+ writeVertex(spark)
+ writeEdge(spark)
updateVertex(spark)
- //updateEdge(spark)
+ updateEdge(spark)
spark.close()
- sys.exit()
}
/**
diff --git a/nebula-exchange/pom.xml b/nebula-exchange/pom.xml
index 13cd01ab..cb92bc5e 100644
--- a/nebula-exchange/pom.xml
+++ b/nebula-exchange/pom.xml
@@ -141,7 +141,8 @@
javax.inject:javax.inject
org.spark-project.hive:hive-exec
stax:stax-api
- org.glassfish.hk2.external:aopalliance-repackaged
+ org.glassfish.hk2.external:aopalliance-repackaged
+
@@ -209,7 +210,8 @@
maven-javadoc-plugin
3.2.0
- com.facebook.thrift:com.facebook.thrift.*
+ com.facebook.thrift:com.facebook.thrift.*
+
@@ -721,11 +723,6 @@
SparkPackagesRepo
https://repos.spark-packages.org
-
- bintray-streamnative-maven
- bintray
- https://dl.bintray.com/streamnative/maven
-
snapshots
https://oss.sonatype.org/content/repositories/snapshots/
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
index 21ffcc7c..9ddb6694 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
@@ -212,7 +212,6 @@ object Exchange {
LOG.info(s"batchFailure.reimport: ${batchFailure.value}")
}
spark.close()
- sys.exit(0)
}
/**
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/GraphProvider.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/GraphProvider.scala
index 84b83076..6404df07 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/GraphProvider.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/GraphProvider.scala
@@ -47,11 +47,11 @@ class GraphProvider(addresses: List[HostAndPort], timeout: Int)
pool.close()
}
- def switchSpace(session: Session, space: String): Boolean = {
+ def switchSpace(session: Session, space: String): ResultSet = {
val switchStatment = s"use $space"
LOG.info(s"switch space $space")
val result = submit(session, switchStatment)
- result.isSucceeded
+ result
}
def submit(session: Session, statement: String): ResultSet = {
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala
index e469dd8a..8dba5205 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala
@@ -137,9 +137,9 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
def prepare(): Unit = {
val switchResult = graphProvider.switchSpace(session, dataBaseConfigEntry.space)
- if (!switchResult) {
+ if (!switchResult.isSucceeded) {
this.close()
- throw new RuntimeException("Switch Failed")
+ throw new RuntimeException("Switch Failed for " + switchResult.getErrorMessage)
}
LOG.info(s"Connection to ${dataBaseConfigEntry.graphAddress}")
diff --git a/nebula-spark-connector/pom.xml b/nebula-spark-connector/pom.xml
index 35854dfd..a946aa44 100644
--- a/nebula-spark-connector/pom.xml
+++ b/nebula-spark-connector/pom.xml
@@ -117,7 +117,8 @@
javax.inject:javax.inject
org.spark-project.hive:hive-exec
stax:stax-api
- org.glassfish.hk2.external:aopalliance-repackaged
+ org.glassfish.hk2.external:aopalliance-repackaged
+
@@ -150,13 +151,36 @@
-
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ **/*Test.*
+ **/*Suite.*
+
+
+
+
+ org.scalatest
+ scalatest-maven-plugin
+ 2.0.0
+
+
+ test
+
+ test
+
+
+
+
org.apache.maven.plugins
maven-javadoc-plugin
3.2.0
- com.facebook.thrift:com.facebook.thrift.*
+ com.facebook.thrift:com.facebook.thrift.*
+
@@ -179,4 +203,4 @@
-
\ No newline at end of file
+
diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala
index f20d58a2..83d66271 100644
--- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala
+++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala
@@ -226,6 +226,8 @@ object WriteNebulaVertexConfig {
|| vidPolicy.equalsIgnoreCase(KeyPolicy.UUID.toString),
"config vidPolicy is illegal, please don't set vidPolicy or set vidPolicy \"HASH\" or \"UUID\""
)
+ assert(user != null && !user.isEmpty, "user is empty")
+ assert(passwd != null && !passwd.isEmpty, "passwd is empty")
try {
WriteMode.withName(writeMode.toLowerCase())
} catch {
@@ -442,6 +444,8 @@ object WriteNebulaEdgeConfig {
"config dstPolicy is illegal, please don't set dstPolicy or set dstPolicy \"HASH\" or \"UUID\""
)
assert(batch > 0, s"config batch must be positive, your batch is $batch.")
+ assert(user != null && !user.isEmpty, "user is empty")
+ assert(passwd != null && !passwd.isEmpty, "passwd is empty")
try {
WriteMode.withName(writeMode.toLowerCase)
} catch {
diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaEdgeWriter.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaEdgeWriter.scala
index cde3b7f1..7dfe2834 100644
--- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaEdgeWriter.scala
+++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaEdgeWriter.scala
@@ -93,6 +93,7 @@ class NebulaEdgeWriter(nebulaOptions: NebulaOptions,
if (edges.nonEmpty) {
execute()
}
+ graphProvider.close()
NebulaCommitMessage.apply(failedExecs.toList)
}
diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaVertexWriter.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaVertexWriter.scala
index 1039e910..c6637657 100644
--- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaVertexWriter.scala
+++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaVertexWriter.scala
@@ -73,6 +73,7 @@ class NebulaVertexWriter(nebulaOptions: NebulaOptions, vertexIndex: Int, schema:
if (vertices.nonEmpty) {
execute()
}
+ graphProvider.close()
NebulaCommitMessage(failedExecs.toList)
}
diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/NebulaExecutorSuite.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/NebulaExecutorSuite.scala
index b9400823..c273a5b6 100644
--- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/NebulaExecutorSuite.scala
+++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/NebulaExecutorSuite.scala
@@ -186,7 +186,7 @@ class NebulaExecutorSuite extends AnyFunSuite with BeforeAndAfterAll {
NebulaExecutor.toUpdateExecuteStatement("person", propNames, nebulaVertex)
val expectVertexUpdate =
"UPDATE VERTEX ON `person` \"vid1\" SET `col_string`=\"name\",`col_fixed_string`=\"name\"," +
- "`col_bool`=true,`col_int`=10,`col_int64`=100,`col_double`=1.0,`col_date`=2021-11-12;"
+ "`col_bool`=true,`col_int`=10,`col_int64`=100,`col_double`=1.0,`col_date`=2021-11-12"
assert(expectVertexUpdate.equals(updateVertexStatement))
}
@@ -209,7 +209,7 @@ class NebulaExecutorSuite extends AnyFunSuite with BeforeAndAfterAll {
val expectEdgeUpdate =
"UPDATE EDGE ON `friend` \"source\"->\"target\"@0 SET `col_string`=\"name\"," +
"`col_fixed_string`=\"name\",`col_bool`=true,`col_int`=10,`col_int64`=100," +
- "`col_double`=1.0,`col_date`=2021-11-12;"
+ "`col_double`=1.0,`col_date`=2021-11-12"
assert(expectEdgeUpdate.equals(updateEdgeStatement))
}
}