diff --git a/nebula-algorithm/pom.xml b/nebula-algorithm/pom.xml index 9f07775..5b8c091 100644 --- a/nebula-algorithm/pom.xml +++ b/nebula-algorithm/pom.xml @@ -40,6 +40,11 @@ spark-graphx_2.11 ${spark.version} + + org.apache.spark + spark-mllib_2.11 + ${spark.version} + com.vesoft nebula-spark-connector diff --git a/nebula-algorithm/src/main/resources/application.conf b/nebula-algorithm/src/main/resources/application.conf index 4d2bdc2..8536f9a 100644 --- a/nebula-algorithm/src/main/resources/application.conf +++ b/nebula-algorithm/src/main/resources/application.conf @@ -125,5 +125,38 @@ betweenness:{ maxIter:5 } + + # SingleSourceShortestPathAlgo parameter + singlesourceshortestpath:{ + sourceid:"1" + } + + # ClosenessAlgo parameter + closeness:{} + + # HanpAlgo parameter + hanp:{ + hopAttenuation:0.1 + maxIter:10 + preference:1.0 + } + + #Node2vecAlgo parameter + node2vec:{ + maxIter: 10, + lr: 0.025, + dataNumPartition: 10, + modelNumPartition: 10, + dim: 10, + window: 3, + walkLength: 5, + numWalks: 3, + p: 1.0, + q: 1.0, + directed: false, + degree: 30, + embSeparate: ",", + modelPath: "hdfs://127.0.0.1:9000/model" + } } } diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala index 96cf9b3..287fdcb 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala @@ -9,25 +9,29 @@ package com.vesoft.nebula.algorithm import com.vesoft.nebula.algorithm.config.Configs.Argument import com.vesoft.nebula.algorithm.config.{ AlgoConfig, - AlgoConstants, BetweennessConfig, CcConfig, Configs, + HanpConfig, KCoreConfig, LPAConfig, LouvainConfig, + Node2vecConfig, PRConfig, ShortestPathConfig, SparkConfig } import com.vesoft.nebula.algorithm.lib.{ BetweennessCentralityAlgo, + ClosenessAlgo, ConnectedComponentsAlgo, DegreeStaticAlgo, GraphTriangleCountAlgo, + HanpAlgo, KCoreAlgo, LabelPropagationAlgo, LouvainAlgo, + Node2vecAlgo, PageRankAlgo, ShortestPathAlgo, StronglyConnectedComponentsAlgo, @@ -37,7 +41,6 @@ import com.vesoft.nebula.algorithm.reader.{CsvReader, JsonReader, NebulaReader} import com.vesoft.nebula.algorithm.writer.{CsvWriter, NebulaWriter, TextWriter} import org.apache.commons.math3.ode.UnknownParameterException import org.apache.log4j.Logger -import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} /** @@ -166,6 +169,17 @@ object Main { case "graphtrianglecount" => { GraphTriangleCountAlgo(spark, dataSet) } + case "closeness" => { + ClosenessAlgo(spark, dataSet, hasWeight) + } + case "hanp" => { + val hanpConfig = HanpConfig.getHanpConfig(configs) + HanpAlgo(spark, dataSet, hanpConfig, hasWeight) + } + case "node2vec" => { + val node2vecConfig = Node2vecConfig.getNode2vecConfig(configs) + Node2vecAlgo(spark, dataSet, node2vecConfig, hasWeight) + } case _ => throw new UnknownParameterException("unknown executeAlgo name.") } } diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/AlgoConfig.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/AlgoConfig.scala index 3077214..3662404 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/AlgoConfig.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/AlgoConfig.scala @@ -150,6 +150,89 @@ object BetweennessConfig { } } +/** + * Hanp + */ +case class HanpConfig(hopAttenuation: Double, maxIter: Int, preference: Double) + +object HanpConfig { + var hopAttenuation: Double = _ + var maxIter: Int = _ + var preference: Double = 1.0 + def getHanpConfig(configs: Configs): HanpConfig = { + val hanpConfig = configs.algorithmConfig.map + hopAttenuation = hanpConfig("algorithm.hanp.hopAttenuation").toDouble + maxIter = hanpConfig("algorithm.hanp.maxIter").toInt + preference = hanpConfig("algorithm.hanp.preference").toDouble + HanpConfig(hopAttenuation, maxIter, preference) + } +} + +/** + * Node2vec + */ +case class Node2vecConfig(maxIter: Int, + lr: Double, + dataNumPartition: Int, + modelNumPartition: Int, + dim: Int, + window: Int, + walkLength: Int, + numWalks: Int, + p: Double, + q: Double, + directed: Boolean, + degree: Int, + embSeparate: String, + modelPath: String) +object Node2vecConfig { + var maxIter: Int = _ + var lr: Double = _ + var dataNumPartition: Int = _ + var modelNumPartition: Int = _ + var dim: Int = _ + var window: Int = _ + var walkLength: Int = _ + var numWalks: Int = _ + var p: Double = _ + var q: Double = _ + var directed: Boolean = _ + var degree: Int = _ + var embSeparate: String = _ + var modelPath: String = _ + def getNode2vecConfig(configs: Configs): Node2vecConfig = { + val node2vecConfig = configs.algorithmConfig.map + maxIter = node2vecConfig("algorithm.node2vec.maxIter").toInt + lr = node2vecConfig("algorithm.node2vec.lr").toDouble + dataNumPartition = node2vecConfig("algorithm.node2vec.dataNumPartition").toInt + modelNumPartition = node2vecConfig("algorithm.node2vec.modelNumPartition").toInt + dim = node2vecConfig("algorithm.node2vec.dim").toInt + window = node2vecConfig("algorithm.node2vec.window").toInt + walkLength = node2vecConfig("algorithm.node2vec.walkLength").toInt + numWalks = node2vecConfig("algorithm.node2vec.numWalks").toInt + p = node2vecConfig("algorithm.node2vec.p").toDouble + q = node2vecConfig("algorithm.node2vec.q").toDouble + directed = node2vecConfig("algorithm.node2vec.directed").toBoolean + degree = node2vecConfig("algorithm.node2vec.degree").toInt + embSeparate = node2vecConfig("algorithm.node2vec.embSeparate") + modelPath = node2vecConfig("algorithm.node2vec.modelPath") + Node2vecConfig(maxIter, + lr, + dataNumPartition, + modelNumPartition, + dim, + window, + walkLength, + numWalks, + p, + q, + directed, + degree, + embSeparate, + modelPath) + } +} + case class AlgoConfig(configs: Configs) object AlgoConfig { diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala index addfd3d..99b1706 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala @@ -362,4 +362,7 @@ object AlgoConstants { val INDEGREE_RESULT_COL: String = "inDegree" val OUTDEGREE_RESULT_COL: String = "outDegree" val TRIANGLECOUNT_RESULT_COL: String = "tranglecount" + val CLOSENESS_RESULT_COL: String = "closeness" + val HANP_RESULT_COL: String = "hanp" + val NODE2VEC_RESULT_COL: String = "node2vec" } diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgo.scala new file mode 100644 index 0000000..2b0f8c1 --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgo.scala @@ -0,0 +1,78 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.AlgoConstants +import com.vesoft.nebula.algorithm.utils.NebulaUtil +import org.apache.log4j.Logger +import org.apache.spark.graphx.{EdgeTriplet, Graph, Pregel, VertexId} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.{DoubleType, LongType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} + +object ClosenessAlgo { + private val LOGGER = Logger.getLogger(this.getClass) + val ALGORITHM: String = "Closeness" + type SPMap = Map[VertexId, Double] + + private def makeMap(x: (VertexId, Double)*) = Map(x: _*) + + private def addMap(spmap: SPMap, weight: Double): SPMap = spmap.map { case (v, d) => v -> (d + weight) } + + private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = { + (spmap1.keySet ++ spmap2.keySet).map { + k => k -> math.min(spmap1.getOrElse(k, Double.MaxValue), spmap2.getOrElse(k, Double.MaxValue)) + }(collection.breakOut) + } + /** + * run the Closeness algorithm for nebula graph + */ + def apply(spark: SparkSession, + dataset: Dataset[Row], + hasWeight:Boolean):DataFrame={ + val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight) + val closenessRDD = execute(graph) + val schema = StructType( + List( + StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false), + StructField(AlgoConstants.CLOSENESS_RESULT_COL, DoubleType, nullable = true) + )) + val algoResult = spark.sqlContext.createDataFrame(closenessRDD, schema) + algoResult + } + + /** + * execute Closeness algorithm + */ + def execute(graph: Graph[None.type, Double]):RDD[Row]={ + val spGraph = graph.mapVertices((vid, _) => makeMap(vid -> 0.0)) + + + val initialMessage = makeMap() + + def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = { + addMaps(attr, msg) + } + + def sendMessage(edge: EdgeTriplet[SPMap, Double]): Iterator[(VertexId, SPMap)] = { + val newAttr = addMap(edge.dstAttr, edge.attr) + if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr)) + else Iterator.empty + } + val spsGraph=Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps) + val closenessRDD = spsGraph.vertices.map(vertex => { + var dstNum = 0 + var dstDistanceSum = 0.0 + for (distance <- vertex._2.values) { + dstNum += 1 + dstDistanceSum += distance + } + Row(vertex._1,(dstNum - 1) / dstDistanceSum) + }) + closenessRDD + } +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/HanpAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/HanpAlgo.scala new file mode 100644 index 0000000..fa5b067 --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/HanpAlgo.scala @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2021. vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.{AlgoConstants, HanpConfig} +import com.vesoft.nebula.algorithm.utils.NebulaUtil +import org.apache.spark.graphx.{EdgeTriplet, Graph, VertexId} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.{LongType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} + +object HanpAlgo { + val ALGORITHM: String = "Hanp" + + /** + * run the Hanp algorithm for nebula graph + */ + def apply(spark: SparkSession, + dataset: Dataset[Row], + hanpConfig: HanpConfig, + hasWeight:Boolean, + preferences:RDD[(VertexId,Double)]=null):DataFrame={ + val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight) + val hanpResultRDD = execute(graph,hanpConfig.hopAttenuation,hanpConfig.maxIter,hanpConfig.preference,preferences) + val schema = StructType( + List( + StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false), + StructField(AlgoConstants.HANP_RESULT_COL, LongType, nullable = true) + )) + val algoResult = spark.sqlContext.createDataFrame(hanpResultRDD, schema) + algoResult + } + + /** + * execute Hanp algorithm + */ + def execute(graph: Graph[None.type, Double], + hopAttenuation:Double, + maxIter: Int, + preference:Double=1.0, + preferences:RDD[(VertexId,Double)]=null):RDD[Row]={ + var hanpGraph: Graph[(VertexId, Double, Double), Double]=null + if(preferences==null){ + hanpGraph=graph.mapVertices((vertexId,_)=>(vertexId,preference,1.0)) + }else{ + hanpGraph=graph.outerJoinVertices(preferences)((vertexId, _, vertexPreference) => {(vertexId,vertexPreference.getOrElse(preference),1.0)}) + } + def sendMessage(e: EdgeTriplet[(VertexId,Double,Double), Double]): Iterator[(VertexId, Map[VertexId, (Double,Double)])] = { + if(e.srcAttr._3>0 && e.dstAttr._3>0){ + Iterator( + (e.dstId, Map(e.srcAttr._1 -> (e.srcAttr._3,e.srcAttr._2*e.srcAttr._3*e.attr))), + (e.srcId, Map(e.dstAttr._1 -> (e.dstAttr._3,e.dstAttr._2*e.dstAttr._3*e.attr))) + ) + }else if(e.srcAttr._3>0){ + Iterator((e.dstId, Map(e.srcAttr._1 -> (e.srcAttr._3,e.srcAttr._2*e.srcAttr._3*e.attr)))) + }else if(e.dstAttr._3>0){ + Iterator((e.srcId, Map(e.dstAttr._1 -> (e.dstAttr._3,e.dstAttr._2*e.dstAttr._3*e.attr)))) + }else{ + Iterator.empty + } + } + def mergeMessage(count1: Map[VertexId, (Double,Double)], count2: Map[VertexId, (Double,Double)]) + : Map[VertexId, (Double,Double)] = { + (count1.keySet ++ count2.keySet).map { i => + val count1Val = count1.getOrElse(i, (0.0,0.0)) + val count2Val = count2.getOrElse(i, (0.0,0.0)) + i -> (Math.max(count1Val._1,count2Val._1),count1Val._2+count2Val._2) + }(collection.breakOut) + } + def vertexProgram(vid: VertexId, attr: (VertexId,Double,Double), message: Map[VertexId, (Double,Double)]): (VertexId,Double,Double) = { + if (message.isEmpty) { + attr + } else { + val maxMessage=message.maxBy(_._2._2) + (maxMessage._1,attr._2,maxMessage._2._1-hopAttenuation) + } + } + val initialMessage = Map[VertexId, (Double,Double)]() + val hanpResultGraph=hanpGraph.pregel(initialMessage,maxIter)(vertexProgram,sendMessage,mergeMessage) + hanpResultGraph.vertices.map(vertex=>Row(vertex._1,vertex._2._1)) + } +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgo.scala new file mode 100644 index 0000000..05aca2d --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgo.scala @@ -0,0 +1,238 @@ +/* + * Copyright (c) 2021. vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.{AlgoConstants, Node2vecConfig} +import com.vesoft.nebula.algorithm.utils.NebulaUtil +import org.apache.spark.SparkContext +import org.apache.spark.graphx.{Edge, EdgeTriplet, Graph, VertexId} +import org.apache.spark.mllib.feature.Word2Vec +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} + +import java.io.Serializable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +case class NodeAttr(var neighbors: Array[(Long, Double)] = Array.empty[(Long, Double)], + var path: Array[Long] = Array.empty[Long]) extends Serializable + +case class EdgeAttr(var dstNeighbors: Array[Long] = Array.empty[Long], + var J: Array[Int] = Array.empty[Int], + var q: Array[Double] = Array.empty[Double]) extends Serializable + +object Node2vecAlgo { + val ALGORITHM: String = "Node2vec" + var node2vecConfig:Node2vecConfig = _ + var context:SparkContext = _ + var indexedEdges: RDD[Edge[EdgeAttr]] = _ + var indexedNodes: RDD[(VertexId, NodeAttr)] = _ + var graph: Graph[NodeAttr, EdgeAttr] = _ + var randomWalkPaths: RDD[(Long, ArrayBuffer[Long])] = null + var vectors:Map[String,Array[Float]] = _ + lazy val createUndirectedEdge = (srcId: Long, dstId: Long, weight: Double) => { + Array( + (srcId, Array((dstId, weight))), + (dstId, Array((srcId, weight))) + ) + } + lazy val createDirectedEdge = (srcId: Long, dstId: Long, weight: Double) => { + Array( + (srcId, Array((dstId, weight))) + ) + } + + def setupAlias(nodeWeights: Array[(Long, Double)]): (Array[Int], Array[Double]) = { + val K = nodeWeights.length + val J = Array.fill(K)(0) + val q = Array.fill(K)(0.0) + + val smaller = new ArrayBuffer[Int]() + val larger = new ArrayBuffer[Int]() + + val sum = nodeWeights.map(_._2).sum + nodeWeights.zipWithIndex.foreach { case ((nodeId, weight), i) => + q(i) = K * weight / sum + if (q(i) < 1.0) { + smaller.append(i) + } else { + larger.append(i) + } + } + + while (smaller.nonEmpty && larger.nonEmpty) { + val small = smaller.remove(smaller.length - 1) + val large = larger.remove(larger.length - 1) + + J(small) = large + q(large) = q(large) + q(small) - 1.0 + if (q(large) < 1.0) smaller.append(large) + else larger.append(large) + } + + (J, q) + } + def setupEdgeAlias(p: Double = 1.0, q: Double = 1.0)(srcId: Long, srcNeighbors: Array[(Long, Double)], dstNeighbors: Array[(Long, Double)]): (Array[Int], Array[Double]) = { + val neighbors_ = dstNeighbors.map { case (dstNeighborId, weight) => + var unnormProb = weight / q + if (srcId == dstNeighborId) unnormProb = weight / p + else if (srcNeighbors.exists(_._1 == dstNeighborId)) unnormProb = weight + + (dstNeighborId, unnormProb) + } + + setupAlias(neighbors_) + } + def drawAlias(J: Array[Int], q: Array[Double]): Int = { + val K = J.length + val kk = math.floor(math.random * K).toInt + + if (math.random < q(kk)) kk + else J(kk) + } + def load(graph:Graph[None.type, Double]):this.type ={ + val bcMaxDegree = context.broadcast(node2vecConfig.degree) + val bcEdgeCreator = node2vecConfig.directed match { + case true => context.broadcast(createDirectedEdge) + case false => context.broadcast(createUndirectedEdge) + } + + indexedNodes = graph.edges.flatMap { row => + bcEdgeCreator.value.apply(row.srcId, row.dstId,row.attr) + }.reduceByKey(_++_).map { case (nodeId, neighbors: Array[(VertexId, Double)]) => + var neighbors_ = neighbors + if (neighbors_.length > bcMaxDegree.value) { + neighbors_ = neighbors.sortWith{ case (left, right) => left._2 > right._2 }.slice(0, bcMaxDegree.value) + } + + (nodeId, NodeAttr(neighbors = neighbors_.distinct)) + }.repartition(node2vecConfig.dataNumPartition).cache + + indexedEdges = indexedNodes.flatMap { case (srcId, clickNode) => + clickNode.neighbors.map { case (dstId, weight) => + Edge(srcId, dstId, EdgeAttr()) + } + }.repartition(node2vecConfig.dataNumPartition).cache + this + } + def initTransitionProb(): this.type = { + val bcP = context.broadcast(node2vecConfig.p) + val bcQ = context.broadcast(node2vecConfig.q) + + graph = Graph(indexedNodes, indexedEdges) + .mapVertices[NodeAttr] { case (vertexId, clickNode) => + val (j, q) = this.setupAlias(clickNode.neighbors) + val nextNodeIndex = this.drawAlias(j, q) + clickNode.path = Array(vertexId, clickNode.neighbors(nextNodeIndex)._1) + + clickNode + } + .mapTriplets { edgeTriplet: EdgeTriplet[NodeAttr, EdgeAttr] => + val (j, q) = this.setupEdgeAlias(bcP.value, bcQ.value)(edgeTriplet.srcId, edgeTriplet.srcAttr.neighbors, edgeTriplet.dstAttr.neighbors) + edgeTriplet.attr.J = j + edgeTriplet.attr.q = q + edgeTriplet.attr.dstNeighbors = edgeTriplet.dstAttr.neighbors.map(_._1) + + edgeTriplet.attr + }.cache + + this + } + def randomWalk(): this.type = { + val edge2attr = graph.triplets.map { edgeTriplet => + (s"${edgeTriplet.srcId}${edgeTriplet.dstId}", edgeTriplet.attr) + }.repartition(node2vecConfig.dataNumPartition).cache + edge2attr.first + + for (iter <- 0 until node2vecConfig.numWalks) { + var prevWalk: RDD[(Long, ArrayBuffer[Long])] = null + var randomWalk = graph.vertices.map { case (nodeId, clickNode) => + val pathBuffer = new ArrayBuffer[Long]() + pathBuffer.append(clickNode.path:_*) + (nodeId, pathBuffer) + }.cache + var activeWalks = randomWalk.first + graph.unpersist(blocking = false) + graph.edges.unpersist(blocking = false) + for (walkCount <- 0 until node2vecConfig.walkLength) { + prevWalk = randomWalk + randomWalk = randomWalk.map { case (srcNodeId, pathBuffer) => + val prevNodeId = pathBuffer(pathBuffer.length - 2) + val currentNodeId = pathBuffer.last + + (s"$prevNodeId$currentNodeId", (srcNodeId, pathBuffer)) + }.join(edge2attr).map { case (edge, ((srcNodeId, pathBuffer), attr)) => + try { + val nextNodeIndex = this.drawAlias(attr.J, attr.q) + val nextNodeId = attr.dstNeighbors(nextNodeIndex) + pathBuffer.append(nextNodeId) + + (srcNodeId, pathBuffer) + } catch { + case e: Exception => throw new RuntimeException(e.getMessage) + } + }.cache + + activeWalks = randomWalk.first() + prevWalk.unpersist(blocking=false) + } + + + if (randomWalkPaths != null) { + val prevRandomWalkPaths = randomWalkPaths + randomWalkPaths = randomWalkPaths.union(randomWalk).cache() + randomWalkPaths.first + prevRandomWalkPaths.unpersist(blocking = false) + } else { + randomWalkPaths = randomWalk + } + } + + this + } + def embedding(): this.type = { + val randomPaths = randomWalkPaths.map { case (vertexId, pathBuffer) => + Try(pathBuffer.map(_.toString).toIterable).getOrElse(null) + }.filter(_!=null) + val word2vec = new Word2Vec() + word2vec.setLearningRate(node2vecConfig.lr) + .setNumIterations(node2vecConfig.maxIter) + .setNumPartitions(node2vecConfig.modelNumPartition) + .setVectorSize(node2vecConfig.dim) + .setWindowSize(node2vecConfig.window) + val model=word2vec.fit(randomPaths) + model.save(context,node2vecConfig.modelPath)// use Word2VecModel.load(context, path) to load model + this.vectors=model.getVectors + this + } + /** + * run the Node2vec algorithm for nebula graph + */ + def apply(spark: SparkSession, + dataset: Dataset[Row], + node2vecConfig: Node2vecConfig, + hasWeight:Boolean):DataFrame={ + val inputGraph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight) + this.context=spark.sparkContext + this.node2vecConfig=node2vecConfig + val node2vecResult:Map[String,Array[Float]]=this.load(inputGraph) + .initTransitionProb() + .randomWalk() + .embedding() + .vectors + val node2vecRDD:RDD[Row]=this.context.parallelize(node2vecResult.toList).map(row=>Row(row._1,row._2.mkString(node2vecConfig.embSeparate))) + val schema = StructType( + List( + StructField(AlgoConstants.ALGO_ID_COL, StringType, nullable = false), + StructField(AlgoConstants.NODE2VEC_RESULT_COL, StringType, nullable = true) + )) + val algoResult=spark.sqlContext.createDataFrame(node2vecRDD,schema) + algoResult + } +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ShortestPathAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ShortestPathAlgo.scala index e4a5771..efe906d 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ShortestPathAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ShortestPathAlgo.scala @@ -6,15 +6,9 @@ package com.vesoft.nebula.algorithm.lib -import com.vesoft.nebula.algorithm.utils.NebulaUtil import com.vesoft.nebula.algorithm.config.{ AlgoConstants, - CcConfig, - Configs, - NebulaConfig, - PRConfig, - ShortestPathConfig, - SparkConfig + ShortestPathConfig } import org.apache.log4j.Logger import org.apache.spark.graphx.{Graph, VertexId, VertexRDD} diff --git a/nebula-algorithm/src/test/resources/application.conf b/nebula-algorithm/src/test/resources/application.conf index a17fb11..62a0efd 100644 --- a/nebula-algorithm/src/test/resources/application.conf +++ b/nebula-algorithm/src/test/resources/application.conf @@ -122,5 +122,38 @@ betweenness:{ maxIter:5 } + + # SingleSourceShortestPathAlgo parameter + singlesourceshortestpath:{ + sourceid:"1" + } + + # ClosenessAlgo parameter + closeness:{} + + # HanpAlgo parameter + hanp:{ + hopAttenuation:0.1 + maxIter:10 + preference:1.0 + } + + #Node2vecAlgo parameter + node2vec:{ + maxIter: 10, + lr: 0.025, + dataNumPartition: 10, + modelNumPartition: 10, + dim: 10, + window: 3, + walkLength: 5, + numWalks: 3, + p: 1.0, + q: 1.0, + directed: false, + degree: 30, + embSeparate: ",", + modelPath: "hdfs://127.0.0.1:9000/model" + } } } diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgoSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgoSuite.scala new file mode 100644 index 0000000..f28e717 --- /dev/null +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgoSuite.scala @@ -0,0 +1,21 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + + +import org.apache.spark.sql.SparkSession +import org.junit.Test + +class ClosenessAlgoSuite { + @Test + def closenessAlgoSuite()={ + val spark = SparkSession.builder().master("local").getOrCreate() + val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") + val result = ClosenessAlgo.apply(spark, data, true) + assert(result.count() == 4) + } +} diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/HanpSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/HanpSuite.scala new file mode 100644 index 0000000..dfaf0c0 --- /dev/null +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/HanpSuite.scala @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2021. vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.HanpConfig +import org.apache.spark.sql.SparkSession +import org.junit.Test + +class HanpSuite { + @Test + def hanpSuite()={ + val spark = SparkSession.builder().master("local").getOrCreate() + val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") + val hanpConfig = new HanpConfig(0.1,10,1.0) + val result = HanpAlgo.apply(spark, data, hanpConfig, false) + assert(result.count() == 4) + } +} diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgoSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgoSuite.scala new file mode 100644 index 0000000..4041eed --- /dev/null +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgoSuite.scala @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021. vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.Node2vecConfig +import org.apache.spark.sql.SparkSession +import org.junit.Test + +class Node2vecAlgoSuite { + @Test + def node2vecAlgoSuite():Unit={ + val spark = SparkSession.builder().master("local").getOrCreate() + val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") + val node2vecConfig = new Node2vecConfig(10, + 0.025, + 10, + 10, + 10, + 3, + 5, + 3, + 1.0, + 1.0, + false, + 10, + ",", + "src/test/resources/model" + ) + val result = Node2vecAlgo.apply(spark, data, node2vecConfig, true) + assert(result.count() == 4) + } +} diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/SingleSourceShortestPathAlgoSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/SingleSourceShortestPathAlgoSuite.scala new file mode 100644 index 0000000..57797ef --- /dev/null +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/SingleSourceShortestPathAlgoSuite.scala @@ -0,0 +1,22 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.SingleSourceShortestPathConfig +import org.apache.spark.sql.SparkSession +import org.junit.Test + +class SingleSourceShortestPathAlgoSuite { + @Test + def singleSourceShortestPathAlgoSuite()={ + val spark = SparkSession.builder().master("local").getOrCreate() + val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") + val singleSourceShortestPathConfig = new SingleSourceShortestPathConfig(1) + val result = SingleSourceShortestPathAlgo.apply(spark, data, singleSourceShortestPathConfig, true) + assert(result.count() == 4) + } +}