Skip to content

Commit

Permalink
Logs: Add clickhouse node info when error ocurrs (#129)
Browse files Browse the repository at this point in the history
  • Loading branch information
pan3793 committed Jun 14, 2022
1 parent 8cf297f commit 006df5c
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,24 @@

package xenon.clickhouse.exception

import xenon.clickhouse.spec.NodeSpec
import xenon.protocol.grpc.{Exception => GRPCException}

abstract class ClickHouseException(code: Int, reason: String) extends RuntimeException(s"[$code] $reason")
abstract class ClickHouseException(code: Int, reason: String, node: Option[NodeSpec])
extends RuntimeException(s"[$code]${node.getOrElse("")} $reason")

case class ClickHouseServerException(code: Int, reason: String) extends ClickHouseException(code, reason) {
def this(exception: GRPCException) = this(exception.getCode, exception.getDisplayText)
case class ClickHouseServerException(code: Int, reason: String, node: Option[NodeSpec])
extends ClickHouseException(code, reason, node) {

def this(exception: GRPCException, node: Option[NodeSpec] = None) =
this(exception.getCode, exception.getDisplayText, node)
}

case class ClickHouseClientException(reason: String)
extends ClickHouseException(ClickHouseErrCode.CLIENT_ERROR.code(), reason)
case class ClickHouseClientException(reason: String, node: Option[NodeSpec] = None)
extends ClickHouseException(ClickHouseErrCode.CLIENT_ERROR.code(), reason, node)

case class RetryableClickHouseException(code: Int, reason: String, node: Option[NodeSpec])
extends ClickHouseException(code, reason, node) {

case class RetryableClickHouseException(code: Int, reason: String) extends ClickHouseException(code, reason) {
def this(exception: GRPCException) = this(exception.getCode, exception.getDisplayText)
def this(exception: GRPCException, node: Option[NodeSpec]) = this(exception.getCode, exception.getDisplayText, node)
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class GrpcNodeClient(val node: NodeSpec) extends AutoCloseable with Logging {
deserializer: ByteString => SimpleOutput[OUT],
settings: Map[String, String]
): SimpleOutput[OUT] = syncQuery[OUT](sql, outputFormat, deserializer, settings) match {
case Left(exception) => throw new ClickHouseServerException(exception)
case Left(exception) => throw new ClickHouseServerException(exception, Some(node))
case Right(output) => output
}

Expand Down Expand Up @@ -212,7 +212,7 @@ class GrpcNodeClient(val node: NodeSpec) extends AutoCloseable with Logging {
if (result.getException.getCode == OK.code) Right(result.getOutput) else Left(result.getException)
}
.map {
case Left(gRPCException) => throw new ClickHouseServerException(gRPCException)
case Left(gRPCException) => throw new ClickHouseServerException(gRPCException, Some(node))
case Right(output) => output
}
outputStreamDeserializer(outputIterator)
Expand Down Expand Up @@ -245,6 +245,6 @@ class GrpcNodeClient(val node: NodeSpec) extends AutoCloseable with Logging {
}
}
if (throwException && result.getException.getCode != OK.code)
throw new ClickHouseServerException(result.getException)
throw new ClickHouseServerException(result.getException, Some(node))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ class ClickHouseAppendWriter(writeJob: WriteJobDescription)
) match {
case Right(_) => buf.clear
case Left(retryable) if writeJob.writeOptions.retryableErrorCodes.contains(retryable.getCode) =>
throw new RetryableClickHouseException(retryable)
case Left(rethrow) => throw new ClickHouseServerException(rethrow)
throw new RetryableClickHouseException(retryable, Some(client.node))
case Left(rethrow) => throw new ClickHouseServerException(rethrow, Some(client.node))
}
} match {
case Success(_) => log.info(s"Job[${writeJob.queryId}]: flush batch completed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ class ClickHouseAppendWriter(writeJob: WriteJobDescription)
) match {
case Right(_) => buf.clear
case Left(retryable) if writeJob.writeOptions.retryableErrorCodes.contains(retryable.getCode) =>
throw new RetryableClickHouseException(retryable)
case Left(rethrow) => throw new ClickHouseServerException(rethrow)
throw new RetryableClickHouseException(retryable, Some(client.node))
case Left(rethrow) => throw new ClickHouseServerException(rethrow, Some(client.node))
}
} match {
case Success(_) => log.info(s"Job[${writeJob.queryId}]: flush batch completed")
Expand Down

0 comments on commit 006df5c

Please sign in to comment.