Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 37 additions & 32 deletions ocaml/database/block_device_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -306,38 +306,43 @@ let accept_conn s latest_response_time =
(* Listen on a given socket. Accept a single connection and transfer all the data from it to dest_fd, or raise Timeout if target_response_time happens first. *)
(* Raises NotEnoughSpace if the next write would exceed the available_space. *)
let transfer_data_from_sock_to_fd sock dest_fd available_space target_response_time =
(* Open the data channel *)
let s = listen_on sock in
let data_client = accept_conn s target_response_time in
R.info "Accepted connection on data socket";
ignore_exn (fun () -> Unix.close s);

(* Read all the data from the data channel, writing it straight into the block device, keeping track of accumulated length *)
let total_length = ref 0 in
R.debug "Reading from data socket, writing to the block device...";
let bytes_read = finally
(fun () ->
(* Read data from the client until EOF. Returns the length read. *)
Unixext.read_data_in_chunks (fun chunk len ->
(* Check that there's enough space *)
if available_space - !total_length < len then raise NotEnoughSpace;
(* Otherwise write it *)
Unixext.time_limited_write dest_fd len chunk target_response_time;
total_length := !total_length + len
) ~block_size:16384 data_client
)
(fun () ->
(* Close the connection *)
(* CA-42914: If there was an exception, note that we are forcibly closing the connection when possibly the client (xapi) is still trying to write data. This will cause it to see a 'connection reset by peer' error. *)
R.info "Closing connection on data socket";
try
Unix.shutdown data_client Unix.SHUTDOWN_ALL;
Unix.close data_client
with e ->
R.warn "Exception %s while closing socket" (Printexc.to_string e);
) in
R.debug "Finished reading from data socket";
bytes_read
(* Open the data channel *)
let s = listen_on sock in
try
(* May raise a Timeout exception: CA-106403 *)
let data_client = accept_conn s target_response_time in
R.info "Accepted connection on data socket";
ignore_exn (fun () -> Unix.close s);

(* Read all the data from the data channel, writing it straight into the block device, keeping track of accumulated length *)
let total_length = ref 0 in
R.debug "Reading from data socket, writing to the block device...";
let bytes_read = finally
(fun () ->
(* Read data from the client until EOF. Returns the length read. *)
Unixext.read_data_in_chunks (fun chunk len ->
(* Check that there's enough space *)
if available_space - !total_length < len then raise NotEnoughSpace;
(* Otherwise write it *)
Unixext.time_limited_write dest_fd len chunk target_response_time;
total_length := !total_length + len
) ~block_size:16384 data_client
)
(fun () ->
(* Close the connection *)
(* CA-42914: If there was an exception, note that we are forcibly closing the connection when possibly the client (xapi) is still trying to write data. This will cause it to see a 'connection reset by peer' error. *)
R.info "Closing connection on data socket";
try
Unix.shutdown data_client Unix.SHUTDOWN_ALL;
Unix.close data_client
with e ->
R.warn "Exception %s while closing socket" (Printexc.to_string e);
) in
R.debug "Finished reading from data socket";
bytes_read
with Unixext.Timeout -> (* Raised by accept_conn *)
ignore_exn (fun () -> Unix.close s);
raise Unixext.Timeout

let transfer_database_to_sock sock db_fn target_response_time =
(* Open the data channel *)
Expand Down
7 changes: 4 additions & 3 deletions ocaml/database/redo_log.ml
Original file line number Diff line number Diff line change
Expand Up @@ -747,13 +747,14 @@ let empty log =
let flush_db_to_redo_log db log =
R.info "Flushing database to redo_log [%s]" log.name;
let write_db_to_fd = (fun out_fd -> Db_xml.To.fd out_fd db) in
write_db (Db_cache_types.Manifest.generation (Db_cache_types.Database.manifest db)) write_db_to_fd log
write_db (Db_cache_types.Manifest.generation (Db_cache_types.Database.manifest db)) write_db_to_fd log;
!(log.currently_accessible)

(* Write the given database to all active redo_logs *)
let flush_db_to_all_active_redo_logs db =
R.info "Flushing database to all active redo-logs";
with_active_redo_logs (fun log ->
flush_db_to_redo_log db log)
ignore(flush_db_to_redo_log db log))

(* Write a delta to all active redo_logs *)
let database_callback event db =
Expand Down Expand Up @@ -783,7 +784,7 @@ let database_callback event db =
write_delta (Db_cache_types.Manifest.generation (Db_cache_types.Database.manifest db)) entry
(fun () ->
(* the function which will be invoked if a database write is required instead of a delta *)
flush_db_to_redo_log db log)
ignore(flush_db_to_redo_log db log))
log
)
) to_write
2 changes: 1 addition & 1 deletion ocaml/database/redo_log.mli
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ val empty : redo_log -> unit
(** Invalidate the block device. This means that subsequent attempts to read from the block device will not find anything.
This function is best-effort only and does not raise any exceptions in the case of error. *)

val flush_db_to_redo_log: Db_cache_types.Database.t -> redo_log -> unit
val flush_db_to_redo_log: Db_cache_types.Database.t -> redo_log -> bool
(** Immediately write the given database to the given redo_log instance *)

val flush_db_to_all_active_redo_logs: Db_cache_types.Database.t -> unit
Expand Down
2 changes: 1 addition & 1 deletion ocaml/xapi/xapi_pool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ let sync_database ~__context =
(fun () ->
(* If HA is enabled I'll first try to flush to the LUN *)
let pool = Helpers.get_pool ~__context in
let flushed_to_vdi = Db.Pool.get_ha_enabled ~__context ~self:pool && (Xha_metadata_vdi.flush_database ~__context Xapi_ha.ha_redo_log) in
let flushed_to_vdi = Db.Pool.get_ha_enabled ~__context ~self:pool && (Db_lock.with_lock (fun () -> Xha_metadata_vdi.flush_database ~__context Xapi_ha.ha_redo_log)) in
if flushed_to_vdi
then debug "flushed database to metadata VDI: assuming this is sufficient."
else begin
Expand Down
3 changes: 1 addition & 2 deletions ocaml/xapi/xha_metadata_vdi.ml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,5 @@ open Pervasiveext
(** Attempt to flush the database to the metadata VDI *)
let flush_database ~__context log =
try
Redo_log.flush_db_to_redo_log (Db_ref.get_database (Db_backend.make ())) log;
true
Redo_log.flush_db_to_redo_log (Db_ref.get_database (Db_backend.make ())) log
with _ -> false