Skip to content

Commit 0695c4d

Browse files
committed
rewrite how ClientSession handles _ServerSession
1 parent 38171e2 commit 0695c4d

File tree

5 files changed

+449
-275
lines changed

5 files changed

+449
-275
lines changed

pymongo/client_session.py

Lines changed: 45 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,11 @@ def __init__(self, client, server_session, options, authset):
117117
self._authset = authset
118118
self._cluster_time = None
119119
self._operation_time = None
120-
self._transaction_options = None # Current transaction's options.
121120
if self.options.auto_start_transaction:
122121
# TODO: Get transaction options from self.options.
123-
self._transaction_options = TransactionOptions()
124-
self._server_session.start_transaction()
122+
self._current_transaction_opts = TransactionOptions()
123+
else:
124+
self._current_transaction_opts = None
125125

126126
def end_session(self):
127127
"""Finish this session. If a transaction has started, abort it.
@@ -136,14 +136,19 @@ def end_session(self):
136136
def _end_session(self, lock, abort_txn):
137137
if self._server_session is not None:
138138
try:
139-
if self.in_transaction:
139+
if self._current_transaction_opts is not None:
140140
if abort_txn:
141-
self.abort_txn()
141+
self.abort_transaction()
142142
else:
143143
self.commit_transaction()
144144
finally:
145145
self._client._return_server_session(self._server_session, lock)
146146
self._server_session = None
147+
self._current_transaction_opts = None
148+
149+
def _check_ended(self):
150+
if self._server_session is None:
151+
raise InvalidOperation("Cannot use ended session")
147152

148153
def __enter__(self):
149154
return self
@@ -168,9 +173,7 @@ def options(self):
168173
@property
169174
def session_id(self):
170175
"""A BSON document, the opaque server session identifier."""
171-
if self._server_session is None:
172-
raise InvalidOperation("Cannot use ended session")
173-
176+
self._check_ended()
174177
return self._server_session.session_id
175178

176179
@property
@@ -195,27 +198,28 @@ def start_transaction(self, **kwargs):
195198
Do not use this method if the session is configured to automatically
196199
start a transaction.
197200
"""
198-
if self._server_session.in_transaction:
201+
self._check_ended()
202+
203+
if self._current_transaction_opts is not None:
199204
raise InvalidOperation("Transaction already in progress")
200205

201-
self._transaction_options = TransactionOptions(**kwargs)
202-
self._server_session.start_transaction()
206+
self._current_transaction_opts = TransactionOptions(**kwargs)
207+
self._server_session.statement_id = 0
203208

204209
def commit_transaction(self):
205210
"""Commit a multi-statement transaction."""
206211
self._finish_transaction("commitTransaction")
207212

208-
def abort_txn(self):
213+
def abort_transaction(self):
209214
"""Abort a multi-statement transaction."""
210215
self._finish_transaction("abortTransaction")
211216

212217
def _finish_transaction(self, command_name):
213-
if not self._server_session.in_transaction:
218+
if self._current_transaction_opts is None:
214219
raise InvalidOperation("No transaction in progress")
215220

216221
if self._server_session.statement_id == 0:
217222
# Not really started.
218-
self._server_session.reset_transaction()
219223
return
220224

221225
if command_name == 'abortTransaction':
@@ -228,11 +232,11 @@ def _finish_transaction(self, command_name):
228232
command_name,
229233
txnNumber=self._server_session.transaction_id,
230234
session=self,
231-
write_concern=self._transaction_options.write_concern,
235+
write_concern=self._current_transaction_opts.write_concern,
232236
parse_write_concern_error=True)
233237
finally:
234238
self._server_session.reset_transaction()
235-
self._transaction_options = None
239+
self._current_transaction_opts = None
236240

237241
def _advance_cluster_time(self, cluster_time):
238242
"""Internal cluster time helper."""
@@ -286,31 +290,42 @@ def has_ended(self):
286290
@property
287291
def in_transaction(self):
288292
"""True if this session has an active multi-statement transaction."""
289-
return (self._server_session is not None
290-
and self._server_session.in_transaction)
293+
return self._current_transaction_opts is not None
291294

292295
def _apply_to(self, command, is_retryable):
293-
# Internal function.
294-
if self._server_session is None:
295-
raise InvalidOperation("Cannot use ended session")
296+
self._check_ended()
296297

297298
if self.options.auto_start_transaction and not self.in_transaction:
298299
self.start_transaction()
299300

300-
self._server_session.apply_to(command, is_retryable)
301+
self._server_session.last_use = monotonic.time()
302+
command['lsid'] = self._server_session.session_id
301303

302-
def _advance_statement_id(self, n):
303-
# Internal function.
304-
if self._server_session is None:
305-
raise InvalidOperation("Cannot use ended session")
304+
if is_retryable:
305+
self._server_session._transaction_id += 1
306+
command['txnNumber'] = self._server_session.transaction_id
307+
return
308+
309+
if self._current_transaction_opts:
310+
if self._server_session.statement_id == 0:
311+
# First statement begins a new transaction.
312+
self._server_session._transaction_id += 1
313+
command['readConcern'] = {'level': 'snapshot'}
314+
command['autocommit'] = False
315+
316+
command['txnNumber'] = self._server_session.transaction_id
317+
# TODO: Allow stmtId for find/getMore, SERVER-33213.
318+
name = next(iter(command))
319+
if name not in ('find', 'getMore'):
320+
command['stmtId'] = self._server_session.statement_id
321+
self._server_session.statement_id += 1
306322

323+
def _advance_statement_id(self, n):
324+
self._check_ended()
307325
self._server_session.advance_statement_id(n)
308326

309327
def _retry_transaction_id(self):
310-
# Internal function.
311-
if self._server_session is None:
312-
raise InvalidOperation("Cannot use ended session")
313-
328+
self._check_ended()
314329
self._server_session.retry_transaction_id()
315330

316331

@@ -319,7 +334,6 @@ def __init__(self):
319334
# Ensure id is type 4, regardless of CodecOptions.uuid_representation.
320335
self.session_id = {'id': Binary(uuid.uuid4().bytes, 4)}
321336
self.last_use = monotonic.time()
322-
self.in_transaction = False
323337
self._transaction_id = 0
324338
self.statement_id = 0
325339

@@ -329,25 +343,6 @@ def timed_out(self, session_timeout_minutes):
329343
# Timed out if we have less than a minute to live.
330344
return idle_seconds > (session_timeout_minutes - 1) * 60
331345

332-
def apply_to(self, command, is_retryable):
333-
command['lsid'] = self.session_id
334-
335-
if is_retryable:
336-
self._transaction_id += 1
337-
command['txnNumber'] = self.transaction_id
338-
elif self.in_transaction:
339-
command['txnNumber'] = self.transaction_id
340-
# TODO: Allow stmtId for find/getMore, SERVER-33213.
341-
name = next(iter(command))
342-
if name not in ('find', 'getMore'):
343-
command['stmtId'] = self.statement_id
344-
if self.statement_id == 0:
345-
command['readConcern'] = {'level': 'snapshot'}
346-
command['autocommit'] = False
347-
self.statement_id += 1
348-
349-
self.last_use = monotonic.time()
350-
351346
def advance_statement_id(self, n):
352347
# Every command advances the statement id by 1 already.
353348
self.statement_id += (n - 1)
@@ -357,13 +352,7 @@ def transaction_id(self):
357352
"""Positive 64-bit integer."""
358353
return Int64(self._transaction_id)
359354

360-
def start_transaction(self):
361-
self._transaction_id += 1
362-
self.statement_id = 0
363-
self.in_transaction = True
364-
365355
def reset_transaction(self):
366-
self.in_transaction = False
367356
self.statement_id = 0
368357

369358
def retry_transaction_id(self):

test/transactions/abort.json

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,6 @@
279279
},
280280
{
281281
"description": "abort without start",
282-
"skipReason": "Server must implement abortTransaction",
283282
"operations": [
284283
{
285284
"name": "abortTransaction",
@@ -332,7 +331,7 @@
332331
"session": "session0"
333332
},
334333
"result": {
335-
"errorContains": "no such transaction"
334+
"errorContains": "no transaction in progress"
336335
}
337336
}
338337
],
@@ -368,18 +367,6 @@
368367
"command_name": "abortTransaction",
369368
"database_name": "admin"
370369
}
371-
},
372-
{
373-
"command_started_event": {
374-
"command": {
375-
"abortTransaction": 1,
376-
"lsid": "session0",
377-
"txnNumber": 1,
378-
"writeConcern": null
379-
},
380-
"command_name": "commitTransaction",
381-
"database_name": "admin"
382-
}
383370
}
384371
],
385372
"outcome": {

test/transactions/auto-start.json

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,6 @@
162162
},
163163
{
164164
"description": "explicit start",
165-
"skipReason": "Server must implement abortTransaction",
166165
"transactionOptions": {
167166
"session0": {
168167
"autoStartTransaction": true
@@ -185,18 +184,15 @@
185184
"name": "startTransaction",
186185
"arguments": {
187186
"session": "session0"
187+
},
188+
"result": {
189+
"errorContains": "transaction already in progress"
188190
}
189191
},
190192
{
191-
"name": "insertOne",
193+
"name": "commitTransaction",
192194
"arguments": {
193-
"document": {
194-
"_id": 2
195-
},
196195
"session": "session0"
197-
},
198-
"result": {
199-
"errorContains": "Cannot start"
200196
}
201197
}
202198
],
@@ -224,27 +220,23 @@
224220
{
225221
"command_started_event": {
226222
"command": {
227-
"insert": "test",
228-
"documents": [
229-
{
230-
"_id": 2
231-
}
232-
],
233-
"ordered": true,
223+
"commitTransaction": 1,
234224
"lsid": "session0",
235-
"txnNumber": 2,
236-
"stmtId": 0,
237-
"autocommit": false,
225+
"txnNumber": 1,
238226
"writeConcern": null
239227
},
240-
"command_name": "insert",
241-
"database_name": "transaction-tests"
228+
"command_name": "commitTransaction",
229+
"database_name": "admin"
242230
}
243231
}
244232
],
245233
"outcome": {
246234
"collection": {
247-
"data": []
235+
"data": [
236+
{
237+
"_id": 1
238+
}
239+
]
248240
}
249241
}
250242
},
@@ -424,7 +416,7 @@
424416
],
425417
"ordered": true,
426418
"lsid": "session0",
427-
"txnNumber": 2,
419+
"txnNumber": 1,
428420
"stmtId": 0,
429421
"autocommit": false,
430422
"writeConcern": null
@@ -438,7 +430,7 @@
438430
"command": {
439431
"commitTransaction": 1,
440432
"lsid": "session0",
441-
"txnNumber": 2,
433+
"txnNumber": 1,
442434
"writeConcern": null
443435
},
444436
"command_name": "commitTransaction",

0 commit comments

Comments
 (0)