From 2173a56173b626b55c4387381917b0a75f3e8a49 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Sun, 29 Jun 2014 23:08:59 +0400 Subject: [PATCH] Make it possible to customize exception handler Fixes #47. --- src/clojure/langohr/core.clj | 51 +++++++++++++++++++++++++++-- test/langohr/test/shutdown_test.clj | 20 ++++++++++- 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/src/clojure/langohr/core.clj b/src/clojure/langohr/core.clj index 258c89b..a9f24a2 100644 --- a/src/clojure/langohr/core.clj +++ b/src/clojure/langohr/core.clj @@ -15,7 +15,11 @@ * http://clojurerabbitmq.info/articles/connecting.html * http://clojurerabbitmq.info/articles/tls.html" - (:import [com.rabbitmq.client Connection Channel Address ConnectionFactory ShutdownListener BlockedListener] + (:import [com.rabbitmq.client Connection Channel Address + ConnectionFactory ShutdownListener BlockedListener + Consumer TopologyRecoveryException + ExceptionHandler] + [com.rabbitmq.client.impl DefaultExceptionHandler] [com.novemberain.langohr Recoverable] [clojure.lang IFn] [com.rabbitmq.client.impl AMQConnection]) @@ -184,6 +188,47 @@ (^Thread newThread [this ^Runnable r] (f r)))) +(defn exception-handler + [&{:keys [handle-connection-exception-fn + handle-return-listener-exception-fn + handle-flow-listener-exception-fn + handle-confirm-listener-exception-fn + handle-blocked-listener-exception-fn + handle-consumer-exception-fn + handle-connection-recovery-exception-fn + handle-channel-recovery-exception-fn + handle-topology-recovery-exception-fn]}] + (proxy [DefaultExceptionHandler] [] + (handleUnexpectedConnectionDriverException [^Connection conn ^Throwable t] + (when handle-connection-exception-fn + (handle-connection-exception-fn conn t))) + (handleReturnListenerException [^Channel ch ^Throwable t] + (when handle-return-listener-exception-fn + (handle-return-listener-exception-fn ch t))) + (handleFlowListenerException [^Channel ch ^Throwable t] + (when handle-flow-listener-exception-fn + (handle-flow-listener-exception-fn ch t))) + (handleConfirmListenerException [^Channel ch ^Throwable t] + (when handle-confirm-listener-exception-fn + (handle-confirm-listener-exception-fn ch t))) + (handleBlockedListenerException [^Connection conn ^Throwable t] + (when handle-blocked-listener-exception-fn + (handle-blocked-listener-exception-fn conn t))) + (handleConsumerException [^Channel ch ^Throwable t + ^Consumer consumer ^String consumer-tag + ^String method-name] + (when handle-consumer-exception-fn + (handle-consumer-exception-fn ch t consumer consumer-tag method-name))) + (handleConnectionRecoveryException [^Connection conn ^Throwable t] + (when handle-connection-recovery-exception-fn + (handle-connection-recovery-exception-fn conn t))) + (handleChannelRecoveryException [^Channel ch ^Throwable t] + (when handle-channel-recovery-exception-fn + (handle-channel-recovery-exception-fn ))) + (handleTopologyRecoveryException [^Connection conn ^Channel ch + ^TopologyRecoveryException t] + (when handle-topology-recovery-exception-fn + (handle-topology-recovery-exception-fn conn ch t))))) ;; ;; Implementation @@ -219,7 +264,7 @@ [settings] (let [{:keys [host port username password vhost requested-heartbeat connection-timeout ssl ssl-context socket-factory sasl-config - requested-channel-max thread-factory] + requested-channel-max thread-factory exception-handler] :or {requested-heartbeat ConnectionFactory/DEFAULT_HEARTBEAT connection-timeout ConnectionFactory/DEFAULT_CONNECTION_TIMEOUT requested-channel-max ConnectionFactory/DEFAULT_CHANNEL_MAX}} (normalize-settings settings) @@ -246,5 +291,7 @@ (.useSslProtocol cf ^javax.net.ssl.SSLContext ssl-context)) (when thread-factory (.setThreadFactory cf ^ThreadFactory thread-factory)) + (when exception-handler + (.setExceptionHandler cf ^ExceptionHandler exception-handler)) cf)) diff --git a/test/langohr/test/shutdown_test.clj b/test/langohr/test/shutdown_test.clj index 39d502d..7aabf79 100644 --- a/test/langohr/test/shutdown_test.clj +++ b/test/langohr/test/shutdown_test.clj @@ -16,7 +16,7 @@ [langohr.shutdown :as lh] [clojure.test :refer :all]) (:import [com.rabbitmq.client Connection Channel Consumer ShutdownSignalException] - java.util.concurrent.TimeUnit)) + [java.util.concurrent CountDownLatch TimeUnit])) (deftest test-channel-of-with-a-channel-exception @@ -110,3 +110,21 @@ (is (.await latch 700 TimeUnit/MILLISECONDS)) (is (not (lh/initiated-by-broker? @sse))) (is (lh/initiated-by-application? @sse))))) + +(deftest test-custom-exception-handler + (let [el (CountDownLatch. 1) + eh (lhc/exception-handler :handle-consumer-exception (fn [ch ex consumer + consumer-tag method-name] + (.countDown el)))] + (with-open [^Connection conn (lhc/connect {:exception-handler eh})] + (let [ch (lch/open conn) + q (lhq/declare-server-named ch) + cl (CountDownLatch. 1) + dhf (fn [ch metadata payload] + (.countDown cl) + (throw (RuntimeException. "the monster, it is out! Run for life!")))] + (lhcons/subscribe ch q dhf) + (Thread/sleep 50) + (lhb/publish ch "" q "message") + (is (.await cl 700 TimeUnit/MILLISECONDS)) + (is (.await el 700 TimeUnit/MILLISECONDS))))))