Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions bin/node/test-runner-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,20 @@ mod tests {
#[test]
fn test_runner() {
let tokio_runtime = build_runtime().unwrap();
let (task_manager, client, pool, command_sink, backend) =
let (rpc, task_manager, client, pool, command_sink, backend) =
client_parts::<NodeTemplateChainInfo>(ConfigOrChainSpec::ChainSpec(
Box::new(development_config()),
tokio_runtime.handle().clone(),
))
.unwrap();
let node =
Node::<NodeTemplateChainInfo>::new(task_manager, client, pool, command_sink, backend);
let node = Node::<NodeTemplateChainInfo>::new(
rpc,
task_manager,
client,
pool,
command_sink,
backend,
);

tokio_runtime.block_on(async {
// seals blocks
Expand Down
14 changes: 12 additions & 2 deletions client/rpc/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,21 @@ impl Spawn for TaskExecutor {
}
}
impl SpawnNamed for TaskExecutor {
fn spawn_blocking(&self, _name: &'static str, future: futures::future::BoxFuture<'static, ()>) {
fn spawn_blocking(
&self,
_name: &'static str,
_group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
EXECUTOR.spawn_ok(future);
}

fn spawn(&self, _name: &'static str, future: futures::future::BoxFuture<'static, ()>) {
fn spawn(
&self,
_name: &'static str,
_group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
EXECUTOR.spawn_ok(future);
}
}
Expand Down
23 changes: 11 additions & 12 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
config::{Configuration, KeystoreConfig, PrometheusConfig, TransactionStorageMode},
error::Error,
metrics::MetricsService,
start_rpc_servers, SpawnTaskHandle, TaskManager, TransactionPoolAdapter,
start_rpc_servers, RpcHandlers, SpawnTaskHandle, TaskManager, TransactionPoolAdapter,
};
use futures::{channel::oneshot, future::ready, FutureExt, StreamExt};
use jsonrpsee::RpcModule;
Expand Down Expand Up @@ -323,7 +323,7 @@ where
}

/// Parameters to pass into `build`.
pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, Backend> {
pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
/// The service configuration.
pub config: Configuration,
/// A shared client returned by `new_full_parts`/`new_light_parts`.
Expand All @@ -340,7 +340,7 @@ pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, Backend> {
pub transaction_pool: Arc<TExPool>,
/// Builds additional [`RpcModule`]s that should be added to the server
pub rpc_builder:
Box<dyn FnOnce(DenyUnsafe, SubscriptionTaskExecutor) -> Result<RpcModule<()>, Error>>,
Box<dyn Fn(DenyUnsafe, SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to change this to match the behavior of the RpcExtensionBuilder trait to be called more than once ^^

/// An optional, shared remote blockchain instance. Used for light clients.
pub remote_blockchain: Option<Arc<dyn RemoteBlockchain<TBl>>>,
/// A shared network instance.
Expand Down Expand Up @@ -384,9 +384,9 @@ where
}

/// Spawn the tasks that are required to run a node.
pub fn spawn_tasks<TBl, TBackend, TExPool, TCl>(
params: SpawnTasksParams<TBl, TCl, TExPool, TBackend>,
) -> Result<(), Error>
pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
params: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
) -> Result<RpcHandlers, Error>
where
TCl: ProvideRuntimeApi<TBl>
+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
Expand Down Expand Up @@ -494,11 +494,12 @@ where
system_rpc_tx.clone(),
&config,
backend.offchain_storage(),
rpc_builder,
&*rpc_builder,
)
};

let rpc = start_rpc_servers(&config, gen_rpc_module)?;
let rpc_handlers = RpcHandlers(Arc::new(gen_rpc_module(sc_rpc::DenyUnsafe::No)?.into()));

// Spawn informant task
spawn_handle.spawn(
Expand All @@ -514,7 +515,7 @@ where

task_manager.keep_alive((config.base_path, rpc));

Ok(())
Ok(rpc_handlers)
}

async fn transaction_notifications<TBl, TExPool>(
Expand Down Expand Up @@ -571,7 +572,7 @@ fn init_telemetry<TBl: BlockT, TCl: BlockBackend<TBl>>(
Ok(telemetry.handle())
}

fn gen_rpc_module<TBl, TBackend, TCl, TExPool>(
fn gen_rpc_module<TBl, TBackend, TCl, TRpc, TExPool>(
deny_unsafe: DenyUnsafe,
spawn_handle: SpawnTaskHandle,
client: Arc<TCl>,
Expand All @@ -580,9 +581,7 @@ fn gen_rpc_module<TBl, TBackend, TCl, TExPool>(
system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
config: &Configuration,
offchain_storage: Option<<TBackend as sc_client_api::backend::Backend<TBl>>::OffchainStorage>,
rpc_builder: Box<
dyn FnOnce(DenyUnsafe, SubscriptionTaskExecutor) -> Result<RpcModule<()>, Error>,
>,
rpc_builder: &(dyn Fn(DenyUnsafe, SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>),
) -> Result<RpcModule<()>, Error>
where
TBl: BlockT,
Expand Down
41 changes: 31 additions & 10 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use log::{debug, error, warn};
use sc_client_api::{blockchain::HeaderBackend, BlockchainEvents};
use sc_network::PeerId;
use sc_utils::mpsc::TracingUnboundedReceiver;
use serde::Serialize;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header as HeaderT},
Expand Down Expand Up @@ -79,9 +80,28 @@ pub use task_manager::{SpawnTaskHandle, TaskManager, DEFAULT_GROUP_NAME};

const DEFAULT_PROTOCOL_ID: &str = "sup";

/// Dummy RPC handler type.
// TODO(niklasad1): replace this to do perform in-memory rpc request.
pub type RpcHandlers = ();
/// RPC handlers that can perform RPC queries.
#[derive(Clone)]
pub struct RpcHandlers(Arc<RpcModule<()>>);

impl RpcHandlers {
/// Starts an RPC query.
///
/// The query is passed as a string and must be a JSON text similar to what an HTTP client
/// would for example send.
///
/// Returns a `Future` that contains the optional response.
//
// TODO(niklasad1): support subscriptions?!.
pub async fn rpc_query<T: Serialize>(&self, method: &str, params: Vec<T>) -> Option<String> {
self.0.call_with(method, params).await
}

/// Provides access to the underlying `RpcModule`
pub fn handle(&self) -> Arc<RpcModule<()>> {
self.0.clone()
}
}

/// An incomplete set of chain components, but enough to run the chain ops subcommands.
pub struct PartialComponents<Client, Backend, SelectChain, ImportQueue, TransactionPool, Other> {
Expand Down Expand Up @@ -380,16 +400,16 @@ where
fn import(&self, transaction: B::Extrinsic) -> TransactionImportFuture {
if !self.imports_external_transactions {
debug!("Transaction rejected");
return Box::pin(futures::future::ready(TransactionImport::None))
return Box::pin(futures::future::ready(TransactionImport::None));
}

let encoded = transaction.encode();
let uxt = match Decode::decode(&mut &encoded[..]) {
Ok(uxt) => uxt,
Err(e) => {
debug!("Transaction invalid: {:?}", e);
return Box::pin(futures::future::ready(TransactionImport::Bad))
},
return Box::pin(futures::future::ready(TransactionImport::Bad));
}
};

let best_block_id = BlockId::hash(self.client.info().best_hash);
Expand All @@ -403,18 +423,19 @@ where
match import_future.await {
Ok(_) => TransactionImport::NewGood,
Err(e) => match e.into_pool_error() {
Ok(sc_transaction_pool_api::error::Error::AlreadyImported(_)) =>
TransactionImport::KnownGood,
Ok(sc_transaction_pool_api::error::Error::AlreadyImported(_)) => {
TransactionImport::KnownGood
}
Ok(e) => {
debug!("Error adding transaction to the pool: {:?}", e);
TransactionImport::Bad
},
}
Err(e) => {
debug!("Error converting pool error: {:?}", e);
// it is not bad at least, just some internal node logic error, so peer is
// innocent.
TransactionImport::KnownGood
},
}
},
}
})
Expand Down
2 changes: 0 additions & 2 deletions frame/bags-list/remote-tests/src/sanity_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ use frame_support::{
};
use remote_externalities::{Builder, Mode, OnlineConfig};
use sp_runtime::{traits::Block as BlockT, DeserializeOwned};
use sp_std::prelude::*;

/// Execute the sanity check of the bags-list.
pub async fn execute<Runtime: crate::RuntimeT, Block: BlockT + DeserializeOwned>(
currency_unit: u64,
Expand Down
12 changes: 6 additions & 6 deletions test-utils/test-runner/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
use std::{str::FromStr, sync::Arc};

type ClientParts<T> = (
Arc<RpcModule<()>>,
TaskManager,
Arc<
TFullClient<
Expand Down Expand Up @@ -187,13 +188,11 @@ where
let rpc_sink = command_sink.clone();

let rpc_builder = Box::new(move |_, _| {
let seal = ManualSeal::new(rpc_sink).into_rpc();
let mut module = RpcModule::new(());
module.merge(seal).expect("only one module; qed");
Ok(module)
let seal = ManualSeal::new(rpc_sink.clone()).into_rpc();
Ok(seal)
});

let _rpc_handlers = {
let rpc_handlers = {
let params = SpawnTasksParams {
config,
client: client.clone(),
Expand Down Expand Up @@ -241,6 +240,7 @@ where
.spawn("manual-seal", None, authorship_future);

network_starter.start_network();
let rpc_handler = rpc_handlers.handle();

Ok((task_manager, client, transaction_pool, command_sink, backend))
Ok((rpc_handler, task_manager, client, transaction_pool, command_sink, backend))
}
15 changes: 15 additions & 0 deletions test-utils/test-runner/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use futures::{
channel::{mpsc, oneshot},
FutureExt, SinkExt,
};
use jsonrpsee::RpcModule;
use manual_seal::EngineCommand;
use sc_client_api::{
backend::{self, Backend},
Expand All @@ -46,6 +47,8 @@ use sp_state_machine::Ext;
/// the node process is dropped when this struct is dropped
/// also holds logs from the process.
pub struct Node<T: ChainInfo> {
/// rpc handler for communicating with the node over rpc.
rpc_handler: Arc<RpcModule<()>>,
/// handle to the running node.
task_manager: Option<TaskManager>,
/// client instance
Expand Down Expand Up @@ -82,6 +85,7 @@ where
{
/// Creates a new node.
pub fn new(
rpc_handler: Arc<RpcModule<()>>,
task_manager: TaskManager,
client: Arc<
TFullClient<T::Block, T::RuntimeApi, NativeElseWasmExecutor<T::ExecutorDispatch>>,
Expand All @@ -101,6 +105,7 @@ where
backend: Arc<TFullBackend<T::Block>>,
) -> Self {
Self {
rpc_handler,
task_manager: Some(task_manager),
client: client.clone(),
pool,
Expand All @@ -110,6 +115,16 @@ where
}
}

/// Returns a reference to the rpc handlers, use this to send rpc requests.
/// eg
/// ```ignore
/// let response = node.rpc_handler()
/// .call_with(""engine_createBlock", vec![true, true]);
/// ```
pub fn rpc_handler(&self) -> Arc<RpcModule<()>> {
self.rpc_handler.clone()
}

/// Return a reference to the Client
pub fn client(
&self,
Expand Down