Skip to content

Commit

Permalink
compute the vertex/edge amount from ngql (#169)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 authored Nov 6, 2023
1 parent 9432d3f commit 7b1a3f1
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.util.LongAccumulator

import java.util.regex.Pattern
import scala.collection.mutable.ArrayBuffer

class ReloadProcessor(data: DataFrame,
config: Configs,
batchSuccess: LongAccumulator,
batchFailure: LongAccumulator)
batchFailure: LongAccumulator,
recordSuccess: LongAccumulator)
extends Processor {
@transient
private[this] lazy val LOG = Logger.getLogger(this.getClass)
Expand All @@ -46,9 +48,12 @@ class ReloadProcessor(data: DataFrame,
// batch write
val startTime = System.currentTimeMillis
iterator.foreach { row =>
val ngql = row.getString(0)
val recordSize = computeRecordNumber(ngql)
val failStatement = writer.writeNgql(row.getString(0))
if (failStatement == null) {
batchSuccess.add(1)
recordSuccess.add(recordSize)
} else {
errorBuffer.append(failStatement)
batchFailure.add(1)
Expand All @@ -66,4 +71,22 @@ class ReloadProcessor(data: DataFrame,
writer.close()
graphProvider.close()
}

/**
* compute the record amount of ngql
* @param ngql nebula insert ngql
*/
private def computeRecordNumber(ngql: String): Int = {
val substring = ": ("
var count = 0
var index = 0
while (index != -1) {
count += 1
index = ngql.indexOf(substring, index)
if (index != (-1)) {
index += substring.length
}
}
count
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,17 @@ object Exchange {

// reload for failed import tasks
if (c.reload.nonEmpty) {
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reload")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reload")
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reload")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reload")
val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.reimport")

val start = System.currentTimeMillis()
val data = spark.read.text(c.reload)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess)
processor.process()
LOG.info(s">>>>> batchSuccess.reload: ${batchSuccess.value}")
LOG.info(s">>>>> batchFailure.reload: ${batchFailure.value}")
LOG.info(s">>>>> recordSuccess.reload: ${recordSuccess.value}")
LOG.info(
s">>>>> exchange reload job finished, cost:${((System.currentTimeMillis() - start) / 1000.0)
.formatted("%.2f")}s")
Expand Down Expand Up @@ -293,18 +295,25 @@ object Exchange {
if (failures > 0 && ErrorHandler.existError(errorPath)) {
spark.sparkContext.setJobGroup("Reload", s"Reload: ${errorPath}")

val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val data = spark.read.text(errorPath)
val start = System.currentTimeMillis()
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure)
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.reimport")
val data = spark.read.text(errorPath)
val start = System.currentTimeMillis()
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess)
processor.process()
val costTime = ((System.currentTimeMillis() - start) / 1000.0).formatted("%.2f")
LOG.info(s">>>>> reimport ngql cost time: ${costTime}")
LOG.info(s">>>>> batchSuccess.reimport: ${batchSuccess.value}")
LOG.info(s">>>>> batchFailure.reimport: ${batchFailure.value}")
LOG.info(s">>>>> recordSuccess.reimport: ${recordSuccess.value}")
totalClientBatchSuccess += batchSuccess.value
totalClientBatchFailure -= batchSuccess.value
totalClientRecordSuccess += recordSuccess.value
totalClientRecordFailure -= recordSuccess.value
if (totalClientRecordFailure < 0) {
totalClientRecordFailure = 0
}
}
spark.close()
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,17 @@ object Exchange {

// reload for failed import tasks
if (c.reload.nonEmpty) {
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reload")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reload")
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reload")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reload")
val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.reimport")

val start = System.currentTimeMillis()
val data = spark.read.text(c.reload)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess)
processor.process()
LOG.info(s">>>>> batchSuccess.reload: ${batchSuccess.value}")
LOG.info(s">>>>> batchFailure.reload: ${batchFailure.value}")
LOG.info(s">>>>> recordSuccess.reload: ${recordSuccess.value}")
LOG.info(
s">>>>> exchange reload job finished, cost:${((System.currentTimeMillis() - start) / 1000.0)
.formatted("%.2f")}s")
Expand Down Expand Up @@ -290,18 +292,25 @@ object Exchange {
val errorPath = s"${configs.errorConfig.errorPath}/${SparkEnv.get.blockManager.conf.getAppId}"
if (failures > 0 && ErrorHandler.existError(errorPath)) {
spark.sparkContext.setJobGroup("Reload", s"Reload: ${errorPath}")
val start = System.currentTimeMillis()
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val data = spark.read.text(errorPath)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure)
val start = System.currentTimeMillis()
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.reimport")
val data = spark.read.text(errorPath)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess)
processor.process()
val costTime = ((System.currentTimeMillis() - start) / 1000.0).formatted("%.2f")
LOG.info(s">>>>> reimport ngql cost time: ${costTime}")
LOG.info(s">>>>> batchSuccess.reimport: ${batchSuccess.value}")
LOG.info(s">>>>> batchFailure.reimport: ${batchFailure.value}")
LOG.info(s">>>>> recordSuccess.reimport: ${recordSuccess.value}")
totalClientBatchSuccess += batchSuccess.value
totalClientBatchFailure -= batchSuccess.value
totalClientRecordSuccess += recordSuccess.value
totalClientRecordFailure -= recordSuccess.value
if (totalClientRecordFailure < 0) {
totalClientRecordFailure = 0
}
}
spark.close()
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,17 +129,17 @@ object Exchange {

// reload for failed import tasks
if (c.reload.nonEmpty) {
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reload")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reload")
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reload")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reload")
val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.reimport")

val start = System.currentTimeMillis()
val data = spark.read.text(c.reload)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess)
processor.process()
LOG.info(s">>>>> batchSuccess.reload: ${batchSuccess.value}")
LOG.info(s">>>>> batchFailure.reload: ${batchFailure.value}")
LOG.info(s">>>>> batchSuccess.reload: ${batchSuccess.value}")
LOG.info(s">>>>> batchFailure.reload: ${batchFailure.value}")
LOG.info(s">>>>> recordSuccess.reload: ${recordSuccess.value}")
LOG.info(
s">>>>> exchange reload job finished, cost:${((System.currentTimeMillis() - start) / 1000.0)
.formatted("%.2f")}s")
Expand Down Expand Up @@ -291,18 +291,25 @@ object Exchange {
val errorPath = s"${configs.errorConfig.errorPath}/${SparkEnv.get.blockManager.conf.getAppId}"
if (failures > 0 && ErrorHandler.existError(errorPath)) {
spark.sparkContext.setJobGroup("Reload", s"Reload: ${errorPath}")
val start = System.currentTimeMillis()
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val data = spark.read.text(errorPath)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure)
val start = System.currentTimeMillis()
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.reimport")
val data = spark.read.text(errorPath)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess)
processor.process()
val costTime = ((System.currentTimeMillis() - start) / 1000.0).formatted("%.2f")
LOG.info(s">>>>> reimport ngql cost time: ${costTime}")
LOG.info(s">>>>> batchSuccess.reimport: ${batchSuccess.value}")
LOG.info(s">>>>> batchFailure.reimport: ${batchFailure.value}")
LOG.info(s">>>>> recordSuccess.reimport: ${recordSuccess.value}")
totalClientBatchSuccess += batchSuccess.value
totalClientBatchFailure -= batchSuccess.value
totalClientRecordSuccess += recordSuccess.value
totalClientRecordFailure -= recordSuccess.value
if (totalClientRecordFailure < 0) {
totalClientRecordFailure = 0
}
}
spark.close()
LOG.info(
Expand Down

0 comments on commit 7b1a3f1

Please sign in to comment.