77import pika
88from pika import credentials
99
10- from .compat import Queue
10+ from .compat import Queue , QueueEmpty
1111from .filters import FieldFilter
1212from .formatters import JSONFormatter
1313from .compat import ExceptionReporter
@@ -107,6 +107,9 @@ def open_connection(self):
107107 handler = logging .StreamHandler ()
108108 handler .setFormatter (self .formatter )
109109 rabbitmq_logger = logging .getLogger ('pika' )
110+ rabbitmq_logger_propagate = rabbitmq_logger .propagate
111+ rabbitmq_logger_level = rabbitmq_logger .level
112+
110113 rabbitmq_logger .addHandler (handler )
111114 rabbitmq_logger .propagate = False
112115 rabbitmq_logger .setLevel (logging .WARNING )
@@ -123,6 +126,8 @@ def open_connection(self):
123126 self .exchange_declared = True
124127
125128 # Manually remove logger to avoid shutdown message.
129+ rabbitmq_logger .propagate = rabbitmq_logger_propagate
130+ rabbitmq_logger .setlevel (rabbitmq_logger_level )
126131 rabbitmq_logger .removeHandler (handler )
127132
128133 def close_connection (self ):
@@ -151,22 +156,26 @@ def message_worker(self):
151156 while not self .stopping .is_set ():
152157 try :
153158 record , routing_key = self .queue .get (block = True , timeout = 10 )
154-
155- if not self .connection or self .connection .is_closed or not self .channel or self .channel .is_closed :
156- self .open_connection ()
157-
158- res = self .channel .basic_publish (
159- exchange = self .exchange ,
160- routing_key = routing_key ,
161- body = record ,
162- properties = pika .BasicProperties (
163- delivery_mode = 2 ,
164- headers = self .message_headers ,
165- content_type = self .content_type
159+
160+ try :
161+ if not self .connection or self .connection .is_closed or not self .channel or self .channel .is_closed :
162+ self .open_connection ()
163+
164+ res = self .channel .basic_publish (
165+ exchange = self .exchange ,
166+ routing_key = routing_key ,
167+ body = record ,
168+ properties = pika .BasicProperties (
169+ delivery_mode = 2 ,
170+ headers = self .message_headers ,
171+ content_type = self .content_type
172+ )
166173 )
167- )
174+ finally :
175+ if not hasattr (self , 'queue' ):
176+ self .queue .task_done ()
168177
169- except Queue . Empty :
178+ except QueueEmpty :
170179 continue
171180 except Exception :
172181 self .channel , self .connection = None , None
@@ -175,7 +184,6 @@ def message_worker(self):
175184 if self .stopping .is_set ():
176185 self .stopped .set ()
177186 break
178- self .queue .task_done ()
179187 if self .close_after_emit :
180188 self .close_connection ()
181189 self .stopped .set ()
0 commit comments