From 764aa610761b9f12e0af941c8cc12f3155e8b83c Mon Sep 17 00:00:00 2001 From: Anqi Date: Thu, 25 Jan 2024 10:13:06 +0800 Subject: [PATCH] set job desc with algorithm name --- .../nebula/algorithm/lib/BetweennessCentralityAlgo.scala | 1 + .../scala/com/vesoft/nebula/algorithm/lib/BfsAlgo.scala | 2 ++ .../com/vesoft/nebula/algorithm/lib/ClosenessAlgo.scala | 2 ++ .../nebula/algorithm/lib/ClusteringCoefficientAlgo.scala | 1 + .../nebula/algorithm/lib/ConnectedComponentsAlgo.scala | 2 +- .../vesoft/nebula/algorithm/lib/DegreeStaticAlgo.scala | 2 ++ .../scala/com/vesoft/nebula/algorithm/lib/DfsAlgo.scala | 3 +++ .../nebula/algorithm/lib/GraphTriangleCountAlgo.scala | 2 ++ .../scala/com/vesoft/nebula/algorithm/lib/HanpAlgo.scala | 1 + .../com/vesoft/nebula/algorithm/lib/JaccardAlgo.scala | 9 +++------ .../com/vesoft/nebula/algorithm/lib/KCoreAlgo.scala | 2 +- .../nebula/algorithm/lib/LabelPropagationAlgo.scala | 2 ++ .../com/vesoft/nebula/algorithm/lib/LouvainAlgo.scala | 1 + .../com/vesoft/nebula/algorithm/lib/Node2vecAlgo.scala | 2 ++ .../com/vesoft/nebula/algorithm/lib/PageRankAlgo.scala | 1 + .../vesoft/nebula/algorithm/lib/ShortestPathAlgo.scala | 2 +- .../algorithm/lib/StronglyConnectedComponentsAlgo.scala | 1 + .../vesoft/nebula/algorithm/lib/TriangleCountAlgo.scala | 1 + 18 files changed, 28 insertions(+), 9 deletions(-) diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/BetweennessCentralityAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/BetweennessCentralityAlgo.scala index fd5a37b..67a632d 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/BetweennessCentralityAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/BetweennessCentralityAlgo.scala @@ -28,6 +28,7 @@ object BetweennessCentralityAlgo { dataset: Dataset[Row], betweennessConfig: BetweennessConfig, hasWeight: Boolean): DataFrame = { + spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM") var encodeIdDf: DataFrame = null diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/BfsAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/BfsAlgo.scala index 96a3fe7..1356ab1 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/BfsAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/BfsAlgo.scala @@ -25,6 +25,8 @@ object BfsAlgo { * run the louvain algorithm for nebula graph */ def apply(spark: SparkSession, dataset: Dataset[Row], bfsConfig: BfsConfig): DataFrame = { + spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM") + var encodeIdDf: DataFrame = null var finalRoot: Long = 0 diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgo.scala index 27a1bab..df36e56 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgo.scala @@ -35,6 +35,8 @@ object ClosenessAlgo { * run the Closeness algorithm for nebula graph */ def apply(spark: SparkSession, dataset: Dataset[Row], hasWeight: Boolean): DataFrame = { + spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM") + val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight) val closenessRDD = execute(graph) val schema = StructType( diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClusteringCoefficientAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClusteringCoefficientAlgo.scala index 9496708..878e6f7 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClusteringCoefficientAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClusteringCoefficientAlgo.scala @@ -24,6 +24,7 @@ object ClusteringCoefficientAlgo { def apply(spark: SparkSession, dataset: Dataset[Row], coefficientConfig: CoefficientConfig): DataFrame = { + spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM") var encodeIdDf: DataFrame = null diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ConnectedComponentsAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ConnectedComponentsAlgo.scala index 16e6f32..19c6f23 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ConnectedComponentsAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ConnectedComponentsAlgo.scala @@ -10,7 +10,6 @@ import com.vesoft.nebula.algorithm.utils.{DecodeUtil, NebulaUtil} import org.apache.log4j.Logger import org.apache.spark.graphx.{Graph, VertexId, VertexRDD} import org.apache.spark.rdd.RDD -import com.vesoft.nebula.algorithm.utils.NebulaUtil import org.apache.spark.graphx.lib.ConnectedComponents import org.apache.spark.sql.types.{DoubleType, LongType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} @@ -27,6 +26,7 @@ object ConnectedComponentsAlgo { dataset: Dataset[Row], ccConfig: CcConfig, hasWeight: Boolean): DataFrame = { + spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM") var encodeIdDf: DataFrame = null diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/DegreeStaticAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/DegreeStaticAlgo.scala index 7721ecc..7d24500 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/DegreeStaticAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/DegreeStaticAlgo.scala @@ -25,6 +25,8 @@ object DegreeStaticAlgo { def apply(spark: SparkSession, dataset: Dataset[Row], degreeConfig: DegreeStaticConfig = new DegreeStaticConfig): DataFrame = { + spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM") + var encodeIdDf: DataFrame = null val graph: Graph[None.type, Double] = if (degreeConfig.encodeId) { diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/DfsAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/DfsAlgo.scala index 1ee5d4f..efe6702 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/DfsAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/DfsAlgo.scala @@ -22,8 +22,11 @@ import scala.collection.mutable object DfsAlgo { var iterNums = 0 + val ALGORITHM = "dfs" def apply(spark: SparkSession, dataset: Dataset[Row], dfsConfig: DfsConfig): DataFrame = { + spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM") + var encodeIdDf: DataFrame = null var finalRoot: Long = 0 diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/GraphTriangleCountAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/GraphTriangleCountAlgo.scala index 4c749db..c2bd891 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/GraphTriangleCountAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/GraphTriangleCountAlgo.scala @@ -13,8 +13,10 @@ import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructTyp * compute all graph's triangle count */ object GraphTriangleCountAlgo { + val ALGORITHM = "graphTriangleCount" def apply(spark: SparkSession, dataset: Dataset[Row]): DataFrame = { + spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM") val triangleCount = TriangleCountAlgo(spark, dataset) val count = triangleCount diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/HanpAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/HanpAlgo.scala index 208c7b6..047f5b5 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/HanpAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/HanpAlgo.scala @@ -28,6 +28,7 @@ object HanpAlgo { hanpConfig: HanpConfig, hasWeight: Boolean, preferences: RDD[(VertexId, Double)] = null): DataFrame = { + spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM") var encodeIdDf: DataFrame = null diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/JaccardAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/JaccardAlgo.scala index 8339d70..30877e5 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/JaccardAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/JaccardAlgo.scala @@ -6,15 +6,11 @@ package com.vesoft.nebula.algorithm.lib import com.vesoft.nebula.algorithm.config.JaccardConfig +import com.vesoft.nebula.algorithm.lib.HanpAlgo.ALGORITHM import com.vesoft.nebula.algorithm.utils.{DecodeUtil, NebulaUtil} import org.apache.log4j.Logger import org.apache.spark.graphx.Graph -import org.apache.spark.ml.feature.{ - CountVectorizer, - CountVectorizerModel, - MinHashLSH, - MinHashLSHModel -} +import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel, MinHashLSH, MinHashLSHModel} import org.apache.spark.ml.linalg.SparseVector import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} @@ -30,6 +26,7 @@ object JaccardAlgo { * run the Jaccard algorithm for nebula graph */ def apply(spark: SparkSession, dataset: Dataset[Row], jaccardConfig: JaccardConfig): DataFrame = { + spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM") var encodeIdDf: DataFrame = null var data: DataFrame = dataset diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/KCoreAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/KCoreAlgo.scala index 62bb716..9e7ec36 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/KCoreAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/KCoreAlgo.scala @@ -21,7 +21,7 @@ object KCoreAlgo { * run the louvain algorithm for nebula graph */ def apply(spark: SparkSession, dataset: Dataset[Row], kCoreConfig: KCoreConfig): DataFrame = { - + spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM") var encodeIdDf: DataFrame = null val graph: Graph[None.type, Double] = if (kCoreConfig.encodeId) { diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/LabelPropagationAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/LabelPropagationAlgo.scala index 87563ed..4bfd932 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/LabelPropagationAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/LabelPropagationAlgo.scala @@ -27,6 +27,8 @@ object LabelPropagationAlgo { dataset: Dataset[Row], lpaConfig: LPAConfig, hasWeight: Boolean): DataFrame = { + spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM") + var encodeIdDf: DataFrame = null val graph: Graph[None.type, Double] = if (lpaConfig.encodeId) { diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/LouvainAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/LouvainAlgo.scala index 2b5c0fa..e350253 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/LouvainAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/LouvainAlgo.scala @@ -29,6 +29,7 @@ object LouvainAlgo { dataset: Dataset[Row], louvainConfig: LouvainConfig, hasWeight: Boolean): DataFrame = { + spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM") var encodeIdDf: DataFrame = null diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgo.scala index 36356c4..3e45d8a 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgo.scala @@ -270,6 +270,8 @@ object Node2vecAlgo { dataset: Dataset[Row], node2vecConfig: Node2vecConfig, hasWeight: Boolean): DataFrame = { + spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM") + val inputGraph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight) this.context = spark.sparkContext this.node2vecConfig = node2vecConfig diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/PageRankAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/PageRankAlgo.scala index 5a69017..12aaca0 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/PageRankAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/PageRankAlgo.scala @@ -26,6 +26,7 @@ object PageRankAlgo { dataset: Dataset[Row], pageRankConfig: PRConfig, hasWeight: Boolean): DataFrame = { + spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM") var encodeIdDf: DataFrame = null diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ShortestPathAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ShortestPathAlgo.scala index 650b0df..29c24ae 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ShortestPathAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ShortestPathAlgo.scala @@ -30,7 +30,7 @@ object ShortestPathAlgo { dataset: Dataset[Row], shortestPathConfig: ShortestPathConfig, hasWeight: Boolean): DataFrame = { - + spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM") val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight) val prResultRDD = execute(graph, shortestPathConfig.landmarks) diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/StronglyConnectedComponentsAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/StronglyConnectedComponentsAlgo.scala index 5e8583b..08c2575 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/StronglyConnectedComponentsAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/StronglyConnectedComponentsAlgo.scala @@ -25,6 +25,7 @@ object StronglyConnectedComponentsAlgo { dataset: Dataset[Row], ccConfig: CcConfig, hasWeight: Boolean): DataFrame = { + spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM") var encodeIdDf: DataFrame = null diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/TriangleCountAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/TriangleCountAlgo.scala index 377c5a7..37f1a68 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/TriangleCountAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/TriangleCountAlgo.scala @@ -27,6 +27,7 @@ object TriangleCountAlgo { def apply(spark: SparkSession, dataset: Dataset[Row], triangleConfig: TriangleConfig = new TriangleConfig): DataFrame = { + spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM") var encodeIdDf: DataFrame = null