diff --git a/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala b/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala index ff8b2571..eb52417d 100644 --- a/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala +++ b/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala @@ -181,7 +181,10 @@ object NebulaSparkReaderExample { //.withNoColumn(true) .withReturnCols(List("degree")) // please make sure your ngql statement result is edge, connector does not check the statement. - .withNgql("match (v)-[e:friend]-(v2) return e") + // other examples of supported nGQL: + // - GET SUBGRAPH WITH PROP 3 STEPS FROM 2 YIELD EDGES AS relationships; + // - FIND ALL PATH WITH PROP FROM 2 TO 4 OVER friend YIELD path AS p; + .withNgql("match (v)-[e:friend]->(v2) return e") .build() val edge = spark.read.nebula(config, nebulaReadConfig).loadEdgesToDfByNgql() edge.printSchema() diff --git a/nebula-spark-common/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala b/nebula-spark-common/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala index c500ff20..c8993f7c 100644 --- a/nebula-spark-common/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala +++ b/nebula-spark-common/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala @@ -83,6 +83,12 @@ object NebulaConnectionConfig { /** * set connectionRetry, connectionRetry is optional */ + @deprecated("use withConnectionRetry instead", "3.7.0") + def withConenctionRetry(connectionRetry: Int): ConfigBuilder = { + this.connectionRetry = connectionRetry + this + } + def withConnectionRetry(connectionRetry: Int): ConfigBuilder = { this.connectionRetry = connectionRetry this diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala index 7d59ed41..b1da17a7 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala @@ -123,6 +123,7 @@ package object connector { val dfReader = reader .format(classOf[NebulaDataSource].getName) .option(NebulaOptions.TYPE, DataTypeEnum.EDGE.toString) + .option(NebulaOptions.OPERATE_TYPE, OperaType.READ.toString) .option(NebulaOptions.SPACE_NAME, readConfig.getSpace) .option(NebulaOptions.LABEL, readConfig.getLabel) .option(NebulaOptions.RETURN_COLS, readConfig.getReturnCols.mkString(",")) diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaNgqlEdgePartitionReader.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaNgqlEdgePartitionReader.scala index 40f99628..8878fad5 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaNgqlEdgePartitionReader.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaNgqlEdgePartitionReader.scala @@ -70,12 +70,19 @@ class NebulaNgqlEdgePartitionReader extends InputPartitionReader[InternalRow] { val list: mutable.Buffer[ValueWrapper] = value.asList() edges.appendAll( list.toStream - .filter(e => checkLabel(e.asRelationship())) + .filter(e => e != null && e.isEdge() && checkLabel(e.asRelationship())) .map(e => convertToEdge(e.asRelationship(), properties)) ) - } else { - LOG.error(s"Exception convert edge type ${valueType} ") - throw new RuntimeException(" convert value type failed"); + } else if (valueType == Value.PVAL){ + val list: java.util.List[Relationship] = value.asPath().getRelationships() + edges.appendAll( + list.toStream + .filter(e => checkLabel(e)) + .map(e => convertToEdge(e, properties)) + ) + } else if (valueType != Value.NVAL && valueType != 0) { + LOG.error(s"Unexpected edge type encountered: ${valueType}. Only edge or path should be returned.") + throw new RuntimeException("Invalid nGQL return type. Value type conversion failed."); } } } diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/reader/ReadSuite.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/reader/ReadSuite.scala index a719d2ca..2fd8398c 100644 --- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/reader/ReadSuite.scala +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/reader/ReadSuite.scala @@ -337,4 +337,85 @@ class ReadSuite extends AnyFunSuite with BeforeAndAfterAll { })(Encoders.STRING) } + test("read edge from nGQL: MATCH ()-[e:friend]->() RETURN e LIMIT 1000") + { + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withGraphAddress("127.0.0.1:9669") + .withConnectionRetry(2) + .build() + val nebulaReadConfig: ReadNebulaConfig = ReadNebulaConfig + .builder() + .withSpace("test_int") + .withLabel("friend") + .withNoColumn(false) + .withReturnCols(List("col1")) + .withNgql("MATCH ()-[e:friend]->() RETURN e LIMIT 1000") + .build() + val edge = sparkSession.read.nebula(config, nebulaReadConfig).loadEdgesToDfByNgql() + edge.printSchema() + edge.show(truncate = false) + assert(edge.count() == 12) + assert(edge.schema.fields.length == 4) + edge.map(row => { + row.getAs[Long]("_srcId") match { + case 1L => { + assert(row.getAs[Long]("_dstId") == 2) + assert(row.getAs[Long]("_rank") == 0) + assert(row.getAs[String]("col1").equals("friend1")) + } + } + "" + })(Encoders.STRING) + } + + + test("read edge from nGQL: GET SUBGRAPH WITH PROP 3 STEPS FROM 2 YIELD EDGES AS relationships") + { + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withGraphAddress("127.0.0.1:9669") + .withConnectionRetry(2) + .build() + val nebulaReadConfig: ReadNebulaConfig = ReadNebulaConfig + .builder() + .withSpace("test_int") + .withNoColumn(false) + .withLabel("friend") + .withReturnCols(List("col1")) + .withNgql("GET SUBGRAPH WITH PROP 3 STEPS FROM 2 YIELD EDGES AS relationships") + .build() + val edge = sparkSession.read.nebula(config, nebulaReadConfig).loadEdgesToDfByNgql() + edge.printSchema() + edge.show(truncate = false) + assert(edge.count() == 6) + } + + test("read edge from nGQL: FIND ALL PATH WITH PROP FROM 2 TO 4 OVER friend YIELD path AS p") + { + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withGraphAddress("127.0.0.1:9669") + .withConnectionRetry(2) + .build() + val nebulaReadConfig: ReadNebulaConfig = ReadNebulaConfig + .builder() + .withSpace("test_int") + .withNoColumn(false) + .withLabel("friend") + .withReturnCols(List("col1")) + .withNgql("FIND ALL PATH WITH PROP FROM 2 TO 4 OVER friend YIELD path AS p") + .build() + val edge = sparkSession.read.nebula(config, nebulaReadConfig).loadEdgesToDfByNgql() + edge.printSchema() + edge.show(truncate = false) + assert(edge.count() == 2) + } + }