Skip to content

Commit c43f45b

Browse files
committed
mango: prevent occasional duplication of paginated text results
When an interleaved update happens to a `text` index while it is being queried, the affected documents might appear duplicated in the collected results. That is because `search` results are paginated where the boundaries might move due to the updates and the document might show up once more on subsequent queries. Mitigate the situation by tracking and losing duplicated documents while the results are being streamed to the user.
1 parent 06699b1 commit c43f45b

File tree

3 files changed

+248
-76
lines changed

3 files changed

+248
-76
lines changed

src/mango/src/mango_cursor_text.erl

Lines changed: 99 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@
3737
user_fun,
3838
user_acc,
3939
fields,
40-
execution_stats
40+
execution_stats,
41+
documents_seen
4142
}).
4243

4344
-spec create(Db, {Indexes, Trace}, Selector, Options) -> {ok, #cursor{}} | no_return() when
@@ -118,7 +119,8 @@ execute(Cursor, UserFun, UserAcc) ->
118119
user_fun = UserFun,
119120
user_acc = UserAcc,
120121
fields = Cursor#cursor.fields,
121-
execution_stats = mango_execution_stats:log_start(Stats)
122+
execution_stats = mango_execution_stats:log_start(Stats),
123+
documents_seen = sets:new([{version, 2}])
122124
},
123125
try
124126
case Query of
@@ -197,26 +199,40 @@ handle_hit(CAcc0, Sort, Doc) ->
197199
#cacc{
198200
limit = Limit,
199201
skip = Skip,
200-
execution_stats = Stats
202+
execution_stats = Stats,
203+
documents_seen = Seen
201204
} = CAcc0,
202205
CAcc1 = update_bookmark(CAcc0, Sort),
203206
Stats1 = mango_execution_stats:incr_docs_examined(Stats),
204207
couch_stats:increment_counter([mango, docs_examined]),
205208
CAcc2 = CAcc1#cacc{execution_stats = Stats1},
206209
case mango_selector:match(CAcc2#cacc.selector, Doc) of
207-
true when Skip > 0 ->
208-
CAcc2#cacc{skip = Skip - 1};
209-
true when Limit == 0 ->
210-
% We hit this case if the user spcified with a
211-
% zero limit. Notice that in this case we need
212-
% to return the bookmark from before this match
213-
throw({stop, CAcc0});
214-
true when Limit == 1 ->
215-
NewCAcc = apply_user_fun(CAcc2, Doc),
216-
throw({stop, NewCAcc});
217-
true when Limit > 1 ->
218-
NewCAcc = apply_user_fun(CAcc2, Doc),
219-
NewCAcc#cacc{limit = Limit - 1};
210+
true ->
211+
DocId = mango_fields:extract(Doc, [<<"_id">>]),
212+
case sets:is_element(DocId, Seen) of
213+
true ->
214+
CAcc2;
215+
false ->
216+
CAcc3 = CAcc2#cacc{
217+
documents_seen = sets:add_element(DocId, Seen)
218+
},
219+
if
220+
Skip > 0 ->
221+
CAcc3#cacc{skip = Skip - 1};
222+
Limit == 0 ->
223+
% We hit this case if the user specified
224+
% with a zero limit. Notice that in this
225+
% case we need to return the bookmark from
226+
% before this match.
227+
throw({stop, CAcc0});
228+
Limit == 1 ->
229+
CAcc4 = apply_user_fun(CAcc3, Doc),
230+
throw({stop, CAcc4});
231+
Limit > 1 ->
232+
CAcc4 = apply_user_fun(CAcc3, Doc),
233+
CAcc4#cacc{limit = Limit - 1}
234+
end
235+
end;
220236
false ->
221237
CAcc2
222238
end.
@@ -522,7 +538,16 @@ execute_test_() ->
522538
append_sort_type,
523539
fun(RawField, selector) -> <<RawField/binary, "<type>">> end
524540
),
525-
meck:expect(mango_fields, extract, fun({doc, N}, fields) -> {final_doc, N} end),
541+
meck:expect(
542+
mango_fields,
543+
extract,
544+
fun({doc, N}, Fields) ->
545+
case Fields of
546+
fields -> {final_doc, N};
547+
[<<"_id">>] -> N
548+
end
549+
end
550+
),
526551
meck:expect(
527552
foo,
528553
add_key_only,
@@ -549,8 +574,8 @@ execute_test_() ->
549574
?TDEF_FE(t_execute_more_results, 10),
550575
?TDEF_FE(t_execute_unique_results, 10),
551576
?TDEF_FE(t_execute_limit_cutoff, 10),
577+
?TDEF_FE(t_execute_limit_cutoff_unique, 10),
552578
?TDEF_FE(t_execute_limit_zero, 10),
553-
?TDEF_FE(t_execute_limit_unique, 10),
554579
?TDEF_FE(t_execute_skip, 10),
555580
?TDEF_FE(t_execute_skip_unique, 10),
556581
?TDEF_FE(t_execute_no_matches, 10),
@@ -753,54 +778,12 @@ t_execute_limit_cutoff(_) ->
753778
meck:expect(mango_selector_text, convert, [selector], meck:val(query)),
754779
meck:expect(mango_selector, match, fun(selector, {doc, _N}) -> true end),
755780
?assertEqual({ok, {acc, 5}}, execute(Cursor, fun foo:normal/2, {acc, 0})),
756-
?assertEqual(2, meck:num_calls(couch_stats, increment_counter, 1)),
757-
?assertEqual(2, meck:num_calls(mango_execution_stats, incr_docs_examined, 1)),
758-
?assertEqual(Limit, meck:num_calls(mango_execution_stats, incr_results_returned, 1)).
759-
760-
t_execute_limit_zero(_) ->
761-
Limit = 0,
762-
Skip = 0,
763-
IdxName = <<"index">>,
764-
Options = [{partition, <<>>}, {sort, {[]}}, {bookmark, [bookmark, 0]}],
765-
Cursor = #cursor{
766-
db = db,
767-
index = #idx{ddoc = <<"_design/ddoc">>, name = IdxName},
768-
limit = Limit,
769-
skip = Skip,
770-
fields = fields,
771-
selector = selector,
772-
opts = Options,
773-
execution_stats = stats
774-
},
775-
QueryArgs =
776-
#index_query_args{
777-
q = query,
778-
partition = nil,
779-
limit = Skip + Limit,
780-
bookmark = [bookmark, 0],
781-
sort = relevance,
782-
raw_bookmark = true
783-
},
784-
Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 1}}]}},
785-
Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 2}}]}},
786-
Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 3}}]}},
787-
Hits = [Hit1, Hit2, Hit3],
788-
meck:expect(
789-
dreyfus_fabric_search,
790-
go,
791-
[db_name, <<"ddoc">>, IdxName, QueryArgs],
792-
meck:val({ok, [bookmark, 1], undefined, Hits, undefined, undefined})
793-
),
794-
meck:expect(mango_selector_text, convert, [selector], meck:val(query)),
795-
meck:expect(mango_selector, match, fun(selector, {doc, _N}) -> true end),
796-
?assertEqual({ok, {acc, 3}}, execute(Cursor, fun foo:add_key_only/2, {acc, 0})),
797-
?assertEqual(1, meck:num_calls(couch_stats, increment_counter, 1)),
798-
?assertEqual(1, meck:num_calls(mango_execution_stats, incr_docs_examined, 1)),
781+
?assertEqual(Limit, meck:num_calls(couch_stats, increment_counter, 1)),
782+
?assertEqual(Limit, meck:num_calls(mango_execution_stats, incr_docs_examined, 1)),
799783
?assertEqual(Limit, meck:num_calls(mango_execution_stats, incr_results_returned, 1)).
800784

801-
t_execute_limit_unique(_) ->
802-
Limit = 5,
803-
UniqueHits = 3,
785+
t_execute_limit_cutoff_unique(_) ->
786+
Limit = 2,
804787
Options = [{partition, partition}, {sort, {[]}}, {bookmark, []}],
805788
Cursor = #cursor{
806789
db = db,
@@ -835,17 +818,60 @@ t_execute_limit_unique(_) ->
835818
Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 1}}]}},
836819
Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 2}}]}},
837820
Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 3}}]}},
838-
{[bookmark, 4], [Hit3, Hit2, Hit1]}
821+
{[bookmark, 4], [Hit3, Hit2, Hit1]};
822+
[bookmark, 7] ->
823+
{[bookmark, 8], []}
839824
end,
840825
{ok, Bookmark, undefined, Hits, undefined, undefined}
841826
end
842827
),
843828
meck:expect(mango_selector_text, convert, [selector], meck:val(query)),
844829
meck:expect(mango_selector, match, fun(selector, {doc, _N}) -> true end),
845-
?assertEqual({ok, {acc, 6}}, execute(Cursor, fun foo:normal/2, {acc, 0})),
830+
?assertEqual({ok, {acc, 5}}, execute(Cursor, fun foo:normal/2, {acc, 0})),
846831
?assertEqual(Limit, meck:num_calls(couch_stats, increment_counter, 1)),
847832
?assertEqual(Limit, meck:num_calls(mango_execution_stats, incr_docs_examined, 1)),
848-
?assertEqual(UniqueHits, meck:num_calls(mango_execution_stats, incr_results_returned, 1)).
833+
?assertEqual(Limit, meck:num_calls(mango_execution_stats, incr_results_returned, 1)).
834+
835+
t_execute_limit_zero(_) ->
836+
Limit = 0,
837+
Skip = 0,
838+
IdxName = <<"index">>,
839+
Options = [{partition, <<>>}, {sort, {[]}}, {bookmark, [bookmark, 0]}],
840+
Cursor = #cursor{
841+
db = db,
842+
index = #idx{ddoc = <<"_design/ddoc">>, name = IdxName},
843+
limit = Limit,
844+
skip = Skip,
845+
fields = fields,
846+
selector = selector,
847+
opts = Options,
848+
execution_stats = stats
849+
},
850+
QueryArgs =
851+
#index_query_args{
852+
q = query,
853+
partition = nil,
854+
limit = Skip + Limit,
855+
bookmark = [bookmark, 0],
856+
sort = relevance,
857+
raw_bookmark = true
858+
},
859+
Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 1}}]}},
860+
Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 2}}]}},
861+
Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 3}}]}},
862+
Hits = [Hit1, Hit2, Hit3],
863+
meck:expect(
864+
dreyfus_fabric_search,
865+
go,
866+
[db_name, <<"ddoc">>, IdxName, QueryArgs],
867+
meck:val({ok, [bookmark, 1], undefined, Hits, undefined, undefined})
868+
),
869+
meck:expect(mango_selector_text, convert, [selector], meck:val(query)),
870+
meck:expect(mango_selector, match, fun(selector, {doc, _N}) -> true end),
871+
?assertEqual({ok, {acc, 3}}, execute(Cursor, fun foo:add_key_only/2, {acc, 0})),
872+
?assertEqual(1, meck:num_calls(couch_stats, increment_counter, 1)),
873+
?assertEqual(1, meck:num_calls(mango_execution_stats, incr_docs_examined, 1)),
874+
?assertEqual(Limit, meck:num_calls(mango_execution_stats, incr_results_returned, 1)).
849875

850876
t_execute_skip(_) ->
851877
UniqueHits = 3,
@@ -898,12 +924,13 @@ t_execute_skip(_) ->
898924
t_execute_skip_unique(_) ->
899925
UniqueHits = 3,
900926
AllHits = 6,
927+
Skip = 2,
901928
Options = [{partition, partition}, {sort, {[]}}, {bookmark, []}],
902929
Cursor = #cursor{
903930
db = db,
904931
index = #idx{ddoc = <<"ddoc">>, name = <<"index">>},
905932
limit = 10,
906-
skip = 2,
933+
skip = Skip,
907934
fields = fields,
908935
selector = selector,
909936
opts = Options,
@@ -941,10 +968,12 @@ t_execute_skip_unique(_) ->
941968
),
942969
meck:expect(mango_selector_text, convert, [selector], meck:val(query)),
943970
meck:expect(mango_selector, match, fun(selector, {doc, _N}) -> true end),
944-
?assertEqual({ok, {acc, 6}}, execute(Cursor, fun foo:normal/2, {acc, 0})),
971+
?assertEqual({ok, {acc, 4}}, execute(Cursor, fun foo:normal/2, {acc, 0})),
945972
?assertEqual(AllHits, meck:num_calls(couch_stats, increment_counter, 1)),
946973
?assertEqual(AllHits, meck:num_calls(mango_execution_stats, incr_docs_examined, 1)),
947-
?assertEqual(UniqueHits, meck:num_calls(mango_execution_stats, incr_results_returned, 1)).
974+
?assertEqual(
975+
UniqueHits - Skip, meck:num_calls(mango_execution_stats, incr_results_returned, 1)
976+
).
948977

949978
t_execute_no_matches(_) ->
950979
UniqueHits = 3,
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
2+
# use this file except in compliance with the License. You may obtain a copy of
3+
# the License at
4+
#
5+
# http://www.apache.org/licenses/LICENSE-2.0
6+
#
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10+
# License for the specific language governing permissions and limitations under
11+
# the License.
12+
13+
14+
import mango
15+
import unittest
16+
import time
17+
18+
19+
@unittest.skipUnless(mango.has_text_service(), "requires text service")
20+
class PaginatedResultsTest(mango.DbPerClass):
21+
# Great enough to make faster systems busy while running the
22+
# query.
23+
NUM_DOCS = 10000
24+
UPDATES = 25
25+
26+
def setUp(self):
27+
self.db.recreate()
28+
self.db.create_text_index(
29+
analyzer="keyword",
30+
default_field={},
31+
selector={},
32+
fields=[
33+
{"name": "_id", "type": "string"},
34+
{"name": "name", "type": "string"},
35+
],
36+
index_array_lengths=True,
37+
)
38+
docs = [
39+
{"_id": f"{doc_id:08X}", "name": mango.random_string(32)}
40+
for doc_id in range(self.NUM_DOCS)
41+
]
42+
self.db.save_docs(docs)
43+
44+
def test_query_with_lot_of_results(self):
45+
# 200 is the maximum for `text` searches.
46+
docs = self.db.find(selector={"_id": {"$lte": f"{1000:08X}"}}, limit=200)
47+
assert len(docs) == 200
48+
49+
def do_query(self, delay, find_args):
50+
time.sleep(delay)
51+
return self.db.find(*find_args)
52+
53+
def do_updates(self, pause, doc_id):
54+
for i in range(self.UPDATES):
55+
doc = self.db.open_doc(doc_id)
56+
updated_doc = {
57+
"_id": doc_id,
58+
"_rev": doc["_rev"],
59+
"update": i,
60+
"name": "foobar",
61+
}
62+
self.db.save_doc(updated_doc)
63+
time.sleep(pause)
64+
65+
def test_no_duplicates_on_interleaved_updates(self):
66+
# Give ~500 ms head start for the updates before running the
67+
# query.
68+
query = mango.Concurrently(self.do_query, (0.5, ({"name": "foobar"},)))
69+
# Keep updating the target document in every 200 ms.
70+
updates = mango.Concurrently(self.do_updates, (0.2, f"{2:08X}"))
71+
docs = query.get_result()
72+
updates.join()
73+
assert len(docs) == 1
74+
75+
def test_no_duplicates_on_interleaved_updates_heavy(self):
76+
query = mango.Concurrently(self.do_query, (0.5, ({"name": "foobar"},)))
77+
updates = [
78+
mango.Concurrently(self.do_updates, (0.05, f"{2:08X}")),
79+
mango.Concurrently(self.do_updates, (0.2, f"{3:08X}")),
80+
mango.Concurrently(self.do_updates, (0.3, f"{4:08X}")),
81+
mango.Concurrently(self.do_updates, (0.15, f"{5:08X}")),
82+
mango.Concurrently(self.do_updates, (0.1, f"{6:08X}")),
83+
]
84+
docs = query.get_result()
85+
for ref in updates:
86+
ref.join()
87+
ids = list(map(lambda d: d["_id"], docs))
88+
assert sorted(ids) == [
89+
f"{2:08X}",
90+
f"{3:08X}",
91+
f"{4:08X}",
92+
f"{5:08X}",
93+
f"{6:08X}",
94+
]
95+
96+
def test_no_duplicates_on_interleaved_updates_with_limit_skip(self):
97+
query = mango.Concurrently(self.do_query, (0.5, ({"name": "foobar"}, 1, 3)))
98+
updates = [
99+
mango.Concurrently(self.do_updates, (0.05, f"{2:08X}")),
100+
mango.Concurrently(self.do_updates, (0.2, f"{3:08X}")),
101+
mango.Concurrently(self.do_updates, (0.3, f"{4:08X}")),
102+
mango.Concurrently(self.do_updates, (0.15, f"{5:08X}")),
103+
mango.Concurrently(self.do_updates, (0.1, f"{6:08X}")),
104+
]
105+
docs = query.get_result()
106+
for ref in updates:
107+
ref.join()
108+
assert len(docs) == 1

0 commit comments

Comments
 (0)