Skip to content

Commit fe80c21

Browse files
committed
mango: rolling execution statistics
In case of map-reduce views, the arrival of the `complete` message is not guaranteed for the view callback (at the shard) when a `stop` is issued during the aggregation (at the coordinator). Due to that, internally collected shard-level statistics may not be fed back to the coordinator which can cause data loss hence inaccuracy in the overall execution statistics. Address this issue by switching to a "rolling" model where row-level statistics are immediately streamed back to the coordinator. Support mixed-version cluster upgrades by activating this model only if requested through the map-reduce arguments and the given shard supports that. Fixes #4560
1 parent 57f1b4b commit fe80c21

File tree

3 files changed

+193
-40
lines changed

3 files changed

+193
-40
lines changed

src/fabric/src/fabric_view_row.erl

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212

1313
-module(fabric_view_row).
1414
-export([from_props/2]).
15-
-export([get_id/1, get_key/1, get_value/1, get_doc/1, get_worker/1]).
16-
-export([set_key/2, set_doc/2, set_worker/2]).
15+
-export([get_id/1, get_key/1, get_value/1, get_doc/1, get_worker/1, get_stats/1]).
16+
-export([set_key/2, set_doc/2, set_worker/2, set_stats/2]).
1717
-export([transform/1, merge/3]).
1818

1919
-include_lib("fabric/include/fabric.hrl").
@@ -87,6 +87,14 @@ set_worker(#view_row{} = Row, Worker) ->
8787
set_worker({view_row, #{} = Row}, Worker) ->
8888
{view_row, Row#{worker => Worker}}.
8989

90+
get_stats({view_row, #{stats := Stats}}) ->
91+
Stats;
92+
get_stats({view_row, #{}}) ->
93+
undefined.
94+
95+
set_stats({view_row, #{} = Row}, Stats) ->
96+
{view_row, Row#{stats => Stats}}.
97+
9098
transform(#view_row{value = {[{reduce_overflow_error, Msg}]}}) ->
9199
{row, [{key, null}, {id, error}, {value, reduce_overflow_error}, {reason, Msg}]};
92100
transform(#view_row{key = Key, id = reduced, value = Value}) ->
@@ -105,8 +113,13 @@ transform({view_row, #{} = Row0}) ->
105113
Value = maps:get(value, Row0, undefined),
106114
Doc = maps:get(doc, Row0, undefined),
107115
Worker = maps:get(worker, Row0, undefined),
116+
Stats = maps:get(stats, Row0, undefined),
108117
Row = #view_row{id = Id, key = Key, value = Value, doc = Doc, worker = Worker},
109-
transform(Row).
118+
{row, Props} = RowProps = transform(Row),
119+
case Stats of
120+
undefined -> RowProps;
121+
#{} -> {row, [{stats, Stats} | Props]}
122+
end.
110123

111124
merge(Dir, Row, Rows) ->
112125
lists:merge(

src/mango/src/mango_cursor_view.erl

Lines changed: 109 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,15 @@
5555
covering_index => 'maybe'(#idx{})
5656
}.
5757

58+
-type mrargs_extra_item() ::
59+
{callback, {atom(), atom()}}
60+
| {selector, any()}
61+
| {callback_args, viewcbargs()}
62+
| {ignore_partition_query_limit, boolean()}
63+
| {execution_stats_map, boolean()}
64+
| {execution_stats_rolling, boolean()}.
65+
-type mrargs_extra() :: [mrargs_extra_item()].
66+
5867
-spec viewcbargs_new(Selector, Fields, CoveringIndex) -> ViewCBArgs when
5968
Selector :: selector(),
6069
Fields :: fields(),
@@ -204,7 +213,9 @@ base_args(#cursor{index = Idx, selector = Selector, fields = Fields} = Cursor) -
204213
% - Return execution statistics in a map
205214
{execution_stats_map, true},
206215
% - Return view rows in a map
207-
{view_row_map, true}
216+
{view_row_map, true},
217+
% - Stream execution statistics
218+
{execution_stats_rolling, true}
208219
]
209220
}.
210221

@@ -338,6 +349,43 @@ choose_best_index(IndexRanges) ->
338349
{SelectedIndex, SelectedIndexRanges, _} = hd(SortedIndexRanges),
339350
{{SelectedIndex, SelectedIndexRanges}, SortedIndexRanges}.
340351

352+
-spec format_stats(RawStats, Options) -> FormattedStats when
353+
RawStats :: shard_stats_v2(),
354+
Options :: mrargs_extra(),
355+
FormattedStats :: shard_stats_v1() | shard_stats_v2().
356+
format_stats(Stats, Options) when is_list(Options) ->
357+
case couch_util:get_value(execution_stats_map, Options, false) of
358+
true ->
359+
Stats;
360+
false ->
361+
#{docs_examined := DocsExamined} = Stats,
362+
{docs_examined, DocsExamined}
363+
end.
364+
365+
-spec submit_stats(Options) -> ok when
366+
Options :: mrargs_extra().
367+
submit_stats(Options) when is_list(Options) ->
368+
ShardStats = mango_execution_stats:shard_get_stats(),
369+
Stats = format_stats(ShardStats, Options),
370+
% Send execution stats in batch (shard-level)
371+
ok = rexi:stream2({execution_stats, Stats}).
372+
373+
-spec roll_stats(ViewRow, Options) -> ViewRow when
374+
ViewRow :: view_row(),
375+
Options :: mrargs_extra().
376+
roll_stats(ViewRow0, Options) when is_list(Options) ->
377+
ViewRowMap = couch_util:get_value(view_row_map, Options, false),
378+
RollingStats = couch_util:get_value(execution_stats_rolling, Options, false),
379+
case ViewRowMap andalso RollingStats of
380+
true ->
381+
ShardStats = mango_execution_stats:shard_get_stats(),
382+
mango_execution_stats:shard_init(),
383+
Stats = format_stats(ShardStats, Options),
384+
fabric_view_row:set_stats(ViewRow0, Stats);
385+
false ->
386+
ViewRow0
387+
end.
388+
341389
-spec view_cb
342390
(Message, #mrargs{}) -> Response when
343391
Message :: {meta, any()} | {row, row_properties()} | complete,
@@ -378,7 +426,8 @@ view_cb({row, Props}, #mrargs{extra = Options} = Acc) ->
378426
% However, this oddness is confined to being visible in this module.
379427
case match_and_extract_doc(Doc, Selector, Fields) of
380428
{match, FinalDoc} ->
381-
ViewRow = fabric_view_row:set_doc(ViewRow0, FinalDoc),
429+
ViewRow1 = fabric_view_row:set_doc(ViewRow0, FinalDoc),
430+
ViewRow = roll_stats(ViewRow1, Options),
382431
ok = rexi:stream2(ViewRow),
383432
set_mango_msg_timestamp();
384433
{no_match, undefined} ->
@@ -394,7 +443,8 @@ view_cb({row, Props}, #mrargs{extra = Options} = Acc) ->
394443
Process(Doc);
395444
{undefined, _} ->
396445
% include_docs=false. Use quorum fetch at coordinator
397-
ok = rexi:stream2(ViewRow0),
446+
ViewRow = roll_stats(ViewRow0, Options),
447+
ok = rexi:stream2(ViewRow),
398448
set_mango_msg_timestamp();
399449
{Doc, _} ->
400450
mango_execution_stats:shard_incr_docs_examined(),
@@ -403,17 +453,7 @@ view_cb({row, Props}, #mrargs{extra = Options} = Acc) ->
403453
end,
404454
{ok, Acc};
405455
view_cb(complete, #mrargs{extra = Options} = Acc) ->
406-
ShardStats = mango_execution_stats:shard_get_stats(),
407-
Stats =
408-
case couch_util:get_value(execution_stats_map, Options, false) of
409-
true ->
410-
ShardStats;
411-
false ->
412-
DocsExamined = maps:get(docs_examined, ShardStats),
413-
{docs_examined, DocsExamined}
414-
end,
415-
% Send shard-level execution stats
416-
ok = rexi:stream2({execution_stats, Stats}),
456+
submit_stats(Options),
417457
% Finish view output
418458
ok = rexi:stream_last(complete),
419459
{ok, Acc};
@@ -469,6 +509,21 @@ maybe_send_mango_ping() ->
469509
set_mango_msg_timestamp() ->
470510
put(mango_last_msg_timestamp, os:timestamp()).
471511

512+
-spec add_shard_stats(#execution_stats{}, shard_stats()) -> #execution_stats{}.
513+
add_shard_stats(Stats0, {docs_examined, DocsExamined}) ->
514+
mango_execution_stats:incr_docs_examined(Stats0, DocsExamined);
515+
add_shard_stats(Stats0, #{} = ShardStats) ->
516+
DocsExamined = shard_stats_get(docs_examined, ShardStats),
517+
KeysExamined = shard_stats_get(keys_examined, ShardStats),
518+
Stats = mango_execution_stats:incr_docs_examined(Stats0, DocsExamined),
519+
mango_execution_stats:incr_keys_examined(Stats, KeysExamined).
520+
521+
-spec handle_execution_stats(#cursor{}, shard_stats()) -> {ok, #cursor{}}.
522+
handle_execution_stats(Cursor0, ShardStats) ->
523+
#cursor{execution_stats = Stats} = Cursor0,
524+
Cursor = Cursor0#cursor{execution_stats = add_shard_stats(Stats, ShardStats)},
525+
{ok, Cursor}.
526+
472527
-spec handle_message(message(), #cursor{}) -> Response when
473528
Response ::
474529
{ok, #cursor{}}
@@ -492,20 +547,10 @@ handle_message({row, Props}, Cursor) ->
492547
couch_log:error("~s :: Error loading doc: ~p", [?MODULE, Error]),
493548
{ok, Cursor}
494549
end;
495-
handle_message({execution_stats, {docs_examined, DocsExamined}}, Cursor0) ->
496-
#cursor{execution_stats = Stats} = Cursor0,
497-
Cursor = Cursor0#cursor{
498-
execution_stats = mango_execution_stats:incr_docs_examined(Stats, DocsExamined)
499-
},
500-
{ok, Cursor};
550+
handle_message({execution_stats, {docs_examined, _} = ShardStats}, Cursor0) ->
551+
handle_execution_stats(Cursor0, ShardStats);
501552
handle_message({execution_stats, #{} = ShardStats}, Cursor0) ->
502-
DocsExamined = shard_stats_get(docs_examined, ShardStats),
503-
KeysExamined = shard_stats_get(keys_examined, ShardStats),
504-
#cursor{execution_stats = Stats0} = Cursor0,
505-
Stats1 = mango_execution_stats:incr_docs_examined(Stats0, DocsExamined),
506-
Stats = mango_execution_stats:incr_keys_examined(Stats1, KeysExamined),
507-
Cursor = Cursor0#cursor{execution_stats = Stats},
508-
{ok, Cursor};
553+
handle_execution_stats(Cursor0, ShardStats);
509554
handle_message(complete, Cursor) ->
510555
{ok, Cursor};
511556
handle_message({error, Reason}, _Cursor) ->
@@ -645,9 +690,14 @@ consider_index_coverage(Index, Fields, #mrargs{include_docs = IncludeDocs0} = Ar
645690
| {no_match, null, {execution_stats, shard_stats()}}
646691
| any().
647692
doc_member_and_extract(Cursor, RowProps) ->
648-
Db = Cursor#cursor.db,
649-
Opts = Cursor#cursor.opts,
650-
ExecutionStats = Cursor#cursor.execution_stats,
693+
#cursor{db = Db, opts = Opts, execution_stats = ExecutionStats0} = Cursor,
694+
ExecutionStats =
695+
case couch_util:get_value(stats, RowProps) of
696+
undefined ->
697+
ExecutionStats0;
698+
ShardStats ->
699+
add_shard_stats(ExecutionStats0, ShardStats)
700+
end,
651701
Selector = Cursor#cursor.selector,
652702
case couch_util:get_value(doc, RowProps) of
653703
{DocProps} ->
@@ -745,7 +795,8 @@ base_opts_test() ->
745795
}},
746796
{ignore_partition_query_limit, true},
747797
{execution_stats_map, true},
748-
{view_row_map, true}
798+
{view_row_map, true},
799+
{execution_stats_rolling, true}
749800
],
750801
MRArgs =
751802
#mrargs{
@@ -1069,7 +1120,8 @@ t_execute_ok_all_docs(_) ->
10691120
}},
10701121
{ignore_partition_query_limit, true},
10711122
{execution_stats_map, true},
1072-
{view_row_map, true}
1123+
{view_row_map, true},
1124+
{execution_stats_rolling, true}
10731125
],
10741126
Args =
10751127
#mrargs{
@@ -1153,7 +1205,8 @@ t_execute_ok_query_view(_) ->
11531205
}},
11541206
{ignore_partition_query_limit, true},
11551207
{execution_stats_map, true},
1156-
{view_row_map, true}
1208+
{view_row_map, true},
1209+
{execution_stats_rolling, true}
11571210
],
11581211
Args =
11591212
#mrargs{
@@ -1247,7 +1300,8 @@ t_execute_ok_all_docs_with_execution_stats(_) ->
12471300
}},
12481301
{ignore_partition_query_limit, true},
12491302
{execution_stats_map, true},
1250-
{view_row_map, true}
1303+
{view_row_map, true},
1304+
{execution_stats_rolling, true}
12511305
],
12521306
Args =
12531307
#mrargs{
@@ -1348,6 +1402,8 @@ t_view_cb_row_matching_regular_doc(_) ->
13481402
fields => all_fields,
13491403
covering_index => undefined
13501404
}},
1405+
{execution_stats_map, true},
1406+
{execution_stats_rolling, true},
13511407
{view_row_map, true}
13521408
]
13531409
},
@@ -1367,6 +1423,8 @@ t_view_cb_row_non_matching_regular_doc(_) ->
13671423
fields => all_fields,
13681424
covering_index => undefined
13691425
}},
1426+
{execution_stats_map, true},
1427+
{execution_stats_rolling, true},
13701428
{view_row_map, true}
13711429
]
13721430
},
@@ -1386,6 +1444,8 @@ t_view_cb_row_null_doc(_) ->
13861444
fields => all_fields,
13871445
covering_index => undefined
13881446
}},
1447+
{execution_stats_map, true},
1448+
{execution_stats_rolling, true},
13891449
{view_row_map, true}
13901450
]
13911451
},
@@ -1406,6 +1466,8 @@ t_view_cb_row_missing_doc_triggers_quorum_fetch(_) ->
14061466
fields => all_fields,
14071467
covering_index => undefined
14081468
}},
1469+
{execution_stats_map, true},
1470+
{execution_stats_rolling, true},
14091471
{view_row_map, true}
14101472
]
14111473
},
@@ -1433,6 +1495,8 @@ t_view_cb_row_matching_covered_doc(_) ->
14331495
fields => Fields,
14341496
covering_index => Index
14351497
}},
1498+
{execution_stats_map, true},
1499+
{execution_stats_rolling, true},
14361500
{view_row_map, true}
14371501
]
14381502
},
@@ -1457,6 +1521,8 @@ t_view_cb_row_non_matching_covered_doc(_) ->
14571521
fields => Fields,
14581522
covering_index => Index
14591523
}},
1524+
{execution_stats_map, true},
1525+
{execution_stats_rolling, true},
14601526
{view_row_map, true}
14611527
]
14621528
},
@@ -1592,10 +1658,13 @@ t_handle_message_row_ok_above_limit(_) ->
15921658
user_acc = accumulator,
15931659
user_fun = fun foo:bar/2
15941660
},
1595-
Row = [{id, id}, {key, key}, {doc, Doc}],
1661+
ShardStats = #{keys_examined => 2, docs_examined => 3},
1662+
Row = [{id, id}, {key, key}, {doc, Doc}, {stats, ShardStats}],
15961663
Cursor1 =
15971664
Cursor#cursor{
1598-
execution_stats = #execution_stats{resultsReturned = 1},
1665+
execution_stats = #execution_stats{
1666+
resultsReturned = 1, totalKeysExamined = 2, totalDocsExamined = 3
1667+
},
15991668
limit = 8,
16001669
user_acc = updated_accumulator,
16011670
bookmark_docid = id,
@@ -1643,12 +1712,15 @@ t_handle_message_row_ok_triggers_quorum_fetch_match(_) ->
16431712
user_acc = accumulator,
16441713
limit = 1
16451714
},
1646-
Row = [{id, id}, {doc, undefined}],
1715+
ShardStats = #{keys_examined => 2, docs_examined => 3},
1716+
Row = [{id, id}, {doc, undefined}, {stats, ShardStats}],
16471717
Cursor1 =
16481718
Cursor#cursor{
16491719
execution_stats =
16501720
#execution_stats{
16511721
totalQuorumDocsExamined = 1,
1722+
totalKeysExamined = 2,
1723+
totalDocsExamined = 3,
16521724
resultsReturned = 1
16531725
},
16541726
user_acc = updated_accumulator,

0 commit comments

Comments
 (0)