Skip to content
This repository has been archived by the owner on Oct 18, 2021. It is now read-only.

Commit

Permalink
Merge pull request #114 from Nicole00/improve
Browse files Browse the repository at this point in the history
remove useless repository & add close for writer
  • Loading branch information
HarrisChu authored Aug 3, 2021
2 parents b5f60c1 + 947fd09 commit dae9337
Show file tree
Hide file tree
Showing 11 changed files with 51 additions and 23 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ jobs:
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-maven-

- name: download neo4j-contrib & graphframes dependency
- name: download neo4j-contrib & graphframes & pulsar-spark-connector dependency
run: |
wget https://oss-cdn.nebula-graph.com.cn/jar-packages/neo4j-contrib.zip
wget https://oss-cdn.nebula-graph.com.cn/jar-packages/graphframes.zip
wget https://oss-cdn.nebula-graph.com.cn/jar-packages/streamnative.zip
unzip -o -d ~/.m2/repository/ neo4j-contrib.zip
unzip -o -d ~/.m2/repository/ graphframes.zip
rm -rf ~/.m2/repository/io/streamnative
unzip -o -d ~/.m2/repository/io/ streamnative.zip
- name: Install nebula-graph
run: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@ object NebulaSparkWriterExample {
.config(sparkConf)
.getOrCreate()

// writeVertex(spark)
// writeEdge(spark)
writeVertex(spark)
writeEdge(spark)

updateVertex(spark)
//updateEdge(spark)
updateEdge(spark)

spark.close()
sys.exit()
}

/**
Expand Down
11 changes: 4 additions & 7 deletions nebula-exchange/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@
<exclude>javax.inject:javax.inject</exclude>
<exclude>org.spark-project.hive:hive-exec</exclude>
<exclude>stax:stax-api</exclude>
<exclude>org.glassfish.hk2.external:aopalliance-repackaged</exclude>
<exclude>org.glassfish.hk2.external:aopalliance-repackaged
</exclude>
</excludes>
</artifactSet>
<filters>
Expand Down Expand Up @@ -209,7 +210,8 @@
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<excludePackageNames>com.facebook.thrift:com.facebook.thrift.*</excludePackageNames>
<excludePackageNames>com.facebook.thrift:com.facebook.thrift.*
</excludePackageNames>
</configuration>
<executions>
<execution>
Expand Down Expand Up @@ -721,11 +723,6 @@
<id>SparkPackagesRepo</id>
<url>https://repos.spark-packages.org</url>
</repository>
<repository>
<id>bintray-streamnative-maven</id>
<name>bintray</name>
<url>https://dl.bintray.com/streamnative/maven</url>
</repository>
<repository>
<id>snapshots</id>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ object Exchange {
LOG.info(s"batchFailure.reimport: ${batchFailure.value}")
}
spark.close()
sys.exit(0)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ class GraphProvider(addresses: List[HostAndPort], timeout: Int)
pool.close()
}

def switchSpace(session: Session, space: String): Boolean = {
def switchSpace(session: Session, space: String): ResultSet = {
val switchStatment = s"use $space"
LOG.info(s"switch space $space")
val result = submit(session, switchStatment)
result.isSucceeded
result
}

def submit(session: Session, statement: String): ResultSet = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,

def prepare(): Unit = {
val switchResult = graphProvider.switchSpace(session, dataBaseConfigEntry.space)
if (!switchResult) {
if (!switchResult.isSucceeded) {
this.close()
throw new RuntimeException("Switch Failed")
throw new RuntimeException("Switch Failed for " + switchResult.getErrorMessage)
}

LOG.info(s"Connection to ${dataBaseConfigEntry.graphAddress}")
Expand Down
32 changes: 28 additions & 4 deletions nebula-spark-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@
<exclude>javax.inject:javax.inject</exclude>
<exclude>org.spark-project.hive:hive-exec</exclude>
<exclude>stax:stax-api</exclude>
<exclude>org.glassfish.hk2.external:aopalliance-repackaged</exclude>
<exclude>org.glassfish.hk2.external:aopalliance-repackaged
</exclude>
</excludes>
</artifactSet>
<filters>
Expand Down Expand Up @@ -150,13 +151,36 @@
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>2.0.0</version>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<excludePackageNames>com.facebook.thrift:com.facebook.thrift.*</excludePackageNames>
<excludePackageNames>com.facebook.thrift:com.facebook.thrift.*
</excludePackageNames>
</configuration>
<executions>
<execution>
Expand All @@ -179,4 +203,4 @@
</plugins>
</build>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ object WriteNebulaVertexConfig {
|| vidPolicy.equalsIgnoreCase(KeyPolicy.UUID.toString),
"config vidPolicy is illegal, please don't set vidPolicy or set vidPolicy \"HASH\" or \"UUID\""
)
assert(user != null && !user.isEmpty, "user is empty")
assert(passwd != null && !passwd.isEmpty, "passwd is empty")
try {
WriteMode.withName(writeMode.toLowerCase())
} catch {
Expand Down Expand Up @@ -442,6 +444,8 @@ object WriteNebulaEdgeConfig {
"config dstPolicy is illegal, please don't set dstPolicy or set dstPolicy \"HASH\" or \"UUID\""
)
assert(batch > 0, s"config batch must be positive, your batch is $batch.")
assert(user != null && !user.isEmpty, "user is empty")
assert(passwd != null && !passwd.isEmpty, "passwd is empty")
try {
WriteMode.withName(writeMode.toLowerCase)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class NebulaEdgeWriter(nebulaOptions: NebulaOptions,
if (edges.nonEmpty) {
execute()
}
graphProvider.close()
NebulaCommitMessage.apply(failedExecs.toList)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class NebulaVertexWriter(nebulaOptions: NebulaOptions, vertexIndex: Int, schema:
if (vertices.nonEmpty) {
execute()
}
graphProvider.close()
NebulaCommitMessage(failedExecs.toList)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class NebulaExecutorSuite extends AnyFunSuite with BeforeAndAfterAll {
NebulaExecutor.toUpdateExecuteStatement("person", propNames, nebulaVertex)
val expectVertexUpdate =
"UPDATE VERTEX ON `person` \"vid1\" SET `col_string`=\"name\",`col_fixed_string`=\"name\"," +
"`col_bool`=true,`col_int`=10,`col_int64`=100,`col_double`=1.0,`col_date`=2021-11-12;"
"`col_bool`=true,`col_int`=10,`col_int64`=100,`col_double`=1.0,`col_date`=2021-11-12"
assert(expectVertexUpdate.equals(updateVertexStatement))
}

Expand All @@ -209,7 +209,7 @@ class NebulaExecutorSuite extends AnyFunSuite with BeforeAndAfterAll {
val expectEdgeUpdate =
"UPDATE EDGE ON `friend` \"source\"->\"target\"@0 SET `col_string`=\"name\"," +
"`col_fixed_string`=\"name\",`col_bool`=true,`col_int`=10,`col_int64`=100," +
"`col_double`=1.0,`col_date`=2021-11-12;"
"`col_double`=1.0,`col_date`=2021-11-12"
assert(expectEdgeUpdate.equals(updateEdgeStatement))
}
}

0 comments on commit dae9337

Please sign in to comment.