Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add import status and records count #162

Merged
merged 2 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class GraphProvider(addresses: List[HostAddress], timeout: Int, sslConfigEntry:

def switchSpace(session: Session, space: String): ResultSet = {
val switchStatment = s"use $space"
LOG.info(s"switch space $space")
LOG.info(s">>>>>> switch space $space")
val result = submit(session, switchStatment)
result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ case class DataBaseConfigEntry(graphAddress: List[String],
"nebula.address.meta has wrong format,,please make sure the format is [\"ip1:port1\",\"ip2:port2\"]")
}

override def toString: String = super.toString
override def toString: String = {
s"DataBaseConfigEntry:{graphAddress:$graphAddress, space:$space, metaAddress:$metaAddresses}"
}

def getGraphAddress: List[HostAddress] = {
val hostAndPorts = new ListBuffer[HostAddress]
Expand Down Expand Up @@ -94,7 +96,8 @@ case class DataBaseConfigEntry(graphAddress: List[String],
case class UserConfigEntry(user: String, password: String) {
require(user.trim.nonEmpty && password.trim.nonEmpty)

override def toString: String = super.toString
override def toString: String =
s"UserConfigEntry{user:$user, password:xxxxx}"
}

/**
Expand All @@ -106,7 +109,7 @@ case class UserConfigEntry(user: String, password: String) {
case class ConnectionConfigEntry(timeout: Int, retry: Int) {
require(timeout > 0 && retry > 0)

override def toString: String = super.toString
override def toString: String = s"cConnectionConfigEntry:{timeout:$timeout, retry:$retry}"
}

/**
Expand All @@ -119,7 +122,7 @@ case class ConnectionConfigEntry(timeout: Int, retry: Int) {
case class ExecutionConfigEntry(timeout: Int, retry: Int, interval: Int) {
require(timeout > 0 && retry > 0 && interval > 0)

override def toString: String = super.toString
override def toString: String = s"ExecutionConfigEntry:{timeout:$timeout, retry:$retry}"
}

/**
Expand All @@ -131,7 +134,8 @@ case class ExecutionConfigEntry(timeout: Int, retry: Int, interval: Int) {
case class ErrorConfigEntry(errorPath: String, errorMaxSize: Int) {
require(errorPath.trim.nonEmpty && errorMaxSize > 0)

override def toString: String = super.toString
override def toString: String =
s"ErrorConfigEntry:{errorPath:$errorPath, errorMaxSize:$errorMaxSize}"
}

/**
Expand All @@ -143,7 +147,7 @@ case class ErrorConfigEntry(errorPath: String, errorMaxSize: Int) {
case class RateConfigEntry(limit: Int, timeout: Int) {
require(limit > 0 && timeout > 0)

override def toString: String = super.toString
override def toString: String = s"RateConfigEntry:{limit:$limit, timeout:$timeout}"
}

/**
Expand All @@ -168,7 +172,7 @@ case class SslConfigEntry(enableGraph: Boolean,
}
}

override def toString: String = super.toString
override def toString: String = s"SslConfigEntry:{enableGraph:$enableGraph, enableMeta:$enableMeta, signType:${signType.toString}}"
}

case class CaSignParam(caCrtFilePath: String, crtFilePath: String, keyFilePath: String)
Expand Down Expand Up @@ -915,7 +919,7 @@ object Configs {
)
}
case SourceCategory.CLICKHOUSE => {
val partition: String = getOrElse (config, "numPartition", "1")
val partition: String = getOrElse(config, "numPartition", "1")
ClickHouseConfigEntry(
SourceCategory.CLICKHOUSE,
config.getString("url"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class ReloadProcessor(data: DataFrame,
.getPartitionId()}")
errorBuffer.clear()
}
LOG.info(s"data reload in partition ${TaskContext
LOG.info(s">>>>> data reload in partition ${TaskContext
.getPartitionId()} cost ${System.currentTimeMillis() - startTime}ms")
writer.close()
graphProvider.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ class NebulaSSTWriter(path: String) extends Writer {

try {
RocksDB.loadLibrary()
LOG.info("Loading RocksDB successfully")
LOG.info(">>>>> Loading RocksDB successfully")
} catch {
case _: Exception =>
LOG.error("Can't load RocksDB library!")
LOG.error(">>>>> Can't load RocksDB library!")
}

// TODO More Config ...
Expand Down Expand Up @@ -108,7 +108,7 @@ class GenerateSstFile extends Serializable {
}
} catch {
case e: Throwable => {
LOG.error("sst file write error,", e)
LOG.error(">>>>> sst file write error,", e)
batchFailure.add(1)
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
throw new RuntimeException("Switch Failed for " + switchResult.getErrorMessage)
}

LOG.info(s"Connection to ${dataBaseConfigEntry.graphAddress}")
LOG.info(s">>>>>> Connection to ${dataBaseConfigEntry.graphAddress}")
}

def execute(vertices: Vertices, writeMode: WriteMode.Mode): String = {
Expand Down Expand Up @@ -329,16 +329,16 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
val result = graphProvider.submit(session, statement)
if (result.isSucceeded) {
LOG.info(
s" write ${config.name}, batch size(${vertices.values.size}), latency(${result.getLatency})")
s">>>>> write ${config.name}, batch size(${vertices.values.size}), latency(${result.getLatency})")
return null
}
LOG.error(s"write vertex failed for ${result.getErrorMessage}")
LOG.error(s">>>>> write vertex failed for ${result.getErrorMessage} statement: \n $statement")
if (result.getErrorCode == ErrorCode.E_BAD_PERMISSION.getValue) {
throw new RuntimeException(
s"write ${config.name} failed for E_BAD_PERMISSION: ${result.getErrorMessage}")
}
} else {
LOG.error(s"write vertex failed because write speed is too fast")
LOG.error(s">>>>>> write vertex failed because write speed is too fast")
}
statement
}
Expand All @@ -349,16 +349,16 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
val result = graphProvider.submit(session, statement)
if (result.isSucceeded) {
LOG.info(
s" write ${config.name}, batch size(${edges.values.size}), latency(${result.getLatency}us)")
s">>>>>> write ${config.name}, batch size(${edges.values.size}), latency(${result.getLatency}us)")
return null
}
LOG.error(s"write edge failed for ${result.getErrorMessage}")
LOG.error(s">>>>>> write edge failed for ${result.getErrorMessage}")
if (result.getErrorCode == ErrorCode.E_BAD_PERMISSION.getValue) {
throw new RuntimeException(
s"write ${config.name} failed for E_BAD_PERMISSION: ${result.getErrorMessage}")
}
} else {
LOG.error(s"write vertex failed because write speed is too fast")
LOG.error(s">>>>>> write vertex failed because write speed is too fast")
}
statement
}
Expand All @@ -369,9 +369,9 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
if (result.isSucceeded) {
return null
}
LOG.error(s"reimport ngql failed for ${result.getErrorMessage}")
LOG.error(s">>>>>> reimport ngql failed for ${result.getErrorMessage}")
} else {
LOG.error(s"reimport ngql failed because write speed is too fast")
LOG.error(s">>>>>> reimport ngql failed because write speed is too fast")
}
ngql
}
Expand Down
Loading