Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
transaction-pool: expose blocking api for tx submission
  • Loading branch information
andresilva committed Jun 10, 2020
commit cc00f14a45a8f094ddc09ab07a125936e2e430af
88 changes: 65 additions & 23 deletions client/transaction-pool/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,29 +87,15 @@ where
let client = self.client.clone();
let at = at.clone();

self.pool.spawn_ok(futures_diagnose::diagnose("validate-transaction", async move {
sp_tracing::enter_span!("validate_transaction");
let runtime_api = client.runtime_api();
let has_v2 = sp_tracing::tracing_span! { "check_version";
runtime_api
.has_api_with::<dyn TaggedTransactionQueue<Self::Block, Error=()>, _>(
&at, |v| v >= 2,
)
.unwrap_or_default()
};

sp_tracing::enter_span!("runtime::validate_transaction");
let res = if has_v2 {
runtime_api.validate_transaction(&at, source, uxt)
} else {
#[allow(deprecated)] // old validate_transaction
runtime_api.validate_transaction_before_version_2(&at, uxt)
};
let res = res.map_err(|e| Error::RuntimeApi(e.to_string()));
if let Err(e) = tx.send(res) {
log::warn!("Unable to send a validate transaction result: {:?}", e);
}
}));
self.pool.spawn_ok(futures_diagnose::diagnose(
"validate-transaction",
async move {
let res = validate_transaction_blocking(&*client, &at, source, uxt);
if let Err(e) = tx.send(res) {
log::warn!("Unable to send a validate transaction result: {:?}", e);
}
},
));

Box::pin(async move {
match rx.await {
Expand Down Expand Up @@ -143,6 +129,62 @@ where
}
}

/// Helper function to validate a transaction using a full chain API.
/// This method will call into the runtime to perform the validation.
fn validate_transaction_blocking<Client, Block>(
client: &Client,
at: &BlockId<Block>,
source: TransactionSource,
uxt: sc_transaction_graph::ExtrinsicFor<FullChainApi<Client, Block>>,
) -> error::Result<TransactionValidity>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block>,
Client: Send + Sync + 'static,
Client::Api: TaggedTransactionQueue<Block>,
sp_api::ApiErrorFor<Client, Block>: Send + std::fmt::Display,
{
sp_tracing::enter_span!("validate_transaction");
let runtime_api = client.runtime_api();
let has_v2 = sp_tracing::tracing_span! { "check_version";
runtime_api
.has_api_with::<dyn TaggedTransactionQueue<Block, Error=()>, _>(&at, |v| v >= 2)
.unwrap_or_default()
};

sp_tracing::enter_span!("runtime::validate_transaction");
let res = if has_v2 {
runtime_api.validate_transaction(&at, source, uxt)
} else {
#[allow(deprecated)] // old validate_transaction
runtime_api.validate_transaction_before_version_2(&at, uxt)
};

res.map_err(|e| Error::RuntimeApi(e.to_string()))
}

impl<Client, Block> FullChainApi<Client, Block>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block>,
Client: Send + Sync + 'static,
Client::Api: TaggedTransactionQueue<Block>,
sp_api::ApiErrorFor<Client, Block>: Send + std::fmt::Display,
{
/// Validates a transaction by calling into the runtime, same as
/// `validate_transaction` but blocks the current thread when performing
/// validation. Only implemented for `FullChainApi` since we can call into
/// the runtime locally.
pub fn validate_transaction_blocking(
&self,
at: &BlockId<Block>,
source: TransactionSource,
uxt: sc_transaction_graph::ExtrinsicFor<Self>,
) -> error::Result<TransactionValidity> {
validate_transaction_blocking(&*self.client, at, source, uxt)
}
}

/// The transaction pool logic for light client.
pub struct LightChainApi<Client, F, Block> {
client: Arc<Client>,
Expand Down
53 changes: 53 additions & 0 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,59 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
}
}

impl<Block, Client> sp_transaction_pool::LocalTransactionPool
for BasicPool<FullChainApi<Client, Block>, Block>
where
Block: BlockT,
Client: sp_api::ProvideRuntimeApi<Block>
+ sc_client_api::BlockBackend<Block>
+ sp_runtime::traits::BlockIdTo<Block>,
Client: Send + Sync + 'static,
Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
sp_api::ApiErrorFor<Client, Block>: Send + std::fmt::Display,
{
type Block = Block;
type Hash = sc_transaction_graph::ExtrinsicHash<FullChainApi<Client, Block>>;
type Error = <FullChainApi<Client, Block> as ChainApi>::Error;

fn submit_local(
&self,
at: &BlockId<Self::Block>,
xt: sp_transaction_pool::LocalTransactionFor<Self>,
) -> Result<Self::Hash, Self::Error> {
use sc_transaction_graph::ValidatedTransaction;
use sp_runtime::traits::SaturatedConversion;
use sp_runtime::transaction_validity::TransactionValidityError;

let validity = self
.api
.validate_transaction_blocking(at, TransactionSource::Local, xt.clone())?
.map_err(|e| {
Self::Error::Pool(match e {
TransactionValidityError::Invalid(i) => i.into(),
TransactionValidityError::Unknown(u) => u.into(),
})
})?;

let (hash, bytes) = self.pool.validated_pool().api().hash_and_length(&xt);
let block_number = self
.api
.block_id_to_number(at)?
.ok_or_else(|| error::Error::BlockIdConversion(format!("{:?}", at)))?;

let validated = ValidatedTransaction::valid_at(
block_number.saturated_into::<u64>(),
hash.clone(),
TransactionSource::Local,
xt,
bytes,
validity,
);

self.pool.validated_pool().submit(vec![validated]).remove(0)
}
}

#[cfg_attr(test, derive(Debug))]
enum RevalidationStatus<N> {
/// The revalidation has never been completed.
Expand Down
34 changes: 27 additions & 7 deletions primitives/transaction-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ pub type BlockHash<P> = <<P as TransactionPool>::Block as BlockT>::Hash;
pub type TransactionFor<P> = <<P as TransactionPool>::Block as BlockT>::Extrinsic;
/// Type of transactions event stream for a pool.
pub type TransactionStatusStreamFor<P> = TransactionStatusStream<TxHash<P>, BlockHash<P>>;
/// Transaction type for a local pool.
pub type LocalTransactionFor<P> = <<P as LocalTransactionPool>::Block as BlockT>::Extrinsic;

/// Typical future type used in transaction pool api.
pub type PoolFuture<T, E> = std::pin::Pin<Box<dyn Future<Output=Result<T, E>> + Send>>;
Expand Down Expand Up @@ -273,6 +275,25 @@ pub trait MaintainedTransactionPool: TransactionPool {
fn maintain(&self, event: ChainEvent<Self::Block>) -> Pin<Box<dyn Future<Output=()> + Send>>;
}

/// Transaction pool interface for submitting local transactions that exposes a
/// blocking interface for submission.
pub trait LocalTransactionPool: Send + Sync {
/// Block type.
type Block: BlockT;
/// Transaction hash type.
type Hash: Hash + Eq + Member + Serialize;
/// Error type.
type Error: From<crate::error::Error> + crate::error::IntoPoolError;

/// Submits the given local unverified transaction to the pool blocking the
/// current thread for any necessary pre-verification.
fn submit_local(
&self,
at: &BlockId<Self::Block>,
xt: LocalTransactionFor<Self>,
) -> Result<Self::Hash, Self::Error>;
}

/// An abstraction for transaction pool.
///
/// This trait is used by offchain calls to be able to submit transactions.
Expand All @@ -291,7 +312,7 @@ pub trait OffchainSubmitTransaction<Block: BlockT>: Send + Sync {
) -> Result<(), ()>;
}

impl<TPool: TransactionPool> OffchainSubmitTransaction<TPool::Block> for TPool {
impl<TPool: LocalTransactionPool> OffchainSubmitTransaction<TPool::Block> for TPool {
fn submit_at(
&self,
at: &BlockId<TPool::Block>,
Expand All @@ -303,15 +324,14 @@ impl<TPool: TransactionPool> OffchainSubmitTransaction<TPool::Block> for TPool {
extrinsic
);

let result = futures::executor::block_on(self.submit_one(
&at, TransactionSource::Local, extrinsic,
));
let result = self.submit_local(&at, extrinsic);

result.map(|_| ())
.map_err(|e| log::warn!(
result.map(|_| ()).map_err(|e| {
log::warn!(
target: "txpool",
"(offchain call) Error submitting a transaction to the pool: {:?}",
e
))
)
})
}
}