Skip to content

Commit de557b0

Browse files
committed
CA-361220: xenopsd: introduce TASK.destroy_on_finish
There are certain tasks that are run asynchronously without anyone waiting for the result (e.g. import_metadata_async). Allow setting a flag on these tasks, so that they are cleaned up when finished (either successfully or not). Needed to avoid space leaks due to an ever growing tasks/updates list. Signed-off-by: Edwin Török <[email protected]>
1 parent 9576302 commit de557b0

File tree

3 files changed

+56
-20
lines changed

3 files changed

+56
-20
lines changed

lib/task_server.ml

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ functor
9393
; test_cancel_at: int option (** index of the cancel point to trigger *)
9494
; mutable backtrace: Backtrace.t (** on error, a backtrace *)
9595
; mutable cancellable: bool
96+
; mutable destroy_on_finish: bool
9697
}
9798

9899
and tasks = {
@@ -149,26 +150,44 @@ functor
149150
)
150151
; backtrace= Backtrace.empty
151152
; cancellable= true
153+
; destroy_on_finish= false
152154
}
153155
in
154156
Mutex.execute tasks.m (fun () ->
155157
tasks.task_map := SMap.add t.id t !(tasks.task_map)) ;
156158
t
157159

160+
(* Remove the task from the id -> task mapping. NB any active thread will
161+
still continue. *)
162+
let destroy task =
163+
let tasks = task.tasks in
164+
Mutex.execute tasks.m (fun () ->
165+
tasks.task_map := SMap.remove task.id !(tasks.task_map)
166+
)
167+
168+
let task_finished item =
169+
if item.destroy_on_finish then (
170+
debug "Auto-destroying task %s" item.id ;
171+
destroy item
172+
)
173+
158174
(* [run t] executes the task body, updating the fields of [t] *)
159175
let run item =
160-
try
161-
let start = Unix.gettimeofday () in
162-
let result = item.f item in
163-
let duration = Unix.gettimeofday () -. start in
164-
item.state <- Interface.Task.Completed {Interface.Task.duration; result} ;
165-
debug "Task %s completed; duration = %.0f" item.id duration
166-
with e ->
167-
Backtrace.is_important e ;
168-
error "Task %s failed; %s" item.id (Printexc.to_string e) ;
169-
item.backtrace <- Backtrace.remove e ;
170-
let e = e |> Interface.marshal_exn in
171-
item.state <- Interface.Task.Failed e
176+
( try
177+
let start = Unix.gettimeofday () in
178+
let result = item.f item in
179+
let duration = Unix.gettimeofday () -. start in
180+
item.state <-
181+
Interface.Task.Completed {Interface.Task.duration; result} ;
182+
debug "Task %s completed; duration = %.0f" item.id duration
183+
with e ->
184+
Backtrace.is_important e ;
185+
error "Task %s failed; %s" item.id (Printexc.to_string e) ;
186+
item.backtrace <- Backtrace.remove e ;
187+
let e = e |> Interface.marshal_exn in
188+
item.state <- Interface.Task.Failed e
189+
) ;
190+
task_finished item
172191

173192
let find_locked tasks id =
174193
try SMap.find id !(tasks.task_map)
@@ -219,13 +238,6 @@ functor
219238
Mutex.execute tasks.m (fun () ->
220239
SMap.bindings !(tasks.task_map) |> List.map snd)
221240

222-
(* Remove the task from the id -> task mapping. NB any active thread will
223-
still continue. *)
224-
let destroy task =
225-
let tasks = task.tasks in
226-
Mutex.execute tasks.m (fun () ->
227-
tasks.task_map := SMap.remove task.id !(tasks.task_map))
228-
229241
let cancel task =
230242
let callbacks =
231243
Mutex.execute task.tm (fun () ->
@@ -286,5 +298,19 @@ functor
286298
(* If task is cancelling, just cancel it before setting it to not
287299
cancellable *)
288300
check_cancelling_locked task ;
289-
task.cancellable <- false)
301+
task.cancellable <- false
302+
)
303+
304+
let destroy_on_finish t =
305+
t.destroy_on_finish <- true ;
306+
let already_finished =
307+
Mutex.execute t.tm @@ fun () ->
308+
t.destroy_on_finish <- true ;
309+
match t.state with
310+
| Interface.Task.Pending _ ->
311+
false
312+
| Interface.Task.Completed _ | Interface.Task.Failed _ ->
313+
true
314+
in
315+
if already_finished then task_finished t
290316
end

lib/task_server.mli

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,9 @@ module Task : functor (Interface : INTERFACE) -> sig
113113

114114
(* Set a task not cancellable *)
115115
val prohibit_cancellation : task_handle -> unit
116+
117+
(* When the task finishes automatically destroy it to avoid resource leaks.
118+
Useful for asynchronous tasks that we don't wait for.
119+
*)
120+
val destroy_on_finish : task_handle -> unit
116121
end

xen/xenops_interface.ml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,11 @@ module XenopsAPI (R : RPC) = struct
551551
declare "Task.list"
552552
["List all the current tasks"]
553553
(debug_info_p @-> returning task_list_p err)
554+
555+
let destroy_on_finish =
556+
declare "Task.destroy_on_finish"
557+
["Ensures the task will be destroyed when it finishes"]
558+
(debug_info_p @-> task_id_p @-> returning unit_p err)
554559
end
555560

556561
module HOST = struct

0 commit comments

Comments
 (0)