Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
clean up change feed logic from query pipeline
  • Loading branch information
annie-mac committed Sep 17, 2024
commit ddd598e91ba949b7f5aeb4e87c20105ae77ad644
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,6 @@ def _QueryChangeFeed(
options = {}
else:
options = dict(options)
options["changeFeed"] = True

resource_key_map = {"Documents": "docs"}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def __init__(self, client, options):
"""
self._client = client
self._options = options
self._is_change_feed = "changeFeed" in options and options["changeFeed"] is True
self._continuation = self._get_initial_continuation()
self._has_started = False
self._has_finished = False
Expand Down Expand Up @@ -117,10 +116,6 @@ async def _fetch_items_helper_no_retries(self, fetch_function):
fetched_items = []
new_options = copy.deepcopy(self._options)
while self._continuation or not self._has_started:
# Check if this is first fetch for read from specific time change feed.
# For read specific time the first fetch will return empty even if we have more pages.
is_s_time_first_fetch = self._is_change_feed and self._options.get("startTime") and not self._has_started

new_options["continuation"] = self._continuation

response_headers = {}
Expand All @@ -129,13 +124,7 @@ async def _fetch_items_helper_no_retries(self, fetch_function):
self._has_started = True

continuation_key = http_constants.HttpHeaders.Continuation
# Use Etag as continuation token for change feed queries.
if self._is_change_feed:
continuation_key = http_constants.HttpHeaders.ETag
# In change feed queries, the continuation token is always populated. The hasNext() test is whether
# there is any items in the response or not.
# No initial fetch for start time change feed, so we need to pass continuation token for first fetch
if not self._is_change_feed or fetched_items or is_s_time_first_fetch:
if fetched_items:
self._continuation = response_headers.get(continuation_key)
else:
self._continuation = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ def __init__(self, client, options):
"""
self._client = client
self._options = options
self._is_change_feed = "changeFeed" in options and options["changeFeed"] is True
self._continuation = self._get_initial_continuation()
self._has_started = False
self._has_finished = False
Expand Down Expand Up @@ -115,9 +114,6 @@ def _fetch_items_helper_no_retries(self, fetch_function):
fetched_items = []
new_options = copy.deepcopy(self._options)
while self._continuation or not self._has_started:
# Check if this is first fetch for read from specific time change feed.
# For read specific time the first fetch will return empty even if we have more pages.
is_s_time_first_fetch = self._is_change_feed and self._options.get("startTime") and not self._has_started
if not self._has_started:
self._has_started = True
new_options["continuation"] = self._continuation
Expand All @@ -126,13 +122,7 @@ def _fetch_items_helper_no_retries(self, fetch_function):
(fetched_items, response_headers) = fetch_function(new_options)

continuation_key = http_constants.HttpHeaders.Continuation
# Use Etag as continuation token for change feed queries.
if self._is_change_feed:
continuation_key = http_constants.HttpHeaders.ETag
# In change feed queries, the continuation token is always populated. The hasNext() test is whether
# there is any items in the response or not.
# For start time however we get no initial results, so we need to pass continuation token
if not self._is_change_feed or fetched_items or is_s_time_first_fetch:
if fetched_items:
self._continuation = response_headers.get(continuation_key)
else:
self._continuation = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2277,7 +2277,6 @@ def _QueryChangeFeed(
options = {}
else:
options = dict(options)
options["changeFeed"] = True

resource_key_map = {"Documents": "docs"}

Expand Down