Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
improve function name
  • Loading branch information
liamaharon committed Apr 18, 2023
commit c4c30b0c8594b588243e4b98fcf7aff8c203964b
92 changes: 59 additions & 33 deletions utils/frame/remote-externalities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ type ChildKeyValues = Vec<(ChildInfo, Vec<KeyValue>)>;

const LOG_TARGET: &str = "remote-ext";
const DEFAULT_HTTP_ENDPOINT: &str = "https://rpc.polkadot.io:443";
// NOTE: increasing this value does not seem to impact speed all that much.
const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000;
/// The snapshot that we store on disk.
#[derive(Decode, Encode)]
struct Snapshot<B: BlockT> {
Expand Down Expand Up @@ -323,6 +321,8 @@ where
const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10;
const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50;
const INITIAL_BATCH_SIZE: usize = 5000;
// NOTE: increasing this value does not seem to impact speed all that much.
const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000;

/// Get the number of threads to use.
fn threads() -> NonZeroUsize {
Expand Down Expand Up @@ -370,7 +370,7 @@ where
.rpc_client()
.storage_keys_paged(
Some(prefix.clone()),
DEFAULT_KEY_DOWNLOAD_PAGE,
Self::DEFAULT_KEY_DOWNLOAD_PAGE,
last_key.clone(),
Some(at),
)
Expand All @@ -383,7 +383,7 @@ where

all_keys.extend(page);

if page_len < DEFAULT_KEY_DOWNLOAD_PAGE as usize {
if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
log::debug!(target: LOG_TARGET, "last page received: {}", page_len);
break all_keys
} else {
Expand All @@ -402,35 +402,56 @@ where
Ok(keys)
}

/// Retrieves storage data from a list of storage keys using an
/// additive-increase/multiplicative-decrease (AIMD) algorithm to manage batch sizes.
/// Fetches storage data from a node using a dynamic batch size.
///
/// This function uses batch requests to query storage data from a given list of storage keys.
/// It utilizes an additive-increase/multiplicative-decrease (AIMD) algorithm to handle the
/// batch request sizes. The algorithm starts with a base batch size and linearly increases it
/// after each successful batch request, or exponentially decreases it when encountering an
/// error.
/// This function adjusts the batch size on the fly to help prevent overwhelming the node with
/// large batch requests, and stay within request size limits enforced by the node.
///
/// # Arguments
///
/// * thread_client - An Arc<HttpClient> representing the WebSocket client.
/// * remaining_keys - A Vec<StorageKey> containing the storage keys to be queried.
/// * batch_size - A usize representing the initial batch request size.
/// * at - A B::Hash representing the block hash at which the storage data should be queried.
/// * `client` - An `Arc` wrapped `HttpClient` used for making the requests.
/// * `remaining_payloads` - A vector of tuples containing a JSONRPC method name and
/// `ArrayParams`
/// * `batch_size` - The initial batch size to use for the request. The batch size will be
/// adjusted dynamically in case of failure.
///
/// # Returns
///
/// * Result<Vec<Option<StorageData>>, String> - A result containing a vector of
/// Option<StorageData> if successful,
/// where each item corresponds to the storage data for each key, or an error message if the
/// operation fails.
/// Returns a `Result` with a vector of `Option<StorageData>`, where each element corresponds to
/// the storage data for the given method and parameters. The result will be an `Err` with a
/// `String` error message if the request fails.
///
/// # Errors
///
/// This function returns an error message as a String in case of any failure during batch
/// request construction, batch request execution, or processing the batch response.
/// This function will return an error if:
/// * The batch request fails and the batch size is less than 2.
/// * There are invalid batch params.
/// * There is an error in the batch response.
///
/// # Example
///
/// ```no_run
/// use your_crate::{get_storage_data_dynamic_batch_size, HttpClient, ArrayParams};
/// use std::sync::Arc;
///
/// async fn example() {
/// let client = Arc::new(HttpClient::new());
/// let payloads = vec![
/// ("some_method".to_string(), ArrayParams::new(vec![])),
/// ("another_method".to_string(), ArrayParams::new(vec![])),
/// ];
/// let batch_size = 10;
///
/// let storage_data = get_storage_data_dynamic_batch_size(client, payloads, batch_size).await;
/// match storage_data {
/// Ok(data) => println!("Storage data: {:?}", data),
/// Err(e) => eprintln!("Error fetching storage data: {}", e),
/// }
/// }
/// ```

#[async_recursion]
async fn get_storage_data_aimd(
async fn get_storage_data_dynamic_batch_size(
client: Arc<HttpClient>,
remaining_payloads: Vec<(String, ArrayParams)>,
batch_size: usize,
Expand Down Expand Up @@ -471,7 +492,7 @@ where
e.to_string()
);

return Self::get_storage_data_aimd(
return Self::get_storage_data_dynamic_batch_size(
client,
remaining_payloads,
(batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize,
Expand All @@ -492,7 +513,7 @@ where
// Return this data joined with the remaining keys
let remaining_payloads =
remaining_payloads.iter().skip(batch_size).cloned().collect::<Vec<_>>();
let mut rest = Self::get_storage_data_aimd(
let mut rest = Self::get_storage_data_dynamic_batch_size(
client,
remaining_payloads,
// + 1 to ensure we always increase by at least 1
Expand Down Expand Up @@ -559,7 +580,7 @@ where
.collect::<Vec<_>>();

let rt = tokio::runtime::Runtime::new().unwrap();
let storage_data = match rt.block_on(Self::get_storage_data_aimd(
let storage_data = match rt.block_on(Self::get_storage_data_dynamic_batch_size(
thread_client,
payloads,
Self::INITIAL_BATCH_SIZE,
Expand Down Expand Up @@ -679,14 +700,19 @@ where
})
.collect::<Vec<_>>();

let storage_data =
match Self::get_storage_data_aimd(client, payloads, Self::INITIAL_BATCH_SIZE).await {
Ok(storage_data) => storage_data,
Err(e) => {
log::error!(target: LOG_TARGET, "batch processing failed: {:?}", e);
return Err("batch processing failed")
},
};
let storage_data = match Self::get_storage_data_dynamic_batch_size(
client,
payloads,
Self::INITIAL_BATCH_SIZE,
)
.await
{
Ok(storage_data) => storage_data,
Err(e) => {
log::error!(target: LOG_TARGET, "batch processing failed: {:?}", e);
return Err("batch processing failed")
},
};

assert_eq!(child_keys_len, storage_data.len());

Expand Down