Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions bin/node/cli/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use nix::{
unistd::Pid,
};
use node_primitives::Block;
use remote_externalities::rpc_api;
use remote_externalities::rpc_api::RpcService;
use std::{
io::{BufRead, BufReader, Read},
ops::{Deref, DerefMut},
Expand Down Expand Up @@ -71,9 +71,10 @@ pub async fn wait_n_finalized_blocks(
pub async fn wait_n_finalized_blocks_from(n: usize, url: &str) {
let mut built_blocks = std::collections::HashSet::new();
let mut interval = tokio::time::interval(Duration::from_secs(2));
let mut rpc_service = RpcService::new(url, false);

loop {
if let Ok(block) = rpc_api::get_finalized_head::<Block, _>(url.to_string()).await {
if let Ok(block) = rpc_service.get_finalized_head::<Block>().await {
built_blocks.insert(block);
if built_blocks.len() > n {
break
Expand Down
1 change: 1 addition & 0 deletions utils/frame/remote-externalities/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
codec = { package = "parity-scale-codec", version = "3.0.0" }
env_logger = "0.9"
futures = "0.3.21"
jsonrpsee = { version = "0.15.1", features = ["ws-client", "macros"] }
log = "0.4.17"
serde = "1.0.136"
Expand Down
182 changes: 118 additions & 64 deletions utils/frame/remote-externalities/src/rpc_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,83 +18,137 @@
//! WS RPC API for one off RPC calls to a substrate node.
// TODO: Consolidate one off RPC calls https://github.com/paritytech/substrate/issues/8988

use futures::lock::Mutex;
use jsonrpsee::{
core::client::ClientT,
core::client::{Client, ClientT},
rpc_params,
types::ParamsSer,
ws_client::{WsClient, WsClientBuilder},
};
use sp_runtime::{
generic::SignedBlock,
traits::{Block as BlockT, Header as HeaderT},
};

/// Get the header of the block identified by `at`
pub async fn get_header<Block, S>(from: S, at: Block::Hash) -> Result<Block::Header, String>
where
Block: BlockT,
Block::Header: serde::de::DeserializeOwned,
S: AsRef<str>,
{
let client = build_client(from).await?;
use serde::de::DeserializeOwned;
use sp_runtime::{generic::SignedBlock, traits::Block as BlockT};
use std::sync::Arc;

client
.request::<Block::Header>("chain_getHeader", rpc_params!(at))
.await
.map_err(|e| format!("chain_getHeader request failed: {:?}", e))
enum RpcCall {
GetHeader,
GetFinalizedHead,
GetBlock,
GetRuntimeVersion,
}

/// Get the finalized head
pub async fn get_finalized_head<Block, S>(from: S) -> Result<Block::Hash, String>
where
Block: BlockT,
S: AsRef<str>,
{
let client = build_client(from).await?;
impl RpcCall {
fn as_str(&self) -> &'static str {
match self {
RpcCall::GetHeader => "chain_getHeader",
RpcCall::GetFinalizedHead => "chain_getFinalizedHead",
RpcCall::GetBlock => "chain_getBlock",
RpcCall::GetRuntimeVersion => "state_getRuntimeVersion",
}
}
}

/// General purpose method for making RPC calls.
async fn make_request<'a, T: DeserializeOwned>(
client: &Client,
call: RpcCall,
params: Option<ParamsSer<'a>>,
) -> Result<T, String> {
client
.request::<Block::Hash>("chain_getFinalizedHead", None)
.request::<T>(call.as_str(), params)
.await
.map_err(|e| format!("chain_getFinalizedHead request failed: {:?}", e))
.map_err(|e| format!("{} request failed: {:?}", call.as_str(), e))
}

/// Get the signed block identified by `at`.
pub async fn get_block<Block, S>(from: S, at: Block::Hash) -> Result<Block, String>
where
S: AsRef<str>,
Block: BlockT + serde::de::DeserializeOwned,
Block::Header: HeaderT,
{
let client = build_client(from).await?;
let signed_block = client
.request::<SignedBlock<Block>>("chain_getBlock", rpc_params!(at))
.await
.map_err(|e| format!("chain_getBlock request failed: {:?}", e))?;

Ok(signed_block.block)
/// Simple RPC service that is capable of keeping the connection.
///
/// Service will connect to `uri` for the first time during the first request. Instantiation
/// does not trigger connecting.
///
/// Be careful with reusing the connection in a multithreaded environment.
pub struct RpcService {
uri: String,
client: Arc<Mutex<Option<Client>>>,
keep_connection: bool,
}

/// Build a websocket client that connects to `from`.
async fn build_client<S: AsRef<str>>(from: S) -> Result<WsClient, String> {
WsClientBuilder::default()
.max_request_body_size(u32::MAX)
.build(from.as_ref())
.await
.map_err(|e| format!("`WsClientBuilder` failed to build: {:?}", e))
}
impl RpcService {
/// Creates a new RPC service.
///
/// Does not connect yet.
pub fn new<S: AsRef<str>>(uri: S, keep_connection: bool) -> Self {
Self { uri: uri.as_ref().to_string(), client: Arc::new(Mutex::new(None)), keep_connection }
}

/// Get the runtime version of a given chain.
pub async fn get_runtime_version<Block, S>(
from: S,
at: Option<Block::Hash>,
) -> Result<sp_version::RuntimeVersion, String>
where
S: AsRef<str>,
Block: BlockT + serde::de::DeserializeOwned,
Block::Header: HeaderT,
{
let client = build_client(from).await?;
client
.request::<sp_version::RuntimeVersion>("state_getRuntimeVersion", rpc_params!(at))
.await
.map_err(|e| format!("state_getRuntimeVersion request failed: {:?}", e))
/// Returns the address at which requests are sent.
pub fn uri(&self) -> String {
self.uri.clone()
}

/// Whether to keep and reuse a single connection.
pub fn keep_connection(&self) -> bool {
self.keep_connection
}

/// Build a websocket client that connects to `self.uri`.
async fn build_client(&self) -> Result<WsClient, String> {
WsClientBuilder::default()
.max_request_body_size(u32::MAX)
.build(&self.uri)
.await
.map_err(|e| format!("`WsClientBuilder` failed to build: {:?}", e))
}

/// Generic method for making RPC requests.
async fn make_request<'a, T: DeserializeOwned>(
&self,
call: RpcCall,
params: Option<ParamsSer<'a>>,
) -> Result<T, String> {
let mut maybe_client = self.client.lock().await;
match *maybe_client {
// `self.keep_connection` must be `true.
Some(ref client) => make_request(client, call, params).await,
None => {
let client = self.build_client().await?;
let result = make_request(&client, call, params).await;
if self.keep_connection {
*maybe_client = Some(client)
};
result
},
}
}

/// Get the header of the block identified by `at`.
pub async fn get_header<Block>(&self, at: Block::Hash) -> Result<Block::Header, String>
where
Block: BlockT,
Block::Header: DeserializeOwned,
{
self.make_request(RpcCall::GetHeader, rpc_params!(at)).await
}

/// Get the finalized head.
pub async fn get_finalized_head<Block: BlockT>(&self) -> Result<Block::Hash, String> {
self.make_request(RpcCall::GetFinalizedHead, None).await
}

/// Get the signed block identified by `at`.
pub async fn get_block<Block: BlockT + DeserializeOwned>(
&self,
at: Block::Hash,
) -> Result<Block, String> {
Ok(self
.make_request::<SignedBlock<Block>>(RpcCall::GetBlock, rpc_params!(at))
.await?
.block)
}

/// Get the runtime version of a given chain.
pub async fn get_runtime_version<Block: BlockT + DeserializeOwned>(
&self,
at: Option<Block::Hash>,
) -> Result<sp_version::RuntimeVersion, String> {
self.make_request(RpcCall::GetRuntimeVersion, rpc_params!(at)).await
}
}
9 changes: 5 additions & 4 deletions utils/frame/try-runtime/cli/src/commands/execute_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ impl ExecuteBlockCmd {
Block::Hash: FromStr,
<Block::Hash as FromStr>::Err: Debug,
{
let rpc_service = rpc_api::RpcService::new(ws_uri, false);

match (&self.block_at, &self.state) {
(Some(block_at), State::Snap { .. }) => hash_of::<Block>(block_at),
(Some(block_at), State::Live { .. }) => {
Expand All @@ -100,9 +102,7 @@ impl ExecuteBlockCmd {
target: LOG_TARGET,
"No --block-at or --at provided, using the latest finalized block instead"
);
remote_externalities::rpc_api::get_finalized_head::<Block, _>(ws_uri)
.await
.map_err(Into::into)
rpc_service.get_finalized_head::<Block>().await.map_err(Into::into)
},
(None, State::Live { at: Some(at), .. }) => hash_of::<Block>(at),
_ => {
Expand Down Expand Up @@ -148,7 +148,8 @@ where

let block_ws_uri = command.block_ws_uri::<Block>();
let block_at = command.block_at::<Block>(block_ws_uri.clone()).await?;
let block: Block = rpc_api::get_block::<Block, _>(block_ws_uri.clone(), block_at).await?;
let rpc_service = rpc_api::RpcService::new(block_ws_uri.clone(), false);
let block: Block = rpc_service.get_block::<Block>(block_at).await?;
let parent_hash = block.header().parent_hash();
log::info!(
target: LOG_TARGET,
Expand Down
Loading