3030(* * Inside the tar we divide each VDI into small 'chunk_size' blocks: *)
3131let chunk_size = Int64. mul 1024L 1024L (* 1 MiB *)
3232
33+ (* * Maximum range for sparseness query *)
34+ let max_sparseness_size = Int64. mul 10240L chunk_size
35+
3336let checksum_extension = " .checksum"
3437
3538type vdi = string (* directory prefix in tar file *) * API .ref_VDI * Int64 .t (* size to send/recieve *)
@@ -41,7 +44,7 @@ let with_open_vdi __context rpc session_id vdi_ref mode flags perms f =
4144 (fun dom0_path ->
4245 debug " with_open_vdi opening: %s" dom0_path;
4346 let ofd = Unix. openfile dom0_path flags perms in
44- Pervasiveext. finally (fun () -> f ofd) (fun () -> Unix. close ofd))
47+ Pervasiveext. finally (fun () -> f ofd dom0_path ) (fun () -> Unix. close ofd))
4548
4649(* * Used to sort VDI prefixes into a canonical order for streaming. Currently lexicographic
4750 sort on the externalised reference (used as a 'directory name') *)
@@ -100,6 +103,105 @@ let write_block ~__context filename buffer ofd len =
100103 then raise (Api_errors. Server_error (Api_errors. client_error, [ExnHelper. string_of_exn e]))
101104 else raise e
102105
106+ let get_device_numbers path =
107+ let rdev = (Unix.LargeFile. stat path).Unix.LargeFile. st_rdev in
108+ let major = rdev / 256 and minor = rdev mod 256 in
109+ (major, minor)
110+
111+ let is_nbd_device path =
112+ let nbd_device_num = 43 in
113+ let (major, _) = get_device_numbers path in
114+ major = nbd_device_num
115+
116+ type nbd_connect_info =
117+ { path : string
118+ ; exportname : string
119+ } [@@ deriving rpc ]
120+
121+ let get_nbd_device path =
122+ let nbd_device_prefix = " /dev/nbd" in
123+ if Astring.String. is_prefix ~affix: nbd_device_prefix path && is_nbd_device path then begin
124+ let nbd_number =
125+ Astring.String. with_range ~first: (String. length nbd_device_prefix) path
126+ in
127+ let { path; exportname } =
128+ (* persistent_nbd_info_dir is written from nbd_client_manager.py as part of VBD plug*)
129+ let persistent_nbd_info_dir = " /var/run/nonpersistent/nbd" in
130+ let filename = persistent_nbd_info_dir ^ " /" ^ nbd_number in
131+ Xapi_stdext_unix.Unixext. string_of_file filename
132+ |> Jsonrpc. of_string
133+ |> nbd_connect_info_of_rpc
134+ in
135+ Some (path, exportname)
136+ end else None
137+
138+ type extent = {
139+ flags : int32 ;
140+ length : int64 ;
141+ } [@@ deriving rpc ]
142+
143+ type extent_list = extent list [@@ deriving rpc ]
144+
145+ (* Flags for extents returned for the base:allocation NBD metadata context,
146+ documented at https://github.com/NetworkBlockDevice/nbd/blob/master/doc/proto.md *)
147+ let flag_hole = 1l
148+ let flag_zero = 2l
149+
150+ (* * [descriptor_list] should be a list of non-overlapping extents, ordered from
151+ lowest offset to highest.
152+ [offset] is the current offset needed to translate from relative extents to
153+ absolute chunk numbers *)
154+ let get_chunk_numbers_in_increasing_order descriptor_list offset =
155+ (* Output increasing range includes start and end points *)
156+ let rec range acc start_chunk end_chunk =
157+ if end_chunk < start_chunk then acc
158+ else range (end_chunk::acc) start_chunk Int64. (add end_chunk minus_one)
159+ in
160+
161+ let is_empty e =
162+ let has_flag flag =
163+ Int32. logand e.flags flag = flag
164+ in
165+ (* We assume the destination is prezeroed, so we do not have to copy zeroed extents *)
166+ (has_flag flag_hole) || (has_flag flag_zero)
167+ in
168+
169+ (* Output increasing chunks numbers covered by this descriptor if it is not empty *)
170+ let get_non_empty_chunks descriptor offset increment =
171+ match (is_empty descriptor) with
172+ | false ->
173+ let start_chunk = Int64. div offset increment in
174+ (* If the chunk ends at the boundary don't include the next chunk *)
175+ let end_chunk = Int64. (div (add (add descriptor.length offset) minus_one) increment) in
176+ let chunks = range [] start_chunk end_chunk in
177+ chunks, descriptor.length
178+ | true -> [] , descriptor.length
179+ in
180+
181+ (* Only compare to most recent chunk added *)
182+ let add_new acc b =
183+ match acc with
184+ | hd :: _ when Int64. equal b hd -> acc
185+ | _ -> b::acc
186+ in
187+
188+ (* Output decreasing chunk numbers, x should be increasing and acc should be decreasing *)
189+ let rec add_unique_chunks acc = function
190+ | [] -> acc
191+ | x ::xs -> add_unique_chunks (add_new acc x) xs
192+ in
193+
194+ (* This works in reverse order *)
195+ let rec process acc offset = function
196+ | [] -> acc
197+ | x ::xs ->
198+ let chunks, add_offset = get_non_empty_chunks x offset chunk_size in
199+ process (add_unique_chunks acc chunks) (Int64. add offset add_offset) xs
200+ in
201+
202+ let chunks = process [] offset descriptor_list in
203+ List. rev chunks
204+
103205
104206(* * Stream a set of VDIs split into chunks in a tar format in a defined order. Return an
105207 association list mapping tar filename -> string (containing the SHA1 checksums) *)
@@ -108,58 +210,98 @@ let send_all refresh_session ofd ~__context rpc session_id (prefix_vdis: vdi lis
108210
109211 let progress = new_progress_record __context prefix_vdis in
110212
111- (* Remember when we last wrote something so that we can work around firewalls which close 'idle' connections *)
112- let last_transmission_time = ref 0. in
113-
114213 let send_one ofd (__context :Context.t ) (prefix , vdi_ref , size ) =
115214 let size = Db.VDI. get_virtual_size ~__context ~self: vdi_ref in
215+ let reusable_buffer = Bytes. make (Int64. to_int chunk_size) '\000' in
116216
117217 with_open_vdi __context rpc session_id vdi_ref `RO [Unix. O_RDONLY ] 0o644
118- (fun ifd ->
119-
120- let reusable_buffer = Bytes. make (Int64. to_int chunk_size) '\000' in
121-
122- (* NB. It used to be that chunks could be larger than a native int *)
123- (* could handle, but this is no longer the case! Ensure all chunks *)
124- (* are strictly less than 2^30 bytes *)
125- let rec stream_from (chunk_no : int ) (offset : int64 ) =
126- refresh_session () ;
127- let remaining = Int64. sub size offset in
128- if remaining > 0L
129- then
130- begin
131- let this_chunk = (min remaining chunk_size) in
132- let last_chunk = this_chunk = remaining in
133- let this_chunk = Int64. to_int this_chunk in
134- let filename = Printf. sprintf " %s/%08d" prefix chunk_no in
135-
136- let now = Unix. gettimeofday () in
137- let time_since_transmission = now -. ! last_transmission_time in
138-
139- (* We always include the first and last blocks *)
140- let first_or_last = chunk_no = 0 || last_chunk in
141-
142- if time_since_transmission > 5. && not first_or_last then begin
143- last_transmission_time := now;
144- write_block ~__context filename Bytes. empty ofd 0 ;
145- (* no progress has been made *)
146- stream_from (chunk_no + 1 ) offset
147- end else begin
148- let buffer = if (Int64. of_int this_chunk) = chunk_size
149- then reusable_buffer
150- else Bytes. make this_chunk '\000'
151- in
152- Unixext. really_read ifd buffer 0 this_chunk;
153- if not (Zerocheck. is_all_zeros (Bytes. unsafe_to_string buffer) this_chunk) || first_or_last then begin
154- last_transmission_time := now;
155- write_block ~__context filename buffer ofd this_chunk;
156- end ;
157- made_progress __context progress (Int64. of_int this_chunk);
158- stream_from (chunk_no + 1 ) (Int64. add offset chunk_size);
159- end
160- end
161- in
162- stream_from 0 0L );
218+ (fun ifd dom0_path ->
219+ (match get_nbd_device dom0_path with
220+ | None ->
221+ (* Remember when we last wrote something so that we can work around firewalls which close 'idle' connections *)
222+ let last_transmission_time = ref 0. in
223+ (* NB. It used to be that chunks could be larger than a native int *)
224+ (* could handle, but this is no longer the case! Ensure all chunks *)
225+ (* are strictly less than 2^30 bytes *)
226+ let rec stream_from (chunk_no : int ) (offset : int64 ) =
227+ refresh_session () ;
228+ let remaining = Int64. sub size offset in
229+ if remaining > 0L
230+ then
231+ begin
232+ let this_chunk = (min remaining chunk_size) in
233+ let last_chunk = this_chunk = remaining in
234+ let this_chunk = Int64. to_int this_chunk in
235+ let filename = Printf. sprintf " %s/%08d" prefix chunk_no in
236+
237+ let now = Unix. gettimeofday () in
238+ let time_since_transmission = now -. ! last_transmission_time in
239+
240+ (* We always include the first and last blocks *)
241+ let first_or_last = chunk_no = 0 || last_chunk in
242+
243+ if time_since_transmission > 5. && not first_or_last then begin
244+ last_transmission_time := now;
245+ write_block ~__context filename Bytes. empty ofd 0 ;
246+ (* no progress has been made *)
247+ stream_from (chunk_no + 1 ) offset
248+ end else begin
249+ let buffer = if (Int64. of_int this_chunk) = chunk_size
250+ then reusable_buffer
251+ else Bytes. make this_chunk '\000'
252+ in
253+ Unixext. really_read ifd buffer 0 this_chunk;
254+ if not (Zerocheck. is_all_zeros (Bytes. unsafe_to_string buffer) this_chunk) || first_or_last then begin
255+ last_transmission_time := now;
256+ write_block ~__context filename buffer ofd this_chunk;
257+ end ;
258+ made_progress __context progress (Int64. of_int this_chunk);
259+ stream_from (chunk_no + 1 ) (Int64. add offset chunk_size);
260+ end
261+ end
262+ in
263+ stream_from 0 0L
264+ | Some (path , exportname ) ->
265+ let last_transmission_time = ref 0L in
266+ let actually_write_chunk (this_chunk_no : int ) (this_chunk_size : int ) =
267+ let buffer = if this_chunk_size = Int64. to_int chunk_size
268+ then reusable_buffer
269+ else Bytes. make this_chunk_size '\000'
270+ in
271+ let filename = Printf. sprintf " %s/%08d" prefix this_chunk_no in
272+ Unix.LargeFile. lseek ifd (Int64. mul (Int64. of_int this_chunk_no) chunk_size) Unix. SEEK_SET |> ignore;
273+ Unixext. really_read ifd buffer 0 this_chunk_size;
274+ last_transmission_time := Mtime_clock. now_ns () ;
275+ write_block ~__context filename buffer ofd this_chunk_size;
276+ made_progress __context progress (Int64. of_int this_chunk_size);
277+ in
278+ let rec stream_from_offset (offset : int64 ) =
279+ let remaining = Int64. sub size offset in
280+ if remaining > 0L then begin
281+ let this_chunk_size = min (Int64. to_int chunk_size) (Int64. to_int remaining) in
282+ let this_chunk_no = Int64. div offset chunk_size in
283+ let now = Mtime_clock. now_ns () in
284+ let time_since_transmission = Int64. sub now ! last_transmission_time in
285+ if offset = 0L || remaining < = chunk_size || time_since_transmission > 5000000000L then begin
286+ actually_write_chunk (Int64. to_int this_chunk_no) this_chunk_size;
287+ stream_from_offset (Int64. add offset (Int64. of_int this_chunk_size))
288+ end else begin
289+ let remaining = Int64. mul (Int64. div (Int64. sub remaining 1L ) chunk_size) chunk_size in
290+ (* Get sparseness for next 10GB or until the end rounded by last chunk, whichever is smaller *)
291+ let sparseness_size = min max_sparseness_size remaining in
292+ let output, _ = Forkhelpers. execute_command_get_output Xapi_globs. get_nbd_extents [" --path" ; path; " --exportname" ; exportname; " --offset" ; Int64. to_string offset; " --length" ; Int64. to_string sparseness_size] in
293+ let extents = extent_list_of_rpc (Jsonrpc. of_string output) in
294+ let chunks = get_chunk_numbers_in_increasing_order extents offset in
295+ List. iter (
296+ fun chunk ->
297+ actually_write_chunk (Int64. to_int chunk) (Int64. to_int chunk_size);
298+ ) chunks;
299+ stream_from_offset (Int64. add offset sparseness_size)
300+ end
301+ end
302+ in
303+ stream_from_offset 0L )
304+ );
163305 debug " Finished streaming VDI" in
164306 for_each_vdi __context (send_one ofd __context) prefix_vdis
165307
@@ -209,7 +351,7 @@ let recv_all_vdi refresh_session ifd (__context:Context.t) rpc session_id ~has_i
209351 debug " begun import of VDI%s preserving sparseness" (if vdi_skip_zeros then " " else " NOT" );
210352
211353 with_open_vdi __context rpc session_id vdi_ref `RW [Unix. O_WRONLY ] 0o644
212- (fun ofd ->
354+ (fun ofd dom0_path ->
213355 let reusable_buffer = Bytes. make (Int64. to_int chunk_size) '\000' in
214356
215357 let rec stream_from (last_suffix : string ) (offset : int64 ) =
@@ -315,7 +457,7 @@ let recv_all_zurich refresh_session ifd (__context:Context.t) rpc session_id pre
315457 (* Open this VDI and stream in all the blocks. Return when hdr represents
316458 a chunk which is not part of this VDI or the end of stream is reached. *)
317459 with_open_vdi __context rpc session_id vdi_ref `RW [Unix. O_WRONLY ] 0o644
318- (fun ofd ->
460+ (fun ofd dom0_path ->
319461 let rec stream_from (last_suffix : string ) = match ! hdr with
320462 | Some hdr ->
321463 refresh_session () ;
0 commit comments