Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
port manual seal rpc
  • Loading branch information
niklasad1 committed Sep 3, 2021
commit 549f881e4751c406164205be187cb79da00a15c4
132 changes: 62 additions & 70 deletions client/consensus/manual-seal/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,14 @@ use futures::{
channel::{mpsc, oneshot},
FutureExt, SinkExt,
};
use jsonrpsee::types::{Error as JsonRpseeError, RpcModule};
use jsonrpsee::{
proc_macros::rpc,
types::{async_trait, Error as JsonRpseeError, JsonRpcResult},
};
use sc_consensus::ImportedAux;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use sp_runtime::EncodedJustification;

/// Helper macro to bail early in async context when you want to
/// return `Box::pin(future::err(e))` once an error occurs.
/// Because `Try` is not implemented for it.
macro_rules! unwrap_or_fut_err {
( $e:expr ) => {
match $e {
Ok(x) => x,
Err(e) => return Box::pin(futures::future::err(e.into())),
}
};
}

/// Sender passed to the authorship task to report errors or successes.
pub type Sender<T> = Option<oneshot::Sender<std::result::Result<T, Error>>>;

Expand Down Expand Up @@ -73,6 +64,27 @@ pub enum EngineCommand<Hash> {
},
}

/// RPC trait that provides methods for interacting with the manual-seal authorship task over rpc.
#[rpc(client, server, namespace = "engine")]
pub trait ManualSealApi<Hash> {
/// Instructs the manual-seal authorship task to create a new block
#[method(name = "createBlock")]
async fn create_block(
&self,
create_empty: bool,
finalize: bool,
parent_hash: Option<Hash>,
) -> JsonRpcResult<CreatedBlock<Hash>>;

/// Instructs the manual-seal authorship task to finalize a block
#[method(name = "finalizeBlock")]
async fn finalize_block(
&self,
hash: Hash,
justification: Option<EncodedJustification>,
) -> JsonRpcResult<bool>;
}

/// A struct that implements the [`ManualSealApi`].
pub struct ManualSeal<Hash> {
import_block_channel: mpsc::Sender<EngineCommand<Hash>>,
Expand All @@ -94,63 +106,43 @@ impl<Hash> ManualSeal<Hash> {
}
}

// TODO(niklasad1): this should be replaced with a proc macro impl.
impl<Hash: Send + Sync + DeserializeOwned + Serialize + 'static> ManualSeal<Hash> {
/// Convert a [`ManualSealApi`] to an [`RpcModule`]. Registers all the RPC methods available
/// with the RPC server.
pub fn into_rpc_module(self) -> std::result::Result<RpcModule<Self>, JsonRpseeError> {
let mut module = RpcModule::new(self);

module.register_async_method::<CreatedBlock<Hash>, _>(
"engine_createBlock",
|params, engine| {
let mut seq = params.sequence();

let create_empty = unwrap_or_fut_err!(seq.next());
let finalize = unwrap_or_fut_err!(seq.next());
let parent_hash = unwrap_or_fut_err!(seq.optional_next());
let mut sink = engine.import_block_channel.clone();

async move {
let (sender, receiver) = oneshot::channel();
// NOTE: this sends a Result over the channel.
let command = EngineCommand::SealNewBlock {
create_empty,
finalize,
parent_hash,
sender: Some(sender),
};

sink.send(command).await?;

match receiver.await {
Ok(Ok(rx)) => Ok(rx),
Ok(Err(e)) => Err(e.into()),
Err(e) => Err(JsonRpseeError::to_call_error(e)),
}
}
.boxed()
},
)?;

module.register_async_method("engine_finalizeBlock", |params, engine| {
let mut seq = params.sequence();

let hash = unwrap_or_fut_err!(seq.next());
let justification = unwrap_or_fut_err!(seq.optional_next());
let mut sink = engine.import_block_channel.clone();

async move {
let (sender, receiver) = oneshot::channel();
let command =
EngineCommand::FinalizeBlock { hash, sender: Some(sender), justification };
sink.send(command).await?;
receiver.await.map(|_| true).map_err(|e| JsonRpseeError::to_call_error(e))
}
.boxed()
})?;

Ok(module)
#[async_trait]
impl<Hash: Send + 'static> ManualSealApiServer<Hash> for ManualSeal<Hash> {
async fn create_block(
&self,
create_empty: bool,
finalize: bool,
parent_hash: Option<Hash>,
) -> JsonRpcResult<CreatedBlock<Hash>> {
let mut sink = self.import_block_channel.clone();
let (sender, receiver) = oneshot::channel();
// NOTE: this sends a Result over the channel.
let command = EngineCommand::SealNewBlock {
create_empty,
finalize,
parent_hash,
sender: Some(sender),
};

sink.send(command).await?;

match receiver.await {
Ok(Ok(rx)) => Ok(rx),
Ok(Err(e)) => Err(e.into()),
Err(e) => Err(JsonRpseeError::to_call_error(e)),
}
}

async fn finalize_block(
&self,
hash: Hash,
justification: Option<EncodedJustification>,
) -> JsonRpcResult<bool> {
let mut sink = self.import_block_channel.clone();
let (sender, receiver) = oneshot::channel();
let command = EngineCommand::FinalizeBlock { hash, sender: Some(sender), justification };
sink.send(command).await?;
receiver.await.map(|_| true).map_err(|e| JsonRpseeError::to_call_error(e))
}
}

Expand Down
11 changes: 6 additions & 5 deletions test-utils/test-runner/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use jsonrpsee::types::RpcModule;
use manual_seal::{
consensus::babe::{BabeConsensusDataProvider, SlotTimestampProvider},
import_queue,
rpc::ManualSeal,
rpc::{ManualSeal, ManualSealApiServer},
run_manual_seal, EngineCommand, ManualSealParams,
};
use sc_client_api::backend::Backend;
Expand Down Expand Up @@ -102,8 +102,9 @@ where
use sp_consensus_babe::AuthorityId;
let config = match config_or_chain_spec {
ConfigOrChainSpec::Config(config) => config,
ConfigOrChainSpec::ChainSpec(chain_spec, task_executor) =>
default_config(task_executor, chain_spec),
ConfigOrChainSpec::ChainSpec(chain_spec, task_executor) => {
default_config(task_executor, chain_spec)
}
};

let executor = NativeElseWasmExecutor::<T::ExecutorDispatch>::new(
Expand Down Expand Up @@ -187,9 +188,9 @@ where
let rpc_sink = command_sink.clone();

let rpc_builder = Box::new(move |_, _| -> RpcModule<()> {
let seal = ManualSeal::new(rpc_sink).into_rpc_module().expect("TODO; error handling");
let seal = ManualSeal::new(rpc_sink).into_rpc();
let mut module = RpcModule::new(());
module.merge(seal).expect("TODO: error handling");
module.merge(seal).expect("only one module; qed");
module
});

Expand Down