Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/mango/src/mango.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@
-type selector() :: any().
-type ejson() :: {[{atom(), any()}]}.

-type shard_stats() :: {docs_examined, non_neg_integer()}.
-type shard_stats() :: shard_stats_v1() | shard_stats_v2().

-type shard_stats_v1() :: {docs_examined, non_neg_integer()}.
-type shard_stats_v2() ::
#{
docs_examined => non_neg_integer(),
keys_examined => non_neg_integer()
}.

-type row_property_key() :: id | key | value | doc.
-type row_properties() :: [{row_property_key(), any()}].
139 changes: 133 additions & 6 deletions src/mango/src/mango_cursor_view.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,26 @@ viewcbargs_get(fields, Args) when is_map(Args) ->
viewcbargs_get(covering_index, Args) when is_map(Args) ->
maps:get(covering_index, Args, undefined).

% This is not yet in use intentionally.
-spec shard_stats(DocsExamined, KeysExamined) -> ShardStats when
DocsExamined :: non_neg_integer(),
KeysExamined :: non_neg_integer(),
ShardStats :: shard_stats_v2().
shard_stats(DocsExamined, KeysExamined) ->
#{
docs_examined => DocsExamined,
keys_examined => KeysExamined
}.

-spec shard_stats_get(Key, Args) -> Stat when
Key :: docs_examined | keys_examined,
Args :: shard_stats_v2(),
Stat :: non_neg_integer().
shard_stats_get(docs_examined, Args) when is_map(Args) ->
maps:get(docs_examined, Args, 0);
shard_stats_get(keys_examined, Args) when is_map(Args) ->
maps:get(keys_examined, Args, 0).

-spec create(Db, Indexes, Selector, Options) -> {ok, #cursor{}} when
Db :: database(),
Indexes :: [#idx{}],
Expand Down Expand Up @@ -325,10 +345,13 @@ choose_best_index(IndexRanges) ->
view_cb({meta, Meta}, Acc) ->
% Map function starting
put(mango_docs_examined, 0),
put(mango_keys_examined, 0),
set_mango_msg_timestamp(),
ok = rexi:stream2({meta, Meta}),
{ok, Acc};
view_cb({row, Row}, #mrargs{extra = Options} = Acc) ->
put(mango_keys_examined, get(mango_keys_examined) + 1),
couch_stats:increment_counter([mango, keys_examined]),
ViewRow = #view_row{
id = couch_util:get_value(id, Row),
key = couch_util:get_value(key, Row),
Expand Down Expand Up @@ -457,12 +480,21 @@ handle_message({row, Props}, Cursor) ->
couch_log:error("~s :: Error loading doc: ~p", [?MODULE, Error]),
{ok, Cursor}
end;
handle_message({execution_stats, ShardStats}, #cursor{execution_stats = Stats} = Cursor) ->
{docs_examined, DocsExamined} = ShardStats,
Cursor1 = Cursor#cursor{
% TODO: remove clause couchdb 4 -- mixed-version upgrade support
handle_message(
{execution_stats, {docs_examined, DocsExamined}}, #cursor{execution_stats = Stats} = Cursor0
Copy link
Contributor

@nickva nickva May 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny nit: It might look better to pick out Stats in a separate line after the function head, since with the {execution_stats, {docs_examined, DocsExamined}} the head is getting rather long.

Also, in the second clause it might be better to explicitly assert we're matching on a map.

handle_message({execution_stats, {docs_examined, DocsExamined}}, #cursor{} = Cursor0) ->
    #cursor{execution_stats = Stats} = Cursor0,
    ...
handle_message({execution_stats, #{} = ShardStats}, #cursor{} = Cursor0) ->
    #cursor{execution_stats = Stats} = Cursor0,
    ...  

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Nick for the comments! I have had the reasons for these choices, let me add them for clarification:

  • The {execution_stats, {docs_examined, DocsExamined}} pattern was embedded into the function head to avoid the potential overlap with other clause. My understanding is that heads are checked top-down and the body of the first matching one is going to be evaluated. If there is no direct pattern to match on the contents of the argument of execution_stats (i.e. the second element in the tuple) in the head, the clause will be skipped and missed.

  • For the second clause on execution_stats, I avoided to match on the type of ShardStats to keep it opaque. Ideally, mango_cursor_view:shard_stats_get/2 will handle the map check eventually. I am aware that might be too late, but there are no other alternatives present to be confused with. At the same time, changes to the format of shard stats is encapsulated into the implementation of the shard_stats/2 and shard_stats_get/2 functions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The {execution_stats, {docs_examined, DocsExamined}} pattern was embedded into the function head to avoid the potential overlap with other clause.

That's a good way to do it. The heads should go from the most specific one to the more general. The first comment wasn't about the order of clauses but about not having the line get too long. In the first head we're doing at least two unrelated things: match on {execution_stats, {docs_examined, DocsExamined}} and picking out Stats out of the #cursor. The idea of the cleanup is to pick out the Stats variable in a separate line.

I avoided to match on the type of ShardStats to keep it opaque.

I had noticed but it would still look better have an extra match since we're juggling two different formats. We're not breaking the opaqueness too much as we're not picking out individual keys from the map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of the cleanup is to pick out the Stats variable in a separate line.

Oh, you are right! Yes, that is a good idea.

We're not breaking the opaqueness too much as we're not picking out individual keys from the map.

That one still forces the map type on the container. That is true that checking only if that is map would not interfere with the actual keys, but what if we wanted to move away from the maps for a different representation?

Copy link
Contributor

@nickva nickva May 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a trade-off: preserve complete opaqueness or provide immediate visual and runtime feedback that the type is wrong in that clause. If we switch away from a map, we'd get an immediate failure in testing. In a strongly-typed language it might be better not to be "leaky", but in Erlang matching on things we expect as early as possible is more common. But that's fine, if you feel strongly about it. It's +1 either, way. If you rebase again, I'll merge (to get a clean rebase in main).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not have a strong feeling about it. As you have also put it, this might be kind of a habit that I acquired from strongly typed languages and I accept that the same may apply to Erlang. I can follow this idiom.

But please do not merge this PR for now. In response to this approach, I was suggested to handle the question of transition through configuration options and add support for both the sender and the receiver sides in a single change. I will file another PR for that alternative, and it is very likely that will be the winner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads-up, let me know we'll take another look at it later.

) ->
Cursor = Cursor0#cursor{
execution_stats = mango_execution_stats:incr_docs_examined(Stats, DocsExamined)
},
{ok, Cursor1};
{ok, Cursor};
handle_message({execution_stats, ShardStats}, #cursor{execution_stats = Stats0} = Cursor0) ->
DocsExamined = shard_stats_get(docs_examined, ShardStats),
KeysExamined = shard_stats_get(keys_examined, ShardStats),
Stats1 = mango_execution_stats:incr_docs_examined(Stats0, DocsExamined),
Stats = mango_execution_stats:incr_keys_examined(Stats1, KeysExamined),
Cursor = Cursor0#cursor{execution_stats = Stats},
{ok, Cursor};
handle_message(complete, Cursor) ->
{ok, Cursor};
handle_message({error, Reason}, _Cursor) ->
Expand Down Expand Up @@ -669,6 +701,12 @@ viewcbargs_test() ->
?assertEqual(index, viewcbargs_get(covering_index, ViewCBArgs)),
?assertError(function_clause, viewcbargs_get(something_else, ViewCBArgs)).

shard_stats_test() ->
ShardStats = shard_stats(docs_examined, keys_examined),
?assertEqual(docs_examined, shard_stats_get(docs_examined, ShardStats)),
?assertEqual(keys_examined, shard_stats_get(keys_examined, ShardStats)),
?assertError(function_clause, shard_stats_get(something_else, ShardStats)).

maybe_replace_max_json_test() ->
?assertEqual([], maybe_replace_max_json([])),
?assertEqual(<<"<MAX>">>, maybe_replace_max_json(?MAX_STR)),
Expand Down Expand Up @@ -943,6 +981,7 @@ execute_test_() ->
[
?TDEF_FE(t_execute_empty),
?TDEF_FE(t_execute_ok_all_docs),
?TDEF_FE(t_execute_ok_all_docs_with_execution_stats),
?TDEF_FE(t_execute_ok_query_view),
?TDEF_FE(t_execute_error)
]
Expand Down Expand Up @@ -1082,6 +1121,78 @@ t_execute_ok_query_view(_) ->
?assertEqual({ok, updated_accumulator}, execute(Cursor, fun foo:bar/2, accumulator)),
?assert(meck:called(fabric, query_view, '_')).

t_execute_ok_all_docs_with_execution_stats(_) ->
Bookmark = bookmark,
Stats =
{[
{total_keys_examined, 0},
{total_docs_examined, 0},
{total_quorum_docs_examined, 0},
{results_returned, 0},
{execution_time_ms, '_'}
]},
UserFnDefinition =
[
{[{add_key, bookmark, Bookmark}, accumulator], {undefined, updated_accumulator1}},
{
[{add_key, execution_stats, Stats}, updated_accumulator1],
{undefined, updated_accumulator2}
}
],
meck:expect(foo, bar, UserFnDefinition),
Index = #idx{type = <<"json">>, def = all_docs},
Selector = {[]},
Fields = all_fields,
Cursor =
#cursor{
index = Index,
db = db,
selector = Selector,
fields = Fields,
ranges = [{'$gte', start_key, '$lte', end_key}],
opts = [{user_ctx, user_ctx}, {execution_stats, true}],
bookmark = nil
},
Cursor1 =
Cursor#cursor{
user_acc = accumulator,
user_fun = fun foo:bar/2,
execution_stats = '_'
},
Cursor2 =
Cursor1#cursor{
bookmark = Bookmark,
bookmark_docid = undefined,
bookmark_key = undefined,
execution_stats = #execution_stats{executionStartTime = {0, 0, 0}}
},
Extra =
[
{callback, {mango_cursor_view, view_cb}},
{selector, Selector},
{callback_args, #{
selector => Selector,
fields => Fields,
covering_index => undefined
}},
{ignore_partition_query_limit, true}
],
Args =
#mrargs{
view_type = map,
reduce = false,
start_key = [start_key],
end_key = [end_key, ?MAX_JSON_OBJ],
include_docs = true,
extra = Extra
},
Parameters = [
db, [{user_ctx, user_ctx}], fun mango_cursor_view:handle_all_docs_message/2, Cursor1, Args
],
meck:expect(fabric, all_docs, Parameters, meck:val({ok, Cursor2})),
?assertEqual({ok, updated_accumulator2}, execute(Cursor, fun foo:bar/2, accumulator)),
?assert(meck:called(fabric, all_docs, '_')).

t_execute_error(_) ->
Cursor =
#cursor{
Expand Down Expand Up @@ -1142,6 +1253,7 @@ t_view_cb_row_matching_regular_doc(_) ->
]
},
put(mango_docs_examined, 0),
put(mango_keys_examined, 0),
?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
?assert(meck:called(rexi, stream2, '_')).

Expand All @@ -1160,6 +1272,7 @@ t_view_cb_row_non_matching_regular_doc(_) ->
]
},
put(mango_docs_examined, 0),
put(mango_keys_examined, 0),
put(mango_last_msg_timestamp, os:timestamp()),
?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
?assertNot(meck:called(rexi, stream2, '_')).
Expand All @@ -1177,6 +1290,7 @@ t_view_cb_row_null_doc(_) ->
}}
]
},
put(mango_keys_examined, 0),
put(mango_last_msg_timestamp, os:timestamp()),
?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
?assertNot(meck:called(rexi, stream2, '_')).
Expand All @@ -1195,6 +1309,7 @@ t_view_cb_row_missing_doc_triggers_quorum_fetch(_) ->
}}
]
},
put(mango_keys_examined, 0),
?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
?assert(meck:called(rexi, stream2, '_')).

Expand All @@ -1220,6 +1335,7 @@ t_view_cb_row_matching_covered_doc(_) ->
}}
]
},
put(mango_keys_examined, 0),
?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
?assert(meck:called(rexi, stream2, '_')).

Expand All @@ -1242,6 +1358,7 @@ t_view_cb_row_non_matching_covered_doc(_) ->
}}
]
},
put(mango_keys_examined, 0),
put(mango_last_msg_timestamp, os:timestamp()),
?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
?assertNot(meck:called(rexi, stream2, '_')).
Expand All @@ -1250,6 +1367,7 @@ t_view_cb_row_backwards_compatible(_) ->
Row = [{id, id}, {key, key}, {doc, null}],
meck:expect(rexi, stream2, ['_'], undefined),
Accumulator = #mrargs{extra = [{selector, {[]}}]},
put(mango_keys_examined, 0),
put(mango_last_msg_timestamp, os:timestamp()),
?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
?assertNot(meck:called(rexi, stream2, '_')).
Expand Down Expand Up @@ -1321,7 +1439,8 @@ handle_message_test_() ->
?TDEF_FE(t_handle_message_row_ok_triggers_quorum_fetch_no_match),
?TDEF_FE(t_handle_message_row_no_match),
?TDEF_FE(t_handle_message_row_error),
?TDEF_FE(t_handle_message_execution_stats),
?TDEF_FE(t_handle_message_execution_stats_v1),
?TDEF_FE(t_handle_message_execution_stats_v2),
?TDEF_FE(t_handle_message_complete),
?TDEF_FE(t_handle_message_error)
]
Expand Down Expand Up @@ -1453,14 +1572,22 @@ t_handle_message_row_error(_) ->
meck:delete(mango_util, defer, 3),
meck:delete(couch_log, error, 2).

t_handle_message_execution_stats(_) ->
t_handle_message_execution_stats_v1(_) ->
ShardStats = {docs_examined, 42},
ExecutionStats = #execution_stats{totalDocsExamined = 11},
ExecutionStats1 = #execution_stats{totalDocsExamined = 53},
Cursor = #cursor{execution_stats = ExecutionStats},
Cursor1 = #cursor{execution_stats = ExecutionStats1},
?assertEqual({ok, Cursor1}, handle_message({execution_stats, ShardStats}, Cursor)).

t_handle_message_execution_stats_v2(_) ->
ShardStats = shard_stats(42, 53),
ExecutionStats = #execution_stats{totalDocsExamined = 11, totalKeysExamined = 22},
ExecutionStats1 = #execution_stats{totalDocsExamined = 53, totalKeysExamined = 75},
Cursor = #cursor{execution_stats = ExecutionStats},
Cursor1 = #cursor{execution_stats = ExecutionStats1},
?assertEqual({ok, Cursor1}, handle_message({execution_stats, ShardStats}, Cursor)).

t_handle_message_complete(_) ->
?assertEqual({ok, cursor}, handle_message(complete, cursor)).

Expand Down
6 changes: 3 additions & 3 deletions src/mango/src/mango_execution_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

-export([
to_json/1,
incr_keys_examined/1,
incr_keys_examined/2,
incr_docs_examined/1,
incr_docs_examined/2,
incr_quorum_docs_examined/1,
Expand All @@ -35,9 +35,9 @@ to_json(Stats) ->
{execution_time_ms, Stats#execution_stats.executionTimeMs}
]}.

incr_keys_examined(Stats) ->
incr_keys_examined(Stats, N) ->
Stats#execution_stats{
totalKeysExamined = Stats#execution_stats.totalKeysExamined + 1
totalKeysExamined = Stats#execution_stats.totalKeysExamined + N
}.

incr_docs_examined(Stats) ->
Expand Down