@@ -266,69 +266,101 @@ module VolumeManager = struct
266266 ) >> = fun () ->
267267 sync ()
268268 end
269-
269+
270+ let sexp_of_exn e = Sexplib.Sexp. Atom (Printexc. to_string e)
271+
272+ type connection_state =
273+ | Resuming_to_LVM
274+ | Resending_free_blocks
275+ | Connected
276+ | Failed of exn
277+ with sexp_of
278+
279+ let host_connections = Hashtbl. create 7
280+
270281 let connect name =
271282 myvg >> = fun vg ->
272283 info " Registering host %s" name;
273284 let toLVM = toLVM name in
274285 let fromLVM = fromLVM name in
275286 let freeLVM = freeLVM name in
276- if List. mem_assoc name ! to_LVMs then begin
277- info " Host-specific volumes (%s, %s, %s) already connected" toLVM fromLVM freeLVM;
287+
288+ let try_again =
289+ if Hashtbl. mem host_connections name then begin
290+ match Hashtbl. find host_connections name with
291+ | Failed exn ->
292+ info " Connection to host %s has failed with %s: retrying" name (Printexc. to_string exn );
293+ true
294+ | x ->
295+ info " Connction to host %s in state %s" name (Sexplib.Sexp. to_string (sexp_of_connection_state x));
296+ false
297+ end else true in
298+
299+ if not try_again then begin
278300 return ()
279301 end else begin
280- ( try
281- Lwt. return (Lvm.Vg.LVs. find_by_name freeLVM (Vg_IO. metadata_of vg).Lvm.Vg. lvs).Lvm.Lv. id
282- with _ ->
283- fail Xenvm_interface. HostNotCreated ) >> = fun freeLVMid ->
284- ( match Vg_IO. find vg toLVM with
285- | Some lv -> return lv
286- | None -> assert false ) >> = fun v ->
287- Vg_IO.Volume. connect v
288- >> = function
289- | `Error _ -> fail (Failure (Printf. sprintf " Failed to open %s" toLVM))
290- | `Ok disk ->
291- ToLVM. attach ~name ~disk ()
292- >> = fun to_LVM ->
293- ToLVM. state to_LVM
294- >> = fun state ->
295- debug " ToLVM queue is currently %s" (match state with `Running -> " Running" | `Suspended -> " Suspended" );
296- ToLVM. resume to_LVM
297- >> = fun () ->
298- ( match Vg_IO. find vg fromLVM with
299- | Some lv -> return lv
300- | None -> assert false ) >> = fun v ->
301- Vg_IO.Volume. connect v
302- >> = function
303- | `Error _ -> fail (Failure (Printf. sprintf " Failed to open %s" fromLVM))
304- | `Ok disk ->
305- FromLVM. attach ~name ~disk ()
306- >> = fun (initial_state , from_LVM ) ->
307- ( if initial_state = `Suspended then begin
308- debug " The FromLVM queue was already suspended: resending the free blocks" ;
309- ( match Vg_IO. find vg freeLVM with
310- | Some lv -> return lv
311- | None -> assert false ) >> = fun lv ->
312- let allocation = Lvm.Lv. to_allocation (Vg_IO.Volume. metadata_of lv) in
313- FromLVM. push from_LVM allocation
314- >> = fun pos ->
315- FromLVM. advance from_LVM pos
316- >> = fun () ->
317- debug " Free blocks pushed" ;
318- return ()
319- end else begin
320- debug " The FromLVM queue was running: no need to resend the free blocks" ;
321- return ()
322- end )
323- >> = fun () ->
324- debug " querying state" ;
325- FromLVM. state from_LVM
326- >> = fun state ->
327- debug " FromLVM queue is currently %s" (match state with `Running -> " Running" | `Suspended -> " Suspended" );
328- to_LVMs := (name, to_LVM) :: ! to_LVMs;
329- from_LVMs := (name, from_LVM) :: ! from_LVMs;
330- free_LVs := (name, (freeLVM,freeLVMid)) :: ! free_LVs;
331- return ()
302+ match Vg_IO. find vg toLVM, Vg_IO. find vg fromLVM, Vg_IO. find vg freeLVM with
303+ | Some toLVM_id , Some fromLVM_id , Some freeLVM_id ->
304+ Hashtbl. replace host_connections name Resuming_to_LVM ;
305+ let background_t =
306+ Vg_IO.Volume. connect toLVM_id
307+ >> = function
308+ | `Error _ -> fail (Failure (Printf. sprintf " Failed to open %s" toLVM))
309+ | `Ok disk ->
310+ ToLVM. attach ~name ~disk ()
311+ >> = fun toLVM_q ->
312+ ToLVM. state toLVM_q
313+ >> = fun state ->
314+ debug " ToLVM queue is currently %s" (match state with `Running -> " Running" | `Suspended -> " Suspended" );
315+ ToLVM. resume toLVM_q
316+ >> = fun () ->
317+
318+
319+ Vg_IO.Volume. connect fromLVM_id
320+ >> = function
321+ | `Error _ -> fail (Failure (Printf. sprintf " Failed to open %s" fromLVM))
322+ | `Ok disk ->
323+ FromLVM. attach ~name ~disk ()
324+ >> = fun (initial_state , fromLVM_q ) ->
325+ ( if initial_state = `Suspended then begin
326+ Hashtbl. replace host_connections name Resending_free_blocks ;
327+
328+ debug " The FromLVM queue was already suspended: resending the free blocks" ;
329+ let allocation = Lvm.Lv. to_allocation (Vg_IO.Volume. metadata_of freeLVM_id) in
330+ FromLVM. push fromLVM_q allocation
331+ >> = fun pos ->
332+ FromLVM. advance fromLVM_q pos
333+ >> = fun () ->
334+ debug " Free blocks pushed" ;
335+ return ()
336+ end else begin
337+ debug " The FromLVM queue was running: no need to resend the free blocks" ;
338+ return ()
339+ end )
340+ >> = fun () ->
341+ debug " querying state" ;
342+ FromLVM. state fromLVM_q
343+ >> = fun state ->
344+ debug " FromLVM queue is currently %s" (match state with `Running -> " Running" | `Suspended -> " Suspended" );
345+ return (toLVM_q, fromLVM_q, freeLVM_id) in
346+ Lwt. catch
347+ (fun () ->
348+ background_t
349+ >> = fun (toLVM_q , fromLVM_q , freeLVM_id ) ->
350+ Hashtbl. replace host_connections name Connected ;
351+ to_LVMs := (name, toLVM_q) :: ! to_LVMs;
352+ from_LVMs := (name, fromLVM_q) :: ! from_LVMs;
353+ let freeLVM_uuid = (Vg_IO.Volume. metadata_of freeLVM_id).Lvm.Lv. id in
354+ free_LVs := (name, (freeLVM,freeLVM_uuid)) :: ! free_LVs;
355+ return ()
356+ ) (fun e ->
357+ error " Connecting to %s failed with: %s" name (Printexc. to_string e);
358+ Hashtbl. replace host_connections name (Failed e);
359+ return () )
360+ | _ , _ , _ ->
361+ info " At least one of host %s's volumes does not exist" name;
362+ Hashtbl. remove host_connections name;
363+ fail Xenvm_interface. HostNotCreated
332364 end
333365
334366 (* Hold this mutex when actively flushing from the ToLVM queues *)
0 commit comments