Skip to content

Commit fdf4436

Browse files
committed
PYTHON-1332 - Send lsid with all commands
1 parent 6fa2e40 commit fdf4436

File tree

4 files changed

+43
-48
lines changed

4 files changed

+43
-48
lines changed

pymongo/change_stream.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,14 @@ class ChangeStream(object):
4848
per batch.
4949
- `collation` (optional): The :class:`~pymongo.collation.Collation`
5050
to use for the aggregation.
51+
- `session` (optional): a
52+
:class:`~pymongo.client_session.ClientSession`.
5153
5254
.. versionadded: 3.6
5355
"""
5456
def __init__(self, collection, pipeline, full_document,
5557
resume_after=None, max_await_time_ms=None, batch_size=None,
56-
collation=None):
58+
collation=None, session=None):
5759
self._codec_options = collection.codec_options
5860
self._collection = collection.with_options(
5961
codec_options=DEFAULT_RAW_BSON_OPTIONS)
@@ -63,6 +65,7 @@ def __init__(self, collection, pipeline, full_document,
6365
self._max_await_time_ms = max_await_time_ms
6466
self._batch_size = batch_size
6567
self._collation = collation
68+
self._session = session
6669
self._cursor = self._create_cursor()
6770

6871
def _full_pipeline(self):
@@ -79,7 +82,7 @@ def _full_pipeline(self):
7982
def _create_cursor(self):
8083
"""Initialize the cursor or raise a fatal error"""
8184
return self._collection.aggregate(
82-
self._full_pipeline(), batchSize=self._batch_size,
85+
self._full_pipeline(), self._session, batchSize=self._batch_size,
8386
collation=self._collation, maxAwaitTimeMS=self._max_await_time_ms)
8487

8588
def close(self):

pymongo/client_session.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,9 @@ def return_server_session(self, server_session, session_timeout_minutes):
169169
if not server_session.timed_out(session_timeout_minutes):
170170
self.appendleft(server_session)
171171

172+
def return_server_session_no_lock(self, server_session):
173+
self.appendleft(server_session)
174+
172175
def _clear_stale(self, session_timeout_minutes):
173176
# Clear stale sessions. The least recently used are on the right.
174177
while self:

pymongo/collection.py

Lines changed: 34 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1468,19 +1468,18 @@ def parallel_scan(self, num_cursors, session=None, **kwargs):
14681468
cmd.update(kwargs)
14691469

14701470
with self._socket_for_reads() as (sock_info, slave_ok):
1471-
# Avoid auto-injecting a session.
1472-
result = sock_info.command(
1473-
self.__database.name,
1474-
cmd,
1475-
slave_ok,
1476-
self.read_preference,
1477-
self.codec_options,
1478-
read_concern=self.read_concern,
1479-
session=session)
1471+
result = self._command(sock_info, cmd, slave_ok,
1472+
read_concern=self.read_concern,
1473+
session=session)
1474+
1475+
cursors = []
1476+
for cursor in result['cursors']:
1477+
s = self.__database.client._ensure_session(session)
1478+
cursors.append(CommandCursor(
1479+
self, cursor['cursor'], sock_info.address,
1480+
session=s, explicit_session=session is not None))
14801481

1481-
return [CommandCursor(self, cursor['cursor'], sock_info.address,
1482-
session=session, explicit_session=True)
1483-
for cursor in result['cursors']]
1482+
return cursors
14841483

14851484
def _count(self, cmd, collation=None, session=None):
14861485
"""Internal count helper."""
@@ -2041,38 +2040,25 @@ def _aggregate(self, pipeline, cursor_class, first_batch_size, session,
20412040
cmd.update(kwargs)
20422041
# Apply this Collection's read concern if $out is not in the
20432042
# pipeline.
2044-
if sock_info.max_wire_version >= 4 and 'readConcern' not in cmd:
2045-
if dollar_out:
2046-
# Avoid auto-injecting a session.
2047-
result = sock_info.command(
2048-
self.__database.name,
2049-
cmd,
2050-
slave_ok,
2051-
self.read_preference,
2052-
self.codec_options,
2053-
parse_write_concern_error=True,
2054-
collation=collation,
2055-
session=session)
2056-
else:
2057-
result = sock_info.command(
2058-
self.__database.name,
2059-
cmd,
2060-
slave_ok,
2061-
ReadPreference.PRIMARY,
2062-
self.codec_options,
2063-
read_concern=self.read_concern,
2064-
collation=collation,
2065-
session=session)
2043+
if (sock_info.max_wire_version >= 4
2044+
and 'readConcern' not in cmd
2045+
and not dollar_out):
2046+
read_concern = self.read_concern
20662047
else:
2067-
result = sock_info.command(
2068-
self.__database.name,
2069-
cmd,
2070-
slave_ok,
2071-
self.read_preference,
2072-
self.codec_options,
2073-
parse_write_concern_error=dollar_out,
2074-
collation=collation,
2075-
session=session)
2048+
read_concern = DEFAULT_READ_CONCERN
2049+
2050+
# Avoid auto-injecting a session: aggregate() passes a session,
2051+
# aggregate_raw_batches() passes none.
2052+
result = sock_info.command(
2053+
self.__database.name,
2054+
cmd,
2055+
slave_ok,
2056+
self.read_preference,
2057+
self.codec_options,
2058+
parse_write_concern_error=dollar_out,
2059+
read_concern=read_concern,
2060+
collation=collation,
2061+
session=session)
20762062

20772063
if "cursor" in result:
20782064
cursor = result["cursor"]
@@ -2202,7 +2188,8 @@ def aggregate_raw_batches(self, pipeline, **kwargs):
22022188
None, False, **kwargs)
22032189

22042190
def watch(self, pipeline=None, full_document='default', resume_after=None,
2205-
max_await_time_ms=None, batch_size=None, collation=None):
2191+
max_await_time_ms=None, batch_size=None, collation=None,
2192+
session=None):
22062193
"""Watch changes on this collection.
22072194
22082195
Performs an aggregation with an implicit initial
@@ -2266,6 +2253,8 @@ def watch(self, pipeline=None, full_document='default', resume_after=None,
22662253
per batch.
22672254
- `collation` (optional): The :class:`~pymongo.collation.Collation`
22682255
to use for the aggregation.
2256+
- `session` (optional): a
2257+
:class:`~pymongo.client_session.ClientSession`.
22692258
22702259
:Returns:
22712260
A :class:`~pymongo.change_stream.ChangeStream` cursor.
@@ -2287,7 +2276,7 @@ def watch(self, pipeline=None, full_document='default', resume_after=None,
22872276
common.validate_string_or_none('full_document', full_document)
22882277

22892278
return ChangeStream(self, pipeline, full_document, resume_after,
2290-
max_await_time_ms, batch_size, collation)
2279+
max_await_time_ms, batch_size, collation, session)
22912280

22922281
def group(self, key, condition, initial, reduce, finalize=None, **kwargs):
22932282
"""Perform a query similar to an SQL *group by* operation.

pymongo/topology.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ def return_server_session(self, server_session, lock):
400400
session_timeout)
401401
else:
402402
# Called from a __del__ method, can't use a lock.
403-
self._session_pool.append(server_session)
403+
self._session_pool.return_server_session_no_lock(server_session)
404404

405405
def _new_selection(self):
406406
"""A Selection object, initially including all known servers.

0 commit comments

Comments
 (0)