Skip to content

Commit ba91507

Browse files
committed
Implement empty-queue timeout to avoid taxing the CPU.
1 parent 33e367f commit ba91507

File tree

2 files changed

+71
-3
lines changed

2 files changed

+71
-3
lines changed

fluent/asyncsender.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
_global_sender = None
1515

16+
DEFAULT_QUEUE_TIMEOUT = 0.05
17+
1618

1719
def _set_global_sender(sender):
1820
""" [For testing] Function to set global sender directly
@@ -29,6 +31,7 @@ def setup(tag, **kwargs):
2931
def get_global_sender():
3032
return _global_sender
3133

34+
3235
def close():
3336
get_global_sender().close()
3437

@@ -42,10 +45,12 @@ def __init__(self, tag,
4245
verbose=False,
4346
buffer_overflow_handler=None,
4447
nanosecond_precision=False,
45-
msgpack_kwargs=None, *args, **kwargs):
48+
msgpack_kwargs=None,
49+
queue_timeout=DEFAULT_QUEUE_TIMEOUT, *args, **kwargs):
4650
super(CommunicatorThread, self).__init__(**kwargs)
4751
self._queue = Queue()
4852
self._do_run = True
53+
self._queue_timeout = queue_timeout
4954
self._conn_close_lock = threading.Lock()
5055
self._sender = sender.FluentSender(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
5156
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
@@ -61,7 +66,7 @@ def send(self, bytes_):
6166
def run(self):
6267
while self._do_run:
6368
try:
64-
bytes_ = self._queue.get(block=False)
69+
bytes_ = self._queue.get(block=True, timeout=self._queue_timeout)
6570
except Empty:
6671
continue
6772
self._conn_close_lock.acquire()
@@ -101,6 +106,14 @@ def last_error(self, err):
101106
def clear_last_error(self, _thread_id = None):
102107
self._sender.clear_last_error(_thread_id=_thread_id)
103108

109+
@property
110+
def queue_timeout(self):
111+
return self._queue_timeout
112+
113+
@queue_timeout.setter
114+
def queue_timeout(self, value):
115+
self._queue_timeout = value
116+
104117
def __enter__(self):
105118
return self
106119

@@ -119,14 +132,16 @@ def __init__(self,
119132
buffer_overflow_handler=None,
120133
nanosecond_precision=False,
121134
msgpack_kwargs=None,
135+
queue_timeout=DEFAULT_QUEUE_TIMEOUT,
122136
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.
123137
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
124138
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
125139
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
126140
**kwargs)
127141
self._communicator = CommunicatorThread(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
128142
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
129-
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs)
143+
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
144+
queue_timeout=queue_timeout)
130145
self._communicator.start()
131146

132147
def _send(self, bytes_):
@@ -163,6 +178,14 @@ def last_error(self, err):
163178
def clear_last_error(self, _thread_id = None):
164179
self._communicator.clear_last_error(_thread_id=_thread_id)
165180

181+
@property
182+
def queue_timeout(self):
183+
return self._communicator.queue_timeout
184+
185+
@queue_timeout.setter
186+
def queue_timeout(self, value):
187+
self._communicator.queue_timeout = value
188+
166189
def __enter__(self):
167190
return self
168191

tests/test_asyncsender.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,51 @@ def test_connect_exception_during_sender_init(self, mock_socket):
145145
self.assertEqual(self._sender.last_error.args[0], EXCEPTION_MSG)
146146

147147

148+
class TestSenderWithTimeout(unittest.TestCase):
149+
def setUp(self):
150+
super(TestSenderWithTimeout, self).setUp()
151+
self._server = mockserver.MockRecvServer('localhost')
152+
self._sender = fluent.asyncsender.FluentSender(tag='test',
153+
port=self._server.port,
154+
queue_timeout=0.04)
155+
156+
def tearDown(self):
157+
self._sender.close()
158+
159+
def get_data(self):
160+
return self._server.get_recieved()
161+
162+
def test_simple(self):
163+
sender = self._sender
164+
sender.emit('foo', {'bar': 'baz'})
165+
time.sleep(0.5)
166+
sender._close()
167+
data = self.get_data()
168+
eq = self.assertEqual
169+
eq(1, len(data))
170+
eq(3, len(data[0]))
171+
eq('test.foo', data[0][0])
172+
eq({'bar': 'baz'}, data[0][2])
173+
self.assertTrue(data[0][1])
174+
self.assertTrue(isinstance(data[0][1], int))
175+
176+
def test_simple_with_timeout_props(self):
177+
sender = self._sender
178+
sender.queue_timeout = 0.06
179+
assert sender.queue_timeout == 0.06
180+
sender.emit('foo', {'bar': 'baz'})
181+
time.sleep(0.5)
182+
sender._close()
183+
data = self.get_data()
184+
eq = self.assertEqual
185+
eq(1, len(data))
186+
eq(3, len(data[0]))
187+
eq('test.foo', data[0][0])
188+
eq({'bar': 'baz'}, data[0][2])
189+
self.assertTrue(data[0][1])
190+
self.assertTrue(isinstance(data[0][1], int))
191+
192+
148193
class TestEventTime(unittest.TestCase):
149194
def test_event_time(self):
150195
time = fluent.asyncsender.EventTime(1490061367.8616468906402588)

0 commit comments

Comments
 (0)