Skip to content

Commit 08f7033

Browse files
authored
fix(vespa): handle continuation tokens in selection-based bulk deletes (#1733)
* fix(vespa): propagate delete errors instead of swallowing them * fix(vespa): use 'contains' instead of '=' for collection_id in fast-delete YQL The fast-delete path constructs a YQL query to resolve Vespa doc IDs before issuing parallel direct DELETEs. The query used `= '{uuid}'` for the collection_id filter, but in Vespa YQL `=` is the numeric equality operator — it fails on UUID strings with HTTP 400 "not an int item expression: Illegal embedded sign character" (the `-` in UUIDs). This caused every fast-delete to fall back to the visitor-based selection scan (5 schemas × 120s timeout each ≈ 300s per batch), producing 1,100+ "update_delete_VespaDestination slow: 300.Xs" warnings per hour in production. Fix: change `=` to `contains` (the correct YQL string-match operator), consistent with query_builder.py and vespa_client.py which already use `contains` for the same field. * fix(vespa): handle continuation token in selection-based bulk deletes Vespa visitor-based DELETE returns a continuation token when the result set is too large for a single pass. The previous implementation made a single request and ignored the token, silently leaving undeleted documents behind on large collections. Now delete_by_selection loops: issue DELETE, check for continuation token in response, re-issue with continuation param until Vespa stops returning one. This affects delete_by_sync_id, delete_by_collection_id, and the entity-level fallback path. * style: fix ruff format for bulk delete response assignment
1 parent 9dad3da commit 08f7033

2 files changed

Lines changed: 178 additions & 13 deletions

File tree

backend/airweave/platform/destinations/vespa/client.py

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,9 @@ async def delete_by_selection(self, schema: str, selection: str) -> DeleteResult
188188
"""Delete documents using Vespa's selection-based bulk delete API (visitor scan).
189189
190190
Uses a visitor that walks all buckets evaluating the selection expression.
191-
Appropriate for broad deletions (by sync_id, collection_id) but slow for
192-
targeted deletions -- use _delete_by_doc_ids for those.
191+
For large result sets Vespa returns a continuation token; this method
192+
loops until no more tokens are returned so all matching documents are
193+
deleted.
193194
194195
Uses: DELETE /document/v1/{namespace}/{doctype}/docid?selection={expr}&cluster={cluster}
195196
@@ -200,26 +201,51 @@ async def delete_by_selection(self, schema: str, selection: str) -> DeleteResult
200201
Returns:
201202
DeleteResult with count of deleted documents
202203
"""
203-
url = self._build_bulk_delete_url(schema, selection)
204+
base_url = self._build_bulk_delete_url(schema, selection)
204205
self._logger.debug(f"[VespaClient] Bulk delete from {schema} with selection: {selection}")
205206

206207
deleted_count = 0
208+
continuation: Optional[str] = None
209+
pass_num = 0
210+
207211
try:
208212
async with httpx.AsyncClient(timeout=settings.VESPA_TIMEOUT) as client:
209-
async with client.stream("DELETE", url) as response:
210-
if response.status_code == 200:
211-
deleted_count = await self._parse_bulk_delete_response(response)
212-
else:
213-
await self._log_delete_error(response)
213+
while True:
214+
pass_num += 1
215+
url = base_url
216+
if continuation:
217+
url += f"&continuation={quote(continuation, safe='')}"
218+
219+
async with client.stream("DELETE", url) as response:
220+
if response.status_code == 200:
221+
batch_count, continuation = await self._parse_bulk_delete_response(
222+
response
223+
)
224+
deleted_count += batch_count
225+
else:
226+
await self._log_delete_error(response)
227+
break
228+
229+
if not continuation:
230+
break
231+
232+
self._logger.debug(
233+
f"[VespaClient] Bulk delete pass {pass_num}: "
234+
f"{deleted_count} deleted so far, continuing..."
235+
)
214236
except httpx.TimeoutException:
215237
self._logger.error(
216-
f"[VespaClient] Bulk delete timed out after {settings.VESPA_TIMEOUT}s"
238+
f"[VespaClient] Bulk delete timed out after {settings.VESPA_TIMEOUT}s "
239+
f"(pass {pass_num}, {deleted_count} deleted before timeout)"
217240
)
218241
except Exception as e:
219242
self._logger.error(f"[VespaClient] Bulk delete error: {e}")
220243

221244
if deleted_count > 0:
222-
self._logger.info(f"[VespaClient] Deleted {deleted_count} documents from {schema}")
245+
self._logger.info(
246+
f"[VespaClient] Deleted {deleted_count} documents from {schema} "
247+
f"in {pass_num} pass(es)"
248+
)
223249
else:
224250
self._logger.debug(f"[VespaClient] No documents to delete from {schema}")
225251

@@ -477,14 +503,24 @@ def _build_bulk_delete_url(self, schema: str, selection: str) -> str:
477503
f"&cluster={settings.VESPA_CLUSTER}"
478504
)
479505

480-
async def _parse_bulk_delete_response(self, response: httpx.Response) -> int:
481-
"""Parse streaming response from Vespa bulk delete."""
506+
async def _parse_bulk_delete_response(
507+
self, response: httpx.Response
508+
) -> Tuple[int, Optional[str]]:
509+
"""Parse streaming response from Vespa bulk delete.
510+
511+
Returns:
512+
(deleted_count, continuation_token) — token is None when the
513+
visitor has finished and there are no more documents to process.
514+
"""
482515
count = 0
516+
continuation: Optional[str] = None
483517
async for line in response.aiter_lines():
484518
if not line.strip():
485519
continue
486520
try:
487521
result = json.loads(line)
522+
if "continuation" in result:
523+
continuation = result["continuation"]
488524
if "documentCount" in result:
489525
count += result["documentCount"]
490526
elif "sessionStats" in result:
@@ -494,7 +530,7 @@ async def _parse_bulk_delete_response(self, response: httpx.Response) -> int:
494530
count += 1
495531
except json.JSONDecodeError:
496532
pass
497-
return count
533+
return count, continuation
498534

499535
async def _log_delete_error(self, response: httpx.Response) -> None:
500536
"""Log error from failed bulk delete response."""

backend/tests/unit/platform/destinations/vespa/test_client.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
"""Unit tests for VespaClient (with mocked I/O)."""
22

3+
import json
4+
from contextlib import asynccontextmanager
5+
6+
import httpx
37
import pytest
48
from unittest.mock import AsyncMock, MagicMock, patch
59
from uuid import UUID
@@ -123,6 +127,131 @@ async def test_delete_by_selection_builds_url_correctly(self, client):
123127
assert result.deleted_count == 5
124128
assert result.schema_name == schema
125129

130+
@pytest.mark.asyncio
131+
async def test_delete_by_selection_follows_continuation_token(self, client):
132+
"""Test that delete_by_selection loops when Vespa returns a continuation token.
133+
134+
Vespa's visitor-based delete returns a continuation token for large result
135+
sets. The client must re-issue the DELETE with the token until Vespa stops
136+
returning one.
137+
"""
138+
captured_urls: list[str] = []
139+
140+
def _make_stream_response(body_lines: list[str], status: int = 200):
141+
"""Build a fake async streaming response."""
142+
@asynccontextmanager
143+
async def _stream(method, url, **kwargs):
144+
captured_urls.append(url)
145+
resp = MagicMock()
146+
resp.status_code = status
147+
148+
async def aiter_lines():
149+
for line in body_lines:
150+
yield line
151+
152+
resp.aiter_lines = aiter_lines
153+
yield resp
154+
return _stream
155+
156+
pass_1_body = [
157+
json.dumps({"documentCount": 500, "continuation": "AAAABB=="}),
158+
]
159+
pass_2_body = [
160+
json.dumps({"documentCount": 300, "continuation": "CCCCDD=="}),
161+
]
162+
pass_3_body = [
163+
json.dumps({"documentCount": 200}),
164+
]
165+
166+
call_count = 0
167+
168+
@asynccontextmanager
169+
async def mock_async_client(**kwargs):
170+
class FakeClient:
171+
def stream(self, method, url, **kw):
172+
nonlocal call_count
173+
call_count += 1
174+
if call_count == 1:
175+
return _make_stream_response(pass_1_body)(method, url)
176+
elif call_count == 2:
177+
return _make_stream_response(pass_2_body)(method, url)
178+
else:
179+
return _make_stream_response(pass_3_body)(method, url)
180+
yield FakeClient()
181+
182+
with patch("httpx.AsyncClient", side_effect=lambda **kw: mock_async_client(**kw)):
183+
result = await client.delete_by_selection("base_entity", "field=='value'")
184+
185+
assert result.deleted_count == 1000
186+
assert call_count == 3
187+
assert "continuation=" not in captured_urls[0]
188+
assert "continuation=" in captured_urls[1]
189+
assert "AAAABB" in captured_urls[1]
190+
assert "continuation=" in captured_urls[2]
191+
assert "CCCCDD" in captured_urls[2]
192+
193+
@pytest.mark.asyncio
194+
async def test_delete_by_selection_no_continuation_single_pass(self, client):
195+
"""Test that delete_by_selection completes in one pass when no continuation token."""
196+
call_count = 0
197+
198+
@asynccontextmanager
199+
async def mock_async_client(**kwargs):
200+
class FakeClient:
201+
def stream(self, method, url, **kw):
202+
nonlocal call_count
203+
call_count += 1
204+
205+
@asynccontextmanager
206+
async def _ctx(m, u):
207+
resp = MagicMock()
208+
resp.status_code = 200
209+
210+
async def aiter_lines():
211+
yield json.dumps({"documentCount": 42})
212+
213+
resp.aiter_lines = aiter_lines
214+
yield resp
215+
216+
return _ctx(method, url)
217+
yield FakeClient()
218+
219+
with patch("httpx.AsyncClient", side_effect=lambda **kw: mock_async_client(**kw)):
220+
result = await client.delete_by_selection("base_entity", "field=='value'")
221+
222+
assert result.deleted_count == 42
223+
assert call_count == 1
224+
225+
@pytest.mark.asyncio
226+
async def test_parse_bulk_delete_response_extracts_continuation(self, client):
227+
"""Test _parse_bulk_delete_response returns the continuation token."""
228+
response = MagicMock()
229+
230+
async def aiter_lines():
231+
yield json.dumps({"documentCount": 150, "continuation": "TOKEN123"})
232+
233+
response.aiter_lines = aiter_lines
234+
235+
count, token = await client._parse_bulk_delete_response(response)
236+
237+
assert count == 150
238+
assert token == "TOKEN123"
239+
240+
@pytest.mark.asyncio
241+
async def test_parse_bulk_delete_response_none_when_no_continuation(self, client):
242+
"""Test _parse_bulk_delete_response returns None when no continuation."""
243+
response = MagicMock()
244+
245+
async def aiter_lines():
246+
yield json.dumps({"documentCount": 50})
247+
248+
response.aiter_lines = aiter_lines
249+
250+
count, token = await client._parse_bulk_delete_response(response)
251+
252+
assert count == 50
253+
assert token is None
254+
126255
@pytest.mark.asyncio
127256
async def test_delete_by_sync_id(self, client):
128257
"""Test delete by sync ID across all schemas."""

0 commit comments

Comments
 (0)