@@ -597,7 +597,9 @@ module Impl = struct
597597 let fail = Lwt. fail
598598 let handle_failure = Lwt. catch
599599
600- type context = unit
600+ type context = {
601+ stoppers : (unit Lwt .u ) list
602+ }
601603
602604 let get context () =
603605 fatal_error " get" (VolumeManager. read (fun x -> return (`Ok x)))
@@ -650,6 +652,7 @@ module Impl = struct
650652 VolumeManager. flush_all ()
651653
652654 let shutdown context () =
655+ List. iter (fun u -> Lwt. wakeup u () ) context.stoppers;
653656 VolumeManager. shutdown ()
654657 >> = fun () ->
655658 FreePool. shutdown ()
@@ -658,7 +661,7 @@ module Impl = struct
658661 Lwt_unix. sleep 1.
659662 >> = fun () ->
660663 exit 0 in
661- return ()
664+ return (Unix. getpid () )
662665
663666 module Host = struct
664667 let create context ~name = VolumeManager.Host. create name
@@ -674,64 +677,23 @@ module XenvmServer = Xenvm_interface.ServerM(Impl)
674677
675678open Cohttp_lwt_unix
676679
677- let handler ~info (ch ,conn ) req body =
680+ let handler ~info stoppers (ch ,conn ) req body =
678681 Cohttp_lwt_body. to_string body >> = fun bodystr ->
679- XenvmServer. process () (Jsonrpc. call_of_string bodystr) >> = fun result ->
682+ XenvmServer. process { Impl. stoppers} (Jsonrpc. call_of_string bodystr) >> = fun result ->
680683 Server. respond_string ~status: `OK ~body: (Jsonrpc. string_of_response result) ()
681684
682- let run port sock_path config daemon =
683- let config = Config. t_of_sexp (Sexplib.Sexp. load_sexp config) in
684- let config = { config with Config. listenPort = match port with None -> config.Config. listenPort | Some x -> Some x } in
685- let config = { config with Config. listenPath = match sock_path with None -> config.Config. listenPath | Some x -> Some x } in
686-
687- (* Ideally we would bind our sockets before daemonizing to avoid racing
688- with the next command, but the conduit API doesn't let us pass a socket
689- in. Instead we daemonize in a fork()ed child, and in the parent we wait
690- for a connect() to succeed. *)
691- if Unix. fork () <> 0 then begin
692- let started = ref false in
693- let rec wait remaining =
694- if remaining = 0 then begin
695- Printf. fprintf stderr " Failed to communicate with xenvmd: check the configuration and try again.\n %!" ;
696- exit 1 ;
697- end ;
698- begin match config.Config. listenPort with
699- | Some port ->
700- let s = Unix. socket Unix. PF_INET Unix. SOCK_STREAM 0 in
701- (try
702- Unix. connect s (Unix. ADDR_INET (Unix. inet_addr_of_string " 127.0.0.1" , port));
703- Unix. close s;
704- started := true
705- with _ ->
706- Unix. close s)
707- | None -> ()
708- end ;
709- begin match config.Config. listenPath with
710- | Some path ->
711- let s = Unix. socket Unix. PF_UNIX Unix. SOCK_STREAM 0 in
712- (try
713- Unix. connect s (Unix. ADDR_UNIX path);
714- Unix. close s;
715- started := true
716- with e ->
717- Unix. close s)
718- | None -> ()
719- end ;
720- if not ! started then begin
721- Unix. sleep 1 ;
722- wait (remaining - 1 )
723- end in
724- wait 30 ;
725- exit 0
726- end ;
727- if daemon then Lwt_daemon. daemonize () ;
728- ( match config.Config. listenPath with
729- | None ->
685+ let maybe_write_pid config =
686+ match config.Config. listenPath with
687+ | None ->
730688 (* don't need a lock file because we'll fail to bind to the port *)
731- ()
732- | Some path ->
733- info " Writing pidfile to %s" path;
734- Pidfile. write_pid (path ^ " .lock" ) );
689+ ()
690+ | Some path ->
691+ info " Writing pidfile to %s" path;
692+ Pidfile. write_pid (path ^ " .lock" )
693+
694+ let run port sock_path config =
695+ maybe_write_pid config;
696+
735697 let t =
736698 info " Started with configuration: %s" (Sexplib.Sexp. to_string_hum (Config. sexp_of_t config));
737699 VolumeManager. vgopen ~devices: config.Config. devices
@@ -756,7 +718,8 @@ let run port sock_path config daemon =
756718 >> = fun () ->
757719 service_queues () in
758720
759- let service_http mode =
721+ (* See below for a description of 'stoppers' and 'stop' *)
722+ let service_http stoppers mode stop =
760723 let ty = match mode with
761724 | `TCP (`Port x ) -> Printf. sprintf " TCP port %d" x
762725 | `Unix_domain_socket (`File p ) -> Printf. sprintf " Unix domain socket '%s'" p
@@ -765,10 +728,10 @@ let run port sock_path config daemon =
765728 Printf. printf " Listening for HTTP request on: %s\n " ty;
766729 let info = Printf. sprintf " Served by Cohttp/Lwt listening on %s" ty in
767730 let conn_closed (ch ,conn ) = () in
768- let callback = handler ~info in
731+ let callback = handler ~info stoppers in
769732 let c = Server. make ~callback ~conn_closed () in
770733 (* Listen for regular API calls *)
771- Server. create ~mode c in
734+ Server. create ~mode ~stop c in
772735
773736
774737 let tcp_mode =
@@ -787,12 +750,74 @@ let run port sock_path config daemon =
787750 Lwt. return []
788751 end >> = fun unix_mode ->
789752
790- let threads = List. map service_http (tcp_mode @ unix_mode) in
753+ let services = tcp_mode @ unix_mode in
754+
755+ (* stoppers here is a list of type (unit Lwt.u) list, and 'stops'
756+ is a list of type (unit Lwt.t). Each of the listening Cohttp
757+ servers is given one of the 'stop' threads, and the whole
758+ 'stoppers' list is passed to every handler. When a 'shutdown'
759+ is issued, whichever server received the call to shutdown can
760+ use the 'stoppers' list to shutdown each of the listeners so
761+ they no longer react to API calls. *)
762+ let stops,stoppers = List. map (fun _ -> Lwt. wait () ) services |> List. split in
763+ let threads = List. map2 (service_http stoppers) (tcp_mode @ unix_mode) stops in
791764
792765 Lwt. join ((service_queues () )::threads) in
793766
794767 Lwt_main. run t
795768
769+ let daemonize config =
770+ (* Ideally we would bind our sockets before daemonizing to avoid racing
771+ with the next command, but the conduit API doesn't let us pass a socket
772+ in. Instead we daemonize in a fork()ed child, and in the parent we wait
773+ for a connect() to succeed. *)
774+ if Unix. fork () <> 0 then begin
775+ let started = ref false in
776+ let rec wait remaining =
777+ if remaining = 0 then begin
778+ Printf. fprintf stderr " Failed to communicate with xenvmd: check the configuration and try again.\n %!" ;
779+ exit 1 ;
780+ end ;
781+ begin match config.Config. listenPort with
782+ | Some port ->
783+ let s = Unix. socket Unix. PF_INET Unix. SOCK_STREAM 0 in
784+ (try
785+ Unix. connect s (Unix. ADDR_INET (Unix. inet_addr_of_string " 127.0.0.1" , port));
786+ Unix. close s;
787+ started := true
788+ with _ ->
789+ Unix. close s)
790+ | None -> ()
791+ end ;
792+ begin match config.Config. listenPath with
793+ | Some path ->
794+ let s = Unix. socket Unix. PF_UNIX Unix. SOCK_STREAM 0 in
795+ (try
796+ Unix. connect s (Unix. ADDR_UNIX path);
797+ Unix. close s;
798+ started := true
799+ with e ->
800+ Unix. close s)
801+ | None -> ()
802+ end ;
803+ if not ! started then begin
804+ Unix. sleep 1 ;
805+ wait (remaining - 1 )
806+ end in
807+ wait 30 ;
808+ exit 0
809+ end ;
810+ Lwt_daemon. daemonize ()
811+
812+ let main port sock_path config daemon =
813+ let config = Config. t_of_sexp (Sexplib.Sexp. load_sexp config) in
814+ let config = { config with Config. listenPort = match port with None -> config.Config. listenPort | Some x -> Some x } in
815+ let config = { config with Config. listenPath = match sock_path with None -> config.Config. listenPath | Some x -> Some x } in
816+
817+ if daemon then daemonize config;
818+
819+ run port sock_path config
820+
796821open Cmdliner
797822
798823let info =
@@ -826,7 +851,7 @@ let cmd =
826851 `S " EXAMPLES" ;
827852 `P " TODO" ;
828853 ] in
829- Term. (pure run $ port $ sock_path $ config $ daemon),
854+ Term. (pure main $ port $ sock_path $ config $ daemon),
830855 Term. info " xenvmd" ~version: " 0.1" ~doc ~man
831856
832857let _ =
0 commit comments