Skip to content
This repository was archived by the owner on May 21, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
start_generic_node_impl accepts a closure and RPC types are extracted@
  • Loading branch information
hbulgarini committed May 26, 2023
commit 539fe6f48ad7f0f94350c8d10e66cd52a9399d15
63 changes: 51 additions & 12 deletions node/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,25 +83,64 @@ where
Ok(module)
}

/// Instantiate all RPC extensions.
pub fn create_stout_full<C, P, B>(
deps: FullDeps<C, P>,
backend: Arc<B>,
) -> Result<RpcExtension, Box<dyn std::error::Error + Send + Sync>>
pub trait RuntimeApiCollection:
frame_rpc_system::AccountNonceApi<Block, AccountId, Nonce>
+ pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi<Block, Balance>
+ BlockBuilder<Block>
+ 'static
{
}

pub trait ClientRequiredTraits:
ProvideRuntimeApi<Block>
+ HeaderBackend<Block>
+ AuxStore
+ HeaderMetadata<Block, Error = BlockChainError>
+ Send
+ Sync
+ 'static
where
C: ProvideRuntimeApi<Block>
<Self as ProvideRuntimeApi<Block>>::Api: RuntimeApiCollection,
{
}

pub trait PoolRequiredTraits: TransactionPool + Sync + Send + 'static {}

pub trait BackendRequiredTraits: sc_client_api::Backend<Block> + Send + Sync + 'static
where
<Self as sc_client_api::Backend<Block>>::State:
sc_client_api::backend::StateBackend<sp_runtime::traits::HashFor<Block>>,
{
}

impl<T> ClientRequiredTraits for T
where
T: ProvideRuntimeApi<Block>
+ HeaderBackend<Block>
+ AuxStore
+ HeaderMetadata<Block, Error = BlockChainError>
+ Send
+ Sync
+ 'static,
C::Api: frame_rpc_system::AccountNonceApi<Block, AccountId, Nonce>,
C::Api: pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi<Block, Balance>,
C::Api: BlockBuilder<Block>,
P: TransactionPool + Sync + Send + 'static,
B: sc_client_api::Backend<Block> + Send + Sync + 'static,
B::State: sc_client_api::backend::StateBackend<sp_runtime::traits::HashFor<Block>>,
T::Api: RuntimeApiCollection,
{
}

impl<T> PoolRequiredTraits for T where T: TransactionPool + Sync + Send + 'static {}

impl<T> BackendRequiredTraits for T
where
T: sc_client_api::Backend<Block> + Send + Sync + 'static,
T::State: sc_client_api::backend::StateBackend<sp_runtime::traits::HashFor<Block>>,
{
}

pub fn create_stout_full<C: ClientRequiredTraits, P: PoolRequiredTraits, B: BackendRequiredTraits>(
deps: FullDeps<C, P>,
backend: Arc<B>,
) -> Result<RpcExtension, Box<dyn std::error::Error + Send + Sync>>
where
C::Api: RuntimeApiCollection,
{
use frame_rpc_system::{System, SystemApiServer};
use pallet_transaction_payment_rpc::{TransactionPayment, TransactionPaymentApiServer};
Expand Down
257 changes: 237 additions & 20 deletions node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,22 +120,9 @@ pub trait RuntimeApiExt:
{
}

type ImportQueueBuilder<RuntimeApi> =
dyn FnOnce(
Arc<ParachainClient<RuntimeApi>>,
ParachainBlockImport<RuntimeApi>,
&Configuration,
Option<TelemetryHandle>,
&TaskManager,
) -> Result<
sc_consensus::DefaultImportQueue<Block, ParachainClient<RuntimeApi>>,
sc_service::Error,
>;


pub fn new_generic_partial<RuntimeApi>(
pub fn new_generic_partial<RuntimeApi, BIQ>(
config: &Configuration,
build_import_queue: Box<ImportQueueBuilder<RuntimeApi>>,
build_import_queue: BIQ,
) -> Result<
PartialComponents<
ParachainClient<RuntimeApi>,
Expand All @@ -151,6 +138,16 @@ where
RuntimeApi: ConstructRuntimeApi<Block, ParachainClient<RuntimeApi>> + Send + Sync + 'static,
RuntimeApi::RuntimeApi: RuntimeApiExt,
sc_client_api::StateBackendFor<ParachainBackend, Block>: sp_api::StateBackend<BlakeTwo256>,
BIQ: FnOnce(
Arc<ParachainClient<RuntimeApi>>,
ParachainBlockImport<RuntimeApi>,
&Configuration,
Option<TelemetryHandle>,
&TaskManager,
) -> Result<
sc_consensus::DefaultImportQueue<Block, ParachainClient<RuntimeApi>>,
sc_service::Error,
>
{
let telemetry = config
.telemetry_endpoints
Expand Down Expand Up @@ -218,9 +215,9 @@ where
Ok(params)
}

pub fn new_trappist_partial<RuntimeApi>(
pub fn new_trappist_partial<RuntimeApi, BIQ>(
config: &Configuration,
build_import_queue: Box<ImportQueueBuilder<RuntimeApi>>,
build_import_queue: BIQ,
) -> Result<
PartialComponents<
ParachainClient<RuntimeApi>,
Expand All @@ -235,9 +232,230 @@ pub fn new_trappist_partial<RuntimeApi>(
where
RuntimeApi: ConstructRuntimeApi<Block, ParachainClient<RuntimeApi>> + Send + Sync + 'static,
RuntimeApi::RuntimeApi: RuntimeApiExt + pallet_dex_rpc::DexRuntimeApi<Block, AssetId, Balance, Balance> ,
sc_client_api::StateBackendFor<ParachainBackend, Block>: sp_api::StateBackend<BlakeTwo256>
sc_client_api::StateBackendFor<ParachainBackend, Block>: sp_api::StateBackend<BlakeTwo256>,
BIQ: FnOnce(
Arc<ParachainClient<RuntimeApi>>,
ParachainBlockImport<RuntimeApi>,
&Configuration,
Option<TelemetryHandle>,
&TaskManager,
) -> Result<
sc_consensus::DefaultImportQueue<Block, ParachainClient<RuntimeApi>>,
sc_service::Error,
>,
{
new_generic_partial::<RuntimeApi, BIQ>(config, build_import_queue)
}



fn create_stout_full_rpc<C, P, B>(
deps: rpc::FullDeps<C, P>,
backend: Arc<B>,
) -> Result<jsonrpsee::RpcModule<()>, sc_service::Error>
where
C: rpc::ClientRequiredTraits,
P: rpc::PoolRequiredTraits,
B: rpc::BackendRequiredTraits,
C::Api: rpc::RuntimeApiCollection,
{
rpc::create_stout_full(deps, backend.clone()).map_err(Into::into)
}

/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
///
/// This is the actual implementation that is abstract over the executor and the runtime api.
#[sc_tracing::logging::prefix_logs_with("Parachain")]
async fn start_generic_node_impl<RuntimeApi, BIQ, RB, BIC>(
parachain_config: Configuration,
polkadot_config: Configuration,
collator_options: CollatorOptions,
para_id: ParaId,
rpc_ext_builder: RB,
build_import_queue: BIQ,
build_consensus: BIC,
hwbench: Option<sc_sysinfo::HwBench>,
) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient<RuntimeApi>>)>
where
RuntimeApi: ConstructRuntimeApi<Block, ParachainClient<RuntimeApi>> + Send + Sync + 'static,
RuntimeApi::RuntimeApi: RuntimeApiExt
+ cumulus_primitives_core::CollectCollationInfo<Block>
+ pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi<Block, Balance>
+ frame_rpc_system::AccountNonceApi<Block, AccountId, Nonce>,
sc_client_api::StateBackendFor<ParachainBackend, Block>: sp_api::StateBackend<BlakeTwo256>,
RB: Fn(
rpc::FullDeps<
ParachainClient<RuntimeApi>,
sc_transaction_pool::FullPool<Block, ParachainClient<RuntimeApi>>
>,
Arc<ParachainBackend>
) -> Result<jsonrpsee::RpcModule<()>, sc_service::Error> + 'static,
BIQ: FnOnce(
Arc<ParachainClient<RuntimeApi>>,
ParachainBlockImport<RuntimeApi>,
&Configuration,
Option<TelemetryHandle>,
&TaskManager,
) -> Result<
sc_consensus::DefaultImportQueue<Block, ParachainClient<RuntimeApi>>,
sc_service::Error,
>,
BIC: FnOnce(
Arc<ParachainClient<RuntimeApi>>,
ParachainBlockImport<RuntimeApi>,
Option<&Registry>,
Option<TelemetryHandle>,
&TaskManager,
Arc<dyn RelayChainInterface>,
Arc<sc_transaction_pool::FullPool<Block, ParachainClient<RuntimeApi>>>,
Arc<NetworkService<Block, Hash>>,
SyncCryptoStorePtr,
bool,
) -> Result<Box<dyn ParachainConsensus<Block>>, sc_service::Error>,
{
new_generic_partial::<RuntimeApi>(config, build_import_queue)
let parachain_config = prepare_node_config(parachain_config);

let params = new_generic_partial::<RuntimeApi, BIQ>(&parachain_config, build_import_queue)?;
let (block_import, mut telemetry, telemetry_worker_handle) = params.other;

let client = params.client.clone();
let backend = params.backend.clone();

let mut task_manager = params.task_manager;
let (relay_chain_interface, collator_key) = build_relay_chain_interface(
polkadot_config,
&parachain_config,
telemetry_worker_handle,
&mut task_manager,
collator_options.clone(),
hwbench.clone(),
)
.await
.map_err(|e| match e {
RelayChainError::ServiceError(polkadot_service::Error::Sub(x)) => x,
s => s.to_string().into(),
})?;

let block_announce_validator =
BlockAnnounceValidator::new(relay_chain_interface.clone(), para_id);

let force_authoring = parachain_config.force_authoring;
let validator = parachain_config.role.is_authority();
let prometheus_registry = parachain_config.prometheus_registry().cloned();
let transaction_pool = params.transaction_pool.clone();
let import_queue_service = params.import_queue.service();

let (network, system_rpc_tx, tx_handler_controller, start_network) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &parachain_config,
client: client.clone(),
transaction_pool: transaction_pool.clone(),
spawn_handle: task_manager.spawn_handle(),
import_queue: params.import_queue,
block_announce_validator_builder: Some(Box::new(|_| {
Box::new(block_announce_validator)
})),
warp_sync: None,
})?;

let rpc_builder = {
let client = client.clone();
let transaction_pool = transaction_pool.clone();

let backend_for_rpc = backend.clone();
Box::new(move |deny_unsafe, _| {
let deps = rpc::FullDeps {
client: client.clone(),
pool: transaction_pool.clone(),
deny_unsafe,
};

rpc_ext_builder(deps, backend_for_rpc.clone())
})
};

sc_service::spawn_tasks(sc_service::SpawnTasksParams {
rpc_builder,
client: client.clone(),
transaction_pool: transaction_pool.clone(),
task_manager: &mut task_manager,
config: parachain_config,
keystore: params.keystore_container.sync_keystore(),
backend: backend.clone(),
network: network.clone(),
system_rpc_tx,
tx_handler_controller,
telemetry: telemetry.as_mut(),
})?;

if let Some(hwbench) = hwbench {
sc_sysinfo::print_hwbench(&hwbench);

if let Some(ref mut telemetry) = telemetry {
let telemetry_handle = telemetry.handle();
task_manager.spawn_handle().spawn(
"telemetry_hwbench",
None,
sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
);
}
}

let announce_block = {
let network = network.clone();
Arc::new(move |hash, data| network.announce_block(hash, data))
};

let relay_chain_slot_duration = Duration::from_secs(6);

if validator {
let parachain_consensus = build_consensus(
client.clone(),
block_import,
prometheus_registry.as_ref(),
telemetry.as_ref().map(|t| t.handle()),
&task_manager,
relay_chain_interface.clone(),
transaction_pool,
network,
params.keystore_container.sync_keystore(),
force_authoring,
)?;

let spawner = task_manager.spawn_handle();

let params = StartCollatorParams {
para_id,
block_status: client.clone(),
announce_block,
client: client.clone(),
task_manager: &mut task_manager,
relay_chain_interface: relay_chain_interface.clone(),
spawner,
parachain_consensus,
import_queue: import_queue_service,
collator_key: collator_key.expect("Command line arguments do not allow this. qed"),
relay_chain_slot_duration,
};

start_collator(params).await?;
} else {
let params = StartFullNodeParams {
client: client.clone(),
announce_block,
task_manager: &mut task_manager,
para_id,
relay_chain_interface,
relay_chain_slot_duration,
import_queue: import_queue_service,
};

start_full_node(params)?;
}

start_network.start_network();

Ok((task_manager, client))
}


Expand Down Expand Up @@ -269,7 +487,6 @@ where
Block,
StateBackend = sc_client_api::StateBackendFor<ParachainBackend, Block>,
> + sp_offchain::OffchainWorkerApi<Block>
+ pallet_dex_rpc::DexRuntimeApi<Block, AssetId, Balance, Balance>
+ sp_block_builder::BlockBuilder<Block>,
sc_client_api::StateBackendFor<ParachainBackend, Block>: sp_api::StateBackend<BlakeTwo256>,
BIQ: FnOnce(
Expand Down