diff --git a/python_logging_rabbitmq/compat.py b/python_logging_rabbitmq/compat.py index 4a654ed..d3dcab6 100644 --- a/python_logging_rabbitmq/compat.py +++ b/python_logging_rabbitmq/compat.py @@ -4,10 +4,10 @@ if sys.version_info[0] == 2: text_type = unicode - from Queue import Queue as Queue + from Queue import Queue as Queue, Empty as QueueEmpty else: text_type = str - from queue import Queue as Queue + from queue import Queue as Queue, Empty as QueueEmpty try: import ujson as json # noqa: F401 diff --git a/python_logging_rabbitmq/handlers_oneway.py b/python_logging_rabbitmq/handlers_oneway.py index e760585..de96542 100644 --- a/python_logging_rabbitmq/handlers_oneway.py +++ b/python_logging_rabbitmq/handlers_oneway.py @@ -7,7 +7,7 @@ import pika from pika import credentials -from .compat import Queue +from .compat import Queue, QueueEmpty from .filters import FieldFilter from .formatters import JSONFormatter from .compat import ExceptionReporter @@ -107,6 +107,9 @@ def open_connection(self): handler = logging.StreamHandler() handler.setFormatter(self.formatter) rabbitmq_logger = logging.getLogger('pika') + rabbitmq_logger_propagate = rabbitmq_logger.propagate + rabbitmq_logger_level = rabbitmq_logger.level + rabbitmq_logger.addHandler(handler) rabbitmq_logger.propagate = False rabbitmq_logger.setLevel(logging.WARNING) @@ -123,6 +126,8 @@ def open_connection(self): self.exchange_declared = True # Manually remove logger to avoid shutdown message. + rabbitmq_logger.propagate = rabbitmq_logger_propagate + rabbitmq_logger.setlevel(rabbitmq_logger_level) rabbitmq_logger.removeHandler(handler) def close_connection(self): @@ -151,22 +156,26 @@ def message_worker(self): while not self.stopping.is_set(): try: record, routing_key = self.queue.get(block=True, timeout=10) - - if not self.connection or self.connection.is_closed or not self.channel or self.channel.is_closed: - self.open_connection() - - res = self.channel.basic_publish( - exchange=self.exchange, - routing_key=routing_key, - body=record, - properties=pika.BasicProperties( - delivery_mode=2, - headers=self.message_headers, - content_type=self.content_type + + try: + if not self.connection or self.connection.is_closed or not self.channel or self.channel.is_closed: + self.open_connection() + + res = self.channel.basic_publish( + exchange=self.exchange, + routing_key=routing_key, + body=record, + properties=pika.BasicProperties( + delivery_mode=2, + headers=self.message_headers, + content_type=self.content_type + ) ) - ) + finally: + if not hasattr(self, 'queue'): + self.queue.task_done() - except Queue.Empty: + except QueueEmpty: continue except Exception: self.channel, self.connection = None, None @@ -175,7 +184,6 @@ def message_worker(self): if self.stopping.is_set(): self.stopped.set() break - self.queue.task_done() if self.close_after_emit: self.close_connection() self.stopped.set()