diff --git a/Cargo.lock b/Cargo.lock index c7f74a495d2fe..b316993185862 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -395,6 +395,17 @@ dependencies = [ "futures-lite", ] +[[package]] +name = "async-recursion" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e97ce7de6cf12de5d7226c73f5ba9811622f4db3a5b91b55c53e987e5f91cba" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.14", +] + [[package]] name = "async-stream" version = "0.3.4" @@ -2427,8 +2438,10 @@ dependencies = [ name = "frame-remote-externalities" version = "0.10.0-dev" dependencies = [ + "async-recursion", "frame-support", "futures", + "jsonrpsee", "log", "pallet-elections-phragmen", "parity-scale-codec", @@ -3202,6 +3215,7 @@ dependencies = [ "rustls-native-certs", "tokio", "tokio-rustls", + "webpki-roots", ] [[package]] @@ -3468,6 +3482,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d291e3a5818a2384645fd9756362e6d89cf0541b0b916fa7702ea4a9833608e" dependencies = [ "jsonrpsee-core", + "jsonrpsee-http-client", "jsonrpsee-proc-macros", "jsonrpsee-server", "jsonrpsee-types", @@ -3524,6 +3539,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "jsonrpsee-http-client" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc345b0a43c6bc49b947ebeb936e886a419ee3d894421790c969cc56040542ad" +dependencies = [ + "async-trait", + "hyper", + "hyper-rustls", + "jsonrpsee-core", + "jsonrpsee-types", + "rustc-hash", + "serde", + "serde_json", + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "jsonrpsee-proc-macros" version = "0.16.2" diff --git a/utils/frame/remote-externalities/Cargo.toml b/utils/frame/remote-externalities/Cargo.toml index 8611ae4980f12..d3909af34451b 100644 --- a/utils/frame/remote-externalities/Cargo.toml +++ b/utils/frame/remote-externalities/Cargo.toml @@ -12,6 +12,7 @@ description = "An externalities provided environment that can load itself from r targets = ["x86_64-unknown-linux-gnu"] [dependencies] +jsonrpsee = { version = "0.16.2", features = ["http-client"] } codec = { package = "parity-scale-codec", version = "3.2.2" } log = "0.4.17" serde = "1.0.136" @@ -22,6 +23,7 @@ sp-runtime = { version = "7.0.0", path = "../../../primitives/runtime" } tokio = { version = "1.22.0", features = ["macros", "rt-multi-thread"] } substrate-rpc-client = { path = "../rpc/client" } futures = "0.3" +async-recursion = "1.0.4" [dev-dependencies] frame-support = { version = "4.0.0-dev", path = "../../../frame/support" } diff --git a/utils/frame/remote-externalities/src/lib.rs b/utils/frame/remote-externalities/src/lib.rs index ee342408828de..b60e2bc75d0db 100644 --- a/utils/frame/remote-externalities/src/lib.rs +++ b/utils/frame/remote-externalities/src/lib.rs @@ -20,8 +20,13 @@ //! An equivalent of `sp_io::TestExternalities` that can load its state from a remote substrate //! based chain, or a local state snapshot file. +use async_recursion::async_recursion; use codec::{Decode, Encode}; use futures::{channel::mpsc, stream::StreamExt}; +use jsonrpsee::{ + core::params::ArrayParams, + http_client::{HttpClient, HttpClientBuilder}, +}; use log::*; use serde::de::DeserializeOwned; use sp_core::{ @@ -35,6 +40,7 @@ use sp_core::{ pub use sp_io::TestExternalities; use sp_runtime::{traits::Block as BlockT, StateVersion}; use std::{ + cmp::max, fs, num::NonZeroUsize, ops::{Deref, DerefMut}, @@ -42,19 +48,14 @@ use std::{ sync::Arc, thread, }; -use substrate_rpc_client::{ - rpc_params, ws_client, BatchRequestBuilder, ChainApi, ClientT, StateApi, WsClient, -}; +use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi}; type KeyValue = (StorageKey, StorageData); type TopKeyValues = Vec; type ChildKeyValues = Vec<(ChildInfo, Vec)>; const LOG_TARGET: &str = "remote-ext"; -const DEFAULT_WS_ENDPOINT: &str = "wss://rpc.polkadot.io:443"; -const DEFAULT_VALUE_DOWNLOAD_BATCH: usize = 4096; -// NOTE: increasing this value does not seem to impact speed all that much. -const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000; +const DEFAULT_HTTP_ENDPOINT: &str = "https://rpc.polkadot.io:443"; /// The snapshot that we store on disk. #[derive(Decode, Encode)] struct Snapshot { @@ -117,36 +118,51 @@ pub struct OfflineConfig { pub enum Transport { /// Use the `URI` to open a new WebSocket connection. Uri(String), - /// Use existing WebSocket connection. - RemoteClient(Arc), + /// Use HTTP connection. + RemoteClient(Arc), } impl Transport { - fn as_client(&self) -> Option<&WsClient> { + fn as_client(&self) -> Option<&HttpClient> { match self { Self::RemoteClient(client) => Some(client), _ => None, } } - fn as_client_cloned(&self) -> Option> { + fn as_client_cloned(&self) -> Option> { match self { Self::RemoteClient(client) => Some(client.clone()), _ => None, } } - // Open a new WebSocket connection if it's not connected. - async fn map_uri(&mut self) -> Result<(), &'static str> { + // Build an HttpClient from a URI. + async fn init(&mut self) -> Result<(), &'static str> { if let Self::Uri(uri) = self { log::debug!(target: LOG_TARGET, "initializing remote client to {:?}", uri); - let ws_client = ws_client(uri).await.map_err(|e| { + // If we have a ws uri, try to convert it to an http uri. + // We use an HTTP client rather than WS because WS starts to choke with "accumulated + // message length exceeds maximum" errors after processing ~10k keys when fetching + // from a node running a default configuration. + let uri = if uri.starts_with("ws://") { + let uri = uri.replace("ws://", "http://"); + log::info!(target: LOG_TARGET, "replacing ws:// in uri with http://: {:?} (ws is currently unstable for fetching remote storage, for more see https://github.com/paritytech/jsonrpsee/issues/1086)", uri); + uri + } else if uri.starts_with("wss://") { + let uri = uri.replace("wss://", "https://"); + log::info!(target: LOG_TARGET, "replacing wss:// in uri with https://: {:?} (ws is currently unstable for fetching remote storage, for more see https://github.com/paritytech/jsonrpsee/issues/1086)", uri); + uri + } else { + uri.clone() + }; + let http_client = HttpClientBuilder::default().build(uri).map_err(|e| { log::error!(target: LOG_TARGET, "error: {:?}", e); - "failed to build ws client" + "failed to build http client" })?; - *self = Self::RemoteClient(Arc::new(ws_client)) + *self = Self::RemoteClient(Arc::new(http_client)) } Ok(()) @@ -159,8 +175,8 @@ impl From for Transport { } } -impl From> for Transport { - fn from(client: Arc) -> Self { +impl From> for Transport { + fn from(client: Arc) -> Self { Transport::RemoteClient(client) } } @@ -189,18 +205,18 @@ pub struct OnlineConfig { } impl OnlineConfig { - /// Return rpc (ws) client reference. - fn rpc_client(&self) -> &WsClient { + /// Return rpc (http) client reference. + fn rpc_client(&self) -> &HttpClient { self.transport .as_client() - .expect("ws client must have been initialized by now; qed.") + .expect("http client must have been initialized by now; qed.") } - /// Return a cloned rpc (ws) client, suitable for being moved to threads. - fn rpc_client_cloned(&self) -> Arc { + /// Return a cloned rpc (http) client, suitable for being moved to threads. + fn rpc_client_cloned(&self) -> Arc { self.transport .as_client_cloned() - .expect("ws client must have been initialized by now; qed.") + .expect("http client must have been initialized by now; qed.") } fn at_expected(&self) -> B::Hash { @@ -211,7 +227,7 @@ impl OnlineConfig { impl Default for OnlineConfig { fn default() -> Self { Self { - transport: Transport::from(DEFAULT_WS_ENDPOINT.to_owned()), + transport: Transport::from(DEFAULT_HTTP_ENDPOINT.to_owned()), child_trie: true, at: None, state_snapshot: None, @@ -307,10 +323,16 @@ where B::Hash: DeserializeOwned, B::Header: DeserializeOwned, { + const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10; + const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50; + const INITIAL_BATCH_SIZE: usize = 5000; + // NOTE: increasing this value does not seem to impact speed all that much. + const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000; + /// Get the number of threads to use. fn threads() -> NonZeroUsize { thread::available_parallelism() - .unwrap_or(NonZeroUsize::new(4usize).expect("4 is non-zero; qed")) + .unwrap_or(NonZeroUsize::new(1usize).expect("4 is non-zero; qed")) } async fn rpc_get_storage( @@ -352,7 +374,7 @@ where .rpc_client() .storage_keys_paged( Some(prefix.clone()), - DEFAULT_KEY_DOWNLOAD_PAGE, + Self::DEFAULT_KEY_DOWNLOAD_PAGE, last_key.clone(), Some(at), ) @@ -365,7 +387,7 @@ where all_keys.extend(page); - if page_len < DEFAULT_KEY_DOWNLOAD_PAGE as usize { + if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize { log::debug!(target: LOG_TARGET, "last page received: {}", page_len); break all_keys } else { @@ -384,6 +406,123 @@ where Ok(keys) } + /// Fetches storage data from a node using a dynamic batch size. + /// + /// This function adjusts the batch size on the fly to help prevent overwhelming the node with + /// large batch requests, and stay within request size limits enforced by the node. + /// + /// # Arguments + /// + /// * `client` - An `Arc` wrapped `HttpClient` used for making the requests. + /// * `payloads` - A vector of tuples containing a JSONRPC method name and `ArrayParams` + /// * `batch_size` - The initial batch size to use for the request. The batch size will be + /// adjusted dynamically in case of failure. + /// + /// # Returns + /// + /// Returns a `Result` with a vector of `Option`, where each element corresponds to + /// the storage data for the given method and parameters. The result will be an `Err` with a + /// `String` error message if the request fails. + /// + /// # Errors + /// + /// This function will return an error if: + /// * The batch request fails and the batch size is less than 2. + /// * There are invalid batch params. + /// * There is an error in the batch response. + /// + /// # Example + /// + /// ```ignore + /// use your_crate::{get_storage_data_dynamic_batch_size, HttpClient, ArrayParams}; + /// use std::sync::Arc; + /// + /// async fn example() { + /// let client = Arc::new(HttpClient::new()); + /// let payloads = vec![ + /// ("some_method".to_string(), ArrayParams::new(vec![])), + /// ("another_method".to_string(), ArrayParams::new(vec![])), + /// ]; + /// let initial_batch_size = 10; + /// + /// let storage_data = get_storage_data_dynamic_batch_size(client, payloads, batch_size).await; + /// match storage_data { + /// Ok(data) => println!("Storage data: {:?}", data), + /// Err(e) => eprintln!("Error fetching storage data: {}", e), + /// } + /// } + /// ``` + #[async_recursion] + async fn get_storage_data_dynamic_batch_size( + client: &Arc, + payloads: Vec<(String, ArrayParams)>, + batch_size: usize, + ) -> Result>, String> { + // All payloads have been processed + if payloads.is_empty() { + return Ok(vec![]) + }; + + log::debug!( + target: LOG_TARGET, + "Remaining payloads: {} Batch request size: {}", + payloads.len(), + batch_size, + ); + + // Payloads to attempt to process this batch + let page = payloads.iter().take(batch_size).cloned().collect::>(); + + // Build the batch request + let mut batch = BatchRequestBuilder::new(); + for (method, params) in page.iter() { + batch + .insert(method, params.clone()) + .map_err(|_| "Invalid batch method and/or params")? + } + let batch_response = match client.batch_request::>(batch).await { + Ok(batch_response) => batch_response, + Err(e) => { + if batch_size < 2 { + return Err(e.to_string()) + } + + log::debug!( + target: LOG_TARGET, + "Batch request failed, trying again with smaller batch size. {}", + e.to_string() + ); + + return Self::get_storage_data_dynamic_batch_size( + client, + payloads, + max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize), + ) + .await + }, + }; + + // Collect the data from this batch + let mut data: Vec> = vec![]; + for item in batch_response.into_iter() { + match item { + Ok(x) => data.push(x), + Err(e) => return Err(e.message().to_string()), + } + } + + // Return this data joined with the remaining keys + let payloads = payloads.iter().skip(batch_size).cloned().collect::>(); + let mut rest = Self::get_storage_data_dynamic_batch_size( + client, + payloads, + max(batch_size + 1, (batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize), + ) + .await?; + data.append(&mut rest); + Ok(data) + } + /// Synonym of `getPairs` that uses paged queries to first get the keys, and then /// map them to values one by one. /// @@ -428,81 +567,61 @@ where let (tx, mut rx) = mpsc::unbounded::(); for thread_keys in keys_chunked { - let thread_client = client.clone(); let thread_sender = tx.clone(); + let thread_client = client.clone(); let handle = std::thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().unwrap(); - let mut thread_key_values = Vec::with_capacity(thread_keys.len()); - - for chunk_keys in thread_keys.chunks(DEFAULT_VALUE_DOWNLOAD_BATCH) { - let mut batch = BatchRequestBuilder::new(); - - for key in chunk_keys.iter() { - batch - .insert("state_getStorage", rpc_params![key, at]) - .map_err(|_| "Invalid batch params") - .unwrap(); - } - - let batch_response = rt - .block_on(thread_client.batch_request::>(batch)) - .map_err(|e| { - log::error!( - target: LOG_TARGET, - "failed to execute batch: {:?}. Error: {:?}", - chunk_keys.iter().map(HexDisplay::from).collect::>(), - e - ); - "batch failed." - }) - .unwrap(); + // Process the payloads in chunks so each thread can pass kvs back to the main + // thread to start inserting before all of the data has been queried from the node. + // Inserting data takes a very long time, so the earlier it can start the better. + let mut thread_key_values = vec![]; + let chunk_size = thread_keys.len() / 1; + for thread_keys_chunk in thread_keys.chunks(chunk_size) { + let mut thread_key_chunk_values = Vec::with_capacity(thread_keys_chunk.len()); + + let payloads = thread_keys_chunk + .iter() + .map(|key| ("state_getStorage".to_string(), rpc_params!(key, at))) + .collect::>(); + + let rt = tokio::runtime::Runtime::new().unwrap(); + let storage_data = match rt.block_on(Self::get_storage_data_dynamic_batch_size( + &thread_client, + payloads, + Self::INITIAL_BATCH_SIZE, + )) { + Ok(storage_data) => storage_data, + Err(e) => { + thread_sender.unbounded_send(Message::BatchFailed(e)).unwrap(); + return Default::default() + }, + }; // Check if we got responses for all submitted requests. - assert_eq!(chunk_keys.len(), batch_response.len()); + assert_eq!(thread_keys_chunk.len(), storage_data.len()); - let mut batch_kv = Vec::with_capacity(chunk_keys.len()); - for (key, maybe_value) in chunk_keys.into_iter().zip(batch_response) { + let mut batch_kv = Vec::with_capacity(thread_keys_chunk.len()); + for (key, maybe_value) in thread_keys_chunk.iter().zip(storage_data) { match maybe_value { - Ok(Some(data)) => { - thread_key_values.push((key.clone(), data.clone())); + Some(data) => { + thread_key_chunk_values.push((key.clone(), data.clone())); batch_kv.push((key.clone().0, data.0)); }, - Ok(None) => { + None => { log::warn!( target: LOG_TARGET, "key {:?} had none corresponding value.", &key ); let data = StorageData(vec![]); - thread_key_values.push((key.clone(), data.clone())); + thread_key_chunk_values.push((key.clone(), data.clone())); batch_kv.push((key.clone().0, data.0)); }, - Err(e) => { - let reason = format!("key {:?} failed: {:?}", &key, e); - log::error!(target: LOG_TARGET, "Reason: {}", reason); - // Signal failures to the main thread, stop aggregating (key, value) - // pairs and return immediately an error. - thread_sender.unbounded_send(Message::BatchFailed(reason)).unwrap(); - return Default::default() - }, }; - - if thread_key_values.len() % (thread_keys.len() / 10).max(1) == 0 { - let ratio: f64 = - thread_key_values.len() as f64 / thread_keys.len() as f64; - log::debug!( - target: LOG_TARGET, - "[thread = {:?}] progress = {:.2} [{} / {}]", - std::thread::current().id(), - ratio, - thread_key_values.len(), - thread_keys.len(), - ); - } } - // Send this batch to the main thread to start inserting. + // Send this chunk to the main thread to start inserting. thread_sender.unbounded_send(Message::Batch(batch_kv)).unwrap(); + thread_key_values.extend(thread_key_chunk_values); } thread_sender.unbounded_send(Message::Terminated).unwrap(); @@ -516,10 +635,21 @@ where // `pending_ext`. let mut terminated = 0usize; let mut batch_failed = false; + let mut processed = 0usize; loop { match rx.next().await.unwrap() { Message::Batch(kv) => { for (k, v) in kv { + processed += 1; + if processed % 50_000 == 0 || processed == keys.len() || processed == 1 { + log::info!( + target: LOG_TARGET, + "inserting keys progress = {:.0}% [{} / {}]", + (processed as f32 / keys.len() as f32) * 100f32, + processed, + keys.len(), + ); + } // skip writing the child root data. if is_default_child_storage_key(k.as_ref()) { continue @@ -554,73 +684,58 @@ where /// Get the values corresponding to `child_keys` at the given `prefixed_top_key`. pub(crate) async fn rpc_child_get_storage_paged( - client: &WsClient, + client: &Arc, prefixed_top_key: &StorageKey, child_keys: Vec, at: B::Hash, ) -> Result, &'static str> { - let mut child_kv_inner = vec![]; - let mut batch_success = true; - - for batch_child_key in child_keys.chunks(DEFAULT_VALUE_DOWNLOAD_BATCH) { - let mut batch_request = BatchRequestBuilder::new(); - - for key in batch_child_key { - batch_request - .insert( - "childstate_getStorage", - rpc_params![ - PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()), - key, - at - ], - ) - .map_err(|_| "Invalid batch params")?; - } + let child_keys_len = child_keys.len(); - let batch_response = - client.batch_request::>(batch_request).await.map_err(|e| { - log::error!( - target: LOG_TARGET, - "failed to execute batch: {:?}. Error: {:?}", - batch_child_key, - e - ); - "batch failed." - })?; + let payloads = child_keys + .iter() + .map(|key| { + ( + "childstate_getStorage".to_string(), + rpc_params![ + PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()), + key, + at + ], + ) + }) + .collect::>(); - assert_eq!(batch_child_key.len(), batch_response.len()); - - for (key, maybe_value) in batch_child_key.iter().zip(batch_response) { - match maybe_value { - Ok(Some(v)) => { - child_kv_inner.push((key.clone(), v)); - }, - Ok(None) => { - log::warn!( - target: LOG_TARGET, - "key {:?} had none corresponding value.", - &key - ); - child_kv_inner.push((key.clone(), StorageData(vec![]))); - }, - Err(e) => { - log::error!(target: LOG_TARGET, "key {:?} failed: {:?}", &key, e); - batch_success = false; - }, - }; - } - } + let storage_data = match Self::get_storage_data_dynamic_batch_size( + client, + payloads, + Self::INITIAL_BATCH_SIZE, + ) + .await + { + Ok(storage_data) => storage_data, + Err(e) => { + log::error!(target: LOG_TARGET, "batch processing failed: {:?}", e); + return Err("batch processing failed") + }, + }; - if batch_success { - Ok(child_kv_inner) - } else { - Err("batch failed.") - } + assert_eq!(child_keys_len, storage_data.len()); + + Ok(child_keys + .iter() + .zip(storage_data) + .map(|(key, maybe_value)| match maybe_value { + Some(v) => (key.clone(), v), + None => { + log::warn!(target: LOG_TARGET, "key {:?} had no corresponding value.", &key); + (key.clone(), StorageData(vec![])) + }, + }) + .collect::>()) } pub(crate) async fn rpc_child_get_keys( - client: &WsClient, + client: &HttpClient, prefixed_top_key: &StorageKey, child_prefix: StorageKey, at: B::Hash, @@ -678,107 +793,47 @@ where return Ok(Default::default()) } - // div-ceil simulation. - let threads = Self::threads().get(); - let child_roots_per_thread = (child_roots.len() + threads - 1) / threads; - info!( target: LOG_TARGET, - "👩‍👦 scraping child-tree data from {} top keys, split among {} threads, {} top keys per thread", + "👩‍👦 scraping child-tree data from {} top keys", child_roots.len(), - threads, - child_roots_per_thread, ); - // NOTE: the threading done here is the simpler, yet slightly un-elegant because we are - // splitting child root among threads, and it is very common for these root to have vastly - // different child tries underneath them, causing some threads to finish way faster than - // others. Certainly still better than single thread though. - let mut handles = vec![]; - let client = self.as_online().rpc_client_cloned(); let at = self.as_online().at_expected(); - enum Message { - Terminated, - Batch((ChildInfo, Vec<(Vec, Vec)>)), - } - let (tx, mut rx) = mpsc::unbounded::(); - - for thread_child_roots in child_roots - .chunks(child_roots_per_thread) - .map(|x| x.into()) - .collect::>>() - { - let thread_client = client.clone(); - let thread_sender = tx.clone(); - let handle = thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().unwrap(); - let mut thread_child_kv = vec![]; - for prefixed_top_key in thread_child_roots { - let child_keys = rt.block_on(Self::rpc_child_get_keys( - &thread_client, - &prefixed_top_key, - StorageKey(vec![]), - at, - ))?; - let child_kv_inner = rt.block_on(Self::rpc_child_get_storage_paged( - &thread_client, - &prefixed_top_key, - child_keys, - at, - ))?; - - let prefixed_top_key = PrefixedStorageKey::new(prefixed_top_key.clone().0); - let un_prefixed = match ChildType::from_prefixed_key(&prefixed_top_key) { - Some((ChildType::ParentKeyId, storage_key)) => storage_key, - None => { - log::error!(target: LOG_TARGET, "invalid key: {:?}", prefixed_top_key); - return Err("Invalid child key") - }, - }; - - thread_sender - .unbounded_send(Message::Batch(( - ChildInfo::new_default(un_prefixed), - child_kv_inner - .iter() - .cloned() - .map(|(k, v)| (k.0, v.0)) - .collect::>(), - ))) - .unwrap(); - thread_child_kv.push((ChildInfo::new_default(un_prefixed), child_kv_inner)); - } - - thread_sender.unbounded_send(Message::Terminated).unwrap(); - Ok(thread_child_kv) - }); - handles.push(handle); - } - - // first, wait until all threads send a `Terminated` message, in the meantime populate - // `pending_ext`. - let mut terminated = 0usize; - loop { - match rx.next().await.unwrap() { - Message::Batch((info, kvs)) => - for (k, v) in kvs { - pending_ext.insert_child(info.clone(), k, v); - }, - Message::Terminated => { - terminated += 1; - if terminated == handles.len() { - break - } + let arc_client = self.as_online().rpc_client_cloned(); + let mut child_kv = vec![]; + for prefixed_top_key in child_roots { + let child_keys = Self::rpc_child_get_keys( + arc_client.as_ref(), + &prefixed_top_key, + StorageKey(vec![]), + at, + ) + .await?; + + let child_kv_inner = + Self::rpc_child_get_storage_paged(&arc_client, &prefixed_top_key, child_keys, at) + .await?; + + let prefixed_top_key = PrefixedStorageKey::new(prefixed_top_key.clone().0); + let un_prefixed = match ChildType::from_prefixed_key(&prefixed_top_key) { + Some((ChildType::ParentKeyId, storage_key)) => storage_key, + None => { + log::error!(target: LOG_TARGET, "invalid key: {:?}", prefixed_top_key); + return Err("Invalid child key") }, + }; + + let info = ChildInfo::new_default(un_prefixed); + let key_values = + child_kv_inner.iter().cloned().map(|(k, v)| (k.0, v.0)).collect::>(); + child_kv.push((info.clone(), child_kv_inner)); + for (k, v) in key_values { + pending_ext.insert_child(info.clone(), k, v); } } - let child_kv = handles - .into_iter() - .flat_map(|h| h.join().unwrap()) - .flatten() - .collect::>(); Ok(child_kv) } @@ -841,8 +896,8 @@ where /// /// initializes the remote client in `transport`, and sets the `at` field, if not specified. async fn init_remote_client(&mut self) -> Result<(), &'static str> { - // First, initialize the ws client. - self.as_online_mut().transport.map_uri().await?; + // First, initialize the http client. + self.as_online_mut().transport.init().await?; // Then, if `at` is not set, set it. if self.as_online().at.is_none() { diff --git a/utils/frame/try-runtime/cli/src/lib.rs b/utils/frame/try-runtime/cli/src/lib.rs index cd6c48f52f45e..9268ef2edba8b 100644 --- a/utils/frame/try-runtime/cli/src/lib.rs +++ b/utils/frame/try-runtime/cli/src/lib.rs @@ -97,18 +97,6 @@ //! > If anything, in most cases, we expect spec-versions to NOT match, because try-runtime is all //! > about testing unreleased runtimes. //! -//! ## Note on nodes that respond to `try-runtime` requests. -//! -//! There are a number of flags that need to be preferably set on a running node in order to work -//! well with try-runtime's expensive RPC queries: -//! -//! - set `--rpc-max-response-size 1000` and -//! - `--rpc-max-request-size 1000` to ensure connections are not dropped in case the state is -//! large. -//! - set `--rpc-cors all` to ensure ws connections can come through. -//! -//! Note that *none* of the try-runtime operations need unsafe RPCs. -//! //! ## Note on signature and state-root checks //! //! All of the commands calling into `TryRuntime_execute_block` ([`Command::ExecuteBlock`] and