Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Review cleanups: Py2 support, PEP8 clean
  • Loading branch information
edenhill committed Aug 8, 2016
commit 20bba1cc9c6586eaf84cf96bd938ff4e78f966f3
28 changes: 15 additions & 13 deletions tests/test_threads.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
#!/usr/bin/env python

from confluent_kafka import Producer, KafkaError, KafkaException
import threading, time, queue
import threading
import time
try:
from queue import Queue, Empty
except:
from Queue import Queue, Empty


class IntendedException (Exception):
pass

def thread_run (myid,p,q):
def do_crash (err, msg):

def thread_run(myid, p, q):
def do_crash(err, msg):
raise IntendedException()

for i in range(1, 3):
Expand All @@ -23,45 +29,41 @@ def do_crash (err, msg):
except IntendedException:
print(myid, "Intentional callback crash: ok")
continue
except:
raise

print(myid, 'Done')
q.put(myid)



def test_thread_safety():
""" Basic thread safety tests. """

q = queue.Queue()
p = Producer({'socket.timeout.ms':10,
q = Queue()
p = Producer({'socket.timeout.ms': 10,
'socket.blocking.max.ms': 10,
'default.topic.config': {'message.timeout.ms': 10}})

threads = list()
for i in range(1, 5):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the settings you have here (5 threads, 3 messages each) enough to reliably trigger an error before the patch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, fails immediately and repeatedly on master:

$ python tests/t.py 
Exception in thread 1:
NoneType

Fatal Python error: PyEval_RestoreThread: NULL tstate

thr = threading.Thread(target=thread_run, name=str(i), args=[i,p,q])
thr = threading.Thread(target=thread_run, name=str(i), args=[i, p, q])
thr.start()
threads.append(thr)

for thr in threads:
thr.join()


# Count the number of threads that exited cleanly
cnt = 0
try:
for x in iter(q.get_nowait, None):
cnt += 1
except queue.Empty:
except Empty:
pass

if cnt != len(threads):
raise Exception('Only %d/%d threads succeeded' % (cnt, len(threads)))

print('Done')

if __name__=='__main__':

if __name__ == '__main__':
test_thread_safety()