Skip to content

Commit 699bd48

Browse files
committed
Introduce distinction between message-seq and delivery-seq.
This allows manual acking of lazy sequence contents. Fix an apparent bug without prefetching argument. Add delivery-contents, ack-delivery to pull contents out of Delivery objects.
1 parent ee3f14e commit 699bd48

File tree

1 file changed

+64
-18
lines changed

1 file changed

+64
-18
lines changed

src/org/clojars/rabbitmq.clj

Lines changed: 64 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
AMQP
1010
ConnectionFactory
1111
Consumer
12-
QueueingConsumer)))
12+
QueueingConsumer)
13+
com.rabbitmq.client.QueueingConsumer$Delivery))
1314

1415

1516
;; abbreviatons:
@@ -56,37 +57,82 @@
5657
(.close ch)
5758
(.close conn))
5859

60+
(defn #^String delivery-contents
61+
"Return the contents of the Delivery as a String."
62+
[#^"QueueingConsumer$Delivery" d]
63+
(when d
64+
(String. (.getBody d))))
65+
66+
(defn ack-delivery [#^Channel ch
67+
#^"QueueingConsumer$Delivery" d]
68+
"Acknowledge a Delivery from the given Channel."
69+
(.basicAck ch (.. d getEnvelope getDeliveryTag) false))
70+
71+
(defn delivery-seq
72+
"Returns a lazy sequence of deliveries from the queue.
73+
The caller is responsible for acking the message and
74+
extracting its contents."
75+
[#^Channel ch
76+
#^QueueingConsumer q]
77+
(lazy-seq
78+
(let [d (.nextDelivery q)]
79+
(cons d (delivery-seq ch q)))))
80+
5981
;;;; AMQP Queue as a sequence
60-
(defn delivery-seq [#^Channel ch
61-
#^QueueingConsumer q]
82+
(defn message-seq
83+
"Return a lazy sequence of queue items. Automatically
84+
acks each message and returns the content as a string."
85+
[#^Channel ch
86+
#^QueueingConsumer q]
6287
(lazy-seq
6388
(let [d (.nextDelivery q)
64-
m (String. (.getBody d))]
65-
(.basicAck ch (.. d getEnvelope getDeliveryTag) false)
66-
(cons m (delivery-seq ch q)))))
89+
m (delivery-contents d)]
90+
(ack-delivery ch d)
91+
(cons m (message-seq ch q)))))
6792

6893
(defn #^QueueingConsumer
6994
declare-queue-and-consumer
7095
"Return a QueueingConsumer with the appropriate settings."
7196
[#^Channel ch queue prefetch]
7297
(.queueDeclare ch queue)
7398
(when prefetch
74-
(.basicQos ch prefetch)
75-
(QueueingConsumer. ch)))
76-
77-
(defn queue-seq
78-
"Return a sequence of the messages in queue with name queue-name"
79-
([#^Channel ch
80-
{:keys [queue prefetch]}]
81-
(let [consumer (declare-queue-and-consumer ch queue prefetch)]
82-
(.basicConsume ch queue consumer)
83-
(delivery-seq ch consumer)))
99+
(.basicQos ch prefetch))
100+
(QueueingConsumer. ch))
101+
102+
(defmacro with-declared-queue-and-consumer
103+
"Macro which establishes a declared queue and consumer, starting
104+
basicConsume, then executing the body.
105+
`consumer` is bound to the resulting consumer for the duration of the body."
106+
[[ch consumer options] & body]
107+
`(let [opts# ~options
108+
queue# (:queue opts#)
109+
prefetch# (:prefetch opts#)
110+
cch# ~ch]
111+
(let [~consumer (declare-queue-and-consumer cch# queue# prefetch#)]
112+
(.basicConsume cch# queue# ~consumer)
113+
~@body)))
114+
115+
(defn queue-delivery-seq
116+
"Return a sequence of the Delivery objects in the named queue."
117+
([#^Channel ch opts]
118+
(with-declared-queue-and-consumer
119+
[ch consumer opts]
120+
(delivery-seq ch consumer))))
121+
122+
(defn queue-message-seq
123+
"Return a sequence of the messages in queue with name `queue-name`."
124+
([#^Channel ch opts]
125+
(with-declared-queue-and-consumer
126+
[ch consumer opts]
127+
(message-seq ch consumer)))
84128

85129
([conn
86130
#^Channel ch
87131
c]
88-
(queue-seq ch c)))
89-
132+
(queue-message-seq ch c)))
133+
134+
(defn queue-seq [& args]
135+
(apply queue-message-seq args))
90136

91137
;;; consumer routines
92138
(defn consume-wait

0 commit comments

Comments
 (0)