Skip to content

Commit

Permalink
Make it possible to customize exception handler
Browse files Browse the repository at this point in the history
Fixes #47.
  • Loading branch information
michaelklishin committed Jun 29, 2014
1 parent 1558a45 commit 2173a56
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 3 deletions.
51 changes: 49 additions & 2 deletions src/clojure/langohr/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
* http://clojurerabbitmq.info/articles/connecting.html
* http://clojurerabbitmq.info/articles/tls.html"
(:import [com.rabbitmq.client Connection Channel Address ConnectionFactory ShutdownListener BlockedListener]
(:import [com.rabbitmq.client Connection Channel Address
ConnectionFactory ShutdownListener BlockedListener
Consumer TopologyRecoveryException
ExceptionHandler]
[com.rabbitmq.client.impl DefaultExceptionHandler]
[com.novemberain.langohr Recoverable]
[clojure.lang IFn]
[com.rabbitmq.client.impl AMQConnection])
Expand Down Expand Up @@ -184,6 +188,47 @@
(^Thread newThread [this ^Runnable r]
(f r))))

(defn exception-handler
[&{:keys [handle-connection-exception-fn
handle-return-listener-exception-fn
handle-flow-listener-exception-fn
handle-confirm-listener-exception-fn
handle-blocked-listener-exception-fn
handle-consumer-exception-fn
handle-connection-recovery-exception-fn
handle-channel-recovery-exception-fn
handle-topology-recovery-exception-fn]}]
(proxy [DefaultExceptionHandler] []
(handleUnexpectedConnectionDriverException [^Connection conn ^Throwable t]
(when handle-connection-exception-fn
(handle-connection-exception-fn conn t)))
(handleReturnListenerException [^Channel ch ^Throwable t]
(when handle-return-listener-exception-fn
(handle-return-listener-exception-fn ch t)))
(handleFlowListenerException [^Channel ch ^Throwable t]
(when handle-flow-listener-exception-fn
(handle-flow-listener-exception-fn ch t)))
(handleConfirmListenerException [^Channel ch ^Throwable t]
(when handle-confirm-listener-exception-fn
(handle-confirm-listener-exception-fn ch t)))
(handleBlockedListenerException [^Connection conn ^Throwable t]
(when handle-blocked-listener-exception-fn
(handle-blocked-listener-exception-fn conn t)))
(handleConsumerException [^Channel ch ^Throwable t
^Consumer consumer ^String consumer-tag
^String method-name]
(when handle-consumer-exception-fn
(handle-consumer-exception-fn ch t consumer consumer-tag method-name)))
(handleConnectionRecoveryException [^Connection conn ^Throwable t]
(when handle-connection-recovery-exception-fn
(handle-connection-recovery-exception-fn conn t)))
(handleChannelRecoveryException [^Channel ch ^Throwable t]
(when handle-channel-recovery-exception-fn
(handle-channel-recovery-exception-fn )))
(handleTopologyRecoveryException [^Connection conn ^Channel ch
^TopologyRecoveryException t]
(when handle-topology-recovery-exception-fn
(handle-topology-recovery-exception-fn conn ch t)))))

;;
;; Implementation
Expand Down Expand Up @@ -219,7 +264,7 @@
[settings]
(let [{:keys [host port username password vhost
requested-heartbeat connection-timeout ssl ssl-context socket-factory sasl-config
requested-channel-max thread-factory]
requested-channel-max thread-factory exception-handler]
:or {requested-heartbeat ConnectionFactory/DEFAULT_HEARTBEAT
connection-timeout ConnectionFactory/DEFAULT_CONNECTION_TIMEOUT
requested-channel-max ConnectionFactory/DEFAULT_CHANNEL_MAX}} (normalize-settings settings)
Expand All @@ -246,5 +291,7 @@
(.useSslProtocol cf ^javax.net.ssl.SSLContext ssl-context))
(when thread-factory
(.setThreadFactory cf ^ThreadFactory thread-factory))
(when exception-handler
(.setExceptionHandler cf ^ExceptionHandler exception-handler))
cf))

20 changes: 19 additions & 1 deletion test/langohr/test/shutdown_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
[langohr.shutdown :as lh]
[clojure.test :refer :all])
(:import [com.rabbitmq.client Connection Channel Consumer ShutdownSignalException]
java.util.concurrent.TimeUnit))
[java.util.concurrent CountDownLatch TimeUnit]))


(deftest test-channel-of-with-a-channel-exception
Expand Down Expand Up @@ -110,3 +110,21 @@
(is (.await latch 700 TimeUnit/MILLISECONDS))
(is (not (lh/initiated-by-broker? @sse)))
(is (lh/initiated-by-application? @sse)))))

(deftest test-custom-exception-handler
(let [el (CountDownLatch. 1)
eh (lhc/exception-handler :handle-consumer-exception (fn [ch ex consumer
consumer-tag method-name]
(.countDown el)))]
(with-open [^Connection conn (lhc/connect {:exception-handler eh})]
(let [ch (lch/open conn)
q (lhq/declare-server-named ch)
cl (CountDownLatch. 1)
dhf (fn [ch metadata payload]
(.countDown cl)
(throw (RuntimeException. "the monster, it is out! Run for life!")))]
(lhcons/subscribe ch q dhf)
(Thread/sleep 50)
(lhb/publish ch "" q "message")
(is (.await cl 700 TimeUnit/MILLISECONDS))
(is (.await el 700 TimeUnit/MILLISECONDS))))))

0 comments on commit 2173a56

Please sign in to comment.