From 006df5c341bce616bf9f6853958254c17a73704c Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Tue, 14 Jun 2022 23:00:44 +0800 Subject: [PATCH] Logs: Add clickhouse node info when error ocurrs (#129) --- .../exception/ClickHouseException.scala | 21 ++++++++++++------- .../clickhouse/grpc/GrpcNodeClient.scala | 6 +++--- .../clickhouse/write/ClickHouseWriter.scala | 4 ++-- .../clickhouse/write/ClickHouseWriter.scala | 4 ++-- 4 files changed, 21 insertions(+), 14 deletions(-) diff --git a/clickhouse-core/src/main/scala/xenon/clickhouse/exception/ClickHouseException.scala b/clickhouse-core/src/main/scala/xenon/clickhouse/exception/ClickHouseException.scala index 39aa8547..b3cc0f73 100644 --- a/clickhouse-core/src/main/scala/xenon/clickhouse/exception/ClickHouseException.scala +++ b/clickhouse-core/src/main/scala/xenon/clickhouse/exception/ClickHouseException.scala @@ -14,17 +14,24 @@ package xenon.clickhouse.exception +import xenon.clickhouse.spec.NodeSpec import xenon.protocol.grpc.{Exception => GRPCException} -abstract class ClickHouseException(code: Int, reason: String) extends RuntimeException(s"[$code] $reason") +abstract class ClickHouseException(code: Int, reason: String, node: Option[NodeSpec]) + extends RuntimeException(s"[$code]${node.getOrElse("")} $reason") -case class ClickHouseServerException(code: Int, reason: String) extends ClickHouseException(code, reason) { - def this(exception: GRPCException) = this(exception.getCode, exception.getDisplayText) +case class ClickHouseServerException(code: Int, reason: String, node: Option[NodeSpec]) + extends ClickHouseException(code, reason, node) { + + def this(exception: GRPCException, node: Option[NodeSpec] = None) = + this(exception.getCode, exception.getDisplayText, node) } -case class ClickHouseClientException(reason: String) - extends ClickHouseException(ClickHouseErrCode.CLIENT_ERROR.code(), reason) +case class ClickHouseClientException(reason: String, node: Option[NodeSpec] = None) + extends ClickHouseException(ClickHouseErrCode.CLIENT_ERROR.code(), reason, node) + +case class RetryableClickHouseException(code: Int, reason: String, node: Option[NodeSpec]) + extends ClickHouseException(code, reason, node) { -case class RetryableClickHouseException(code: Int, reason: String) extends ClickHouseException(code, reason) { - def this(exception: GRPCException) = this(exception.getCode, exception.getDisplayText) + def this(exception: GRPCException, node: Option[NodeSpec]) = this(exception.getCode, exception.getDisplayText, node) } diff --git a/clickhouse-core/src/main/scala/xenon/clickhouse/grpc/GrpcNodeClient.scala b/clickhouse-core/src/main/scala/xenon/clickhouse/grpc/GrpcNodeClient.scala index b6ab27d2..164631ee 100644 --- a/clickhouse-core/src/main/scala/xenon/clickhouse/grpc/GrpcNodeClient.scala +++ b/clickhouse-core/src/main/scala/xenon/clickhouse/grpc/GrpcNodeClient.scala @@ -161,7 +161,7 @@ class GrpcNodeClient(val node: NodeSpec) extends AutoCloseable with Logging { deserializer: ByteString => SimpleOutput[OUT], settings: Map[String, String] ): SimpleOutput[OUT] = syncQuery[OUT](sql, outputFormat, deserializer, settings) match { - case Left(exception) => throw new ClickHouseServerException(exception) + case Left(exception) => throw new ClickHouseServerException(exception, Some(node)) case Right(output) => output } @@ -212,7 +212,7 @@ class GrpcNodeClient(val node: NodeSpec) extends AutoCloseable with Logging { if (result.getException.getCode == OK.code) Right(result.getOutput) else Left(result.getException) } .map { - case Left(gRPCException) => throw new ClickHouseServerException(gRPCException) + case Left(gRPCException) => throw new ClickHouseServerException(gRPCException, Some(node)) case Right(output) => output } outputStreamDeserializer(outputIterator) @@ -245,6 +245,6 @@ class GrpcNodeClient(val node: NodeSpec) extends AutoCloseable with Logging { } } if (throwException && result.getException.getCode != OK.code) - throw new ClickHouseServerException(result.getException) + throw new ClickHouseServerException(result.getException, Some(node)) } } diff --git a/spark-3.2/clickhouse-spark/src/main/scala/xenon/clickhouse/write/ClickHouseWriter.scala b/spark-3.2/clickhouse-spark/src/main/scala/xenon/clickhouse/write/ClickHouseWriter.scala index 5aeb5a9f..95b471dd 100644 --- a/spark-3.2/clickhouse-spark/src/main/scala/xenon/clickhouse/write/ClickHouseWriter.scala +++ b/spark-3.2/clickhouse-spark/src/main/scala/xenon/clickhouse/write/ClickHouseWriter.scala @@ -136,8 +136,8 @@ class ClickHouseAppendWriter(writeJob: WriteJobDescription) ) match { case Right(_) => buf.clear case Left(retryable) if writeJob.writeOptions.retryableErrorCodes.contains(retryable.getCode) => - throw new RetryableClickHouseException(retryable) - case Left(rethrow) => throw new ClickHouseServerException(rethrow) + throw new RetryableClickHouseException(retryable, Some(client.node)) + case Left(rethrow) => throw new ClickHouseServerException(rethrow, Some(client.node)) } } match { case Success(_) => log.info(s"Job[${writeJob.queryId}]: flush batch completed") diff --git a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/write/ClickHouseWriter.scala b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/write/ClickHouseWriter.scala index 5aeb5a9f..95b471dd 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/write/ClickHouseWriter.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/write/ClickHouseWriter.scala @@ -136,8 +136,8 @@ class ClickHouseAppendWriter(writeJob: WriteJobDescription) ) match { case Right(_) => buf.clear case Left(retryable) if writeJob.writeOptions.retryableErrorCodes.contains(retryable.getCode) => - throw new RetryableClickHouseException(retryable) - case Left(rethrow) => throw new ClickHouseServerException(rethrow) + throw new RetryableClickHouseException(retryable, Some(client.node)) + case Left(rethrow) => throw new ClickHouseServerException(rethrow, Some(client.node)) } } match { case Success(_) => log.info(s"Job[${writeJob.queryId}]: flush batch completed")