diff --git a/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkWriterExample.scala b/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkWriterExample.scala index 301a3a70..b97c2a33 100644 --- a/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkWriterExample.scala +++ b/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkWriterExample.scala @@ -106,6 +106,7 @@ object NebulaSparkWriterExample { .withWriteMode(WriteMode.DELETE) .withVidAsProp(false) .withBatch(1000) + .withDisableWriteLog(true) .build() df.write.nebula(config, nebulaWriteVertexConfig).writeVertices() } @@ -133,6 +134,7 @@ object NebulaSparkWriterExample { .withDstAsProperty(false) .withRankAsProperty(false) .withBatch(1000) + .withDisableWriteLog(true) .build() df.write.nebula(config, nebulaWriteEdgeConfig).writeEdges() } diff --git a/nebula-spark-common/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala b/nebula-spark-common/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala index 5ce02470..65a1c9b3 100644 --- a/nebula-spark-common/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala +++ b/nebula-spark-common/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala @@ -21,19 +21,29 @@ class NebulaConnectionConfig(metaAddress: String, signType: SSLSignType.Value, caSignParam: CASSLSignParams, selfSignParam: SelfSSLSignParams) - extends Serializable { - def getMetaAddress = metaAddress - def getGraphAddress = graphAddress - def getTimeout = timeout - def getConnectionRetry = connectionRetry - def getExecRetry = executeRetry - def getEnableMetaSSL = enableMetaSSL - def getEnableGraphSSL = enableGraphSSL + extends Serializable { + def getMetaAddress = metaAddress + + def getGraphAddress = graphAddress + + def getTimeout = timeout + + def getConnectionRetry = connectionRetry + + def getExecRetry = executeRetry + + def getEnableMetaSSL = enableMetaSSL + + def getEnableGraphSSL = enableGraphSSL + def getEnableStorageSSL = enableStorageSSL - def getSignType = signType.toString + + def getSignType = signType.toString + def getCaSignParam: String = { caSignParam.caCrtFilePath + "," + caSignParam.crtFilePath + "," + caSignParam.keyFilePath } + def getSelfSignParam: String = { selfSignParam.crtFilePath + "," + selfSignParam.keyFilePath + "," + selfSignParam.password } @@ -44,46 +54,46 @@ object NebulaConnectionConfig { class ConfigBuilder { private val LOG = LoggerFactory.getLogger(this.getClass) - protected var metaAddress: String = _ + protected var metaAddress: String = _ protected var graphAddress: String = _ - protected var timeout: Int = 6000 + protected var timeout: Int = 6000 protected var connectionRetry: Int = 1 - protected var executeRetry: Int = 1 + protected var executeRetry: Int = 1 - protected var enableMetaSSL: Boolean = false - protected var enableGraphSSL: Boolean = false - protected var enableStorageSSL: Boolean = false - protected var sslSignType: SSLSignType.Value = _ - protected var caSignParam: CASSLSignParams = null + protected var enableMetaSSL: Boolean = false + protected var enableGraphSSL: Boolean = false + protected var enableStorageSSL: Boolean = false + protected var sslSignType: SSLSignType.Value = _ + protected var caSignParam: CASSLSignParams = null protected var selfSignParam: SelfSSLSignParams = null /** - * set nebula meta server address, multi addresses is split by English comma - */ + * set nebula meta server address, multi addresses is split by English comma + */ def withMetaAddress(metaAddress: String): ConfigBuilder = { this.metaAddress = metaAddress this } /** - * set nebula graph server address, multi addresses is split by English comma - */ + * set nebula graph server address, multi addresses is split by English comma + */ def withGraphAddress(graphAddress: String): ConfigBuilder = { this.graphAddress = graphAddress this } /** - * set timeout, timeout is optional - */ + * set timeout, timeout is optional + */ def withTimeout(timeout: Int): ConfigBuilder = { this.timeout = timeout this } /** - * set connectionRetry, connectionRetry is optional - */ + * set connectionRetry, connectionRetry is optional + */ @deprecated("use withConnectionRetry instead", "3.7.0") def withConenctionRetry(connectionRetry: Int): ConfigBuilder = { this.connectionRetry = connectionRetry @@ -96,48 +106,48 @@ object NebulaConnectionConfig { } /** - * set executeRetry, executeRetry is optional - */ + * set executeRetry, executeRetry is optional + */ def withExecuteRetry(executeRetry: Int): ConfigBuilder = { this.executeRetry = executeRetry this } /** - * set enableMetaSSL, enableMetaSSL is optional - */ + * set enableMetaSSL, enableMetaSSL is optional + */ def withEnableMetaSSL(enableMetaSSL: Boolean): ConfigBuilder = { this.enableMetaSSL = enableMetaSSL this } /** - * set enableMetaSSL, enableMetaSSL is optional - */ + * set enableMetaSSL, enableMetaSSL is optional + */ def withEnableGraphSSL(enableGraphSSL: Boolean): ConfigBuilder = { this.enableGraphSSL = enableGraphSSL this } /** - * set enableStorageSSL, enableStorageSSL is optional - */ + * set enableStorageSSL, enableStorageSSL is optional + */ def withEnableStorageSSL(enableStorageSSL: Boolean): ConfigBuilder = { this.enableStorageSSL = enableStorageSSL this } /** - * set ssl sign type {@link SSLSignType} - */ + * set ssl sign type {@link SSLSignType} + */ def withSSLSignType(signType: SSLSignType.Value): ConfigBuilder = { this.sslSignType = signType this } /** - * set ca sign param for ssl - */ + * set ca sign param for ssl + */ def withCaSSLSignParam(caCrtFilePath: String, crtFilePath: String, keyFilePath: String): ConfigBuilder = { @@ -146,8 +156,8 @@ object NebulaConnectionConfig { } /** - * set self sign param for ssl - */ + * set self sign param for ssl + */ def withSelfSSLSignParam(crtFilePath: String, keyFilePath: String, password: String): ConfigBuilder = { @@ -156,8 +166,8 @@ object NebulaConnectionConfig { } /** - * check if the connection config is valid - */ + * check if the connection config is valid + */ def check(): Unit = { assert(metaAddress != null && !metaAddress.isEmpty, "config address is empty.") assert(timeout > 0, "timeout must be larger than 0") @@ -191,8 +201,8 @@ object NebulaConnectionConfig { } /** - * build NebulaConnectionConfig - */ + * build NebulaConnectionConfig + */ def build(): NebulaConnectionConfig = { check() new NebulaConnectionConfig( @@ -218,32 +228,40 @@ object NebulaConnectionConfig { } /** - * Base config needed when write dataframe into nebula graph - */ + * Base config needed when write dataframe into nebula graph + */ private[connector] class WriteNebulaConfig(space: String, user: String, passwd: String, batch: Int, writeMode: String, - overwrite: Boolean) - extends Serializable { - def getSpace = space - def getBatch = batch - def getUser = user - def getPasswd = passwd + overwrite: Boolean, + disableWriteLog: Boolean) + extends Serializable { + def getSpace = space + + def getBatch = batch + + def getUser = user + + def getPasswd = passwd + def getWriteMode = writeMode - def isOverwrite = overwrite + + def isOverwrite = overwrite + + def isDisableWriteLog: Boolean = disableWriteLog } /** - * subclass of WriteNebulaConfig to config vertex when write dataframe into nebula graph - * - * @param space: nebula space name - * @param tagName: tag name - * @param vidField: field in dataframe to indicate vertexId - * @param vidPolicy: not required, use hash to map your vertexId - * @param batch: amount of one batch when write into nebula graph - */ + * subclass of WriteNebulaConfig to config vertex when write dataframe into nebula graph + * + * @param space : nebula space name + * @param tagName : tag name + * @param vidField : field in dataframe to indicate vertexId + * @param vidPolicy : not required, use hash to map your vertexId + * @param batch : amount of one batch when write into nebula graph + */ class WriteNebulaVertexConfig(space: String, tagName: String, vidField: String, @@ -254,31 +272,36 @@ class WriteNebulaVertexConfig(space: String, passwd: String, writeMode: String, deleteEdge: Boolean, - overwrite: Boolean) - extends WriteNebulaConfig(space, user, passwd, batch, writeMode, overwrite) { - def getTagName = tagName - def getVidField = vidField - def getVidPolicy = if (vidPolicy == null) "" else vidPolicy - def getVidAsProp = vidAsProp + overwrite: Boolean, + disableWriteLog: Boolean) + extends WriteNebulaConfig(space, user, passwd, batch, writeMode, overwrite, disableWriteLog) { + def getTagName = tagName + + def getVidField = vidField + + def getVidPolicy = if (vidPolicy == null) "" else vidPolicy + + def getVidAsProp = vidAsProp + def getDeleteEdge = deleteEdge } /** - * object WriteNebulaVertexConfig - * */ + * object WriteNebulaVertexConfig + * */ object WriteNebulaVertexConfig { private val LOG: Logger = LoggerFactory.getLogger(this.getClass) class WriteVertexConfigBuilder { - var space: String = _ - var tagName: String = _ + var space: String = _ + var tagName: String = _ var vidPolicy: String = _ - var vidField: String = _ - var batch: Int = 512 - var user: String = "root" - var passwd: String = "nebula" + var vidField: String = _ + var batch: Int = 512 + var user: String = "root" + var passwd: String = "nebula" var writeMode: String = "insert" /** whether set vid as property */ @@ -290,111 +313,122 @@ object WriteNebulaVertexConfig { /** whether overwrite the exists vertex */ var overwrite: Boolean = true + var disableWriteLog: Boolean = true + /** - * set space name - */ + * set space name + */ def withSpace(space: String): WriteVertexConfigBuilder = { this.space = space this } /** - * set tag name - */ + * set tag name + */ def withTag(tagName: String): WriteVertexConfigBuilder = { this.tagName = tagName this } /** - * set which field in dataframe as nebula tag's id - */ + * set which field in dataframe as nebula tag's id + */ def withVidField(vidField: String): WriteVertexConfigBuilder = { this.vidField = vidField this } /** - * set vid policy, its optional - * only "hash" and "uuid" is validate - */ + * set vid policy, its optional + * only "hash" and "uuid" is validate + */ def withVidPolicy(vidPolicy: String): WriteVertexConfigBuilder = { this.vidPolicy = vidPolicy this } /** - * set data amount for one batch, default is 512 - */ + * set data amount for one batch, default is 512 + */ def withBatch(batch: Int): WriteVertexConfigBuilder = { this.batch = batch this } /** - * set whether vid as prop, default is false - */ + * set whether vid as prop, default is false + */ def withVidAsProp(vidAsProp: Boolean): WriteVertexConfigBuilder = { this.vidAsProp = vidAsProp this } /** - * set user name for nebula graph - */ + * set user name for nebula graph + */ def withUser(user: String): WriteVertexConfigBuilder = { this.user = user this } /** - * set password for nebula graph's user - */ + * set password for nebula graph's user + */ def withPasswd(passwd: String): WriteVertexConfigBuilder = { this.passwd = passwd this } /** - * set nebula write mode for nebula tag, INSERT or UPDATE - */ + * set nebula write mode for nebula tag, INSERT or UPDATE + */ def withWriteMode(writeMode: WriteMode.Value): WriteVertexConfigBuilder = { this.writeMode = writeMode.toString this } /** - * set whether delete related edges when delete vertex - */ + * set whether delete related edges when delete vertex + */ def withDeleteEdge(deleteEdge: Boolean): WriteVertexConfigBuilder = { this.deleteEdge = deleteEdge this } /** - * set whether overwrite the exists vertex - */ + * set whether overwrite the exists vertex + */ def withOverwrite(overwrite: Boolean): WriteVertexConfigBuilder = { this.overwrite = overwrite this; } /** - * check and get WriteNebulaVertexConfig - */ + * set whether disable the write log for failed data + */ + def withDisableWriteLog(disableWriteLog: Boolean): WriteVertexConfigBuilder = { + this.disableWriteLog = disableWriteLog + this + } + + /** + * check and get WriteNebulaVertexConfig + */ def build(): WriteNebulaVertexConfig = { check() new WriteNebulaVertexConfig(space, - tagName, - vidField, - vidPolicy, - batch, - vidAsProp, - user, - passwd, - writeMode, - deleteEdge, - overwrite) + tagName, + vidField, + vidPolicy, + batch, + vidAsProp, + user, + passwd, + writeMode, + deleteEdge, + overwrite, + disableWriteLog) } private def check(): Unit = { @@ -436,17 +470,17 @@ object WriteNebulaVertexConfig { } /** - * subclass of WriteNebulaConfig to config edge when write dataframe into nebula graph - * - * @param space: nebula space name - * @param edgeName: edge name - * @param srcFiled: field in dataframe to indicate src vertex id - * @param srcPolicy: not required, use hash to map your src vertex id - * @param dstField: field in dataframe to indicate dst vertex id - * @param dstPolicy: not required, use hash to map your dst vertex id - * @param rankField: not required, field in dataframe to indicate edge rank - * @param batch: amount of one batch when write into nebula graph - */ + * subclass of WriteNebulaConfig to config edge when write dataframe into nebula graph + * + * @param space : nebula space name + * @param edgeName : edge name + * @param srcFiled : field in dataframe to indicate src vertex id + * @param srcPolicy : not required, use hash to map your src vertex id + * @param dstField : field in dataframe to indicate dst vertex id + * @param dstPolicy : not required, use hash to map your dst vertex id + * @param rankField : not required, field in dataframe to indicate edge rank + * @param batch : amount of one batch when write into nebula graph + */ class WriteNebulaEdgeConfig(space: String, edgeName: String, srcFiled: String, @@ -461,44 +495,52 @@ class WriteNebulaEdgeConfig(space: String, user: String, passwd: String, writeMode: String, - overwrite: Boolean) - extends WriteNebulaConfig(space, user, passwd, batch, writeMode, overwrite) { - def getEdgeName = edgeName - def getSrcFiled = srcFiled + overwrite: Boolean, + disableWriteLog: Boolean) + extends WriteNebulaConfig(space, user, passwd, batch, writeMode, overwrite, disableWriteLog) { + def getEdgeName = edgeName + + def getSrcFiled = srcFiled + def getSrcPolicy = if (srcPolicy == null) "" else srcPolicy - def getDstField = dstField + + def getDstField = dstField + def getDstPolicy = if (dstPolicy == null) "" else dstPolicy + def getRankField = if (rankField == null) "" else rankField - def getSrcAsProp = srcAsProp - def getDstAsProp = dstAsProp + def getSrcAsProp = srcAsProp + + def getDstAsProp = dstAsProp + def getRankAsProp = rankAsProp } /** - * object WriteNebulaEdgeConfig - */ + * object WriteNebulaEdgeConfig + */ object WriteNebulaEdgeConfig { private val LOG: Logger = LoggerFactory.getLogger(WriteNebulaEdgeConfig.getClass) /** - * a builder to create {@link WriteNebulaEdgeConfig} - */ + * a builder to create {@link WriteNebulaEdgeConfig} + */ class WriteEdgeConfigBuilder { - var space: String = _ + var space: String = _ var edgeName: String = _ var srcIdField: String = _ - var srcPolicy: String = _ + var srcPolicy: String = _ var dstIdField: String = _ - var dstPolicy: String = _ - var rankField: String = _ - var batch: Int = 512 - var user: String = "root" - var passwd: String = "nebula" + var dstPolicy: String = _ + var rankField: String = _ + var batch: Int = 512 + var user: String = "root" + var passwd: String = "nebula" /** whether srcId as property */ var srcAsProp: Boolean = false @@ -515,147 +557,159 @@ object WriteNebulaEdgeConfig { /** whether overwrite the exists edge */ var overwrite: Boolean = true + var disableWriteLog: Boolean = true + /** - * set space name - */ + * set space name + */ def withSpace(space: String): WriteEdgeConfigBuilder = { this.space = space this } /** - * set edge type name - */ + * set edge type name + */ def withEdge(edgeName: String): WriteEdgeConfigBuilder = { this.edgeName = edgeName this } /** - * set rank field in dataframe - * it rankField is not set, then edge has default 0 rank value - * */ + * set rank field in dataframe + * it rankField is not set, then edge has default 0 rank value + * */ def withRankField(rankField: String): WriteEdgeConfigBuilder = { this.rankField = rankField this } /** - * set which field in dataframe as nebula edge's src id - */ + * set which field in dataframe as nebula edge's src id + */ def withSrcIdField(srcIdField: String): WriteEdgeConfigBuilder = { this.srcIdField = srcIdField this } /** - * set policy for edge src id, its optional - */ + * set policy for edge src id, its optional + */ def withSrcPolicy(srcPolicy: String): WriteEdgeConfigBuilder = { this.srcPolicy = srcPolicy this } /** - * set which field in dataframe as nebula edge's dst id - */ + * set which field in dataframe as nebula edge's dst id + */ def withDstIdField(dstIdField: String): WriteEdgeConfigBuilder = { this.dstIdField = dstIdField this } /** - * set policy for edge dst id, its optional - */ + * set policy for edge dst id, its optional + */ def withDstPolicy(dstPolicy: String): WriteEdgeConfigBuilder = { this.dstPolicy = dstPolicy this } /** - * set data amount for one batch, default is 512 - */ + * set data amount for one batch, default is 512 + */ def withBatch(batch: Int): WriteEdgeConfigBuilder = { this.batch = batch this } /** - * set whether src id as property - */ + * set whether src id as property + */ def withSrcAsProperty(srcAsProp: Boolean): WriteEdgeConfigBuilder = { this.srcAsProp = srcAsProp this } /** - * set whether dst id as property - */ + * set whether dst id as property + */ def withDstAsProperty(dstAsProp: Boolean): WriteEdgeConfigBuilder = { this.dstAsProp = dstAsProp this } /** - * set whether rank as property - */ + * set whether rank as property + */ def withRankAsProperty(rankAsProp: Boolean): WriteEdgeConfigBuilder = { this.rankAsProp = rankAsProp this } /** - * set user name for nebula graph - */ + * set user name for nebula graph + */ def withUser(user: String): WriteEdgeConfigBuilder = { this.user = user this } /** - * set password for nebula graph's user - */ + * set password for nebula graph's user + */ def withPasswd(passwd: String): WriteEdgeConfigBuilder = { this.passwd = passwd this } /** - * set write mode for nebula edge, INSERT or UPDATE - */ + * set write mode for nebula edge, INSERT or UPDATE + */ def withWriteMode(writeMode: WriteMode.Value): WriteEdgeConfigBuilder = { this.writeMode = writeMode.toString this } /** - * set whether overwrite the exists edge - */ + * set whether overwrite the exists edge + */ def withOverwrite(overwrite: Boolean): WriteEdgeConfigBuilder = { this.overwrite = overwrite this } /** - * check configs and get WriteNebulaEdgeConfig - */ + * set whether disable the write log for failed data + */ + def withDisableWriteLog(disableWriteLog: Boolean): WriteEdgeConfigBuilder = { + this.disableWriteLog = disableWriteLog + this + } + + + /** + * check configs and get WriteNebulaEdgeConfig + */ def build(): WriteNebulaEdgeConfig = { check() new WriteNebulaEdgeConfig(space, - edgeName, - srcIdField, - srcPolicy, - dstIdField, - dstPolicy, - rankField, - batch, - srcAsProp, - dstAsProp, - rankAsProp, - user, - passwd, - writeMode, - overwrite) + edgeName, + srcIdField, + srcPolicy, + dstIdField, + dstPolicy, + rankField, + batch, + srcAsProp, + dstAsProp, + rankAsProp, + user, + passwd, + writeMode, + overwrite, + disableWriteLog) } private def check(): Unit = { @@ -701,20 +755,21 @@ object WriteNebulaEdgeConfig { } /** - * config needed when read from nebula graph - * for read vertex or edge: - * you must need to set these configs: addresses/space/label - * you can set noColumn to true to read no vertex col, and you can set returnCols to read the specific cols, if the returnCols is empty, then read all the columns. - * you can set partitionNum to define spark partition nums to read nebula graph. - */ + * config needed when read from nebula graph + * for read vertex or edge: + * you must need to set these configs: addresses/space/label + * you can set noColumn to true to read no vertex col, and you can set returnCols to read the specific cols, if the returnCols is empty, then read all the columns. + * you can set partitionNum to define spark partition nums to read nebula graph. + */ class ReadNebulaConfig extends Serializable { - var getSpace: String = _ - var getLabel: String = _ + var getSpace: String = _ + var getLabel: String = _ var getReturnCols: List[String] = _ - var getNoColumn: Boolean = _ - var getPartitionNum: Int = _ - var getLimit: Int = _ - var getNgql: String = _ + var getNoColumn: Boolean = _ + var getPartitionNum: Int = _ + var getLimit: Int = _ + var getNgql: String = _ + // todo add filter def this(space: String, label: String, @@ -749,24 +804,25 @@ class ReadNebulaConfig extends Serializable { } /** - * object ReadNebulaConfig - */ + * object ReadNebulaConfig + */ object ReadNebulaConfig { private val LOG: Logger = LoggerFactory.getLogger(this.getClass) class ReadConfigBuilder { - var space: String = _ - var label: String = _ + var space: String = _ + var label: String = _ var returnCols: ListBuffer[String] = new ListBuffer[String] - var noColumn: Boolean = false - var partitionNum: Int = 100 - var limit: Int = 1000 - var ngql: String = _ + var noColumn: Boolean = false + var partitionNum: Int = 100 + var limit: Int = 1000 + var ngql: String = _ def withSpace(space: String): ReadConfigBuilder = { this.space = space this } + def withLabel(label: String): ReadConfigBuilder = { this.label = label this @@ -780,24 +836,24 @@ object ReadNebulaConfig { } /** - * if noColumn is set to true, then returnCols is no need and it will be invalidate even if configured - */ + * if noColumn is set to true, then returnCols is no need and it will be invalidate even if configured + */ def withNoColumn(noColumn: Boolean): ReadConfigBuilder = { this.noColumn = noColumn this } /** - * set partition num for spark, default is 100 - */ + * set partition num for spark, default is 100 + */ def withPartitionNum(partitionNum: Int): ReadConfigBuilder = { this.partitionNum = partitionNum this } /** - * set limit for scan nebula graph, default is 1000 - */ + * set limit for scan nebula graph, default is 1000 + */ def withLimit(limit: Int): ReadConfigBuilder = { this.limit = limit this @@ -822,7 +878,7 @@ object ReadNebulaConfig { assert(label != null && !label.isEmpty, s"config label is empty.") assert(limit > 0, s"config limit must be positive, your limit is $limit") assert(partitionNum > 0, - s"config partitionNum must be positive, your partitionNum is $partitionNum") + s"config partitionNum must be positive, your partitionNum is $partitionNum") if (noColumn && returnCols.nonEmpty) { LOG.warn( s"noColumn is true, returnCols will be invalidate " diff --git a/nebula-spark-common/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala b/nebula-spark-common/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala index e2bc2ee0..16fb58ef 100644 --- a/nebula-spark-common/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala +++ b/nebula-spark-common/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala @@ -138,6 +138,7 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String]) exten var writeMode: WriteMode.Value = _ var deleteEdge: Boolean = _ var overwrite: Boolean = _ + var disableWriteLog: Boolean = _ if (operaType == OperaType.WRITE) { require(parameters.isDefinedAt(GRAPH_ADDRESS), @@ -175,6 +176,7 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String]) exten WriteMode.withName(parameters.getOrElse(WRITE_MODE, DEFAULT_WRITE_MODE).toString.toLowerCase) deleteEdge = parameters.getOrElse(DELETE_EDGE, false).toString.toBoolean overwrite = parameters.getOrElse(OVERWRITE, true).toString.toBoolean + disableWriteLog = parameters.getOrElse(DISABLE_WRITE_LOG, true).toString.toBoolean } def getReturnCols: List[String] = { @@ -261,6 +263,7 @@ object NebulaOptions { val WRITE_MODE: String = "writeMode" val DELETE_EDGE: String = "deleteEdge" val OVERWRITE: String = "overwrite" + val DISABLE_WRITE_LOG = "disableWriteLog" val DEFAULT_TIMEOUT: Int = 3000 val DEFAULT_CONNECTION_TIMEOUT: Int = 3000 diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala index b1da17a7..b226e54e 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala @@ -250,6 +250,7 @@ package object connector { .option(NebulaOptions.WRITE_MODE, writeConfig.getWriteMode) .option(NebulaOptions.DELETE_EDGE, writeConfig.getDeleteEdge) .option(NebulaOptions.OVERWRITE, writeConfig.isOverwrite) + .option(NebulaOptions.DISABLE_WRITE_LOG, writeConfig.isDisableWriteLog) .option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress) .option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress) .option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout) @@ -299,6 +300,7 @@ package object connector { .option(NebulaOptions.RANK_AS_PROP, writeConfig.getRankAsProp) .option(NebulaOptions.WRITE_MODE, writeConfig.getWriteMode) .option(NebulaOptions.OVERWRITE, writeConfig.isOverwrite) + .option(NebulaOptions.DISABLE_WRITE_LOG, writeConfig.isDisableWriteLog) .option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress) .option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress) .option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout) diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaWriter.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaWriter.scala index 23008c8c..6afaf393 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaWriter.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaWriter.scala @@ -51,7 +51,11 @@ class NebulaWriter(nebulaOptions: NebulaOptions) extends Serializable { val result = graphProvider.submit(exec) if (!result.isSucceeded) { failedExecs.append(exec) - LOG.error(s"failed to write ${exec} for " + result.getErrorMessage) + if (nebulaOptions.disableWriteLog) { + LOG.error(s"write failed: " + result.getErrorMessage) + } else { + LOG.error(s"write failed: ${result.getErrorMessage} failed statement: \n ${exec}") + } } else { LOG.info(s"batch write succeed") LOG.debug(s"batch write succeed: ${exec}") diff --git a/nebula-spark-connector_2.2/src/main/scala/com/vesoft/nebula/connector/package.scala b/nebula-spark-connector_2.2/src/main/scala/com/vesoft/nebula/connector/package.scala index 6af7663f..1abd2e1f 100644 --- a/nebula-spark-connector_2.2/src/main/scala/com/vesoft/nebula/connector/package.scala +++ b/nebula-spark-connector_2.2/src/main/scala/com/vesoft/nebula/connector/package.scala @@ -311,6 +311,7 @@ package object connector { .option(NebulaOptions.VID_AS_PROP, writeConfig.getVidAsProp) .option(NebulaOptions.WRITE_MODE, writeConfig.getWriteMode) .option(NebulaOptions.OVERWRITE, writeConfig.isOverwrite) + .option(NebulaOptions.DISABLE_WRITE_LOG, writeConfig.isDisableWriteLog) .option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress) .option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress) .option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout) @@ -360,6 +361,7 @@ package object connector { .option(NebulaOptions.RANK_AS_PROP, writeConfig.getRankAsProp) .option(NebulaOptions.WRITE_MODE, writeConfig.getWriteMode) .option(NebulaOptions.OVERWRITE, writeConfig.isOverwrite) + .option(NebulaOptions.DISABLE_WRITE_LOG, writeConfig.isDisableWriteLog) .option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress) .option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress) .option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout) diff --git a/nebula-spark-connector_2.2/src/main/scala/com/vesoft/nebula/connector/writer/NebulaWriter.scala b/nebula-spark-connector_2.2/src/main/scala/com/vesoft/nebula/connector/writer/NebulaWriter.scala index 0551c0b9..0c3bab9f 100644 --- a/nebula-spark-connector_2.2/src/main/scala/com/vesoft/nebula/connector/writer/NebulaWriter.scala +++ b/nebula-spark-connector_2.2/src/main/scala/com/vesoft/nebula/connector/writer/NebulaWriter.scala @@ -23,7 +23,7 @@ abstract class NebulaWriter(nebulaOptions: NebulaOptions, schema: StructType) ex private val LOG = LoggerFactory.getLogger(this.getClass) protected val rowEncoder: ExpressionEncoder[Row] = RowEncoder(schema).resolveAndBind() - protected val failedExecs: ListBuffer[String] = new ListBuffer[String] + protected val failedExecs: ListBuffer[String] = new ListBuffer[String] val metaProvider = new MetaProvider( nebulaOptions.getMetaAddress, @@ -57,7 +57,11 @@ abstract class NebulaWriter(nebulaOptions: NebulaOptions, schema: StructType) ex val result = graphProvider.submit(exec) if (!result.isSucceeded) { failedExecs.append(exec) - LOG.error(s"failed to write ${exec} for " + result.getErrorMessage) + if (nebulaOptions.disableWriteLog) { + LOG.error(s"write failed: " + result.getErrorMessage) + } else { + LOG.error(s"write failed: ${result.getErrorMessage} failed statement: \n ${exec}") + } } else { LOG.info(s"batch write succeed") LOG.debug(s"batch write succeed: ${exec}") diff --git a/nebula-spark-connector_3.0/src/main/scala/com/vesoft/nebula/connector/package.scala b/nebula-spark-connector_3.0/src/main/scala/com/vesoft/nebula/connector/package.scala index 12fceb96..3a9cdf2c 100644 --- a/nebula-spark-connector_3.0/src/main/scala/com/vesoft/nebula/connector/package.scala +++ b/nebula-spark-connector_3.0/src/main/scala/com/vesoft/nebula/connector/package.scala @@ -210,6 +210,7 @@ package object connector { .option(NebulaOptions.WRITE_MODE, writeConfig.getWriteMode) .option(NebulaOptions.DELETE_EDGE, writeConfig.getDeleteEdge) .option(NebulaOptions.OVERWRITE, writeConfig.isOverwrite) + .option(NebulaOptions.DISABLE_WRITE_LOG, writeConfig.isDisableWriteLog) .option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress) .option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress) .option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout) @@ -259,6 +260,7 @@ package object connector { .option(NebulaOptions.RANK_AS_PROP, writeConfig.getRankAsProp) .option(NebulaOptions.WRITE_MODE, writeConfig.getWriteMode) .option(NebulaOptions.OVERWRITE, writeConfig.isOverwrite) + .option(NebulaOptions.DISABLE_WRITE_LOG, writeConfig.isDisableWriteLog) .option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress) .option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress) .option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout) diff --git a/nebula-spark-connector_3.0/src/main/scala/com/vesoft/nebula/connector/writer/NebulaWriter.scala b/nebula-spark-connector_3.0/src/main/scala/com/vesoft/nebula/connector/writer/NebulaWriter.scala index 23008c8c..6afaf393 100644 --- a/nebula-spark-connector_3.0/src/main/scala/com/vesoft/nebula/connector/writer/NebulaWriter.scala +++ b/nebula-spark-connector_3.0/src/main/scala/com/vesoft/nebula/connector/writer/NebulaWriter.scala @@ -51,7 +51,11 @@ class NebulaWriter(nebulaOptions: NebulaOptions) extends Serializable { val result = graphProvider.submit(exec) if (!result.isSucceeded) { failedExecs.append(exec) - LOG.error(s"failed to write ${exec} for " + result.getErrorMessage) + if (nebulaOptions.disableWriteLog) { + LOG.error(s"write failed: " + result.getErrorMessage) + } else { + LOG.error(s"write failed: ${result.getErrorMessage} failed statement: \n ${exec}") + } } else { LOG.info(s"batch write succeed") LOG.debug(s"batch write succeed: ${exec}")