Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
improve logs and variable name
  • Loading branch information
liamaharon committed Apr 18, 2023
commit f9ff3adecedbb4d0458471d75267ef16ca433ef5
45 changes: 19 additions & 26 deletions utils/frame/remote-externalities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,7 @@ where
/// # Arguments
///
/// * `client` - An `Arc` wrapped `HttpClient` used for making the requests.
/// * `remaining_payloads` - A vector of tuples containing a JSONRPC method name and
/// `ArrayParams`
/// * `payloads` - A vector of tuples containing a JSONRPC method name and `ArrayParams`
/// * `batch_size` - The initial batch size to use for the request. The batch size will be
/// adjusted dynamically in case of failure.
///
Expand Down Expand Up @@ -448,27 +447,26 @@ where
/// }
/// }
/// ```

#[async_recursion]
async fn get_storage_data_dynamic_batch_size(
client: &Arc<HttpClient>,
remaining_payloads: Vec<(String, ArrayParams)>,
payloads: Vec<(String, ArrayParams)>,
batch_size: usize,
) -> Result<Vec<Option<StorageData>>, String> {
// All payloads have been processed
if remaining_payloads.is_empty() {
if payloads.is_empty() {
return Ok(vec![])
};

log::debug!(
target: LOG_TARGET,
"Remaining payloads: {} Batch request size: {}",
remaining_payloads.len(),
payloads.len(),
batch_size,
);

// Payloads to attempt to process this batch
let page = remaining_payloads.iter().take(batch_size).cloned().collect::<Vec<_>>();
let page = payloads.iter().take(batch_size).cloned().collect::<Vec<_>>();

// Build the batch request
let mut batch = BatchRequestBuilder::new();
Expand All @@ -493,7 +491,7 @@ where

return Self::get_storage_data_dynamic_batch_size(
client,
remaining_payloads,
payloads,
(batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize,
)
.await
Expand All @@ -510,11 +508,10 @@ where
}

// Return this data joined with the remaining keys
let remaining_payloads =
remaining_payloads.iter().skip(batch_size).cloned().collect::<Vec<_>>();
let payloads = payloads.iter().skip(batch_size).cloned().collect::<Vec<_>>();
let mut rest = Self::get_storage_data_dynamic_batch_size(
client,
remaining_payloads,
payloads,
// + 1 to ensure we always increase by at least 1
(batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize + 1,
)
Expand Down Expand Up @@ -618,21 +615,6 @@ where
batch_kv.push((key.clone().0, data.0));
},
};

if thread_key_chunk_values.len() % (thread_keys_chunk.len() / 10).max(1) ==
0
{
let ratio: f64 = thread_key_chunk_values.len() as f64 /
thread_keys_chunk.len() as f64;
log::debug!(
target: LOG_TARGET,
"[thread = {:?}] progress = {:.2} [{} / {}]",
std::thread::current().id(),
ratio,
thread_key_chunk_values.len(),
thread_keys_chunk.len(),
);
}
}

// Send this chunk to the main thread to start inserting.
Expand All @@ -651,10 +633,21 @@ where
// `pending_ext`.
let mut terminated = 0usize;
let mut batch_failed = false;
let mut processed = 0usize;
loop {
match rx.next().await.unwrap() {
Message::Batch(kv) => {
for (k, v) in kv {
processed += 1;
if processed % 50_000 == 0 || processed == keys.len() || processed == 1 {
log::info!(
target: LOG_TARGET,
"inserting keys progress = {:.0}% [{} / {}]",
(processed as f32 / keys.len() as f32) * 100f32,
processed,
keys.len(),
);
}
// skip writing the child root data.
if is_default_child_storage_key(k.as_ref()) {
continue
Expand Down