diff --git a/src/mango/src/mango_cursor_nouveau.erl b/src/mango/src/mango_cursor_nouveau.erl index 26358409a70..2588e94d970 100644 --- a/src/mango/src/mango_cursor_nouveau.erl +++ b/src/mango/src/mango_cursor_nouveau.erl @@ -35,7 +35,8 @@ user_fun, user_acc, fields, - execution_stats + execution_stats, + documents_seen }). create(Db, {Indexes, Trace}, Selector, Opts) -> @@ -103,7 +104,8 @@ execute(Cursor, UserFun, UserAcc) -> user_fun = UserFun, user_acc = UserAcc, fields = Cursor#cursor.fields, - execution_stats = mango_execution_stats:log_start(Stats) + execution_stats = mango_execution_stats:log_start(Stats), + documents_seen = sets:new([{version, 2}]) }, try case Query of @@ -171,28 +173,41 @@ handle_hit(CAcc0, Hit, Doc) -> #cacc{ limit = Limit, skip = Skip, - execution_stats = Stats + execution_stats = Stats, + documents_seen = Seen } = CAcc0, - CAcc1 = update_bookmark(CAcc0, Hit), Stats1 = mango_execution_stats:incr_docs_examined(Stats), couch_stats:increment_counter([mango, docs_examined]), - CAcc2 = CAcc1#cacc{execution_stats = Stats1}, - case mango_selector:match(CAcc2#cacc.selector, Doc) of - true when Skip > 0 -> - CAcc2#cacc{skip = Skip - 1}; - true when Limit == 0 -> - % We hit this case if the user spcified with a - % zero limit. Notice that in this case we need - % to return the bookmark from before this match - throw({stop, CAcc0}); - true when Limit == 1 -> - NewCAcc = apply_user_fun(CAcc2, Doc), - throw({stop, NewCAcc}); - true when Limit > 1 -> - NewCAcc = apply_user_fun(CAcc2, Doc), - NewCAcc#cacc{limit = Limit - 1}; + CAcc1 = CAcc0#cacc{execution_stats = Stats1}, + case mango_selector:match(CAcc1#cacc.selector, Doc) of + true -> + DocId = mango_doc:get_field(Doc, <<"_id">>), + case sets:is_element(DocId, Seen) of + true -> + CAcc1; + false -> + CAcc2 = update_bookmark(CAcc1, Hit), + CAcc3 = CAcc2#cacc{ + documents_seen = sets:add_element(DocId, Seen) + }, + if + Skip > 0 -> + CAcc3#cacc{skip = Skip - 1}; + Limit == 0 -> + % We hit this case if the user specified with a + % zero limit. Notice that in this case we need + % to return the bookmark from before this match. + throw({stop, CAcc0}); + Limit == 1 -> + CAcc4 = apply_user_fun(CAcc3, Doc), + throw({stop, CAcc4}); + Limit > 1 -> + CAcc4 = apply_user_fun(CAcc3, Doc), + CAcc4#cacc{limit = Limit - 1} + end + end; false -> - CAcc2 + CAcc1 end. apply_user_fun(CAcc, Doc) -> diff --git a/src/mango/src/mango_cursor_text.erl b/src/mango/src/mango_cursor_text.erl index 46692a4a77b..09d878afd2c 100644 --- a/src/mango/src/mango_cursor_text.erl +++ b/src/mango/src/mango_cursor_text.erl @@ -37,7 +37,8 @@ user_fun, user_acc, fields, - execution_stats + execution_stats, + documents_seen }). create(Db, {Indexes, Trace}, Selector, Opts0) -> @@ -108,7 +109,8 @@ execute(Cursor, UserFun, UserAcc) -> user_fun = UserFun, user_acc = UserAcc, fields = Cursor#cursor.fields, - execution_stats = mango_execution_stats:log_start(Stats) + execution_stats = mango_execution_stats:log_start(Stats), + documents_seen = sets:new([{version, 2}]) }, try case Query of @@ -179,28 +181,42 @@ handle_hit(CAcc0, Sort, Doc) -> #cacc{ limit = Limit, skip = Skip, - execution_stats = Stats + execution_stats = Stats, + documents_seen = Seen } = CAcc0, - CAcc1 = update_bookmark(CAcc0, Sort), Stats1 = mango_execution_stats:incr_docs_examined(Stats), couch_stats:increment_counter([mango, docs_examined]), - CAcc2 = CAcc1#cacc{execution_stats = Stats1}, - case mango_selector:match(CAcc2#cacc.selector, Doc) of - true when Skip > 0 -> - CAcc2#cacc{skip = Skip - 1}; - true when Limit == 0 -> - % We hit this case if the user spcified with a - % zero limit. Notice that in this case we need - % to return the bookmark from before this match - throw({stop, CAcc0}); - true when Limit == 1 -> - NewCAcc = apply_user_fun(CAcc2, Doc), - throw({stop, NewCAcc}); - true when Limit > 1 -> - NewCAcc = apply_user_fun(CAcc2, Doc), - NewCAcc#cacc{limit = Limit - 1}; + CAcc1 = CAcc0#cacc{execution_stats = Stats1}, + case mango_selector:match(CAcc1#cacc.selector, Doc) of + true -> + DocId = mango_doc:get_field(Doc, <<"_id">>), + case sets:is_element(DocId, Seen) of + true -> + CAcc1; + false -> + CAcc2 = update_bookmark(CAcc1, Sort), + CAcc3 = CAcc2#cacc{ + documents_seen = sets:add_element(DocId, Seen) + }, + if + Skip > 0 -> + CAcc3#cacc{skip = Skip - 1}; + Limit == 0 -> + % We hit this case if the user specified + % with a zero limit. Notice that in this + % case we need to return the bookmark from + % before this match. + throw({stop, CAcc0}); + Limit == 1 -> + CAcc4 = apply_user_fun(CAcc3, Doc), + throw({stop, CAcc4}); + Limit > 1 -> + CAcc4 = apply_user_fun(CAcc3, Doc), + CAcc4#cacc{limit = Limit - 1} + end + end; false -> - CAcc2 + CAcc1 end. apply_user_fun(CAcc, Doc) -> @@ -334,4 +350,841 @@ get_json_docs(DbName, Hits) -> Hits ). +%%%%%%%% module tests %%%%%%%% + +-ifdef(TEST). +-include_lib("couch/include/couch_eunit.hrl"). + +% This behavior needs to be revisited and potentially fixed, the tests +% below are only to record the current version. + +create_test_() -> + { + foreach, + fun() -> meck:expect(couch_db, name, [db], meck:val(db_name)) end, + fun(_) -> meck:unload() end, + [ + ?TDEF_FE(t_create_no_indexes), + ?TDEF_FE(t_create_multiple_indexes), + ?TDEF_FE(t_create_regular), + ?TDEF_FE(t_create_no_bookmark), + ?TDEF_FE(t_create_invalid_bookmark) + ] + }. + +t_create_no_indexes(_) -> + Exception = {mango_error, mango_cursor_text, multiple_text_indexes}, + ?assertThrow(Exception, create(db, {[], trace}, selector, options)). + +t_create_multiple_indexes(_) -> + Indexes = [index1, index2, index3], + Exception = {mango_error, mango_cursor_text, multiple_text_indexes}, + ?assertThrow(Exception, create(db, {Indexes, trace}, selector, options)). + +t_create_regular(_) -> + Index = #idx{type = <<"text">>}, + Indexes = [Index], + Trace = #{}, + Limit = 10, + Options = [{limit, Limit}, {skip, skip}, {fields, fields}, {bookmark, bookmark}], + Options1 = [{limit, Limit}, {skip, skip}, {fields, fields}, {bookmark, unpacked_bookmark}], + Cursor = #cursor{ + db = db, + index = Index, + ranges = null, + trace = Trace, + selector = selector, + opts = Options1, + limit = Limit, + skip = skip, + fields = fields + }, + meck:expect(dreyfus_bookmark, unpack, [db_name, bookmark], meck:val(unpacked_bookmark)), + ?assertEqual({ok, Cursor}, create(db, {Indexes, Trace}, selector, Options)). + +t_create_no_bookmark(_) -> + Limit = 99, + Options = [{limit, Limit}, {skip, skip}, {fields, fields}, {bookmark, nil}], + Options1 = [{limit, Limit}, {skip, skip}, {fields, fields}, {bookmark, []}], + Cursor = #cursor{ + db = db, + index = index, + ranges = null, + trace = trace, + selector = selector, + opts = Options1, + limit = Limit, + skip = skip, + fields = fields + }, + ?assertEqual({ok, Cursor}, create(db, {[index], trace}, selector, Options)). + +t_create_invalid_bookmark(_) -> + Options = [{bookmark, invalid}], + Exception = {mango_error, mango_cursor_text, {invalid_bookmark, invalid}}, + meck:expect(dreyfus_bookmark, unpack, [db_name, invalid], meck:raise(error, something)), + ?assertThrow(Exception, create(db, {[index], trace}, selector, Options)). + +execute_test_() -> + { + foreach, + fun() -> + meck:new(foo, [non_strict]), + meck:expect(couch_db, name, [db], meck:val(db_name)), + meck:expect(couch_stats, increment_counter, [[mango, docs_examined]], meck:val(ok)), + % Dummy mock functions to progressively update the + % respective states therefore their results could be + % asserted later on. + meck:expect( + dreyfus_bookmark, + pack, + fun(Bookmark) -> + case Bookmark of + nil -> null; + [bookmark, N] -> [bookmark, N] + end + end + ), + meck:expect( + dreyfus_bookmark, + update, + fun(_Sort, [bookmark, N], [#sortable{}]) -> [bookmark, N + 1] end + ), + meck:expect( + dreyfus_fabric, + get_json_docs, + fun(db_name, Ids) -> + IdDocs = lists:flatmap( + fun({id, N} = Id) -> + case N of + not_found -> []; + _ -> [{Id, {doc, {doc, N}}}] + end + end, + Ids + ), + {ok, IdDocs} + end + ), + meck:expect(mango_execution_stats, log_start, [stats], meck:val({stats, 0})), + meck:expect( + mango_execution_stats, + maybe_add_stats, + fun(_Options, _UserFun, {stats, N}, {acc, M}) -> {{acc, M + 1}, {stats, N + 1}} end + ), + meck:expect(mango_execution_stats, log_stats, [{stats, '_'}], meck:val(ok)), + meck:expect( + mango_cursor, + maybe_add_warning, + fun(_UserFun, _Cursor, {stats, _}, {acc, M}) -> {acc, M + 1} end + ), + meck:expect( + mango_execution_stats, + incr_docs_examined, + fun({stats, N}) -> {stats, N + 1} end + ), + meck:expect( + mango_execution_stats, + incr_results_returned, + fun({stats, N}) -> {stats, N + 1} end + ), + meck:expect( + mango_selector_text, + append_sort_type, + fun(RawField, selector) -> <">> end + ), + meck:expect(mango_doc, get_field, fun({doc, N}, <<"_id">>) -> N end), + meck:expect(mango_fields, extract, fun({doc, N}, fields) -> {final_doc, N} end), + meck:expect( + foo, + add_key_only, + fun({add_key, bookmark, [bookmark, _]}, {acc, N}) -> {ok, {acc, N + 1}} end + ), + meck:expect( + foo, + normal, + fun(Args, {acc, N}) -> + case Args of + {row, {final_doc, _}} -> ok; + {add_key, bookmark, [bookmark, _]} -> ok + end, + {ok, {acc, N + 1}} + end + ) + end, + fun(_) -> meck:unload() end, + [ + ?TDEF_FE(t_execute_empty, 10), + ?TDEF_FE(t_execute_no_results, 10), + ?TDEF_FE(t_execute_more_results, 10), + ?TDEF_FE(t_execute_unique_results, 10), + ?TDEF_FE(t_execute_limit_cutoff, 10), + ?TDEF_FE(t_execute_limit_cutoff_unique, 10), + ?TDEF_FE(t_execute_limit_zero, 10), + ?TDEF_FE(t_execute_limit_unique, 10), + ?TDEF_FE(t_execute_skip, 10), + ?TDEF_FE(t_execute_skip_unique, 10), + ?TDEF_FE(t_execute_no_matches, 10), + ?TDEF_FE(t_execute_mixed_matches, 10), + ?TDEF_FE(t_execute_user_fun_returns_stop, 10), + ?TDEF_FE(t_execute_search_error, 10) + ] + }. + +t_execute_empty(_) -> + Options = [{partition, partition}, {sort, {[]}}, {bookmark, [bookmark, 0]}], + Cursor = #cursor{ + db = db, + index = #idx{ddoc = <<"ddoc">>}, + limit = limit, + skip = skip, + fields = fields, + selector = selector, + opts = Options, + execution_stats = stats + }, + meck:expect(mango_selector_text, convert, [selector], meck:val(<<>>)), + ?assertEqual({ok, {acc, 3}}, execute(Cursor, fun foo:add_key_only/2, {acc, 0})), + ?assertEqual(0, meck:num_calls(couch_stats, increment_counter, 1)), + ?assertEqual(0, meck:num_calls(mango_execution_stats, incr_docs_examined, 1)), + ?assertEqual(0, meck:num_calls(mango_execution_stats, incr_results_returned, 1)). + +t_execute_no_results(_) -> + Limit = 10, + Skip = 0, + IdxDDoc = <<"ddoc">>, + IdxName = <<"index">>, + Sort = [<<"field1">>, <<"field2">>], + Options = [{partition, partition}, {sort, {Sort}}, {bookmark, [bookmark, 0]}], + Cursor = #cursor{ + db = db, + index = #idx{ddoc = IdxDDoc, name = IdxName}, + limit = Limit, + skip = Skip, + fields = fields, + selector = selector, + opts = Options, + execution_stats = stats + }, + QueryArgs = + #index_query_args{ + q = query, + partition = partition, + limit = Skip + Limit, + bookmark = [bookmark, 0], + sort = [<<"field1">>, <<"field2">>], + raw_bookmark = true + }, + meck:expect( + dreyfus_fabric_search, + go, + [db_name, IdxDDoc, IdxName, QueryArgs], + meck:val({ok, [bookmark, 1], undefined, [], undefined, undefined}) + ), + meck:expect(mango_selector_text, convert, [selector], meck:val(query)), + meck:expect(mango_selector, match, fun(selector, {doc, _}) -> true end), + ?assertEqual({ok, {acc, 3}}, execute(Cursor, fun foo:add_key_only/2, {acc, 0})), + ?assertEqual(0, meck:num_calls(dreyfus_fabric, get_json_docs, 2)), + ?assertEqual(0, meck:num_calls(couch_stats, increment_counter, 1)), + ?assertEqual(0, meck:num_calls(mango_execution_stats, incr_docs_examined, 1)), + ?assertEqual(0, meck:num_calls(mango_execution_stats, incr_results_returned, 1)). + +t_execute_more_results(_) -> + Options = [{partition, partition}, {sort, {[]}}, {bookmark, [bookmark, 0]}], + Cursor = #cursor{ + db = db, + index = #idx{ddoc = <<"ddoc">>, name = <<"index">>}, + limit = 10, + skip = 0, + fields = fields, + selector = selector, + opts = Options, + execution_stats = stats + }, + meck:expect( + dreyfus_fabric_search, + go, + fun(db_name, <<"ddoc">>, <<"index">>, QueryArgs) -> + #index_query_args{ + q = query, + partition = partition, + bookmark = [bookmark, B], + sort = relevance, + raw_bookmark = true + } = QueryArgs, + Hits = + case B of + 0 -> + Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 1}}]}}, + Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, {id, not_found}}]}}, + Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, {id, not_found}}]}}, + [Hit1, Hit2, Hit3]; + 4 -> + Hit4 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 4}}]}}, + Hit5 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 5}}]}}, + [Hit4, Hit5]; + _ -> + [] + end, + {ok, [bookmark, B + 1], undefined, Hits, undefined, undefined} + end + ), + meck:expect(mango_selector_text, convert, [selector], meck:val(query)), + meck:expect(mango_selector, match, fun(selector, {doc, _}) -> true end), + ?assertEqual({ok, {acc, 6}}, execute(Cursor, fun foo:normal/2, {acc, 0})), + ?assertEqual(3, meck:num_calls(couch_stats, increment_counter, 1)), + ?assertEqual(3, meck:num_calls(mango_execution_stats, incr_docs_examined, 1)), + ?assertEqual(3, meck:num_calls(mango_execution_stats, incr_results_returned, 1)). + +t_execute_unique_results(_) -> + UniqueHits = 3, + Options = [{partition, partition}, {sort, {[]}}, {bookmark, []}], + Cursor = #cursor{ + db = db, + index = #idx{ddoc = <<"ddoc">>, name = <<"index">>}, + limit = 10, + skip = 0, + fields = fields, + selector = selector, + opts = Options, + execution_stats = stats + }, + meck:expect( + dreyfus_fabric_search, + go, + fun(db_name, <<"ddoc">>, <<"index">>, QueryArgs) -> + #index_query_args{ + q = query, + partition = partition, + bookmark = B, + sort = relevance, + raw_bookmark = true + } = QueryArgs, + {Bookmark, Hits} = + case B of + nil -> + Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 1}}]}}, + Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 2}}]}}, + Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 3}}]}}, + {[bookmark, 0], [Hit1, Hit2, Hit3]}; + [bookmark, 3] -> + Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 1}}]}}, + Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 2}}]}}, + Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 3}}]}}, + {[bookmark, 4], [Hit3, Hit2, Hit1]}; + [bookmark, N] -> + {[bookmark, N + 1], []} + end, + {ok, Bookmark, undefined, Hits, undefined, undefined} + end + ), + meck:expect(mango_selector_text, convert, [selector], meck:val(query)), + meck:expect(mango_selector, match, fun(selector, {doc, _}) -> true end), + ?assertEqual({ok, {acc, 6}}, execute(Cursor, fun foo:normal/2, {acc, 0})), + ?assertEqual(6, meck:num_calls(couch_stats, increment_counter, 1)), + ?assertEqual(6, meck:num_calls(mango_execution_stats, incr_docs_examined, 1)), + ?assertEqual(UniqueHits, meck:num_calls(mango_execution_stats, incr_results_returned, 1)). + +t_execute_limit_cutoff(_) -> + Limit = 2, + Skip = 0, + IdxName = <<"index">>, + Sort = [{<<"field1">>, <<"desc">>}, {<<"field2">>, <<"asc">>}], + Options = [{partition, partition}, {sort, {Sort}}, {bookmark, []}], + Cursor = #cursor{ + db = db, + index = #idx{ddoc = <<"_design/ddoc">>, name = IdxName}, + limit = Limit, + skip = Skip, + fields = fields, + selector = selector, + opts = Options, + execution_stats = stats + }, + QueryArgs = + #index_query_args{ + q = query, + partition = partition, + limit = Skip + Limit, + bookmark = nil, + sort = [<<"-field1">>, <<"field2">>], + raw_bookmark = true + }, + Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 1}}]}}, + Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 2}}]}}, + Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 3}}]}}, + Hits = [Hit1, Hit2, Hit3], + meck:expect( + dreyfus_fabric_search, + go, + [db_name, <<"ddoc">>, IdxName, QueryArgs], + meck:val({ok, [bookmark, 1], undefined, Hits, undefined, undefined}) + ), + meck:expect(mango_selector_text, convert, [selector], meck:val(query)), + meck:expect(mango_selector, match, fun(selector, {doc, _}) -> true end), + ?assertEqual({ok, {acc, 5}}, execute(Cursor, fun foo:normal/2, {acc, 0})), + ?assertEqual(Limit, meck:num_calls(couch_stats, increment_counter, 1)), + ?assertEqual(Limit, meck:num_calls(mango_execution_stats, incr_docs_examined, 1)), + ?assertEqual(Limit, meck:num_calls(mango_execution_stats, incr_results_returned, 1)). + +t_execute_limit_cutoff_unique(_) -> + Limit = 4, + ActualHits = 3, + AllHits = 6, + Options = [{partition, partition}, {sort, {[]}}, {bookmark, []}], + Cursor = #cursor{ + db = db, + index = #idx{ddoc = <<"ddoc">>, name = <<"index">>}, + limit = Limit, + skip = 0, + fields = fields, + selector = selector, + opts = Options, + execution_stats = stats + }, + meck:expect( + dreyfus_fabric_search, + go, + fun(db_name, <<"ddoc">>, <<"index">>, QueryArgs) -> + #index_query_args{ + q = query, + partition = partition, + bookmark = B, + sort = relevance, + raw_bookmark = true + } = QueryArgs, + {Bookmark, Hits} = + case B of + nil -> + Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 1}}]}}, + Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 2}}]}}, + Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 3}}]}}, + {[bookmark, 0], [Hit1, Hit2, Hit3]}; + [bookmark, 3] -> + Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 1}}]}}, + Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 2}}]}}, + Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 3}}]}}, + {[bookmark, 4], [Hit3, Hit2, Hit1]}; + [bookmark, 4] -> + {[bookmark, 5], []} + end, + {ok, Bookmark, undefined, Hits, undefined, undefined} + end + ), + meck:expect(mango_selector_text, convert, [selector], meck:val(query)), + meck:expect(mango_selector, match, fun(selector, {doc, _}) -> true end), + ?assertEqual({ok, {acc, 6}}, execute(Cursor, fun foo:normal/2, {acc, 0})), + ?assertEqual(AllHits, meck:num_calls(couch_stats, increment_counter, 1)), + ?assertEqual(AllHits, meck:num_calls(mango_execution_stats, incr_docs_examined, 1)), + ?assertEqual(ActualHits, meck:num_calls(mango_execution_stats, incr_results_returned, 1)). + +t_execute_limit_zero(_) -> + Limit = 0, + Skip = 0, + IdxName = <<"index">>, + Options = [{partition, <<>>}, {sort, {[]}}, {bookmark, [bookmark, 0]}], + Cursor = #cursor{ + db = db, + index = #idx{ddoc = <<"_design/ddoc">>, name = IdxName}, + limit = Limit, + skip = Skip, + fields = fields, + selector = selector, + opts = Options, + execution_stats = stats + }, + QueryArgs = + #index_query_args{ + q = query, + partition = nil, + limit = Skip + Limit, + bookmark = [bookmark, 0], + sort = relevance, + raw_bookmark = true + }, + Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 1}}]}}, + Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 2}}]}}, + Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 3}}]}}, + Hits = [Hit1, Hit2, Hit3], + meck:expect( + dreyfus_fabric_search, + go, + [db_name, <<"ddoc">>, IdxName, QueryArgs], + meck:val({ok, [bookmark, 1], undefined, Hits, undefined, undefined}) + ), + meck:expect(mango_selector_text, convert, [selector], meck:val(query)), + meck:expect(mango_selector, match, fun(selector, {doc, _}) -> true end), + ?assertEqual({ok, {acc, 3}}, execute(Cursor, fun foo:add_key_only/2, {acc, 0})), + ?assertEqual(1, meck:num_calls(couch_stats, increment_counter, 1)), + ?assertEqual(1, meck:num_calls(mango_execution_stats, incr_docs_examined, 1)), + ?assertEqual(Limit, meck:num_calls(mango_execution_stats, incr_results_returned, 1)). + +t_execute_limit_unique(_) -> + Limit = 5, + AllHits = 6, + UniqueHits = 3, + Options = [{partition, partition}, {sort, {[]}}, {bookmark, []}], + Cursor = #cursor{ + db = db, + index = #idx{ddoc = <<"ddoc">>, name = <<"index">>}, + limit = Limit, + skip = 0, + fields = fields, + selector = selector, + opts = Options, + execution_stats = stats + }, + meck:expect( + dreyfus_fabric_search, + go, + fun(db_name, <<"ddoc">>, <<"index">>, QueryArgs) -> + #index_query_args{ + q = query, + partition = partition, + bookmark = B, + sort = relevance, + raw_bookmark = true + } = QueryArgs, + {Bookmark, Hits} = + case B of + nil -> + Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 1}}]}}, + Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 2}}]}}, + Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 3}}]}}, + {[bookmark, 0], [Hit1, Hit2, Hit3]}; + [bookmark, 3] -> + Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 1}}]}}, + Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 2}}]}}, + Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 3}}]}}, + {[bookmark, 4], [Hit3, Hit2, Hit1]}; + [bookmark, 4] -> + {[bookmark, 5], []} + end, + {ok, Bookmark, undefined, Hits, undefined, undefined} + end + ), + meck:expect(mango_selector_text, convert, [selector], meck:val(query)), + meck:expect(mango_selector, match, fun(selector, {doc, _}) -> true end), + ?assertEqual({ok, {acc, 6}}, execute(Cursor, fun foo:normal/2, {acc, 0})), + ?assertEqual(AllHits, meck:num_calls(couch_stats, increment_counter, 1)), + ?assertEqual(AllHits, meck:num_calls(mango_execution_stats, incr_docs_examined, 1)), + ?assertEqual(UniqueHits, meck:num_calls(mango_execution_stats, incr_results_returned, 1)). + +t_execute_skip(_) -> + UniqueHits = 3, + Skip = 2, + Options = [{partition, <<>>}, {sort, {[]}}, {bookmark, [bookmark, 0]}], + Cursor = #cursor{ + db = db, + index = #idx{ddoc = <<"_design/ddoc">>, name = <<"index">>}, + limit = 10, + skip = Skip, + fields = fields, + selector = selector, + opts = Options, + execution_stats = stats + }, + meck:expect( + dreyfus_fabric_search, + go, + fun(db_name, <<"ddoc">>, <<"index">>, QueryArgs) -> + #index_query_args{ + q = query, + partition = nil, + bookmark = [bookmark, B], + sort = relevance, + raw_bookmark = true + } = QueryArgs, + Hits = + case B of + 0 -> + Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 1}}]}}, + Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 2}}]}}, + Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 3}}]}}, + [Hit1, Hit2, Hit3]; + _ -> + [] + end, + {ok, [bookmark, B + 1], undefined, Hits, undefined, undefined} + end + ), + meck:expect(mango_selector_text, convert, [selector], meck:val(query)), + meck:expect(mango_selector, match, fun(selector, {doc, _}) -> true end), + ?assertEqual({ok, {acc, 4}}, execute(Cursor, fun foo:normal/2, {acc, 0})), + ?assertEqual(UniqueHits, meck:num_calls(couch_stats, increment_counter, 1)), + ?assertEqual(UniqueHits, meck:num_calls(mango_execution_stats, incr_docs_examined, 1)), + ?assertEqual( + UniqueHits - Skip, meck:num_calls(mango_execution_stats, incr_results_returned, 1) + ). + +t_execute_skip_unique(_) -> + AllHits = 6, + UniqueHits = 3, + Skip = 2, + Options = [{partition, partition}, {sort, {[]}}, {bookmark, []}], + Cursor = #cursor{ + db = db, + index = #idx{ddoc = <<"ddoc">>, name = <<"index">>}, + limit = 10, + skip = Skip, + fields = fields, + selector = selector, + opts = Options, + execution_stats = stats + }, + meck:expect( + dreyfus_fabric_search, + go, + fun(db_name, <<"ddoc">>, <<"index">>, QueryArgs) -> + #index_query_args{ + q = query, + partition = partition, + bookmark = B, + sort = relevance, + raw_bookmark = true + } = QueryArgs, + {Bookmark, Hits} = + case B of + nil -> + Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 1}}]}}, + Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 2}}]}}, + Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 3}}]}}, + {[bookmark, 0], [Hit1, Hit2, Hit3]}; + [bookmark, 3] -> + Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 1}}]}}, + Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 2}}]}}, + Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 3}}]}}, + {[bookmark, 4], [Hit3, Hit2, Hit1]}; + [bookmark, N] -> + {[bookmark, N + 1], []} + end, + {ok, Bookmark, undefined, Hits, undefined, undefined} + end + ), + meck:expect(mango_selector_text, convert, [selector], meck:val(query)), + meck:expect(mango_selector, match, fun(selector, {doc, _}) -> true end), + ?assertEqual({ok, {acc, 4}}, execute(Cursor, fun foo:normal/2, {acc, 0})), + ?assertEqual(AllHits, meck:num_calls(couch_stats, increment_counter, 1)), + ?assertEqual(AllHits, meck:num_calls(mango_execution_stats, incr_docs_examined, 1)), + ?assertEqual( + UniqueHits - Skip, meck:num_calls(mango_execution_stats, incr_results_returned, 1) + ). + +t_execute_no_matches(_) -> + UniqueHits = 3, + Matches = 0, + Options = [{partition, partition}, {sort, {[]}}, {bookmark, [bookmark, 0]}], + Cursor = #cursor{ + db = db, + index = #idx{ddoc = <<"ddoc">>, name = <<"index">>}, + limit = 10, + skip = 0, + fields = fields, + selector = selector, + opts = Options, + execution_stats = stats + }, + meck:expect( + dreyfus_fabric_search, + go, + fun(db_name, <<"ddoc">>, <<"index">>, QueryArgs) -> + #index_query_args{ + q = query, + partition = partition, + bookmark = [bookmark, B], + sort = relevance, + raw_bookmark = true + } = QueryArgs, + Hits = + case B of + 0 -> + Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 1}}]}}, + Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 2}}]}}, + Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 3}}]}}, + [Hit1, Hit2, Hit3]; + _ -> + [] + end, + {ok, [bookmark, B + 1], undefined, Hits, undefined, undefined} + end + ), + meck:expect(mango_selector_text, convert, [selector], meck:val(query)), + meck:expect(mango_selector, match, fun(selector, {doc, _}) -> false end), + ?assertEqual({ok, {acc, 3}}, execute(Cursor, fun foo:add_key_only/2, {acc, 0})), + ?assertEqual(UniqueHits, meck:num_calls(couch_stats, increment_counter, 1)), + ?assertEqual(UniqueHits, meck:num_calls(mango_execution_stats, incr_docs_examined, 1)), + ?assertEqual(Matches, meck:num_calls(mango_execution_stats, incr_results_returned, 1)). + +t_execute_mixed_matches(_) -> + UniqueHits = 3, + Matches = 1, + Options = [{partition, partition}, {sort, {[]}}, {bookmark, [bookmark, 0]}], + Cursor = #cursor{ + db = db, + index = #idx{ddoc = <<"ddoc">>, name = <<"index">>}, + limit = 10, + skip = 0, + fields = fields, + selector = selector, + opts = Options, + execution_stats = stats + }, + meck:expect( + dreyfus_fabric_search, + go, + fun(db_name, <<"ddoc">>, <<"index">>, QueryArgs) -> + #index_query_args{ + q = query, + partition = partition, + bookmark = [bookmark, B], + sort = relevance, + raw_bookmark = true + } = QueryArgs, + Hits = + case B of + 0 -> + Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 1}}]}}, + Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 2}}]}}, + Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 3}}]}}, + [Hit1, Hit2, Hit3]; + _ -> + [] + end, + {ok, [bookmark, B + 1], undefined, Hits, undefined, undefined} + end + ), + meck:expect(mango_selector_text, convert, [selector], meck:val(query)), + meck:expect(mango_selector, match, fun(selector, {doc, N}) -> N == 2 end), + ?assertEqual({ok, {acc, 4}}, execute(Cursor, fun foo:normal/2, {acc, 0})), + ?assertEqual(UniqueHits, meck:num_calls(couch_stats, increment_counter, 1)), + ?assertEqual(UniqueHits, meck:num_calls(mango_execution_stats, incr_docs_examined, 1)), + ?assertEqual(Matches, meck:num_calls(mango_execution_stats, incr_results_returned, 1)). + +t_execute_user_fun_returns_stop(_) -> + UniqueHits = 3, + Limit = 10, + Skip = 0, + IdxDDoc = <<"ddoc">>, + IdxName = <<"index">>, + Options = [{partition, partition}, {sort, {[]}}, {bookmark, [bookmark, 0]}], + Cursor = #cursor{ + db = db, + index = #idx{ddoc = IdxDDoc, name = IdxName}, + limit = Limit, + skip = Skip, + fields = fields, + selector = selector, + opts = Options, + execution_stats = stats + }, + QueryArgs = + #index_query_args{ + q = query, + partition = partition, + limit = Skip + Limit, + bookmark = [bookmark, 0], + sort = relevance, + raw_bookmark = true + }, + Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 1}}]}}, + Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 2}}]}}, + Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, {id, 3}}]}}, + Hits = [Hit1, Hit2, Hit3], + meck:expect( + dreyfus_fabric_search, + go, + [db_name, IdxDDoc, IdxName, QueryArgs], + meck:val({ok, [bookmark, 1], undefined, Hits, undefined, undefined}) + ), + meck:expect(mango_selector_text, convert, [selector], meck:val(query)), + meck:expect(mango_selector, match, fun(selector, {doc, _}) -> true end), + meck:expect( + foo, + stops, + fun(Args, {acc, N}) -> + case Args of + {row, {final_doc, _}} -> ok; + {add_key, bookmark, [bookmark, _]} -> ok + end, + Status = + case N of + 2 -> stop; + _ -> ok + end, + {Status, {acc, N + 1}} + end + ), + ?assertEqual({ok, {acc, 6}}, execute(Cursor, fun foo:stops/2, {acc, 0})), + ?assertEqual(UniqueHits, meck:num_calls(couch_stats, increment_counter, 1)), + ?assertEqual(UniqueHits, meck:num_calls(mango_execution_stats, incr_docs_examined, 1)), + ?assertEqual(3, meck:num_calls(mango_execution_stats, incr_results_returned, 1)). + +t_execute_search_error(_) -> + Limit = 10, + Skip = 0, + IdxDDoc = <<"ddoc">>, + IdxName = <<"index">>, + Options = [{partition, <<>>}, {sort, {[]}}, {bookmark, [bookmark, 0]}], + Cursor = #cursor{ + db = db, + index = #idx{ddoc = IdxDDoc, name = IdxName}, + limit = Limit, + skip = Skip, + fields = fields, + selector = selector, + opts = Options, + execution_stats = stats + }, + QueryArgs = + #index_query_args{ + q = query, + partition = nil, + limit = Skip + Limit, + bookmark = [bookmark, 0], + sort = relevance, + raw_bookmark = true + }, + meck:expect( + dreyfus_fabric_search, + go, + [db_name, IdxDDoc, IdxName, QueryArgs], + meck:val({error, reason}) + ), + meck:expect(mango_selector_text, convert, [selector], meck:val(query)), + Exception = {mango_error, mango_cursor_text, {text_search_error, {error, reason}}}, + ?assertThrow(Exception, execute(Cursor, fun foo:normal/2, {acc, 0})), + ?assertEqual(0, meck:num_calls(dreyfus_fabric, get_json_docs, 2)), + ?assertEqual(0, meck:num_calls(foo, normal, 2)), + ?assertEqual(0, meck:num_calls(couch_stats, increment_counter, 1)), + ?assertEqual(0, meck:num_calls(mango_execution_stats, incr_docs_examined, 1)), + ?assertEqual(0, meck:num_calls(mango_execution_stats, incr_results_returned, 1)). + +explain_test_() -> + { + foreach, + fun() -> ok end, + fun(_) -> meck:unload() end, + [ + ?TDEF_FE(t_explain) + ] + }. + +t_explain(_) -> + Options = [{partition, partition}, {sort, {[]}}], + Cursor = + #cursor{ + selector = selector, + opts = Options + }, + Response = + [ + {query, converted_selector}, + {partition, partition}, + {sort, relevance} + ], + meck:expect(mango_selector_text, convert, [selector], meck:val(converted_selector)), + ?assertEqual(Response, explain(Cursor)). + +-endif. + -endif. diff --git a/src/mango/test/24-text-paginated-test.py b/src/mango/test/24-text-paginated-test.py new file mode 100644 index 00000000000..84b3a2f4b2d --- /dev/null +++ b/src/mango/test/24-text-paginated-test.py @@ -0,0 +1,108 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + + +import mango +import unittest +import time + + +@unittest.skipUnless(mango.has_text_service(), "requires text service") +class PaginatedResultsTest(mango.DbPerClass): + # Great enough to make faster systems busy while running the + # query. + NUM_DOCS = 10000 + UPDATES = 25 + + def setUp(self): + self.db.recreate() + self.db.create_text_index( + analyzer="keyword", + default_field={}, + selector={}, + fields=[ + {"name": "_id", "type": "string"}, + {"name": "name", "type": "string"}, + ], + index_array_lengths=True, + ) + docs = [ + {"_id": f"{doc_id:08X}", "name": mango.random_string(32)} + for doc_id in range(self.NUM_DOCS) + ] + self.db.save_docs(docs) + + def test_query_with_lot_of_results(self): + # 200 is the maximum for `text` searches. + docs = self.db.find(selector={"_id": {"$lte": f"{1000:08X}"}}, limit=200) + assert len(docs) == 200 + + def do_query(self, delay, find_args): + time.sleep(delay) + return self.db.find(*find_args) + + def do_updates(self, pause, doc_id): + for i in range(self.UPDATES): + doc = self.db.open_doc(doc_id) + updated_doc = { + "_id": doc_id, + "_rev": doc["_rev"], + "update": i, + "name": "foobar", + } + self.db.save_doc(updated_doc) + time.sleep(pause) + + def test_no_duplicates_on_interleaved_updates(self): + # Give ~500 ms head start for the updates before running the + # query. + query = mango.Concurrently(self.do_query, (0.5, ({"name": "foobar"},))) + # Keep updating the target document in every 200 ms. + updates = mango.Concurrently(self.do_updates, (0.2, f"{2:08X}")) + docs = query.get_result() + updates.join() + assert len(docs) == 1 + + def test_no_duplicates_on_interleaved_updates_heavy(self): + query = mango.Concurrently(self.do_query, (0.5, ({"name": "foobar"},))) + updates = [ + mango.Concurrently(self.do_updates, (0.05, f"{2:08X}")), + mango.Concurrently(self.do_updates, (0.2, f"{3:08X}")), + mango.Concurrently(self.do_updates, (0.3, f"{4:08X}")), + mango.Concurrently(self.do_updates, (0.15, f"{5:08X}")), + mango.Concurrently(self.do_updates, (0.1, f"{6:08X}")), + ] + docs = query.get_result() + for ref in updates: + ref.join() + ids = list(map(lambda d: d["_id"], docs)) + assert sorted(ids) == [ + f"{2:08X}", + f"{3:08X}", + f"{4:08X}", + f"{5:08X}", + f"{6:08X}", + ] + + def test_no_duplicates_on_interleaved_updates_with_limit_skip(self): + query = mango.Concurrently(self.do_query, (0.5, ({"name": "foobar"}, 1, 3))) + updates = [ + mango.Concurrently(self.do_updates, (0.05, f"{2:08X}")), + mango.Concurrently(self.do_updates, (0.2, f"{3:08X}")), + mango.Concurrently(self.do_updates, (0.3, f"{4:08X}")), + mango.Concurrently(self.do_updates, (0.15, f"{5:08X}")), + mango.Concurrently(self.do_updates, (0.1, f"{6:08X}")), + ] + docs = query.get_result() + for ref in updates: + ref.join() + assert len(docs) == 1 diff --git a/src/mango/test/mango.py b/src/mango/test/mango.py index 2dff18f4006..b8a6cd9a5aa 100644 --- a/src/mango/test/mango.py +++ b/src/mango/test/mango.py @@ -11,10 +11,13 @@ # the License. import json +import random +import string import time import unittest import uuid import os +import threading import requests @@ -28,10 +31,18 @@ COUCH_PASS = os.environ.get("COUCH_PASS") +BULK_BATCH_SIZE = 500 + + def random_db_name(): return "mango_test_" + uuid.uuid4().hex +def random_string(n_max): + n = random.choice(range(n_max)) + return "".join(random.choice(string.ascii_letters) for _ in range(n)) + + def has_text_service(): features = requests.get(COUCH_HOST).json()["features"] return "search" in features @@ -47,6 +58,27 @@ def delay(n=5, t=0.5): time.sleep(t) +class Concurrently(object): + def __init__(self, thread, thread_args, start=True): + self.thread = threading.Thread(target=self.wrapper, args=(thread, thread_args)) + self.return_value = None + if start: + self.start() + + def wrapper(self, body, args): + self.return_value = body(*args) + + def start(self): + self.thread.start() + + def get_result(self): + self.thread.join() + return self.return_value + + def join(self): + self.thread.join() + + class Database(object): def __init__( self, @@ -103,12 +135,14 @@ def save_docs_with_conflicts(self, docs, **kwargs): r.raise_for_status() def save_docs(self, docs, **kwargs): - body = json.dumps({"docs": docs}) - r = self.sess.post(self.path("_bulk_docs"), data=body, params=kwargs) - r.raise_for_status() - for doc, result in zip(docs, r.json()): - doc["_id"] = result["id"] - doc["_rev"] = result["rev"] + for offset in range(0, len(docs), BULK_BATCH_SIZE): + chunk = docs[offset : (offset + BULK_BATCH_SIZE)] + body = {"docs": chunk} + r = self.sess.post(self.path("_bulk_docs"), json=body, params=kwargs) + r.raise_for_status() + for doc, result in zip(chunk, r.json()): + doc["_id"] = result["id"] + doc["_rev"] = result["rev"] def open_doc(self, docid): r = self.sess.get(self.path(docid))