Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
be84279
PoC couch_stats local resource usage tracking
chewbranca Jul 18, 2022
e5e4820
Instrument more stats and rudimentary delta accumulation
chewbranca Oct 23, 2023
ecda8f3
Add more usage tracking and core functionality
chewbranca Oct 30, 2023
73863a7
Embed worker usage deltas in rexi:ping
chewbranca Oct 31, 2023
12e0267
WIP: core delta aggregation working
chewbranca Nov 9, 2023
84be4af
TMP: rewire rexi to fail on unexpected messages
chewbranca Nov 10, 2023
19224af
Add baseline group by and sort by aggregations
chewbranca Nov 14, 2023
cafa512
Add watchdog scanning for unmonitored processes
chewbranca Nov 14, 2023
996c385
Set dbname context properly
chewbranca Nov 15, 2023
1bbf7a9
Add unsafe_foldl for perf testing
chewbranca Nov 16, 2023
e70ba4d
Rework csrt context setting
chewbranca Nov 18, 2023
715b524
WIP: ugly but working HTTP API around groupings
chewbranca Nov 18, 2023
be8ab04
Cleanup debug info
chewbranca Nov 20, 2023
4c05a0f
Make CSRT toggle-able and rework delta sending
chewbranca Nov 20, 2023
eb20b5d
Remove debug exit clause
chewbranca Nov 20, 2023
f4a712c
Declare missing metrics
chewbranca Nov 21, 2023
c758cb7
WIP: relax record constraints...
chewbranca Nov 21, 2023
4f00910
Use Default in config:get/3 on ets table error
chewbranca Nov 30, 2023
887a7f2
Switch to delta extraction from messages
chewbranca Nov 30, 2023
1a1a584
Revert "Use Default in config:get/3 on ets table error"
chewbranca Nov 30, 2023
db1f874
Flush chttpd_db monitor refs on demonitor
chewbranca Dec 9, 2023
50495bf
TMP Hack around issue #4909
chewbranca Dec 18, 2023
9578773
Avoid mem3_rpc:rexi_call selective receive
chewbranca Dec 18, 2023
f77583a
Pass tests and toggle csrt
chewbranca Dec 19, 2023
ac17510
Handle delta in fabric rpc tests
chewbranca Dec 19, 2023
c500f0f
Conditionally log reports
chewbranca Dec 19, 2023
d7e9bd9
Do not persist doc size test config settings
chewbranca Jan 5, 2024
f68005a
Configurable logging and no more io_lib:format
chewbranca Jan 8, 2024
0371589
Limit resource usage output and fix keys
chewbranca Feb 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Cleanup debug info
  • Loading branch information
chewbranca committed Nov 20, 2023
commit be8ab045d93b72aa1da4cbcb84d71054cf5d03ee
13 changes: 1 addition & 12 deletions src/chttpd/src/chttpd.erl
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,6 @@ handle_request_int(MochiReq) ->
]
},

io:format("CSRTZ PROCESSING: ~w~n", [RequestedPath]),

% put small token on heap to keep requests synced to backend calls
erlang:put(nonce, Nonce),

Expand All @@ -326,22 +324,15 @@ handle_request_int(MochiReq) ->
chttpd_util:mochiweb_client_req_set(MochiReq),

%% This is probably better in before_request, but having Path is nice
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is fine IMO

io:format("CSRTZ INITIATING CONTEXT: ~w~n", [Nonce]),
couch_stats_resource_tracker:create_coordinator_context(HttpReq0, Path),
io:format("CSRTZ INITIAL CONTEXT: ~w~n", [couch_stats_resource_tracker:get_resource()]),
Coord = self(),
PidRef = couch_stats_resource_tracker:get_pid_ref(),
spawn(fun() -> monitor(process, Coord), receive M -> io:format("CSRTZ PROCESS DOWN[~w]{~w}: ~w~n", [ Coord, M, couch_stats_resource_tracker:get_resource(PidRef)]) end end),

{HttpReq2, Response} =
case before_request(HttpReq0) of
{ok, HttpReq1} ->
io:format("CSRTZ BEFORE_REQUEST OK: ~w~n", [couch_stats_resource_tracker:get_resource()]),
process_request(HttpReq1);
{error, Response0} ->
io:format("CSRTZ BEFORE_REQUEST ERROR: ~w~n", [couch_stats_resource_tracker:get_resource()]),
{HttpReq0, Response0}
end,
io:format("CSRTZ AFTER PROCESS_REQUEST: ~w~n", [couch_stats_resource_tracker:get_resource()]),

chttpd_util:mochiweb_client_req_clean(),

Expand All @@ -357,10 +348,8 @@ handle_request_int(MochiReq) ->

case after_request(HttpReq2, HttpResp) of
#httpd_resp{status = ok, response = Resp} ->
io:format("CSRTZ AFTER REQUEST OK: ~w~n", [couch_stats_resource_tracker:get_resource()]),
{ok, Resp};
#httpd_resp{status = aborted, reason = Reason} ->
io:format("CSRTZ AFTER REQUEST ERROR: ~w~n", [couch_stats_resource_tracker:get_resource()]),
couch_log:error("Response abnormally terminated: ~w", [Reason]),
exit({shutdown, Reason})
end.
Expand Down
2 changes: 1 addition & 1 deletion src/chttpd/src/chttpd_misc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ handle_resource_status_req(#httpd{method = 'POST'} = Req) ->
end,
[]
]),
io:format("{CSRT}***** GOT RESP: ~p~n", [Resp]),
%%io:format("{CSRT}***** GOT RESP: ~p~n", [Resp]),
send_json(Req, {Resp});
handle_resource_status_req(#httpd{method = 'GET'} = Req) ->
ok = chttpd:verify_is_server_admin(Req),
Expand Down
2 changes: 1 addition & 1 deletion src/couch/priv/stats_descriptions.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@
{type, counter},
{desc, <<"number of JS filter invocations">>}
]}.
{[couchdb, query_server, js_filterered], [
{[couchdb, query_server, js_filtered_docs], [
{type, counter},
{desc, <<"number of docs filtered through JS invocations">>}
]}.
Expand Down
36 changes: 4 additions & 32 deletions src/couch_stats/src/couch_stats_resource_tracker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -317,14 +317,7 @@ should_track(_Metric) ->
%%io:format("SKIPPING METRIC: ~p~n", [Metric]),
false.

%% TODO: update coordinator stats from worker deltas
accumulate_delta(Delta) ->
case get_resource() of
#rctx{type={coordinator,_,_}} = Rctx ->
io:format("Accumulating delta: ~w || ~w~n", [Delta, Rctx]);
_ ->
ok
end,
%% TODO: switch to creating a batch of updates to invoke a single
%% update_counter rather than sequentially invoking it for each field
maps:foreach(fun inc/2, Delta).
Expand Down Expand Up @@ -463,7 +456,6 @@ to_json(#rctx{}=Rctx) ->
changes_processed = ChangesProcessed,
changes_returned = ChangesReturned
} = Rctx,
%%io:format("TO_JSON_MFA: ~p~n", [MFA0]),
MFA = case MFA0 of
{M, F, A} ->
[M, F, A];
Expand Down Expand Up @@ -495,11 +487,9 @@ to_json(#rctx{}=Rctx) ->
#{
updated_at => term_to_json(TP),
started_at => term_to_json(TInit),
%%pid_ref => [pid_to_list(Pid), ref_to_list(Ref)],
pid_ref => term_to_json(PidRef),
mfa => term_to_json(MFA),
nonce => term_to_json(Nonce),
%%from => From,
from => term_to_json(From),
dbname => DbName,
username => UserName,
Expand All @@ -522,7 +512,6 @@ to_json(#rctx{}=Rctx) ->
term_to_json({Pid, Ref}) when is_pid(Pid), is_reference(Ref) ->
[?l2b(pid_to_list(Pid)), ?l2b(ref_to_list(Ref))];
term_to_json({type, {coordinator, _, _} = Type}) ->
%%io:format("SETTING JSON TYPE: ~p~n", [Type]),
?l2b(io_lib:format("~p", [Type]));
term_to_json({A, B, C}) ->
[A, B, C];
Expand All @@ -536,7 +525,6 @@ term_to_json(T) ->
T.

term_to_flat_json({type, {coordinator, _, _}}=Type) ->
%%io:format("SETTING FLAT JSON TYPE: ~p~n", [Type]),
?l2b(io_lib:format("~p", [Type]));
term_to_flat_json({coordinator, _, _}=Type) ->
?l2b(io_lib:format("~p", [Type]));
Expand Down Expand Up @@ -573,7 +561,6 @@ to_flat_json(#rctx{}=Rctx) ->
changes_returned = ChangesReturned,
ioq_calls = IoqCalls
} = Rctx,
%%io:format("TO_JSON_MFA: ~p~n", [MFA0]),
MFA = case MFA0 of
{_M, _F, _A} ->
?l2b(io_lib:format("~w", [MFA0]));
Expand All @@ -600,12 +587,9 @@ to_flat_json(#rctx{}=Rctx) ->
Nonce0 ->
list_to_binary(Nonce0)
end,
io:format("NONCE IS: ~p||~p~n", [Nonce0, Nonce]),
#{
%%updated_at => ?l2b(io_lib:format("~w", [TP])),
updated_at => term_to_flat_json(TP),
started_at => term_to_flat_json(TInit),
%%pid_ref => [pid_to_list(Pid), ref_to_list(Ref)],
pid_ref => ?l2b(io_lib:format("~w", [PidRef])),
mfa => MFA,
nonce => Nonce,
Expand Down Expand Up @@ -650,10 +634,8 @@ create_context(Pid) ->

%% add type to disnguish coordinator vs rpc_worker
create_context(From, {M,F,_A} = MFA, Nonce) ->
%%io:format("[~p] CREAT_CONTEXT MFA[~p]: {~p}: ~p~n", [self(), From, MFA, Nonce]),
PidRef = get_pid_ref(), %% this will instantiate a new PidRef
%%Rctx = make_record(self(), Ref),
%% TODO: extract user_ctx and db/shard from
%% TODO: extract user_ctx and db/shard from
Rctx = #rctx{
pid_ref = PidRef,
from = From,
Expand All @@ -670,18 +652,13 @@ create_coordinator_context(#httpd{path_parts=Parts} = Req) ->
create_coordinator_context(Req, io_lib:format("~p", [Parts])).

create_coordinator_context(#httpd{} = Req, Path) ->
%%io:format("CREATING COORDINATOR CONTEXT ON {~p}~n", [Path]),
#httpd{
method = Verb,
%%path_parts = Parts,
nonce = Nonce
} = Req,
PidRef = get_pid_ref(), %% this will instantiate a new PidRef
%%Rctx = make_record(self(), Ref),
%% TODO: extract user_ctx and db/shard from Req
Rctx = #rctx{
pid_ref = PidRef,
%%type = {cooridantor, Verb, Parts},
type = {coordinator, Verb, [$/ | Path]},
nonce = Nonce
},
Expand All @@ -704,7 +681,6 @@ set_context_dbname(DbName) ->
set_context_username(null) ->
ok;
set_context_username(UserName) ->
%%io:format("CSRT SETTING USERNAME CONTEXT: ~p~n", [UserName]),
case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.username, UserName}]) of
false ->
Stk = try throw(42) catch _:_:Stk0 -> Stk0 end,
Expand Down Expand Up @@ -755,7 +731,7 @@ make_delta() ->
%% Perhaps somewhat naughty we're incrementing stats from within
%% couch_stats itself? Might need to handle this differently
%% TODO: determine appropriate course of action here
io:format("~n**********MISSING STARTING DELTA************~n~n", []),
%% io:format("~n**********MISSING STARTING DELTA************~n~n", []),
couch_stats:increment_counter(
[couchdb, csrt, delta_missing_t0]),
%%[couch_stats_resource_tracker, delta_missing_t0]),
Expand All @@ -769,7 +745,6 @@ make_delta() ->
TA0 ->
TA0
end;
%%?RCTX{} = TA0 ->
#rctx{} = TA0 ->
TA0
end,
Expand Down Expand Up @@ -804,7 +779,6 @@ make_delta(#rctx{}, _) ->

make_delta_base() ->
Ref = make_ref(),
%%Rctx = make_record(self(), Ref),
%% TODO: extract user_ctx and db/shard from request
TA0 = #rctx{
pid_ref = {self(), Ref}
Expand Down Expand Up @@ -874,14 +848,12 @@ handle_cast(Msg, St) ->
{stop, {unknown_cast, Msg}, St}.

handle_info(scan, #st{tracking=AT0} = St0) ->
io:format("{CSRT} TRIGGERING SCAN: ~n", []),
Unmonitored = find_unmonitored(),
io:format("{CSRT} SCAN FOUND ~p UNMONITORED PIDS: ~n", [length(Unmonitored)]),
AT = maybe_track(Unmonitored, AT0),
_TimerRef = erlang:send_after(St0#st.scan_interval, self(), scan),
{noreply, St0#st{tracking=AT}};
handle_info({'DOWN', MonRef, Type, DPid, Reason}, #st{tracking=AT0} = St0) ->
io:format("CSRT:HI(~p)~n", [{'DOWN', MonRef, Type, DPid, Reason}]),
%% io:format("CSRT:HI(~p)~n", [{'DOWN', MonRef, Type, DPid, Reason}]),
St = case maps:get(MonRef, AT0, undefined) of
undefined ->
io:format("ERROR: UNEXPECTED MISSING MONITOR IN TRACKING TABLE: {~p, ~p}~n", [MonRef, DPid]),
Expand Down Expand Up @@ -937,7 +909,7 @@ log_process_lifetime_report(PidRef) ->
%% More safely assert this can't ever be undefined
#rctx{} = Rctx = get_resource(PidRef),
%% TODO: catch error out of here, report crashes on depth>1 json
io:format("CSRT RCTX: ~p~n", [to_flat_json(Rctx)]),
%%io:format("CSRT RCTX: ~p~n", [to_flat_json(Rctx)]),
couch_log:report("csrt-pid-usage-lifetime", to_flat_json(Rctx)).


Expand Down
1 change: 0 additions & 1 deletion src/fabric/src/fabric_rpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ set_purge_infos_limit(DbName, Limit, Options) ->
with_db(DbName, Options, {couch_db, set_purge_infos_limit, [Limit]}).

open_doc(DbName, DocId, Options) ->
io:format("frpc:open_doc(~p, ~p, ~p)~n", [DbName, DocId, Options]),
with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}).

open_revs(DbName, IdRevsOpts, Options) ->
Expand Down
2 changes: 0 additions & 2 deletions src/fabric/src/fabric_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ get_shard([#shard{node = Node, name = Name} | Rest], Opts, Timeout, Factor) ->
try
receive
{Ref, {ok, Db}, {delta, Delta}} ->
io:format("[~p]GET SHARD GOT DELTA: ~p~n", [self(), Delta]),
couch_stats_resource_tracker:accumulate_delta(Delta),
{ok, Db};
{Ref, {'rexi_EXIT', {{unauthorized, _} = Error, _}}} ->
Expand All @@ -149,7 +148,6 @@ get_shard([#shard{node = Node, name = Name} | Rest], Opts, Timeout, Factor) ->
couch_log:debug("Failed to open shard ~p because: ~p", [Name, Reason]),
get_shard(Rest, Opts, Timeout, Factor);
Other ->
io:format("GOT UNEXPECTED MESSAGE FORMAT: ~p~n", [Other]),
erlang:error(Other)
after Timeout ->
couch_log:debug("Failed to open shard ~p after: ~p", [Name, Timeout]),
Expand Down
6 changes: 0 additions & 6 deletions src/rexi/src/rexi_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
{timeout, TimeoutRef} ->
{timeout, Acc0};
{rexi, '$rexi_ping', {delta, Delta}} ->
io:format("[~p]GOT PING DELTA: ~p~n", [self(), Delta]),
couch_stats_resource_tracker:accumulate_delta(Delta),
{ok, Acc0};
{rexi, Ref, Msg} ->
Expand All @@ -86,7 +85,6 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
Fun(Msg, {Worker, From}, Acc0)
end;
{Ref, Msg, {delta, Delta}} ->
io:format("[~p]GOT DELTA: ~p -- ~p~n", [self(), Delta, Msg]),
couch_stats_resource_tracker:accumulate_delta(Delta),
case lists:keyfind(Ref, Keypos, RefList) of
false ->
Expand All @@ -96,7 +94,6 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
Fun(Msg, Worker, Acc0)
end;
{Ref, From, Msg, {delta, Delta}} ->
io:format("[~p]GOT DELTA: ~p -- ~p~n", [self(), Delta, Msg]),
couch_stats_resource_tracker:accumulate_delta(Delta),
case lists:keyfind(Ref, Keypos, RefList) of
false ->
Expand Down Expand Up @@ -125,7 +122,6 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
%% TODO: add stack trace to log entry
couch_log:debug("rexi_utils:process_message no delta: {Ref, Msg} => {~p, ~p}~n", [Ref, Msg]),
timer:sleep(100),
%%erlang:halt(enodelta),
erlang:halt(binary_to_list(iolist_to_binary(io_lib:format("{enodelta} rexi_utils:process_message no delta: {Ref, Msg} => {~w, ~w}~n", [Ref, Msg]))));
{Ref, From, rexi_STREAM_INIT = Msg} ->
case lists:keyfind(Ref, Keypos, RefList) of
Expand All @@ -135,11 +131,9 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
Fun(Msg, {Worker, From}, Acc0)
end;
{Ref, From, Msg} ->
%%io:format("GOT NON DELTA MSG: ~p~n", [Msg]),
%% TODO: add stack trace to log entry
couch_log:debug("rexi_utils:process_message no delta: {Ref, From, Msg} => {~p, ~p, ~p}~n", [Ref, From, Msg]),
timer:sleep(100),
%%erlang:halt(enodelta),
erlang:halt(binary_to_list(iolist_to_binary(io_lib:format("{enodelta} rexi_utils:process_message no delta: {Ref, From, Msg} => {~w, ~w, ~w}~n", [Ref, From, Msg]))));
{rexi_DOWN, _, _, _} = Msg ->
Fun(Msg, nil, Acc0)
Expand Down