|
19 | 19 | import multiprocessing |
20 | 20 | import os |
21 | 21 | import socket |
| 22 | +import struct |
22 | 23 | import sys |
23 | 24 | import time |
24 | 25 | import traceback |
25 | 26 | import warnings |
26 | 27 |
|
27 | 28 | sys.path[0:0] = [""] |
28 | 29 |
|
| 30 | +from bson import BSON |
29 | 31 | from bson.py3compat import thread, u |
30 | 32 | from bson.son import SON |
31 | 33 | from bson.tz_util import utc |
|
42 | 44 | NetworkTimeout, |
43 | 45 | InvalidURI) |
44 | 46 | from pymongo.mongo_client import MongoClient |
| 47 | +from pymongo.pool import SocketInfo |
45 | 48 | from pymongo.read_preferences import ReadPreference |
46 | 49 | from pymongo.server_selectors import (any_server_selector, |
47 | 50 | writable_server_selector) |
@@ -872,26 +875,39 @@ def test_exhaust_getmore_server_error(self): |
872 | 875 | collection = client.pymongo_test.test |
873 | 876 | collection.remove() |
874 | 877 |
|
875 | | - # Enough data to ensure it streams down for a few milliseconds. |
876 | | - long_str = 'a' * (256 * 1024) |
877 | | - collection.insert([{'a': long_str} for _ in range(200)]) |
| 878 | + collection.insert([{} for _ in range(200)]) |
878 | 879 |
|
879 | | - pool = get_pool(client) |
880 | | - pool._check_interval_seconds = None # Never check. |
881 | | - sock_info = one(pool.sockets) |
| 880 | + try: |
| 881 | + pool = get_pool(client) |
| 882 | + pool._check_interval_seconds = None # Never check. |
| 883 | + sock_info = one(pool.sockets) |
882 | 884 |
|
883 | | - cursor = collection.find(cursor_type=EXHAUST) |
| 885 | + cursor = collection.find(cursor_type=EXHAUST) |
884 | 886 |
|
885 | | - # Initial query succeeds. |
886 | | - cursor.next() |
| 887 | + # Initial query succeeds. |
| 888 | + cursor.next() |
887 | 889 |
|
888 | | - # Cause a server error on getmore. |
889 | | - client_context.client.pymongo_test.test.drop() |
890 | | - self.assertRaises(OperationFailure, list, cursor) |
| 890 | + # Cause a server error on getmore. |
| 891 | + def receive_message(operation, request_id): |
| 892 | + # Discard the actual server response. |
| 893 | + SocketInfo.receive_message(sock_info, operation, request_id) |
891 | 894 |
|
892 | | - # The socket is still valid. |
893 | | - self.assertIn(sock_info, pool.sockets) |
894 | | - self.assertEqual(0, collection.count()) |
| 895 | + # responseFlags bit 1 is QueryFailure. |
| 896 | + msg = struct.pack('<iiiii', 1 << 1, 0, 0, 0, 0) |
| 897 | + msg += BSON.encode({'$err': 'mock err', 'code': 0}) |
| 898 | + return msg |
| 899 | + |
| 900 | + saved = sock_info.receive_message |
| 901 | + sock_info.receive_message = receive_message |
| 902 | + self.assertRaises(OperationFailure, list, cursor) |
| 903 | + sock_info.receive_message = saved |
| 904 | + |
| 905 | + # The socket is returned the pool and it still works. |
| 906 | + self.assertEqual(200, collection.count()) |
| 907 | + self.assertIn(sock_info, pool.sockets) |
| 908 | + |
| 909 | + finally: |
| 910 | + client_context.client.pymongo_test.test.drop() |
895 | 911 |
|
896 | 912 | def test_exhaust_query_network_error(self): |
897 | 913 | # When doing an exhaust query, the socket stays checked out on success |
|
0 commit comments