3636from bson .py3compat import iteritems
3737from bson .raw_bson import DEFAULT_RAW_BSON_OPTIONS , RawBSONDocument
3838
39- from pymongo import monitoring
4039from pymongo .change_stream import _NON_RESUMABLE_GETMORE_ERRORS
4140from pymongo .command_cursor import CommandCursor
4241from pymongo .errors import (InvalidOperation , OperationFailure ,
4342 ServerSelectionTimeoutError )
4443from pymongo .message import _CursorAddress
4544from pymongo .read_concern import ReadConcern
45+ from pymongo .write_concern import WriteConcern
4646
4747from test import client_context , unittest , IntegrationTest
4848from test .utils import (
@@ -70,10 +70,13 @@ def kill_change_stream_cursor(self, change_stream):
7070 client ._close_cursor_now (cursor .cursor_id , address )
7171
7272 def test_try_next (self ):
73- coll = self .watched_collection ()
73+ # ChangeStreams only read majority committed data so use w:majority.
74+ coll = self .watched_collection ().with_options (
75+ write_concern = WriteConcern ("majority" ))
76+ coll .drop ()
7477 coll .insert_one ({})
7578 self .addCleanup (coll .drop )
76- with self .change_stream (max_await_time_ms = 100 ) as stream :
79+ with self .change_stream (max_await_time_ms = 250 ) as stream :
7780 self .assertIsNone (stream .try_next ())
7881 self .assertIsNone (stream ._resume_token )
7982 coll .insert_one ({})
@@ -88,14 +91,16 @@ def test_try_next_runs_one_getmore(self):
8891 # Connect to the cluster.
8992 client .admin .command ('ping' )
9093 listener .results .clear ()
91- coll = self .watched_collection ()
94+ # ChangeStreams only read majority committed data so use w:majority.
95+ coll = self .watched_collection ().with_options (
96+ write_concern = WriteConcern ("majority" ))
9297 coll .drop ()
9398 # Create the watched collection before starting the change stream to
9499 # skip any "create" events.
95100 coll .insert_one ({'_id' : 1 })
96101 self .addCleanup (coll .drop )
97102 with self .change_stream_with_client (
98- client , max_await_time_ms = 100 ) as stream :
103+ client , max_await_time_ms = 250 ) as stream :
99104 self .assertEqual (listener .started_command_names (), ["aggregate" ])
100105 listener .results .clear ()
101106
0 commit comments