Skip to content

Commit b367bf8

Browse files
committed
xapi_xenops: processing xenops events in per-VM threads
Signed-off-by: Jonathan Davies <[email protected]>
1 parent 4dffe91 commit b367bf8

File tree

2 files changed

+416
-8
lines changed

2 files changed

+416
-8
lines changed

ocaml/xapi/lib_worker.ml

Lines changed: 388 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,388 @@
1+
(*
2+
* Copyright (C) Citrix Systems Inc.
3+
*
4+
* This program is free software; you can redistribute it and/or modify
5+
* it under the terms of the GNU Lesser General Public License as published
6+
* by the Free Software Foundation; version 2.1 only. with the special
7+
* exception on linking described in file LICENSE.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Lesser General Public License for more details.
13+
*)
14+
15+
open Stdext.Listext
16+
open Stdext.Xstringext
17+
open Stdext.Threadext
18+
19+
module D = Debug.Make(struct let name = "lib_worker" end)
20+
open D
21+
22+
module StringMap = Map.Make(struct type t = string let compare = compare end)
23+
24+
let push_with_coalesce should_keep item queue =
25+
(* [filter_with_memory p xs] returns elements [x \in xs] where [p (x_i, [x_0...x_i-1])] *)
26+
let filter_with_memory p xs =
27+
List.fold_left (fun (acc, xs) x -> xs :: acc, x :: xs) ([], []) xs
28+
|> fst |> List.rev |> List.combine xs (* association list of (element, all previous elements) *)
29+
|> List.filter p
30+
|> List.map fst in
31+
32+
let to_list queue = Queue.fold (fun xs x -> x :: xs) [] queue |> List.rev in
33+
let of_list xs =
34+
let q = Queue.create () in
35+
List.iter (fun x -> Queue.push x q) xs;
36+
q in
37+
38+
Queue.push item queue;
39+
let queue' =
40+
to_list queue
41+
|> filter_with_memory (fun (this, prev) -> should_keep this prev)
42+
|> of_list in
43+
Queue.clear queue;
44+
Queue.transfer queue' queue
45+
46+
module Queues = struct
47+
(** A set of queues where 'pop' operates on each queue in a round-robin fashion *)
48+
49+
type tag = string
50+
(** Each distinct 'tag' value creates a separate virtual queue *)
51+
52+
type 'a t = {
53+
mutable qs: 'a Queue.t StringMap.t;
54+
mutable last_tag: string;
55+
m: Mutex.t;
56+
c: Condition.t;
57+
}
58+
59+
let create () = {
60+
qs = StringMap.empty;
61+
last_tag = "";
62+
m = Mutex.create ();
63+
c = Condition.create ();
64+
}
65+
66+
let get tag qs =
67+
Mutex.execute qs.m
68+
(fun () ->
69+
if StringMap.mem tag qs.qs then StringMap.find tag qs.qs else Queue.create ()
70+
)
71+
72+
let tags qs =
73+
Mutex.execute qs.m
74+
(fun () ->
75+
StringMap.fold (fun x _ acc -> x :: acc) qs.qs []
76+
)
77+
78+
let get_last_tag qs =
79+
Mutex.execute qs.m
80+
(fun () ->
81+
qs.last_tag
82+
)
83+
84+
let push_with_coalesce should_keep tag item qs =
85+
Mutex.execute qs.m
86+
(fun () ->
87+
let q = if StringMap.mem tag qs.qs then StringMap.find tag qs.qs else Queue.create () in
88+
push_with_coalesce should_keep item q;
89+
qs.qs <- StringMap.add tag q qs.qs;
90+
Condition.signal qs.c
91+
)
92+
93+
let pop qs =
94+
Mutex.execute qs.m
95+
(fun () ->
96+
while StringMap.is_empty qs.qs do
97+
Condition.wait qs.c qs.m;
98+
done;
99+
(* partition based on last_tag *)
100+
let before, after = StringMap.partition (fun x _ -> x <= qs.last_tag) qs.qs in
101+
(* the min_binding in the 'after' is the next queue *)
102+
let last_tag, q = StringMap.min_binding (if StringMap.is_empty after then before else after) in
103+
qs.last_tag <- last_tag;
104+
let item = Queue.pop q in
105+
(* remove empty queues from the whole mapping *)
106+
qs.qs <- if Queue.is_empty q then StringMap.remove last_tag qs.qs else qs.qs;
107+
last_tag, item
108+
)
109+
110+
let transfer_tag tag a b =
111+
Mutex.execute a.m
112+
(fun () ->
113+
Mutex.execute b.m
114+
(fun () ->
115+
if StringMap.mem tag a.qs then begin
116+
b.qs <- StringMap.add tag (StringMap.find tag a.qs) b.qs;
117+
a.qs <- StringMap.remove tag a.qs;
118+
Condition.signal b.c
119+
end
120+
)
121+
)
122+
end
123+
124+
type item = string * (unit -> unit)
125+
126+
module Redirector = struct
127+
type t = { queues: item Queues.t; mutex: Mutex.t }
128+
129+
(* When a thread is not actively processing a queue, items are placed here: *)
130+
let default = { queues = Queues.create (); mutex = Mutex.create () }
131+
(* We create another queue only for Parallel atoms so as to avoid a situation where
132+
Parallel atoms can not progress because all the workers available for the
133+
default queue are used up by other operations depending on further Parallel
134+
atoms, creating a deadlock.
135+
*)
136+
let parallel_queues = { queues = Queues.create (); mutex = Mutex.create () }
137+
138+
(* When a thread is actively processing a queue, items are redirected to a thread-private queue *)
139+
let overrides = ref StringMap.empty
140+
let m = Mutex.create ()
141+
142+
let should_keep item prev = true
143+
144+
let push t tag item =
145+
Debug.with_thread_associated "queue"
146+
(fun () ->
147+
Mutex.execute m
148+
(fun () ->
149+
let q, redirected = if StringMap.mem tag !overrides then StringMap.find tag !overrides, true else t.queues, false in
150+
(*debug "Queue.push %s onto %s%s:[ %s ]" (string_of_int item) (if redirected then "redirected " else "") tag (String.concat ", " (List.rev (Queue.fold (fun acc item -> string_of_int item :: acc) [] (Queues.get tag q))));*)
151+
match item with (label, _) -> debug "Queue.push onto %s%s: %s" (if redirected then "redirected " else "") tag label;
152+
153+
Queues.push_with_coalesce should_keep tag item q
154+
)
155+
) ()
156+
157+
let pop t () =
158+
(* We must prevent worker threads all calling Queues.pop before we've
159+
successfully put the redirection in place. Otherwise we end up with
160+
parallel threads operating on the same VM. *)
161+
Mutex.execute t.mutex
162+
(fun () ->
163+
let tag, item = Queues.pop t.queues in
164+
Mutex.execute m
165+
(fun () ->
166+
let q = Queues.create () in
167+
Queues.transfer_tag tag t.queues q;
168+
overrides := StringMap.add tag q !overrides;
169+
(* All items with [tag] will enter queue [q] *)
170+
tag, q, item
171+
)
172+
)
173+
174+
let finished t tag queue =
175+
Mutex.execute m
176+
(fun () ->
177+
Queues.transfer_tag tag queue t.queues;
178+
overrides := StringMap.remove tag !overrides
179+
(* All items with [tag] will enter the queues queue *)
180+
)
181+
182+
(*
183+
module Dump = struct
184+
type q = {
185+
tag: string;
186+
items: operation list
187+
} with rpc
188+
type t = q list with rpc
189+
190+
let make () =
191+
Mutex.execute m
192+
(fun () ->
193+
let one queue =
194+
List.map
195+
(fun t ->
196+
{ tag = t; items = List.rev (Queue.fold (fun acc (b, _) -> b :: acc) [] (Queues.get t queue)) }
197+
) (Queues.tags queue) in
198+
List.concat (List.map one (default.queues :: parallel_queues.queues :: (List.map snd (StringMap.bindings !overrides))))
199+
)
200+
201+
end
202+
*)
203+
end
204+
205+
module Worker = struct
206+
type state =
207+
| Idle
208+
| Processing of item
209+
| Shutdown_requested
210+
| Shutdown
211+
type t = {
212+
mutable state: state;
213+
mutable shutdown_requested: bool;
214+
m: Mutex.t;
215+
c: Condition.t;
216+
mutable t: Thread.t option;
217+
redirector: Redirector.t;
218+
}
219+
220+
let get_state_locked t =
221+
if t.shutdown_requested
222+
then Shutdown_requested
223+
else t.state
224+
225+
let get_state t =
226+
Mutex.execute t.m
227+
(fun () ->
228+
get_state_locked t
229+
)
230+
231+
let join t =
232+
Mutex.execute t.m
233+
(fun () ->
234+
assert (t.state = Shutdown);
235+
Stdext.Opt.iter Thread.join t.t
236+
)
237+
238+
let is_active t =
239+
Mutex.execute t.m
240+
(fun () ->
241+
match get_state_locked t with
242+
| Idle | Processing _ -> true
243+
| Shutdown_requested | Shutdown -> false
244+
)
245+
246+
let shutdown t =
247+
Mutex.execute t.m
248+
(fun () ->
249+
if not t.shutdown_requested then begin
250+
t.shutdown_requested <- true;
251+
true (* success *)
252+
end else false
253+
)
254+
255+
let restart t =
256+
Mutex.execute t.m
257+
(fun () ->
258+
if t.shutdown_requested && t.state <> Shutdown then begin
259+
t.shutdown_requested <- false;
260+
true (* success *)
261+
end else false
262+
)
263+
264+
let create redirector =
265+
let t = {
266+
state = Idle;
267+
shutdown_requested = false;
268+
m = Mutex.create ();
269+
c = Condition.create ();
270+
t = None;
271+
redirector = redirector;
272+
} in
273+
let thread = Thread.create
274+
(fun () ->
275+
while not(Mutex.execute t.m (fun () ->
276+
if t.shutdown_requested then t.state <- Shutdown;
277+
t.shutdown_requested
278+
)) do
279+
Mutex.execute t.m (fun () -> t.state <- Idle);
280+
let tag, queue, item = Redirector.pop redirector () in (* blocks here *)
281+
debug "Queue.pop returned an item with tag %s" tag;
282+
Mutex.execute t.m (fun () -> t.state <- Processing item);
283+
begin
284+
try
285+
(* TODO execute the item -- treat it as a function *)
286+
match item with (label, item) ->
287+
debug "executing item with label '%s'" label;
288+
item ()
289+
with e ->
290+
debug "Queue caught: %s" (Printexc.to_string e)
291+
end;
292+
Redirector.finished redirector tag queue;
293+
(* TODO check outcome, do any follow-up *)
294+
done
295+
) () in
296+
t.t <- Some thread;
297+
t
298+
end
299+
300+
module WorkerPool = struct
301+
302+
(* Store references to Worker.ts here *)
303+
let pool = ref []
304+
let m = Mutex.create ()
305+
306+
307+
module Dump = struct
308+
type w = {
309+
state: string;
310+
task: int option;
311+
} [@@deriving rpc]
312+
type t = w list [@@deriving rpc]
313+
let make () =
314+
Mutex.execute m
315+
(fun () ->
316+
List.map
317+
(fun t ->
318+
match Worker.get_state t with
319+
| Worker.Idle -> { state = "Idle"; task = None }
320+
| Worker.Processing item -> { state = Printf.sprintf "Processing item"; task = None }
321+
| Worker.Shutdown_requested -> { state = "Shutdown_requested"; task = None }
322+
| Worker.Shutdown -> { state = "Shutdown"; task = None }
323+
) !pool
324+
)
325+
end
326+
327+
(* Compute the number of active threads ie those which will continue to operate *)
328+
let count_active queues =
329+
Mutex.execute m
330+
(fun () ->
331+
(* we do not want to use = when comparing queues: queues can contain (uncomparable) functions, and we
332+
are only interested in comparing the equality of their static references
333+
*)
334+
List.map (fun w -> w.Worker.redirector == queues && Worker.is_active w) !pool |> List.filter (fun x -> x) |> List.length
335+
)
336+
337+
let find_one queues f = List.fold_left (fun acc x -> acc || (x.Worker.redirector == queues && (f x))) false
338+
339+
(* Clean up any shutdown threads and remove them from the master list *)
340+
let gc queues pool =
341+
List.fold_left
342+
(fun acc w ->
343+
(* we do not want to use = when comparing queues: queues can contain (uncomparable) functions, and we
344+
are only interested in comparing the equality of their static references
345+
*)
346+
if w.Worker.redirector == queues && Worker.get_state w = Worker.Shutdown then begin
347+
Worker.join w;
348+
acc
349+
end else w :: acc) [] pool
350+
351+
let incr queues =
352+
debug "Adding a new worker to the thread pool";
353+
Mutex.execute m
354+
(fun () ->
355+
pool := gc queues !pool;
356+
if not(find_one queues Worker.restart !pool)
357+
then pool := (Worker.create queues) :: !pool
358+
)
359+
360+
let decr queues =
361+
debug "Removing a worker from the thread pool";
362+
Mutex.execute m
363+
(fun () ->
364+
pool := gc queues !pool;
365+
if not(find_one queues Worker.shutdown !pool)
366+
then debug "There are no worker threads left to shutdown."
367+
)
368+
369+
let start size =
370+
for i = 1 to size do
371+
incr Redirector.default;
372+
incr Redirector.parallel_queues
373+
done
374+
375+
let set_size size =
376+
let inner queues =
377+
let active = count_active queues in
378+
debug "XXX active = %d" active;
379+
for i = 1 to max 0 (size - active) do
380+
incr queues
381+
done;
382+
for i = 1 to max 0 (active - size) do
383+
decr queues
384+
done
385+
in
386+
inner Redirector.default;
387+
inner Redirector.parallel_queues
388+
end

0 commit comments

Comments
 (0)