diff --git a/nebula-algorithm/pom.xml b/nebula-algorithm/pom.xml
index 9f07775..5b8c091 100644
--- a/nebula-algorithm/pom.xml
+++ b/nebula-algorithm/pom.xml
@@ -40,6 +40,11 @@
spark-graphx_2.11
${spark.version}
+
+ org.apache.spark
+ spark-mllib_2.11
+ ${spark.version}
+
com.vesoft
nebula-spark-connector
diff --git a/nebula-algorithm/src/main/resources/application.conf b/nebula-algorithm/src/main/resources/application.conf
index 4d2bdc2..8536f9a 100644
--- a/nebula-algorithm/src/main/resources/application.conf
+++ b/nebula-algorithm/src/main/resources/application.conf
@@ -125,5 +125,38 @@
betweenness:{
maxIter:5
}
+
+ # SingleSourceShortestPathAlgo parameter
+ singlesourceshortestpath:{
+ sourceid:"1"
+ }
+
+ # ClosenessAlgo parameter
+ closeness:{}
+
+ # HanpAlgo parameter
+ hanp:{
+ hopAttenuation:0.1
+ maxIter:10
+ preference:1.0
+ }
+
+ #Node2vecAlgo parameter
+ node2vec:{
+ maxIter: 10,
+ lr: 0.025,
+ dataNumPartition: 10,
+ modelNumPartition: 10,
+ dim: 10,
+ window: 3,
+ walkLength: 5,
+ numWalks: 3,
+ p: 1.0,
+ q: 1.0,
+ directed: false,
+ degree: 30,
+ embSeparate: ",",
+ modelPath: "hdfs://127.0.0.1:9000/model"
+ }
}
}
diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala
index 96cf9b3..287fdcb 100644
--- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala
+++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala
@@ -9,25 +9,29 @@ package com.vesoft.nebula.algorithm
import com.vesoft.nebula.algorithm.config.Configs.Argument
import com.vesoft.nebula.algorithm.config.{
AlgoConfig,
- AlgoConstants,
BetweennessConfig,
CcConfig,
Configs,
+ HanpConfig,
KCoreConfig,
LPAConfig,
LouvainConfig,
+ Node2vecConfig,
PRConfig,
ShortestPathConfig,
SparkConfig
}
import com.vesoft.nebula.algorithm.lib.{
BetweennessCentralityAlgo,
+ ClosenessAlgo,
ConnectedComponentsAlgo,
DegreeStaticAlgo,
GraphTriangleCountAlgo,
+ HanpAlgo,
KCoreAlgo,
LabelPropagationAlgo,
LouvainAlgo,
+ Node2vecAlgo,
PageRankAlgo,
ShortestPathAlgo,
StronglyConnectedComponentsAlgo,
@@ -37,7 +41,6 @@ import com.vesoft.nebula.algorithm.reader.{CsvReader, JsonReader, NebulaReader}
import com.vesoft.nebula.algorithm.writer.{CsvWriter, NebulaWriter, TextWriter}
import org.apache.commons.math3.ode.UnknownParameterException
import org.apache.log4j.Logger
-import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
@@ -166,6 +169,17 @@ object Main {
case "graphtrianglecount" => {
GraphTriangleCountAlgo(spark, dataSet)
}
+ case "closeness" => {
+ ClosenessAlgo(spark, dataSet, hasWeight)
+ }
+ case "hanp" => {
+ val hanpConfig = HanpConfig.getHanpConfig(configs)
+ HanpAlgo(spark, dataSet, hanpConfig, hasWeight)
+ }
+ case "node2vec" => {
+ val node2vecConfig = Node2vecConfig.getNode2vecConfig(configs)
+ Node2vecAlgo(spark, dataSet, node2vecConfig, hasWeight)
+ }
case _ => throw new UnknownParameterException("unknown executeAlgo name.")
}
}
diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/AlgoConfig.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/AlgoConfig.scala
index 3077214..3662404 100644
--- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/AlgoConfig.scala
+++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/AlgoConfig.scala
@@ -150,6 +150,89 @@ object BetweennessConfig {
}
}
+/**
+ * Hanp
+ */
+case class HanpConfig(hopAttenuation: Double, maxIter: Int, preference: Double)
+
+object HanpConfig {
+ var hopAttenuation: Double = _
+ var maxIter: Int = _
+ var preference: Double = 1.0
+ def getHanpConfig(configs: Configs): HanpConfig = {
+ val hanpConfig = configs.algorithmConfig.map
+ hopAttenuation = hanpConfig("algorithm.hanp.hopAttenuation").toDouble
+ maxIter = hanpConfig("algorithm.hanp.maxIter").toInt
+ preference = hanpConfig("algorithm.hanp.preference").toDouble
+ HanpConfig(hopAttenuation, maxIter, preference)
+ }
+}
+
+/**
+ * Node2vec
+ */
+case class Node2vecConfig(maxIter: Int,
+ lr: Double,
+ dataNumPartition: Int,
+ modelNumPartition: Int,
+ dim: Int,
+ window: Int,
+ walkLength: Int,
+ numWalks: Int,
+ p: Double,
+ q: Double,
+ directed: Boolean,
+ degree: Int,
+ embSeparate: String,
+ modelPath: String)
+object Node2vecConfig {
+ var maxIter: Int = _
+ var lr: Double = _
+ var dataNumPartition: Int = _
+ var modelNumPartition: Int = _
+ var dim: Int = _
+ var window: Int = _
+ var walkLength: Int = _
+ var numWalks: Int = _
+ var p: Double = _
+ var q: Double = _
+ var directed: Boolean = _
+ var degree: Int = _
+ var embSeparate: String = _
+ var modelPath: String = _
+ def getNode2vecConfig(configs: Configs): Node2vecConfig = {
+ val node2vecConfig = configs.algorithmConfig.map
+ maxIter = node2vecConfig("algorithm.node2vec.maxIter").toInt
+ lr = node2vecConfig("algorithm.node2vec.lr").toDouble
+ dataNumPartition = node2vecConfig("algorithm.node2vec.dataNumPartition").toInt
+ modelNumPartition = node2vecConfig("algorithm.node2vec.modelNumPartition").toInt
+ dim = node2vecConfig("algorithm.node2vec.dim").toInt
+ window = node2vecConfig("algorithm.node2vec.window").toInt
+ walkLength = node2vecConfig("algorithm.node2vec.walkLength").toInt
+ numWalks = node2vecConfig("algorithm.node2vec.numWalks").toInt
+ p = node2vecConfig("algorithm.node2vec.p").toDouble
+ q = node2vecConfig("algorithm.node2vec.q").toDouble
+ directed = node2vecConfig("algorithm.node2vec.directed").toBoolean
+ degree = node2vecConfig("algorithm.node2vec.degree").toInt
+ embSeparate = node2vecConfig("algorithm.node2vec.embSeparate")
+ modelPath = node2vecConfig("algorithm.node2vec.modelPath")
+ Node2vecConfig(maxIter,
+ lr,
+ dataNumPartition,
+ modelNumPartition,
+ dim,
+ window,
+ walkLength,
+ numWalks,
+ p,
+ q,
+ directed,
+ degree,
+ embSeparate,
+ modelPath)
+ }
+}
+
case class AlgoConfig(configs: Configs)
object AlgoConfig {
diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala
index addfd3d..99b1706 100644
--- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala
+++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala
@@ -362,4 +362,7 @@ object AlgoConstants {
val INDEGREE_RESULT_COL: String = "inDegree"
val OUTDEGREE_RESULT_COL: String = "outDegree"
val TRIANGLECOUNT_RESULT_COL: String = "tranglecount"
+ val CLOSENESS_RESULT_COL: String = "closeness"
+ val HANP_RESULT_COL: String = "hanp"
+ val NODE2VEC_RESULT_COL: String = "node2vec"
}
diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgo.scala
new file mode 100644
index 0000000..2b0f8c1
--- /dev/null
+++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgo.scala
@@ -0,0 +1,78 @@
+/* Copyright (c) 2021 vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+package com.vesoft.nebula.algorithm.lib
+
+import com.vesoft.nebula.algorithm.config.AlgoConstants
+import com.vesoft.nebula.algorithm.utils.NebulaUtil
+import org.apache.log4j.Logger
+import org.apache.spark.graphx.{EdgeTriplet, Graph, Pregel, VertexId}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.{DoubleType, LongType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
+
+object ClosenessAlgo {
+ private val LOGGER = Logger.getLogger(this.getClass)
+ val ALGORITHM: String = "Closeness"
+ type SPMap = Map[VertexId, Double]
+
+ private def makeMap(x: (VertexId, Double)*) = Map(x: _*)
+
+ private def addMap(spmap: SPMap, weight: Double): SPMap = spmap.map { case (v, d) => v -> (d + weight) }
+
+ private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = {
+ (spmap1.keySet ++ spmap2.keySet).map {
+ k => k -> math.min(spmap1.getOrElse(k, Double.MaxValue), spmap2.getOrElse(k, Double.MaxValue))
+ }(collection.breakOut)
+ }
+ /**
+ * run the Closeness algorithm for nebula graph
+ */
+ def apply(spark: SparkSession,
+ dataset: Dataset[Row],
+ hasWeight:Boolean):DataFrame={
+ val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight)
+ val closenessRDD = execute(graph)
+ val schema = StructType(
+ List(
+ StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false),
+ StructField(AlgoConstants.CLOSENESS_RESULT_COL, DoubleType, nullable = true)
+ ))
+ val algoResult = spark.sqlContext.createDataFrame(closenessRDD, schema)
+ algoResult
+ }
+
+ /**
+ * execute Closeness algorithm
+ */
+ def execute(graph: Graph[None.type, Double]):RDD[Row]={
+ val spGraph = graph.mapVertices((vid, _) => makeMap(vid -> 0.0))
+
+
+ val initialMessage = makeMap()
+
+ def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
+ addMaps(attr, msg)
+ }
+
+ def sendMessage(edge: EdgeTriplet[SPMap, Double]): Iterator[(VertexId, SPMap)] = {
+ val newAttr = addMap(edge.dstAttr, edge.attr)
+ if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr))
+ else Iterator.empty
+ }
+ val spsGraph=Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
+ val closenessRDD = spsGraph.vertices.map(vertex => {
+ var dstNum = 0
+ var dstDistanceSum = 0.0
+ for (distance <- vertex._2.values) {
+ dstNum += 1
+ dstDistanceSum += distance
+ }
+ Row(vertex._1,(dstNum - 1) / dstDistanceSum)
+ })
+ closenessRDD
+ }
+}
diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/HanpAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/HanpAlgo.scala
new file mode 100644
index 0000000..fa5b067
--- /dev/null
+++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/HanpAlgo.scala
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2021. vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+package com.vesoft.nebula.algorithm.lib
+
+import com.vesoft.nebula.algorithm.config.{AlgoConstants, HanpConfig}
+import com.vesoft.nebula.algorithm.utils.NebulaUtil
+import org.apache.spark.graphx.{EdgeTriplet, Graph, VertexId}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
+
+object HanpAlgo {
+ val ALGORITHM: String = "Hanp"
+
+ /**
+ * run the Hanp algorithm for nebula graph
+ */
+ def apply(spark: SparkSession,
+ dataset: Dataset[Row],
+ hanpConfig: HanpConfig,
+ hasWeight:Boolean,
+ preferences:RDD[(VertexId,Double)]=null):DataFrame={
+ val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight)
+ val hanpResultRDD = execute(graph,hanpConfig.hopAttenuation,hanpConfig.maxIter,hanpConfig.preference,preferences)
+ val schema = StructType(
+ List(
+ StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false),
+ StructField(AlgoConstants.HANP_RESULT_COL, LongType, nullable = true)
+ ))
+ val algoResult = spark.sqlContext.createDataFrame(hanpResultRDD, schema)
+ algoResult
+ }
+
+ /**
+ * execute Hanp algorithm
+ */
+ def execute(graph: Graph[None.type, Double],
+ hopAttenuation:Double,
+ maxIter: Int,
+ preference:Double=1.0,
+ preferences:RDD[(VertexId,Double)]=null):RDD[Row]={
+ var hanpGraph: Graph[(VertexId, Double, Double), Double]=null
+ if(preferences==null){
+ hanpGraph=graph.mapVertices((vertexId,_)=>(vertexId,preference,1.0))
+ }else{
+ hanpGraph=graph.outerJoinVertices(preferences)((vertexId, _, vertexPreference) => {(vertexId,vertexPreference.getOrElse(preference),1.0)})
+ }
+ def sendMessage(e: EdgeTriplet[(VertexId,Double,Double), Double]): Iterator[(VertexId, Map[VertexId, (Double,Double)])] = {
+ if(e.srcAttr._3>0 && e.dstAttr._3>0){
+ Iterator(
+ (e.dstId, Map(e.srcAttr._1 -> (e.srcAttr._3,e.srcAttr._2*e.srcAttr._3*e.attr))),
+ (e.srcId, Map(e.dstAttr._1 -> (e.dstAttr._3,e.dstAttr._2*e.dstAttr._3*e.attr)))
+ )
+ }else if(e.srcAttr._3>0){
+ Iterator((e.dstId, Map(e.srcAttr._1 -> (e.srcAttr._3,e.srcAttr._2*e.srcAttr._3*e.attr))))
+ }else if(e.dstAttr._3>0){
+ Iterator((e.srcId, Map(e.dstAttr._1 -> (e.dstAttr._3,e.dstAttr._2*e.dstAttr._3*e.attr))))
+ }else{
+ Iterator.empty
+ }
+ }
+ def mergeMessage(count1: Map[VertexId, (Double,Double)], count2: Map[VertexId, (Double,Double)])
+ : Map[VertexId, (Double,Double)] = {
+ (count1.keySet ++ count2.keySet).map { i =>
+ val count1Val = count1.getOrElse(i, (0.0,0.0))
+ val count2Val = count2.getOrElse(i, (0.0,0.0))
+ i -> (Math.max(count1Val._1,count2Val._1),count1Val._2+count2Val._2)
+ }(collection.breakOut)
+ }
+ def vertexProgram(vid: VertexId, attr: (VertexId,Double,Double), message: Map[VertexId, (Double,Double)]): (VertexId,Double,Double) = {
+ if (message.isEmpty) {
+ attr
+ } else {
+ val maxMessage=message.maxBy(_._2._2)
+ (maxMessage._1,attr._2,maxMessage._2._1-hopAttenuation)
+ }
+ }
+ val initialMessage = Map[VertexId, (Double,Double)]()
+ val hanpResultGraph=hanpGraph.pregel(initialMessage,maxIter)(vertexProgram,sendMessage,mergeMessage)
+ hanpResultGraph.vertices.map(vertex=>Row(vertex._1,vertex._2._1))
+ }
+}
diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgo.scala
new file mode 100644
index 0000000..05aca2d
--- /dev/null
+++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgo.scala
@@ -0,0 +1,238 @@
+/*
+ * Copyright (c) 2021. vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+package com.vesoft.nebula.algorithm.lib
+
+import com.vesoft.nebula.algorithm.config.{AlgoConstants, Node2vecConfig}
+import com.vesoft.nebula.algorithm.utils.NebulaUtil
+import org.apache.spark.SparkContext
+import org.apache.spark.graphx.{Edge, EdgeTriplet, Graph, VertexId}
+import org.apache.spark.mllib.feature.Word2Vec
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
+
+import java.io.Serializable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+case class NodeAttr(var neighbors: Array[(Long, Double)] = Array.empty[(Long, Double)],
+ var path: Array[Long] = Array.empty[Long]) extends Serializable
+
+case class EdgeAttr(var dstNeighbors: Array[Long] = Array.empty[Long],
+ var J: Array[Int] = Array.empty[Int],
+ var q: Array[Double] = Array.empty[Double]) extends Serializable
+
+object Node2vecAlgo {
+ val ALGORITHM: String = "Node2vec"
+ var node2vecConfig:Node2vecConfig = _
+ var context:SparkContext = _
+ var indexedEdges: RDD[Edge[EdgeAttr]] = _
+ var indexedNodes: RDD[(VertexId, NodeAttr)] = _
+ var graph: Graph[NodeAttr, EdgeAttr] = _
+ var randomWalkPaths: RDD[(Long, ArrayBuffer[Long])] = null
+ var vectors:Map[String,Array[Float]] = _
+ lazy val createUndirectedEdge = (srcId: Long, dstId: Long, weight: Double) => {
+ Array(
+ (srcId, Array((dstId, weight))),
+ (dstId, Array((srcId, weight)))
+ )
+ }
+ lazy val createDirectedEdge = (srcId: Long, dstId: Long, weight: Double) => {
+ Array(
+ (srcId, Array((dstId, weight)))
+ )
+ }
+
+ def setupAlias(nodeWeights: Array[(Long, Double)]): (Array[Int], Array[Double]) = {
+ val K = nodeWeights.length
+ val J = Array.fill(K)(0)
+ val q = Array.fill(K)(0.0)
+
+ val smaller = new ArrayBuffer[Int]()
+ val larger = new ArrayBuffer[Int]()
+
+ val sum = nodeWeights.map(_._2).sum
+ nodeWeights.zipWithIndex.foreach { case ((nodeId, weight), i) =>
+ q(i) = K * weight / sum
+ if (q(i) < 1.0) {
+ smaller.append(i)
+ } else {
+ larger.append(i)
+ }
+ }
+
+ while (smaller.nonEmpty && larger.nonEmpty) {
+ val small = smaller.remove(smaller.length - 1)
+ val large = larger.remove(larger.length - 1)
+
+ J(small) = large
+ q(large) = q(large) + q(small) - 1.0
+ if (q(large) < 1.0) smaller.append(large)
+ else larger.append(large)
+ }
+
+ (J, q)
+ }
+ def setupEdgeAlias(p: Double = 1.0, q: Double = 1.0)(srcId: Long, srcNeighbors: Array[(Long, Double)], dstNeighbors: Array[(Long, Double)]): (Array[Int], Array[Double]) = {
+ val neighbors_ = dstNeighbors.map { case (dstNeighborId, weight) =>
+ var unnormProb = weight / q
+ if (srcId == dstNeighborId) unnormProb = weight / p
+ else if (srcNeighbors.exists(_._1 == dstNeighborId)) unnormProb = weight
+
+ (dstNeighborId, unnormProb)
+ }
+
+ setupAlias(neighbors_)
+ }
+ def drawAlias(J: Array[Int], q: Array[Double]): Int = {
+ val K = J.length
+ val kk = math.floor(math.random * K).toInt
+
+ if (math.random < q(kk)) kk
+ else J(kk)
+ }
+ def load(graph:Graph[None.type, Double]):this.type ={
+ val bcMaxDegree = context.broadcast(node2vecConfig.degree)
+ val bcEdgeCreator = node2vecConfig.directed match {
+ case true => context.broadcast(createDirectedEdge)
+ case false => context.broadcast(createUndirectedEdge)
+ }
+
+ indexedNodes = graph.edges.flatMap { row =>
+ bcEdgeCreator.value.apply(row.srcId, row.dstId,row.attr)
+ }.reduceByKey(_++_).map { case (nodeId, neighbors: Array[(VertexId, Double)]) =>
+ var neighbors_ = neighbors
+ if (neighbors_.length > bcMaxDegree.value) {
+ neighbors_ = neighbors.sortWith{ case (left, right) => left._2 > right._2 }.slice(0, bcMaxDegree.value)
+ }
+
+ (nodeId, NodeAttr(neighbors = neighbors_.distinct))
+ }.repartition(node2vecConfig.dataNumPartition).cache
+
+ indexedEdges = indexedNodes.flatMap { case (srcId, clickNode) =>
+ clickNode.neighbors.map { case (dstId, weight) =>
+ Edge(srcId, dstId, EdgeAttr())
+ }
+ }.repartition(node2vecConfig.dataNumPartition).cache
+ this
+ }
+ def initTransitionProb(): this.type = {
+ val bcP = context.broadcast(node2vecConfig.p)
+ val bcQ = context.broadcast(node2vecConfig.q)
+
+ graph = Graph(indexedNodes, indexedEdges)
+ .mapVertices[NodeAttr] { case (vertexId, clickNode) =>
+ val (j, q) = this.setupAlias(clickNode.neighbors)
+ val nextNodeIndex = this.drawAlias(j, q)
+ clickNode.path = Array(vertexId, clickNode.neighbors(nextNodeIndex)._1)
+
+ clickNode
+ }
+ .mapTriplets { edgeTriplet: EdgeTriplet[NodeAttr, EdgeAttr] =>
+ val (j, q) = this.setupEdgeAlias(bcP.value, bcQ.value)(edgeTriplet.srcId, edgeTriplet.srcAttr.neighbors, edgeTriplet.dstAttr.neighbors)
+ edgeTriplet.attr.J = j
+ edgeTriplet.attr.q = q
+ edgeTriplet.attr.dstNeighbors = edgeTriplet.dstAttr.neighbors.map(_._1)
+
+ edgeTriplet.attr
+ }.cache
+
+ this
+ }
+ def randomWalk(): this.type = {
+ val edge2attr = graph.triplets.map { edgeTriplet =>
+ (s"${edgeTriplet.srcId}${edgeTriplet.dstId}", edgeTriplet.attr)
+ }.repartition(node2vecConfig.dataNumPartition).cache
+ edge2attr.first
+
+ for (iter <- 0 until node2vecConfig.numWalks) {
+ var prevWalk: RDD[(Long, ArrayBuffer[Long])] = null
+ var randomWalk = graph.vertices.map { case (nodeId, clickNode) =>
+ val pathBuffer = new ArrayBuffer[Long]()
+ pathBuffer.append(clickNode.path:_*)
+ (nodeId, pathBuffer)
+ }.cache
+ var activeWalks = randomWalk.first
+ graph.unpersist(blocking = false)
+ graph.edges.unpersist(blocking = false)
+ for (walkCount <- 0 until node2vecConfig.walkLength) {
+ prevWalk = randomWalk
+ randomWalk = randomWalk.map { case (srcNodeId, pathBuffer) =>
+ val prevNodeId = pathBuffer(pathBuffer.length - 2)
+ val currentNodeId = pathBuffer.last
+
+ (s"$prevNodeId$currentNodeId", (srcNodeId, pathBuffer))
+ }.join(edge2attr).map { case (edge, ((srcNodeId, pathBuffer), attr)) =>
+ try {
+ val nextNodeIndex = this.drawAlias(attr.J, attr.q)
+ val nextNodeId = attr.dstNeighbors(nextNodeIndex)
+ pathBuffer.append(nextNodeId)
+
+ (srcNodeId, pathBuffer)
+ } catch {
+ case e: Exception => throw new RuntimeException(e.getMessage)
+ }
+ }.cache
+
+ activeWalks = randomWalk.first()
+ prevWalk.unpersist(blocking=false)
+ }
+
+
+ if (randomWalkPaths != null) {
+ val prevRandomWalkPaths = randomWalkPaths
+ randomWalkPaths = randomWalkPaths.union(randomWalk).cache()
+ randomWalkPaths.first
+ prevRandomWalkPaths.unpersist(blocking = false)
+ } else {
+ randomWalkPaths = randomWalk
+ }
+ }
+
+ this
+ }
+ def embedding(): this.type = {
+ val randomPaths = randomWalkPaths.map { case (vertexId, pathBuffer) =>
+ Try(pathBuffer.map(_.toString).toIterable).getOrElse(null)
+ }.filter(_!=null)
+ val word2vec = new Word2Vec()
+ word2vec.setLearningRate(node2vecConfig.lr)
+ .setNumIterations(node2vecConfig.maxIter)
+ .setNumPartitions(node2vecConfig.modelNumPartition)
+ .setVectorSize(node2vecConfig.dim)
+ .setWindowSize(node2vecConfig.window)
+ val model=word2vec.fit(randomPaths)
+ model.save(context,node2vecConfig.modelPath)// use Word2VecModel.load(context, path) to load model
+ this.vectors=model.getVectors
+ this
+ }
+ /**
+ * run the Node2vec algorithm for nebula graph
+ */
+ def apply(spark: SparkSession,
+ dataset: Dataset[Row],
+ node2vecConfig: Node2vecConfig,
+ hasWeight:Boolean):DataFrame={
+ val inputGraph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight)
+ this.context=spark.sparkContext
+ this.node2vecConfig=node2vecConfig
+ val node2vecResult:Map[String,Array[Float]]=this.load(inputGraph)
+ .initTransitionProb()
+ .randomWalk()
+ .embedding()
+ .vectors
+ val node2vecRDD:RDD[Row]=this.context.parallelize(node2vecResult.toList).map(row=>Row(row._1,row._2.mkString(node2vecConfig.embSeparate)))
+ val schema = StructType(
+ List(
+ StructField(AlgoConstants.ALGO_ID_COL, StringType, nullable = false),
+ StructField(AlgoConstants.NODE2VEC_RESULT_COL, StringType, nullable = true)
+ ))
+ val algoResult=spark.sqlContext.createDataFrame(node2vecRDD,schema)
+ algoResult
+ }
+}
diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ShortestPathAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ShortestPathAlgo.scala
index e4a5771..efe906d 100644
--- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ShortestPathAlgo.scala
+++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ShortestPathAlgo.scala
@@ -6,15 +6,9 @@
package com.vesoft.nebula.algorithm.lib
-import com.vesoft.nebula.algorithm.utils.NebulaUtil
import com.vesoft.nebula.algorithm.config.{
AlgoConstants,
- CcConfig,
- Configs,
- NebulaConfig,
- PRConfig,
- ShortestPathConfig,
- SparkConfig
+ ShortestPathConfig
}
import org.apache.log4j.Logger
import org.apache.spark.graphx.{Graph, VertexId, VertexRDD}
diff --git a/nebula-algorithm/src/test/resources/application.conf b/nebula-algorithm/src/test/resources/application.conf
index a17fb11..62a0efd 100644
--- a/nebula-algorithm/src/test/resources/application.conf
+++ b/nebula-algorithm/src/test/resources/application.conf
@@ -122,5 +122,38 @@
betweenness:{
maxIter:5
}
+
+ # SingleSourceShortestPathAlgo parameter
+ singlesourceshortestpath:{
+ sourceid:"1"
+ }
+
+ # ClosenessAlgo parameter
+ closeness:{}
+
+ # HanpAlgo parameter
+ hanp:{
+ hopAttenuation:0.1
+ maxIter:10
+ preference:1.0
+ }
+
+ #Node2vecAlgo parameter
+ node2vec:{
+ maxIter: 10,
+ lr: 0.025,
+ dataNumPartition: 10,
+ modelNumPartition: 10,
+ dim: 10,
+ window: 3,
+ walkLength: 5,
+ numWalks: 3,
+ p: 1.0,
+ q: 1.0,
+ directed: false,
+ degree: 30,
+ embSeparate: ",",
+ modelPath: "hdfs://127.0.0.1:9000/model"
+ }
}
}
diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgoSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgoSuite.scala
new file mode 100644
index 0000000..f28e717
--- /dev/null
+++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgoSuite.scala
@@ -0,0 +1,21 @@
+/* Copyright (c) 2021 vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+package com.vesoft.nebula.algorithm.lib
+
+
+import org.apache.spark.sql.SparkSession
+import org.junit.Test
+
+class ClosenessAlgoSuite {
+ @Test
+ def closenessAlgoSuite()={
+ val spark = SparkSession.builder().master("local").getOrCreate()
+ val data = spark.read.option("header", true).csv("src/test/resources/edge.csv")
+ val result = ClosenessAlgo.apply(spark, data, true)
+ assert(result.count() == 4)
+ }
+}
diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/HanpSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/HanpSuite.scala
new file mode 100644
index 0000000..dfaf0c0
--- /dev/null
+++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/HanpSuite.scala
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2021. vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+package com.vesoft.nebula.algorithm.lib
+
+import com.vesoft.nebula.algorithm.config.HanpConfig
+import org.apache.spark.sql.SparkSession
+import org.junit.Test
+
+class HanpSuite {
+ @Test
+ def hanpSuite()={
+ val spark = SparkSession.builder().master("local").getOrCreate()
+ val data = spark.read.option("header", true).csv("src/test/resources/edge.csv")
+ val hanpConfig = new HanpConfig(0.1,10,1.0)
+ val result = HanpAlgo.apply(spark, data, hanpConfig, false)
+ assert(result.count() == 4)
+ }
+}
diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgoSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgoSuite.scala
new file mode 100644
index 0000000..4041eed
--- /dev/null
+++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgoSuite.scala
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021. vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+package com.vesoft.nebula.algorithm.lib
+
+import com.vesoft.nebula.algorithm.config.Node2vecConfig
+import org.apache.spark.sql.SparkSession
+import org.junit.Test
+
+class Node2vecAlgoSuite {
+ @Test
+ def node2vecAlgoSuite():Unit={
+ val spark = SparkSession.builder().master("local").getOrCreate()
+ val data = spark.read.option("header", true).csv("src/test/resources/edge.csv")
+ val node2vecConfig = new Node2vecConfig(10,
+ 0.025,
+ 10,
+ 10,
+ 10,
+ 3,
+ 5,
+ 3,
+ 1.0,
+ 1.0,
+ false,
+ 10,
+ ",",
+ "src/test/resources/model"
+ )
+ val result = Node2vecAlgo.apply(spark, data, node2vecConfig, true)
+ assert(result.count() == 4)
+ }
+}
diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/SingleSourceShortestPathAlgoSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/SingleSourceShortestPathAlgoSuite.scala
new file mode 100644
index 0000000..57797ef
--- /dev/null
+++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/SingleSourceShortestPathAlgoSuite.scala
@@ -0,0 +1,22 @@
+/* Copyright (c) 2021 vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+package com.vesoft.nebula.algorithm.lib
+
+import com.vesoft.nebula.algorithm.config.SingleSourceShortestPathConfig
+import org.apache.spark.sql.SparkSession
+import org.junit.Test
+
+class SingleSourceShortestPathAlgoSuite {
+ @Test
+ def singleSourceShortestPathAlgoSuite()={
+ val spark = SparkSession.builder().master("local").getOrCreate()
+ val data = spark.read.option("header", true).csv("src/test/resources/edge.csv")
+ val singleSourceShortestPathConfig = new SingleSourceShortestPathConfig(1)
+ val result = SingleSourceShortestPathAlgo.apply(spark, data, singleSourceShortestPathConfig, true)
+ assert(result.count() == 4)
+ }
+}