1212# implied. See the License for the specific language governing
1313# permissions and limitations under the License.
1414
15- """ChangeStream cursor to iterate over changes on a collection."""
15+ """Watch changes on a collection, a database, or the entire cluster ."""
1616
1717import copy
1818
@@ -41,14 +41,12 @@ class ChangeStream(object):
4141 """The internal abstract base class for change stream cursors.
4242
4343 Should not be called directly by application developers. Use
44- :meth:pymongo.collection.Collection.watch,
45- :meth:pymongo.database.Database.watch, or
46- :meth:pymongo.mongo_client.MongoClient.watch instead.
47-
48- Defines the interface for change streams. Should be subclassed to
49- implement the `ChangeStream._create_cursor` abstract method, and
50- the `ChangeStream._database`and ChangeStream._aggregation_target`
51- abstract properties.
44+ :meth:`pymongo.collection.Collection.watch`,
45+ :meth:`pymongo.database.Database.watch`, or
46+ :meth:`pymongo.mongo_client.MongoClient.watch` instead.
47+
48+ .. versionadded:: 3.6
49+ .. mongodoc:: changeStreams
5250 """
5351 def __init__ (self , target , pipeline , full_document , resume_after ,
5452 max_await_time_ms , batch_size , collation ,
@@ -175,34 +173,97 @@ def next(self):
175173 """Advance the cursor.
176174
177175 This method blocks until the next change document is returned or an
178- unrecoverable error is raised.
176+ unrecoverable error is raised. This method is used when iterating over
177+ all changes in the cursor. For example::
178+
179+ try:
180+ with db.collection.watch(
181+ [{'$match': {'operationType': 'insert'}}]) as stream:
182+ for insert_change in stream:
183+ print(insert_change)
184+ except pymongo.errors.PyMongoError:
185+ # The ChangeStream encountered an unrecoverable error or the
186+ # resume attempt failed to recreate the cursor.
187+ logging.error('...')
179188
180189 Raises :exc:`StopIteration` if this ChangeStream is closed.
181190 """
182- while True :
183- try :
184- change = self ._cursor .next ()
185- except ConnectionFailure :
186- self ._resume ()
187- continue
188- except OperationFailure as exc :
189- if exc .code in _NON_RESUMABLE_GETMORE_ERRORS :
190- raise
191- self ._resume ()
192- continue
193- try :
194- resume_token = change ['_id' ]
195- except KeyError :
196- self .close ()
197- raise InvalidOperation (
198- "Cannot provide resume functionality when the resume "
199- "token is missing." )
200- self ._resume_token = copy .copy (resume_token )
201- self ._start_at_operation_time = None
202- return change
191+ while self .alive :
192+ doc = self .try_next ()
193+ if doc is not None :
194+ return doc
195+
196+ raise StopIteration
203197
204198 __next__ = next
205199
200+ @property
201+ def alive (self ):
202+ """Does this cursor have the potential to return more data?
203+
204+ .. note:: Even if :attr:`alive` is ``True``, :meth:`next` can raise
205+ :exc:`StopIteration` and :meth:`try_next` can return ``None``.
206+
207+ .. versionadded:: 3.8
208+ """
209+ return self ._cursor .alive
210+
211+ def try_next (self ):
212+ """Advance the cursor without blocking indefinitely.
213+
214+ This method returns the next change document without waiting
215+ indefinitely for the next change. For example::
216+
217+ with db.collection.watch() as stream:
218+ while stream.alive:
219+ change = stream.try_next()
220+ if change is not None:
221+ print(change)
222+ elif stream.alive:
223+ # We end up here when there are no recent changes.
224+ # Sleep for a while to avoid flooding the server with
225+ # getMore requests when no changes are available.
226+ time.sleep(10)
227+
228+ If no change document is cached locally then this method runs a single
229+ getMore command. If the getMore yields any documents, the next
230+ document is returned, otherwise, if the getMore returns no documents
231+ (because there have been no changes) then ``None`` is returned.
232+
233+ :Returns:
234+ The next change document or ``None`` when no document is available
235+ after running a single getMore or when the cursor is closed.
236+
237+ .. versionadded:: 3.8
238+ """
239+ # Attempt to get the next change with at most one getMore and at most
240+ # one resume attempt.
241+ try :
242+ change = self ._cursor ._try_next (True )
243+ except ConnectionFailure :
244+ self ._resume ()
245+ change = self ._cursor ._try_next (False )
246+ except OperationFailure as exc :
247+ if exc .code in _NON_RESUMABLE_GETMORE_ERRORS :
248+ raise
249+ self ._resume ()
250+ change = self ._cursor ._try_next (False )
251+
252+ # No changes are available.
253+ if change is None :
254+ return None
255+
256+ try :
257+ resume_token = change ['_id' ]
258+ except KeyError :
259+ self .close ()
260+ raise InvalidOperation (
261+ "Cannot provide resume functionality when the resume "
262+ "token is missing." )
263+ self ._resume_token = copy .copy (resume_token )
264+ self ._start_at_operation_time = None
265+ return change
266+
206267 def __enter__ (self ):
207268 return self
208269
@@ -211,13 +272,12 @@ def __exit__(self, exc_type, exc_val, exc_tb):
211272
212273
213274class CollectionChangeStream (ChangeStream ):
214- """Class for creating a change stream on a collection.
275+ """A change stream that watches changes on a single collection.
215276
216277 Should not be called directly by application developers. Use
217278 helper method :meth:`pymongo.collection.Collection.watch` instead.
218279
219- .. versionadded: 3.6
220- .. mongodoc:: changeStreams
280+ .. versionadded:: 3.7
221281 """
222282 @property
223283 def _aggregation_target (self ):
@@ -229,13 +289,12 @@ def _database(self):
229289
230290
231291class DatabaseChangeStream (ChangeStream ):
232- """Class for creating a change stream on all collections in a database.
292+ """A change stream that watches changes on all collections in a database.
233293
234294 Should not be called directly by application developers. Use
235295 helper method :meth:`pymongo.database.Database.watch` instead.
236296
237- .. versionadded: 3.7
238- .. mongodoc:: changeStreams
297+ .. versionadded:: 3.7
239298 """
240299 @property
241300 def _aggregation_target (self ):
@@ -247,13 +306,12 @@ def _database(self):
247306
248307
249308class ClusterChangeStream (DatabaseChangeStream ):
250- """Class for creating a change stream on all collections on a cluster.
309+ """A change stream that watches changes on all collections in the cluster.
251310
252311 Should not be called directly by application developers. Use
253312 helper method :meth:`pymongo.mongo_client.MongoClient.watch` instead.
254313
255- .. versionadded: 3.7
256- .. mongodoc:: changeStreams
314+ .. versionadded:: 3.7
257315 """
258316 def _pipeline_options (self ):
259317 options = super (ClusterChangeStream , self )._pipeline_options ()
0 commit comments