11# Based on the Worker class from Python in a nutshell, by Alex Martelli
2+ import django
23import logging
34import threading
45import queue as Queue
78import time
89from django_queue_manager .task_manager import TaskManager
910from django .conf import settings
11+
1012SAVE_SUCCESS_TASKS = getattr (settings , "SAVE_SUCCESS_TASKS" , True )
1113
1214from django_queue_manager .utilities .loggers import get_default_logger
15+
1316logger = get_default_logger (__name__ )
1417
1518
@@ -74,9 +77,15 @@ def run(self):
7477 # The code above will run indefinitely except when a thread _stopevent.isSet(), it will try to empty
7578 # the queue and move the task into success/failed tasks in base of successful or not execution.
7679 self .logger .info ('Worker Thread Starts' )
80+ django .setup ()
7781 while not self ._stopevent .isSet ():
82+ if not self .worker_queue .empty ():
7883 try :
7984 task = self .worker_queue .get ()
85+ if task is None :
86+ break
87+
88+ django .db .connection .close ()
8089
8190 self .logger .info ('Consuming Task Id: {db_id}' .format (
8291 name = task .task_function_name ,
@@ -89,9 +98,6 @@ def run(self):
8998 self .logger .info (
9099 'Task Id {db_id} success!' .format (name = task .task_function_name ,
91100 db_id = task .db_id ))
92- except Queue .Empty :
93- continue
94-
95101 except Exception as e :
96102 # Save it on the failed table
97103 TaskManager .save_task_failed (task , e )
@@ -108,11 +114,7 @@ def run(self):
108114 # In any case, it will dequeue the task form the queued tasks
109115 self .dequeue_task (task = task )
110116
111- self .worker_queue .task_done ()
112-
113- # Close the connection, in order to prevent (2006, 'MySQL server has gone away') timeout error
114- from django .db import connection
115- connection .close ()
117+ django .db .connection .close ()
116118
117119 self .worker_queue = None
118120 self .logger .warning ('Worker Thread stopped, {0} tasks handled' .format (self .tasks_counter ))
0 commit comments