Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compute the vertex/edge amount from ngql #169

Merged
merged 1 commit into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.util.LongAccumulator

import java.util.regex.Pattern
import scala.collection.mutable.ArrayBuffer

class ReloadProcessor(data: DataFrame,
config: Configs,
batchSuccess: LongAccumulator,
batchFailure: LongAccumulator)
batchFailure: LongAccumulator,
recordSuccess: LongAccumulator)
extends Processor {
@transient
private[this] lazy val LOG = Logger.getLogger(this.getClass)
Expand All @@ -46,9 +48,12 @@ class ReloadProcessor(data: DataFrame,
// batch write
val startTime = System.currentTimeMillis
iterator.foreach { row =>
val ngql = row.getString(0)
val recordSize = computeRecordNumber(ngql)
val failStatement = writer.writeNgql(row.getString(0))
if (failStatement == null) {
batchSuccess.add(1)
recordSuccess.add(recordSize)
} else {
errorBuffer.append(failStatement)
batchFailure.add(1)
Expand All @@ -66,4 +71,22 @@ class ReloadProcessor(data: DataFrame,
writer.close()
graphProvider.close()
}

/**
* compute the record amount of ngql
* @param ngql nebula insert ngql
*/
private def computeRecordNumber(ngql: String): Int = {
val substring = ": ("
var count = 0
var index = 0
while (index != -1) {
count += 1
index = ngql.indexOf(substring, index)
if (index != (-1)) {
index += substring.length
}
}
count
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,17 @@ object Exchange {

// reload for failed import tasks
if (c.reload.nonEmpty) {
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reload")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reload")
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reload")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reload")
val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.reimport")

val start = System.currentTimeMillis()
val data = spark.read.text(c.reload)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess)
processor.process()
LOG.info(s">>>>> batchSuccess.reload: ${batchSuccess.value}")
LOG.info(s">>>>> batchFailure.reload: ${batchFailure.value}")
LOG.info(s">>>>> recordSuccess.reload: ${recordSuccess.value}")
LOG.info(
s">>>>> exchange reload job finished, cost:${((System.currentTimeMillis() - start) / 1000.0)
.formatted("%.2f")}s")
Expand Down Expand Up @@ -295,18 +297,25 @@ object Exchange {
if (failures > 0 && ErrorHandler.existError(errorPath)) {
spark.sparkContext.setJobGroup("Reload", s"Reload: ${errorPath}")

val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val data = spark.read.text(errorPath)
val start = System.currentTimeMillis()
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure)
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.reimport")
val data = spark.read.text(errorPath)
val start = System.currentTimeMillis()
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess)
processor.process()
val costTime = ((System.currentTimeMillis() - start) / 1000.0).formatted("%.2f")
LOG.info(s">>>>> reimport ngql cost time: ${costTime}")
LOG.info(s">>>>> batchSuccess.reimport: ${batchSuccess.value}")
LOG.info(s">>>>> batchFailure.reimport: ${batchFailure.value}")
LOG.info(s">>>>> recordSuccess.reimport: ${recordSuccess.value}")
totalClientBatchSuccess += batchSuccess.value
totalClientBatchFailure -= batchSuccess.value
totalClientRecordSuccess += recordSuccess.value
totalClientRecordFailure -= recordSuccess.value
if (totalClientRecordFailure < 0) {
totalClientRecordFailure = 0
}
}
spark.close()
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,17 @@ object Exchange {

// reload for failed import tasks
if (c.reload.nonEmpty) {
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reload")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reload")
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reload")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reload")
val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.reimport")

val start = System.currentTimeMillis()
val data = spark.read.text(c.reload)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess)
processor.process()
LOG.info(s">>>>> batchSuccess.reload: ${batchSuccess.value}")
LOG.info(s">>>>> batchFailure.reload: ${batchFailure.value}")
LOG.info(s">>>>> recordSuccess.reload: ${recordSuccess.value}")
LOG.info(
s">>>>> exchange reload job finished, cost:${((System.currentTimeMillis() - start) / 1000.0)
.formatted("%.2f")}s")
Expand Down Expand Up @@ -294,18 +296,25 @@ object Exchange {
val errorPath = s"${configs.errorConfig.errorPath}/${SparkEnv.get.blockManager.conf.getAppId}"
if (failures > 0 && ErrorHandler.existError(errorPath)) {
spark.sparkContext.setJobGroup("Reload", s"Reload: ${errorPath}")
val start = System.currentTimeMillis()
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val data = spark.read.text(errorPath)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure)
val start = System.currentTimeMillis()
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.reimport")
val data = spark.read.text(errorPath)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess)
processor.process()
val costTime = ((System.currentTimeMillis() - start) / 1000.0).formatted("%.2f")
LOG.info(s">>>>> reimport ngql cost time: ${costTime}")
LOG.info(s">>>>> batchSuccess.reimport: ${batchSuccess.value}")
LOG.info(s">>>>> batchFailure.reimport: ${batchFailure.value}")
LOG.info(s">>>>> recordSuccess.reimport: ${recordSuccess.value}")
totalClientBatchSuccess += batchSuccess.value
totalClientBatchFailure -= batchSuccess.value
totalClientRecordSuccess += recordSuccess.value
totalClientRecordFailure -= recordSuccess.value
if (totalClientRecordFailure < 0) {
totalClientRecordFailure = 0
}
}
spark.close()
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,17 @@ object Exchange {

// reload for failed import tasks
if (c.reload.nonEmpty) {
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reload")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reload")
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reload")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reload")
val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.reimport")

val start = System.currentTimeMillis()
val data = spark.read.text(c.reload)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess)
processor.process()
LOG.info(s">>>>> batchSuccess.reload: ${batchSuccess.value}")
LOG.info(s">>>>> batchFailure.reload: ${batchFailure.value}")
LOG.info(s">>>>> batchSuccess.reload: ${batchSuccess.value}")
LOG.info(s">>>>> batchFailure.reload: ${batchFailure.value}")
LOG.info(s">>>>> recordSuccess.reload: ${recordSuccess.value}")
LOG.info(
s">>>>> exchange reload job finished, cost:${((System.currentTimeMillis() - start) / 1000.0)
.formatted("%.2f")}s")
Expand Down Expand Up @@ -295,18 +295,25 @@ object Exchange {
val errorPath = s"${configs.errorConfig.errorPath}/${SparkEnv.get.blockManager.conf.getAppId}"
if (failures > 0 && ErrorHandler.existError(errorPath)) {
spark.sparkContext.setJobGroup("Reload", s"Reload: ${errorPath}")
val start = System.currentTimeMillis()
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val data = spark.read.text(errorPath)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure)
val start = System.currentTimeMillis()
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.reimport")
val data = spark.read.text(errorPath)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess)
processor.process()
val costTime = ((System.currentTimeMillis() - start) / 1000.0).formatted("%.2f")
LOG.info(s">>>>> reimport ngql cost time: ${costTime}")
LOG.info(s">>>>> batchSuccess.reimport: ${batchSuccess.value}")
LOG.info(s">>>>> batchFailure.reimport: ${batchFailure.value}")
LOG.info(s">>>>> recordSuccess.reimport: ${recordSuccess.value}")
totalClientBatchSuccess += batchSuccess.value
totalClientBatchFailure -= batchSuccess.value
totalClientRecordSuccess += recordSuccess.value
totalClientRecordFailure -= recordSuccess.value
if (totalClientRecordFailure < 0) {
totalClientRecordFailure = 0
}
}
spark.close()
LOG.info(
Expand Down Expand Up @@ -361,7 +368,8 @@ object Exchange {
Some(reader.read())
case SourceCategory.KAFKA => {
val kafkaConfig = config.asInstanceOf[KafkaSourceConfigEntry]
LOG.info(s""">>>>> Loading from Kafka ${kafkaConfig.server} and subscribe ${kafkaConfig.topic}""")
LOG.info(
s""">>>>> Loading from Kafka ${kafkaConfig.server} and subscribe ${kafkaConfig.topic}""")
val reader = new KafkaReader(session, kafkaConfig, fields)
Some(reader.read())
}
Expand Down