Skip to content

Commit

Permalink
change config to list
Browse files Browse the repository at this point in the history
  • Loading branch information
darionyaphet committed Dec 25, 2019
1 parent 7da8a04 commit 429a0b9
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,19 @@
execution {
retry: 3
}

error: {
max: 32
output: /tmp/errors
}
}

# Processing tags
tags: {
tags: [

# Loading tag from HDFS and data type is parquet
tag-name-0: {
{
name: tag-name-0
type: parquet
path: hdfs tag path 0
fields: {
Expand All @@ -48,7 +54,8 @@
}

# Loading from Hive
tag-name-1: {
{
name: tag-name-1
type: hive
exec: "select hive-field0, hive-field1, hive-field2 from database.table"
fields: {
Expand All @@ -59,12 +66,13 @@
vertex: hive-field-0
partition: 32
}
}
]

# Processing edges
edges: {
edges: [
# Loading tag from HDFS and data type is parquet
edge-name-0: {
{
name: edge-name-0
type: json
path: hdfs edge path 0
fields: {
Expand All @@ -77,19 +85,20 @@
ranking: hive-field-2
partition: 32
}
}

# Loading from Hive
edge-name-1: {
type: hive
exec: "select hive-field0, hive-field1, hive-field2 from database.table"
fields: {
hive-field-0: nebula-field-0,
hive-field-1: nebula-field-1,
hive-field-2: nebula-field-2
# Loading from Hive
{
name: edge-name-1
type: hive
exec: "select hive-field0, hive-field1, hive-field2 from database.table"
fields: {
hive-field-0: nebula-field-0,
hive-field-1: nebula-field-1,
hive-field-2: nebula-field-2
}
source: hive-field-0
target: hive-field-1
partition: 32
}
source: hive-field-0
target: hive-field-1
partition: 32
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

package com.vesoft.nebula.tools.generator.v2

import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession}
import org.apache.spark.sql.{DataFrame, Encoders, Row, SaveMode, SparkSession}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.udf
Expand All @@ -20,13 +20,16 @@ import org.apache.log4j.Logger
import org.apache.spark.sql.types._

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import util.control.Breaks._

case class Argument(config: File = new File("application.conf"),
hive: Boolean = false,
directly: Boolean = false,
dry: Boolean = false)
final case class Argument(config: File = new File("application.conf"),
hive: Boolean = false,
directly: Boolean = false,
dry: Boolean = false)

final case class TooManyErrorException(private val message: String) extends Exception(message)

/**
* SparkClientGenerator is a simple spark job used to write data into Nebula Graph parallel.
Expand All @@ -41,13 +44,15 @@ object SparkClientGenerator {
private[this] val EDGE_VALUE_TEMPLATE = "%d->%d@%d: (%s)"
private[this] val USE_TEMPLATE = "USE %s"

private[this] val DEFAULT_BATCH = 2
private[this] val DEFAULT_PARTITION = -1
private[this] val DEFAULT_CONNECTION_TIMEOUT = 3000
private[this] val DEFAULT_CONNECTION_RETRY = 3
private[this] val DEFAULT_EXECUTION_RETRY = 3
private[this] val DEFAULT_EXECUTION_INTERVAL = 3000
private[this] val DEFAULT_EDGE_RANKING = 0L
private[this] val DEFAULT_BATCH = 2
private[this] val DEFAULT_PARTITION = -1
private[this] val DEFAULT_CONNECTION_TIMEOUT = 3000
private[this] val DEFAULT_CONNECTION_RETRY = 3
private[this] val DEFAULT_EXECUTION_RETRY = 3
private[this] val DEFAULT_EXECUTION_INTERVAL = 3000
private[this] val DEFAULT_EDGE_RANKING = 0L
private[this] val DEFAULT_ERROR_OUTPUT_PATH = "/tmp/nebula.writer.errors/"
private[this] val DEFAULT_ERROR_MAX_BATCH_SIZE = Int.MaxValue

// GEO default config
private[this] val DEFAULT_MIN_CELL_LEVEL = 5
Expand Down Expand Up @@ -108,9 +113,13 @@ object SparkClientGenerator {
val executionInterval =
getOrElse(nebulaConfig, "execution.interval", DEFAULT_EXECUTION_INTERVAL)

val errorPath = getOrElse(nebulaConfig, "error.output", DEFAULT_ERROR_OUTPUT_PATH)
val errorMaxSize = getOrElse(nebulaConfig, "error.max", DEFAULT_ERROR_MAX_BATCH_SIZE)

LOG.info(s"Nebula Addresses ${addresses} for ${user}:${pswd}")
LOG.info(s"Connection Timeout ${connectionTimeout} Retry ${connectionRetry}")
LOG.info(s"Execution Retry ${executionRetry} Interval Base ${executionInterval}")
LOG.info(s"Error Path ${errorPath}")
LOG.info(s"Switch to ${space}")

val session = SparkSession
Expand Down Expand Up @@ -143,17 +152,18 @@ object SparkClientGenerator {
if (c.hive) session.enableHiveSupport().getOrCreate()
else session.getOrCreate()

val tagConfigs =
if (config.hasPath("tags"))
Some(config.getObject("tags"))
else None

val tagConfigs = getConfigOrNone(config, "tags")
if (tagConfigs.isDefined) {
for (tagName <- tagConfigs.get.unwrapped.keySet.asScala) {
for (tagConfig <- tagConfigs.get.asScala) {
if (!tagConfig.hasPath("name")) {
LOG.error("The `name` must be specified")
break()
}

val tagName = tagConfig.getString("name")
LOG.info(s"Processing Tag ${tagName}")
val tagConfig = config.getConfig(s"tags.${tagName}")
if (!tagConfig.hasPath("type")) {
LOG.error("The type must be specified")
LOG.error("The `type` must be specified")
break()
}

Expand Down Expand Up @@ -204,6 +214,7 @@ object SparkClientGenerator {
executionRetry)

if (isSuccessfully(client.connect(user, pswd))) {
val errorBuffer = ArrayBuffer[String]()
if (isSuccessfully(client.execute(USE_TEMPLATE.format(space)))) {
iterator.grouped(batch).foreach { tags =>
val exec = BATCH_INSERT_TEMPLATE.format(
Expand All @@ -225,8 +236,21 @@ object SparkClientGenerator {
time * executionInterval + Random.nextInt(10) * 100L)(exec)) {
break
}
LOG.debug("Save the error execution sentence into buffer")
errorBuffer += exec

if (errorBuffer.size == errorMaxSize) {
throw TooManyErrorException(s"Too Many Error ${errorMaxSize}")
}
}
}
spark
.createDataset(errorBuffer)(Encoders.STRING)
.repartition(1)
.write
.mode(SaveMode.Append)
.text(s"${errorPath}/${tagName}")

} else {
LOG.error(s"Switch ${space} Failed")
}
Expand All @@ -242,20 +266,22 @@ object SparkClientGenerator {
}

val edgeConfigs = getConfigOrNone(config, "edges")

if (edgeConfigs.isDefined) {
for (edgeName <- edgeConfigs.get.unwrapped.keySet.asScala) {
LOG.info(s"Processing Edge ${edgeName}")
val edgeConfig = config.getConfig(s"edges.${edgeName}")
for (edgeConfig <- edgeConfigs.get.asScala) {
if (!edgeConfig.hasPath("name")) {
LOG.error("The `name` must be specified")
break()
}

val edgeName = edgeConfig.getString("name")
if (!edgeConfig.hasPath("type")) {
LOG.error("The type must be specified")
LOG.error("The `type` must be specified")
break()
}

val pathOpt = if (edgeConfig.hasPath("path")) {
Some(edgeConfig.getString("path"))
} else {
LOG.warn("The path is not setting")
None
}

Expand Down Expand Up @@ -336,6 +362,7 @@ object SparkClientGenerator {
connectionRetry,
executionRetry)
if (isSuccessfully(client.connect(user, pswd))) {
val errorBuffer = ArrayBuffer[String]()
if (isSuccessfully(client.execute(USE_TEMPLATE.format(space)))) {
iterator.grouped(batch).foreach { edges =>
val values =
Expand Down Expand Up @@ -373,8 +400,21 @@ object SparkClientGenerator {
time * executionInterval + Random.nextInt(10) * 100L)(exec)) {
break
}
LOG.debug("Save the error execution sentence into buffer")
errorBuffer += exec

if (errorBuffer.size == errorMaxSize) {
throw TooManyErrorException(s"Too Many Error ${errorMaxSize}")
}
}
}

spark
.createDataset(errorBuffer)(Encoders.STRING)
.repartition(1)
.write
.mode(SaveMode.Append)
.text(s"${errorPath}/${edgeName}")
} else {
LOG.error(s"Switch ${space} Failed")
}
Expand Down Expand Up @@ -620,7 +660,7 @@ object SparkClientGenerator {
*/
private[this] def getConfigOrNone(config: Config, path: String) = {
if (config.hasPath(path)) {
Some(config.getObject(path))
Some(config.getConfigList(path))
} else {
None
}
Expand Down

0 comments on commit 429a0b9

Please sign in to comment.