Skip to content

Commit 2625000

Browse files
committed
Merge pull request #126 from xapi-project/connect-async
Host.connect: now async and idempotent
2 parents 2915f6a + 83b408f commit 2625000

File tree

5 files changed

+141
-80
lines changed

5 files changed

+141
-80
lines changed

cleanup.sh

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,25 @@
11
set -ex
22

33
if [ "$EUID" -ne "0" ]; then
4-
echo "Please run me as uid 0. I need to create a loop device and device mapper devices"
5-
exit 1
4+
echo "I am not running with EUID 0. I will use the mock device mapper interface"
5+
USE_MOCK=1
6+
MOCK_ARG="--mock-devmapper"
7+
else
8+
echo "I am running with EUID 0. I will use the real device mapper interface"
9+
USE_MOCK=0
10+
MOCK_ARG=""
611
fi
7-
LOOP=$(losetup -j bigdisk | cut -f 1 -d ':')
12+
813
./xenvm.native lvchange -an /dev/djstest/live || true
914
#./xenvm.native shutdown /dev/djstest
10-
killall xenvmd.native
11-
dmsetup remove_all
12-
dd if=/dev/zero of=$LOOP bs=1M count=128
13-
losetup -d $LOOP
14-
rm -f localJournal bigdisk *.out
15+
killall xenvmd.native || echo No killable xenvmd
16+
killall local_allocator.native || echo No killable local allocators
17+
18+
if [ "$USE_MOCK" -eq "0" ]; then
19+
dmsetup remove_all
20+
LOOP=$(losetup -j bigdisk | cut -f 1 -d ':')
21+
dd if=/dev/zero of=$LOOP bs=1M count=128
22+
losetup -d $LOOP
23+
fi
24+
25+
rm -f localJournal bigdisk *.out djstest-* dm-mock

idl/xenvm_interface.ml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
exception HostNotCreated
44

5+
exception HostStillConnecting of string
6+
57
let _journal_name = "xenvm_journal"
68

79
external get_lv: name:string -> (Vg_wrapper.t * Lv_wrapper.t) = ""
@@ -42,8 +44,15 @@ type queue = {
4244
suspended: bool;
4345
}
4446

47+
type connection_state =
48+
| Resuming_to_LVM
49+
| Resending_free_blocks
50+
| Connected
51+
| Failed of string
52+
4553
type host = {
4654
name: string;
55+
connection_state: connection_state option;
4756
fromLVM: queue;
4857
toLVM: queue;
4958
freeExtents: int64;

setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ sleep 2
5555
./xenvm.native host-create /dev/djstest host1 --configdir /tmp/xenvm.d $MOCK_ARG
5656
./xenvm.native host-connect /dev/djstest host1 --configdir /tmp/xenvm.d $MOCK_ARG
5757
cat test.local_allocator.conf.in | sed -r "s|@BIGDISK@|$LOOP|g" | sed -r "s|@HOST@|host1|g" > test.local_allocator.host1.conf
58-
./local_allocator.native --config ./test.local_allocator.host1.conf $MOCK_ARG > local_allocator.host2.log &
58+
./local_allocator.native --config ./test.local_allocator.host1.conf $MOCK_ARG > local_allocator.host1.log &
5959

6060
sleep 30 # the local allocator daemonizes too soon
6161

xenvm/xenvm.ml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,10 @@ let host_list copts (vg_name,_) =
114114
[ "suspended"; string_of_bool q.suspended ]
115115
] in
116116
let table_of_host h =
117+
let connection_state = [ "state"; match h.connection_state with Some x -> Jsonrpc.to_string (rpc_of_connection_state x) | None -> "None" ] in
117118
let fromLVM = add_prefix "fromLVM" (table_of_queue h.fromLVM) in
118119
let toLVM = add_prefix "toLVM" (table_of_queue h.toLVM) in
119-
fromLVM @ toLVM @ [ [ "freeExtents"; Int64.to_string h.freeExtents ] ] in
120+
[ connection_state ] @ fromLVM @ toLVM @ [ [ "freeExtents"; Int64.to_string h.freeExtents ] ] in
120121
List.map (fun h -> add_prefix h.name (table_of_host h)) hosts
121122
|> List.concat
122123
|> print_table true [ "key"; "value" ]

xenvmd/xenvmd.ml

Lines changed: 110 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -266,69 +266,101 @@ module VolumeManager = struct
266266
) >>= fun () ->
267267
sync ()
268268
end
269-
269+
270+
let sexp_of_exn e = Sexplib.Sexp.Atom (Printexc.to_string e)
271+
272+
let host_connections = Hashtbl.create 7
273+
270274
let connect name =
271275
myvg >>= fun vg ->
272276
info "Registering host %s" name;
273277
let toLVM = toLVM name in
274278
let fromLVM = fromLVM name in
275279
let freeLVM = freeLVM name in
276-
if List.mem_assoc name !to_LVMs then begin
277-
info "Host-specific volumes (%s, %s, %s) already connected" toLVM fromLVM freeLVM;
280+
281+
let try_again =
282+
if Hashtbl.mem host_connections name then begin
283+
match Hashtbl.find host_connections name with
284+
| Xenvm_interface.Failed msg ->
285+
info "Connection to host %s has failed with %s: retrying" name msg;
286+
true
287+
| x ->
288+
info "Connction to host %s in state %s" name (Jsonrpc.to_string (Xenvm_interface.rpc_of_connection_state x));
289+
false
290+
end else true in
291+
292+
if not try_again then begin
278293
return ()
279294
end else begin
280-
( try
281-
Lwt.return (Lvm.Vg.LVs.find_by_name freeLVM (Vg_IO.metadata_of vg).Lvm.Vg.lvs).Lvm.Lv.id
282-
with _ ->
283-
fail Xenvm_interface.HostNotCreated ) >>= fun freeLVMid ->
284-
( match Vg_IO.find vg toLVM with
285-
| Some lv -> return lv
286-
| None -> assert false ) >>= fun v ->
287-
Vg_IO.Volume.connect v
288-
>>= function
289-
| `Error _ -> fail (Failure (Printf.sprintf "Failed to open %s" toLVM))
290-
| `Ok disk ->
291-
ToLVM.attach ~name ~disk ()
292-
>>= fun to_LVM ->
293-
ToLVM.state to_LVM
294-
>>= fun state ->
295-
debug "ToLVM queue is currently %s" (match state with `Running -> "Running" | `Suspended -> "Suspended");
296-
ToLVM.resume to_LVM
297-
>>= fun () ->
298-
( match Vg_IO.find vg fromLVM with
299-
| Some lv -> return lv
300-
| None -> assert false ) >>= fun v ->
301-
Vg_IO.Volume.connect v
302-
>>= function
303-
| `Error _ -> fail (Failure (Printf.sprintf "Failed to open %s" fromLVM))
304-
| `Ok disk ->
305-
FromLVM.attach ~name ~disk ()
306-
>>= fun (initial_state, from_LVM) ->
307-
( if initial_state = `Suspended then begin
308-
debug "The FromLVM queue was already suspended: resending the free blocks";
309-
( match Vg_IO.find vg freeLVM with
310-
| Some lv -> return lv
311-
| None -> assert false ) >>= fun lv ->
312-
let allocation = Lvm.Lv.to_allocation (Vg_IO.Volume.metadata_of lv) in
313-
FromLVM.push from_LVM allocation
314-
>>= fun pos ->
315-
FromLVM.advance from_LVM pos
316-
>>= fun () ->
317-
debug "Free blocks pushed";
318-
return ()
319-
end else begin
320-
debug "The FromLVM queue was running: no need to resend the free blocks";
295+
match Vg_IO.find vg toLVM, Vg_IO.find vg fromLVM, Vg_IO.find vg freeLVM with
296+
| Some toLVM_id, Some fromLVM_id, Some freeLVM_id ->
297+
Hashtbl.replace host_connections name Xenvm_interface.Resuming_to_LVM;
298+
let background_t () =
299+
Vg_IO.Volume.connect toLVM_id
300+
>>= function
301+
| `Error _ -> fail (Failure (Printf.sprintf "Failed to open %s" toLVM))
302+
| `Ok disk ->
303+
ToLVM.attach ~name ~disk ()
304+
>>= fun toLVM_q ->
305+
ToLVM.state toLVM_q
306+
>>= fun state ->
307+
debug "ToLVM queue is currently %s" (match state with `Running -> "Running" | `Suspended -> "Suspended");
308+
ToLVM.resume toLVM_q
309+
>>= fun () ->
310+
311+
312+
Vg_IO.Volume.connect fromLVM_id
313+
>>= function
314+
| `Error _ -> fail (Failure (Printf.sprintf "Failed to open %s" fromLVM))
315+
| `Ok disk ->
316+
FromLVM.attach ~name ~disk ()
317+
>>= fun (initial_state, fromLVM_q) ->
318+
( if initial_state = `Suspended then begin
319+
Hashtbl.replace host_connections name Xenvm_interface.Resending_free_blocks;
320+
321+
debug "The FromLVM queue was already suspended: resending the free blocks";
322+
let allocation = Lvm.Lv.to_allocation (Vg_IO.Volume.metadata_of freeLVM_id) in
323+
FromLVM.push fromLVM_q allocation
324+
>>= fun pos ->
325+
FromLVM.advance fromLVM_q pos
326+
>>= fun () ->
327+
debug "Free blocks pushed";
328+
return ()
329+
end else begin
330+
debug "The FromLVM queue was running: no need to resend the free blocks";
331+
return ()
332+
end )
333+
>>= fun () ->
334+
debug "querying state";
335+
FromLVM.state fromLVM_q
336+
>>= fun state ->
337+
debug "FromLVM queue is currently %s" (match state with `Running -> "Running" | `Suspended -> "Suspended");
338+
return (toLVM_q, fromLVM_q, freeLVM_id) in
339+
340+
(* Run the blocking stuff in the background *)
341+
Lwt.async
342+
(fun () ->
343+
Lwt.catch
344+
(fun () ->
345+
background_t ()
346+
>>= fun (toLVM_q, fromLVM_q, freeLVM_id) ->
347+
Hashtbl.replace host_connections name Xenvm_interface.Connected;
348+
to_LVMs := (name, toLVM_q) :: !to_LVMs;
349+
from_LVMs := (name, fromLVM_q) :: !from_LVMs;
350+
let freeLVM_uuid = (Vg_IO.Volume.metadata_of freeLVM_id).Lvm.Lv.id in
351+
free_LVs := (name, (freeLVM,freeLVM_uuid)) :: !free_LVs;
352+
return ()
353+
) (fun e ->
354+
let msg = Printexc.to_string e in
355+
error "Connecting to %s failed with: %s" name msg;
356+
Hashtbl.replace host_connections name (Xenvm_interface.Failed msg);
357+
return ())
358+
);
321359
return ()
322-
end )
323-
>>= fun () ->
324-
debug "querying state";
325-
FromLVM.state from_LVM
326-
>>= fun state ->
327-
debug "FromLVM queue is currently %s" (match state with `Running -> "Running" | `Suspended -> "Suspended");
328-
to_LVMs := (name, to_LVM) :: !to_LVMs;
329-
from_LVMs := (name, from_LVM) :: !from_LVMs;
330-
free_LVs := (name, (freeLVM,freeLVMid)) :: !free_LVs;
331-
return ()
360+
| _, _, _ ->
361+
info "At least one of host %s's volumes does not exist" name;
362+
Hashtbl.remove host_connections name;
363+
fail Xenvm_interface.HostNotCreated
332364
end
333365

334366
(* Hold this mutex when actively flushing from the ToLVM queues *)
@@ -359,21 +391,25 @@ module VolumeManager = struct
359391
end
360392

361393
let disconnect name =
362-
if not(List.mem_assoc name !to_LVMs)
363-
then return () (* already disconnected *)
364-
else
365-
let to_lvm = List.assoc name !to_LVMs in
366-
debug "Suspending ToLVM queue for %s" name;
367-
ToLVM.suspend to_lvm
368-
>>= fun () ->
369-
(* There may still be updates in the ToLVM queue *)
370-
Lwt_mutex.with_lock flush_m (fun () -> flush_already_locked name)
371-
>>= fun () ->
372-
debug "ToLVM queue for %s has been suspended and flushed" name;
373-
to_LVMs := List.filter (fun (n, _) -> n <> name) !to_LVMs;
374-
from_LVMs := List.filter (fun (n, _) -> n <> name) !from_LVMs;
375-
free_LVs := List.filter (fun (n, _) -> n <> name) !free_LVs;
376-
return ()
394+
if Hashtbl.mem host_connections name then begin
395+
match Hashtbl.find host_connections name with
396+
| Xenvm_interface.Connected ->
397+
let to_lvm = List.assoc name !to_LVMs in
398+
debug "Suspending ToLVM queue for %s" name;
399+
ToLVM.suspend to_lvm
400+
>>= fun () ->
401+
(* There may still be updates in the ToLVM queue *)
402+
Lwt_mutex.with_lock flush_m (fun () -> flush_already_locked name)
403+
>>= fun () ->
404+
debug "ToLVM queue for %s has been suspended and flushed" name;
405+
to_LVMs := List.filter (fun (n, _) -> n <> name) !to_LVMs;
406+
from_LVMs := List.filter (fun (n, _) -> n <> name) !from_LVMs;
407+
free_LVs := List.filter (fun (n, _) -> n <> name) !free_LVs;
408+
Hashtbl.remove host_connections name;
409+
return ()
410+
| x ->
411+
fail (Xenvm_interface.(HostStillConnecting (Jsonrpc.to_string (rpc_of_connection_state x))))
412+
end else return ()
377413

378414
let destroy name =
379415
disconnect name
@@ -412,7 +448,11 @@ module VolumeManager = struct
412448
return (Lvm.Lv.size_in_extents lv)
413449
with Not_found -> return 0L
414450
) >>= fun freeExtents ->
415-
return { Xenvm_interface.name; fromLVM; toLVM; freeExtents }
451+
let connection_state =
452+
if Hashtbl.mem host_connections name
453+
then Some (Hashtbl.find host_connections name)
454+
else None in
455+
return { Xenvm_interface.name; connection_state; fromLVM; toLVM; freeExtents }
416456
) !to_LVMs
417457
end
418458

0 commit comments

Comments
 (0)