Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add algorithms in summer project #3

Merged
merged 15 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions nebula-algorithm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
<artifactId>spark-graphx_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>nebula-spark-connector</artifactId>
Expand Down
33 changes: 33 additions & 0 deletions nebula-algorithm/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,38 @@
betweenness:{
maxIter:5
}

# SingleSourceShortestPathAlgo parameter
singlesourceshortestpath:{
sourceid:"1"
}

# ClosenessAlgo parameter
closeness:{}

# HanpAlgo parameter
hanp:{
hopAttenuation:0.1
maxIter:10
preference:1.0
}

#Node2vecAlgo parameter
node2vec:{
maxIter: 10,
lr: 0.025,
dataNumPartition: 10,
modelNumPartition: 10,
dim: 10,
window: 3,
walkLength: 5,
numWalks: 3,
p: 1.0,
q: 1.0,
directed: false,
degree: 30,
embSeparate: ",",
modelPath: "hdfs://127.0.0.1:9000/model"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,29 @@ package com.vesoft.nebula.algorithm
import com.vesoft.nebula.algorithm.config.Configs.Argument
import com.vesoft.nebula.algorithm.config.{
AlgoConfig,
AlgoConstants,
BetweennessConfig,
CcConfig,
Configs,
HanpConfig,
KCoreConfig,
LPAConfig,
LouvainConfig,
Node2vecConfig,
PRConfig,
ShortestPathConfig,
SparkConfig
}
import com.vesoft.nebula.algorithm.lib.{
BetweennessCentralityAlgo,
ClosenessAlgo,
ConnectedComponentsAlgo,
DegreeStaticAlgo,
GraphTriangleCountAlgo,
HanpAlgo,
KCoreAlgo,
LabelPropagationAlgo,
LouvainAlgo,
Node2vecAlgo,
PageRankAlgo,
ShortestPathAlgo,
StronglyConnectedComponentsAlgo,
Expand All @@ -37,7 +41,6 @@ import com.vesoft.nebula.algorithm.reader.{CsvReader, JsonReader, NebulaReader}
import com.vesoft.nebula.algorithm.writer.{CsvWriter, NebulaWriter, TextWriter}
import org.apache.commons.math3.ode.UnknownParameterException
import org.apache.log4j.Logger
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
Expand Down Expand Up @@ -166,6 +169,17 @@ object Main {
case "graphtrianglecount" => {
GraphTriangleCountAlgo(spark, dataSet)
}
case "closeness" => {
ClosenessAlgo(spark, dataSet, hasWeight)
}
case "hanp" => {
val hanpConfig = HanpConfig.getHanpConfig(configs)
HanpAlgo(spark, dataSet, hanpConfig, hasWeight)
}
case "node2vec" => {
val node2vecConfig = Node2vecConfig.getNode2vecConfig(configs)
Node2vecAlgo(spark, dataSet, node2vecConfig, hasWeight)
}
case _ => throw new UnknownParameterException("unknown executeAlgo name.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,89 @@ object BetweennessConfig {
}
}

/**
* Hanp
*/
case class HanpConfig(hopAttenuation: Double, maxIter: Int, preference: Double)

object HanpConfig {
var hopAttenuation: Double = _
var maxIter: Int = _
var preference: Double = 1.0
def getHanpConfig(configs: Configs): HanpConfig = {
val hanpConfig = configs.algorithmConfig.map
hopAttenuation = hanpConfig("algorithm.hanp.hopAttenuation").toDouble
maxIter = hanpConfig("algorithm.hanp.maxIter").toInt
preference = hanpConfig("algorithm.hanp.preference").toDouble
HanpConfig(hopAttenuation, maxIter, preference)
}
}

/**
* Node2vec
*/
case class Node2vecConfig(maxIter: Int,
lr: Double,
dataNumPartition: Int,
modelNumPartition: Int,
dim: Int,
window: Int,
walkLength: Int,
numWalks: Int,
p: Double,
q: Double,
directed: Boolean,
degree: Int,
embSeparate: String,
modelPath: String)
object Node2vecConfig {
var maxIter: Int = _
var lr: Double = _
var dataNumPartition: Int = _
var modelNumPartition: Int = _
var dim: Int = _
var window: Int = _
var walkLength: Int = _
var numWalks: Int = _
var p: Double = _
var q: Double = _
var directed: Boolean = _
var degree: Int = _
var embSeparate: String = _
var modelPath: String = _
def getNode2vecConfig(configs: Configs): Node2vecConfig = {
val node2vecConfig = configs.algorithmConfig.map
maxIter = node2vecConfig("algorithm.node2vec.maxIter").toInt
lr = node2vecConfig("algorithm.node2vec.lr").toDouble
dataNumPartition = node2vecConfig("algorithm.node2vec.dataNumPartition").toInt
modelNumPartition = node2vecConfig("algorithm.node2vec.modelNumPartition").toInt
dim = node2vecConfig("algorithm.node2vec.dim").toInt
window = node2vecConfig("algorithm.node2vec.window").toInt
walkLength = node2vecConfig("algorithm.node2vec.walkLength").toInt
numWalks = node2vecConfig("algorithm.node2vec.numWalks").toInt
p = node2vecConfig("algorithm.node2vec.p").toDouble
q = node2vecConfig("algorithm.node2vec.q").toDouble
directed = node2vecConfig("algorithm.node2vec.directed").toBoolean
degree = node2vecConfig("algorithm.node2vec.degree").toInt
embSeparate = node2vecConfig("algorithm.node2vec.embSeparate")
modelPath = node2vecConfig("algorithm.node2vec.modelPath")
Node2vecConfig(maxIter,
lr,
dataNumPartition,
modelNumPartition,
dim,
window,
walkLength,
numWalks,
p,
q,
directed,
degree,
embSeparate,
modelPath)
}
}

case class AlgoConfig(configs: Configs)

object AlgoConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,4 +362,7 @@ object AlgoConstants {
val INDEGREE_RESULT_COL: String = "inDegree"
val OUTDEGREE_RESULT_COL: String = "outDegree"
val TRIANGLECOUNT_RESULT_COL: String = "tranglecount"
val CLOSENESS_RESULT_COL: String = "closeness"
val HANP_RESULT_COL: String = "hanp"
val NODE2VEC_RESULT_COL: String = "node2vec"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

package com.vesoft.nebula.algorithm.lib

import com.vesoft.nebula.algorithm.config.AlgoConstants
import com.vesoft.nebula.algorithm.utils.NebulaUtil
import org.apache.log4j.Logger
import org.apache.spark.graphx.{EdgeTriplet, Graph, Pregel, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DoubleType, LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object ClosenessAlgo {
private val LOGGER = Logger.getLogger(this.getClass)
val ALGORITHM: String = "Closeness"
type SPMap = Map[VertexId, Double]

private def makeMap(x: (VertexId, Double)*) = Map(x: _*)

private def addMap(spmap: SPMap, weight: Double): SPMap = spmap.map { case (v, d) => v -> (d + weight) }

private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = {
(spmap1.keySet ++ spmap2.keySet).map {
k => k -> math.min(spmap1.getOrElse(k, Double.MaxValue), spmap2.getOrElse(k, Double.MaxValue))
}(collection.breakOut)
}
/**
* run the Closeness algorithm for nebula graph
*/
def apply(spark: SparkSession,
dataset: Dataset[Row],
hasWeight:Boolean):DataFrame={
val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight)
val closenessRDD = execute(graph)
val schema = StructType(
List(
StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false),
StructField(AlgoConstants.CLOSENESS_RESULT_COL, DoubleType, nullable = true)
))
val algoResult = spark.sqlContext.createDataFrame(closenessRDD, schema)
algoResult
}

/**
* execute Closeness algorithm
*/
def execute(graph: Graph[None.type, Double]):RDD[Row]={
val spGraph = graph.mapVertices((vid, _) => makeMap(vid -> 0.0))


val initialMessage = makeMap()

def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
addMaps(attr, msg)
}

def sendMessage(edge: EdgeTriplet[SPMap, Double]): Iterator[(VertexId, SPMap)] = {
val newAttr = addMap(edge.dstAttr, edge.attr)
if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr))
else Iterator.empty
}
val spsGraph=Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
val closenessRDD = spsGraph.vertices.map(vertex => {
var dstNum = 0
var dstDistanceSum = 0.0
for (distance <- vertex._2.values) {
dstNum += 1
dstDistanceSum += distance
}
Row(vertex._1,(dstNum - 1) / dstDistanceSum)
})
closenessRDD
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2021. vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

package com.vesoft.nebula.algorithm.lib

import com.vesoft.nebula.algorithm.config.{AlgoConstants, HanpConfig}
import com.vesoft.nebula.algorithm.utils.NebulaUtil
import org.apache.spark.graphx.{EdgeTriplet, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object HanpAlgo {
val ALGORITHM: String = "Hanp"

/**
* run the Hanp algorithm for nebula graph
*/
def apply(spark: SparkSession,
dataset: Dataset[Row],
hanpConfig: HanpConfig,
hasWeight:Boolean,
preferences:RDD[(VertexId,Double)]=null):DataFrame={
val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight)
val hanpResultRDD = execute(graph,hanpConfig.hopAttenuation,hanpConfig.maxIter,hanpConfig.preference,preferences)
val schema = StructType(
List(
StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false),
StructField(AlgoConstants.HANP_RESULT_COL, LongType, nullable = true)
))
val algoResult = spark.sqlContext.createDataFrame(hanpResultRDD, schema)
algoResult
}

/**
* execute Hanp algorithm
*/
def execute(graph: Graph[None.type, Double],
hopAttenuation:Double,
maxIter: Int,
preference:Double=1.0,
preferences:RDD[(VertexId,Double)]=null):RDD[Row]={
var hanpGraph: Graph[(VertexId, Double, Double), Double]=null
if(preferences==null){
hanpGraph=graph.mapVertices((vertexId,_)=>(vertexId,preference,1.0))
}else{
hanpGraph=graph.outerJoinVertices(preferences)((vertexId, _, vertexPreference) => {(vertexId,vertexPreference.getOrElse(preference),1.0)})
}
def sendMessage(e: EdgeTriplet[(VertexId,Double,Double), Double]): Iterator[(VertexId, Map[VertexId, (Double,Double)])] = {
if(e.srcAttr._3>0 && e.dstAttr._3>0){
Iterator(
(e.dstId, Map(e.srcAttr._1 -> (e.srcAttr._3,e.srcAttr._2*e.srcAttr._3*e.attr))),
(e.srcId, Map(e.dstAttr._1 -> (e.dstAttr._3,e.dstAttr._2*e.dstAttr._3*e.attr)))
)
}else if(e.srcAttr._3>0){
Iterator((e.dstId, Map(e.srcAttr._1 -> (e.srcAttr._3,e.srcAttr._2*e.srcAttr._3*e.attr))))
}else if(e.dstAttr._3>0){
Iterator((e.srcId, Map(e.dstAttr._1 -> (e.dstAttr._3,e.dstAttr._2*e.dstAttr._3*e.attr))))
}else{
Iterator.empty
}
}
def mergeMessage(count1: Map[VertexId, (Double,Double)], count2: Map[VertexId, (Double,Double)])
: Map[VertexId, (Double,Double)] = {
(count1.keySet ++ count2.keySet).map { i =>
val count1Val = count1.getOrElse(i, (0.0,0.0))
val count2Val = count2.getOrElse(i, (0.0,0.0))
i -> (Math.max(count1Val._1,count2Val._1),count1Val._2+count2Val._2)
}(collection.breakOut)
}
def vertexProgram(vid: VertexId, attr: (VertexId,Double,Double), message: Map[VertexId, (Double,Double)]): (VertexId,Double,Double) = {
if (message.isEmpty) {
attr
} else {
val maxMessage=message.maxBy(_._2._2)
(maxMessage._1,attr._2,maxMessage._2._1-hopAttenuation)
}
}
val initialMessage = Map[VertexId, (Double,Double)]()
val hanpResultGraph=hanpGraph.pregel(initialMessage,maxIter)(vertexProgram,sendMessage,mergeMessage)
hanpResultGraph.vertices.map(vertex=>Row(vertex._1,vertex._2._1))
}
}
Loading