Skip to content

Commit 86c7943

Browse files
authored
Merge pull request #311 from edwintorok/scheduler2
CA-338201/CA-337546: use single persistent pipe per scheduler and Mtime
2 parents 77515fc + afd48dc commit 86c7943

File tree

12 files changed

+165
-186
lines changed

12 files changed

+165
-186
lines changed

lib/dune

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
logs
1515
message-switch-core
1616
message-switch-unix
17+
mtime
18+
mtime.clock.os
1719
ppx_sexp_conv.runtime-lib
1820
re
1921
rpclib.core

lib/scheduler.ml

Lines changed: 136 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -12,107 +12,79 @@
1212
* GNU Lesser General Public License for more details.
1313
*)
1414

15-
let finally f g =
16-
try
17-
let result = f () in
18-
g () ; result
19-
with e -> g () ; raise e
20-
21-
let mutex_execute m f =
22-
Mutex.lock m ;
23-
finally f (fun () -> Mutex.unlock m)
15+
open Xapi_stdext_threads
2416

2517
module D = Debug.Make (struct let name = "scheduler" end)
2618

2719
open D
2820

29-
module Int64Map = Map.Make (struct
30-
type t = int64
31-
32-
let compare = Int64.compare
33-
end)
34-
35-
module Delay = struct
21+
module PipeDelay = struct
3622
(* Concrete type is the ends of a pipe *)
3723
type t = {
3824
(* A pipe is used to wake up a thread blocked in wait: *)
39-
mutable pipe_out: Unix.file_descr option
40-
; mutable pipe_in: Unix.file_descr option
41-
; (* Indicates that a signal arrived before a wait: *)
42-
mutable signalled: bool
43-
; m: Mutex.t
25+
pipe_out: Unix.file_descr
26+
; pipe_in: Unix.file_descr
4427
}
4528

4629
let make () =
47-
{pipe_out= None; pipe_in= None; signalled= false; m= Mutex.create ()}
48-
49-
exception Pre_signalled
30+
let pipe_out, pipe_in = Unix.pipe () in
31+
{pipe_out; pipe_in}
5032

5133
let wait (x : t) (seconds : float) =
5234
let timeout = if seconds < 0.0 then 0.0 else seconds in
53-
let to_close = ref [] in
54-
let close' fd =
55-
if List.mem fd !to_close then Unix.close fd ;
56-
to_close := List.filter (fun x -> fd <> x) !to_close
57-
in
58-
finally
59-
(fun () ->
60-
try
61-
let pipe_out =
62-
mutex_execute x.m (fun () ->
63-
if x.signalled then (
64-
x.signalled <- false ;
65-
raise Pre_signalled
66-
) ;
67-
let pipe_out, pipe_in = Unix.pipe () in
68-
(* these will be unconditionally closed on exit *)
69-
to_close := [pipe_out; pipe_in] ;
70-
x.pipe_out <- Some pipe_out ;
71-
x.pipe_in <- Some pipe_in ;
72-
x.signalled <- false ;
73-
pipe_out)
74-
in
75-
let r, _, _ = Unix.select [pipe_out] [] [] timeout in
76-
(* flush the single byte from the pipe *)
77-
if r <> [] then ignore (Unix.read pipe_out (Bytes.create 1) 0 1) ;
78-
(* return true if we waited the full length of time, false if we were
79-
woken *)
80-
r = []
81-
with Pre_signalled -> false)
82-
(fun () ->
83-
mutex_execute x.m (fun () ->
84-
x.pipe_out <- None ;
85-
x.pipe_in <- None ;
86-
List.iter close' !to_close))
35+
if Thread.wait_timed_read x.pipe_out timeout then
36+
(* flush the single byte from the pipe *)
37+
let (_ : int) = Unix.read x.pipe_out (Bytes.create 1) 0 1 in
38+
(* return false if we were woken *)
39+
false
40+
else
41+
(* return true if we waited the full length of time, false if we were woken *)
42+
true
8743

8844
let signal (x : t) =
89-
mutex_execute x.m (fun () ->
90-
match x.pipe_in with
91-
| Some fd ->
92-
ignore (Unix.write fd (Bytes.of_string "X") 0 1)
93-
| None ->
94-
x.signalled <- true
95-
(* If the wait hasn't happened yet then store up the signal *))
45+
let (_ : int) = Unix.write x.pipe_in (Bytes.of_string "X") 0 1 in
46+
()
9647
end
9748

98-
type item = {id: int; name: string; fn: unit -> unit}
49+
type handle = Mtime.span * int
50+
51+
type handle_compat = int64 * int [@@deriving rpc]
9952

100-
type handle = int64 * int [@@deriving rpc]
53+
let rpc_of_handle (s, id) = rpc_of_handle_compat (Mtime.Span.to_uint64_ns s, id)
54+
55+
let handle_of_rpc rpc =
56+
let i64, id = handle_compat_of_rpc rpc in
57+
(Mtime.Span.of_uint64_ns i64, id)
58+
59+
module HandleMap = Map.Make (struct
60+
type t = handle
61+
62+
let compare (x1, id1) (x2, id2) =
63+
let c = Mtime.Span.compare x1 x2 in
64+
if c = 0 then
65+
id2 - id1
66+
else
67+
c
68+
end)
69+
70+
type item = {id: int; name: string; fn: unit -> unit}
10171

10272
type t = {
103-
mutable schedule: item list Int64Map.t
104-
; mutable shutdown: bool
105-
; delay: Delay.t
73+
mutable schedule: item HandleMap.t
74+
; delay: PipeDelay.t
10675
; mutable next_id: int
107-
; mutable thread: Thread.t option
10876
; m: Mutex.t
10977
}
11078

111-
type time = Absolute of int64 | Delta of int
79+
type time = Delta of int
11280

11381
(*type t = int64 * int [@@deriving rpc]*)
11482

115-
let now () = Unix.gettimeofday () |> ceil |> Int64.of_float
83+
let time_of_span span = span |> Mtime.Span.to_s |> ceil |> Int64.of_float
84+
85+
let mtime_sub time now = Mtime.Span.abs_diff time now |> time_of_span
86+
87+
let now () = Mtime_clock.elapsed ()
11688

11789
module Dump = struct
11890
type u = {time: int64; thing: string} [@@deriving rpc]
@@ -121,65 +93,51 @@ module Dump = struct
12193

12294
let make s =
12395
let now = now () in
124-
mutex_execute s.m (fun () ->
125-
Int64Map.fold
126-
(fun time xs acc ->
127-
List.map (fun i -> {time= Int64.sub time now; thing= i.name}) xs
128-
@ acc)
96+
Threadext.Mutex.execute s.m (fun () ->
97+
HandleMap.fold
98+
(fun (time, _) i acc ->
99+
{time= mtime_sub time now; thing= i.name} :: acc)
129100
s.schedule [])
130101
end
131102

132-
let one_shot s time (name : string) f =
133-
let time =
134-
match time with
135-
| Absolute x ->
136-
x
137-
| Delta x ->
138-
Int64.(add (of_int x) (now ()))
139-
in
140-
let id =
141-
mutex_execute s.m (fun () ->
142-
let existing = try Int64Map.find time s.schedule with _ -> [] in
143-
let id = s.next_id in
144-
s.next_id <- s.next_id + 1 ;
145-
let item = {id; name; fn= f} in
146-
s.schedule <- Int64Map.add time (item :: existing) s.schedule ;
147-
Delay.signal s.delay ;
148-
id)
149-
in
150-
(time, id)
151-
152-
let cancel s (time, id) =
153-
mutex_execute s.m (fun () ->
154-
let existing =
155-
if Int64Map.mem time s.schedule then
156-
Int64Map.find time s.schedule
157-
else
158-
[]
159-
in
160-
s.schedule <-
161-
Int64Map.add time
162-
(List.filter (fun i -> i.id <> id) existing)
163-
s.schedule)
103+
let mtime_add x t =
104+
let dt = Mtime.(x *. Mtime.s_to_ns |> Int64.of_float |> Span.of_uint64_ns) in
105+
Mtime.Span.add dt t
106+
107+
let one_shot_f s dt (name : string) f =
108+
let time = mtime_add dt (now ()) in
109+
Threadext.Mutex.execute s.m (fun () ->
110+
let id = s.next_id in
111+
s.next_id <- s.next_id + 1 ;
112+
let item = {id; name; fn= f} in
113+
let handle = (time, id) in
114+
s.schedule <- HandleMap.add handle item s.schedule ;
115+
PipeDelay.signal s.delay ;
116+
handle)
117+
118+
let one_shot s (Delta x) name f = one_shot_f s (float x) name f
119+
120+
let cancel s handle =
121+
Threadext.Mutex.execute s.m (fun () ->
122+
s.schedule <- HandleMap.remove handle s.schedule)
164123

165124
let process_expired s =
166125
let t = now () in
167126
let expired =
168-
mutex_execute s.m (fun () ->
169-
let expired, unexpired =
170-
Int64Map.partition (fun t' _ -> t' <= t) s.schedule
171-
in
127+
Threadext.Mutex.execute s.m (fun () ->
128+
let expired, eq, unexpired = HandleMap.split (t, max_int) s.schedule in
129+
assert (eq = None) ;
172130
s.schedule <- unexpired ;
173-
Int64Map.fold (fun _ stuff acc -> acc @ stuff) expired [] |> List.rev)
131+
expired |> HandleMap.to_seq |> Seq.map snd)
174132
in
175133
(* This might take a while *)
176-
List.iter
134+
Seq.iter
177135
(fun i ->
178136
try i.fn ()
179137
with e ->
180138
debug "Scheduler ignoring exception: %s\n%!" (Printexc.to_string e))
181139
expired ;
182-
expired <> []
140+
expired () <> Seq.Nil
183141

184142
(* true if work was done *)
185143

@@ -188,36 +146,72 @@ let rec main_loop s =
188146
()
189147
done ;
190148
let sleep_until =
191-
mutex_execute s.m (fun () ->
192-
try Int64Map.min_binding s.schedule |> fst
193-
with Not_found -> Int64.add 3600L (now ()))
149+
Threadext.Mutex.execute s.m (fun () ->
150+
try HandleMap.min_binding s.schedule |> fst |> fst
151+
with Not_found -> mtime_add 3600. (now ()))
194152
in
195-
let seconds = Int64.sub sleep_until (now ()) in
196-
let (_ : bool) = Delay.wait s.delay (Int64.to_float seconds) in
197-
if s.shutdown then s.thread <- None else main_loop s
198-
199-
let start s =
200-
if s.shutdown then failwith "Scheduler was shutdown" ;
201-
s.thread <- Some (Thread.create main_loop s)
153+
let this = now () in
154+
let seconds =
155+
if Mtime.Span.compare sleep_until this > 0 then
156+
(* be careful that this is absolute difference,
157+
it is never negative! *)
158+
Mtime.Span.(abs_diff sleep_until this |> to_s)
159+
else
160+
0.
161+
in
162+
let (_ : bool) = PipeDelay.wait s.delay seconds in
163+
main_loop s
202164

203-
let make () =
165+
let make_scheduler () =
204166
let s =
205167
{
206-
schedule= Int64Map.empty
207-
; shutdown= false
208-
; delay= Delay.make ()
168+
schedule= HandleMap.empty
169+
; delay= PipeDelay.make ()
209170
; next_id= 0
210171
; m= Mutex.create ()
211-
; thread= None
212172
}
213173
in
214-
start s ; s
215-
216-
let shutdown s =
217-
match s.thread with
218-
| Some th ->
219-
s.shutdown <- true ;
220-
Delay.signal s.delay ;
221-
Thread.join th
222-
| None ->
223-
()
174+
let (_ : Thread.t) = Thread.create main_loop s in
175+
s
176+
177+
let make = make_scheduler
178+
179+
module Delay = struct
180+
type state = Signalled | Timedout
181+
182+
let s = make_scheduler ()
183+
184+
type t = {c: Condition.t; m: Mutex.t; mutable state: state option}
185+
186+
let make () = {c= Condition.create (); m= Mutex.create (); state= None}
187+
188+
let wait t seconds =
189+
Threadext.Mutex.execute t.m (fun () ->
190+
let handle =
191+
one_shot_f s seconds "Delay.wait" (fun () ->
192+
if t.state = None then
193+
t.state <- Some Timedout ;
194+
Condition.broadcast t.c)
195+
in
196+
let rec loop () =
197+
match t.state with
198+
| Some Timedout ->
199+
(* return true if we waited the full length of time *)
200+
true
201+
| Some Signalled ->
202+
(* return false if we were woken, or pre-signalled *)
203+
false
204+
| None ->
205+
(* initial wait or spurious wakeup *)
206+
Condition.wait t.c t.m ; loop ()
207+
in
208+
let result = loop () in
209+
cancel s handle ;
210+
t.state <- None ;
211+
result)
212+
213+
let signal t =
214+
Threadext.Mutex.execute t.m (fun () ->
215+
t.state <- Some Signalled ;
216+
Condition.broadcast t.c)
217+
end

lib/scheduler.mli

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,8 @@ val handle_of_rpc : Rpc.t -> handle
2626
val make : unit -> t
2727
(** Creates a scheduler *)
2828

29-
(** Items can be scheduled at an absolute time (measured in seconds since unix
30-
epoch) or as a delta measured in for seconds from now. *)
31-
type time = Absolute of int64 | Delta of int
32-
33-
val now : unit -> int64
34-
(** Useful for Absolutely scheduled items *)
29+
(** Items can be scheduled as a delta measured in seconds from now. *)
30+
type time = Delta of int
3531

3632
(** This module is for dumping the state of a scheduler *)
3733
module Dump : sig
@@ -51,7 +47,3 @@ val one_shot : t -> time -> string -> (unit -> unit) -> handle
5147

5248
val cancel : t -> handle -> unit
5349
(** Cancel an item *)
54-
55-
val shutdown : t -> unit
56-
(** shutdown a scheduler. Any item currently scheduled will not be executed. The
57-
scheduler cannot be restarted. *)

lib/task_server.ml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
(** @group Xenops *)
1616

1717
open Xapi_stdext_monadic
18-
1918
open Xapi_stdext_pervasives.Pervasiveext
2019
open Xapi_stdext_threads.Threadext
2120

0 commit comments

Comments
 (0)