Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Improve transaction submission
Before this pr the transaction pool validated each transaction, even if
the transaction was already known to the pool. This pr changes the
behavior to first check if we are already aware of a transaction and
thus, to only validate them if we don't know them yet. However, there is
still the possibility that a given transaction is validated multiple
times. This can happen if the transaction is added the first time, but
is not yet validated and added to the validated pool.

Besides that, this pr fixes the wrong metrics of gossiped transactions
in the network. It also moves some metrics to the transaction pool api,
to better track when a transaction actually is scheduled for validation.
  • Loading branch information
bkchr committed Jul 7, 2020
commit b25c2f2de346b2503a295534437c666065a3e03a
1 change: 1 addition & 0 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ macro_rules! new_full_start {
.with_transaction_pool(|builder| {
let pool_api = sc_transaction_pool::FullChainApi::new(
builder.client().clone(),
None,
);
Ok(sc_transaction_pool::BasicPool::new(
builder.config().transaction_pool.clone(),
Expand Down
1 change: 1 addition & 0 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ macro_rules! new_full_start {
.with_transaction_pool(|builder| {
let pool_api = sc_transaction_pool::FullChainApi::new(
builder.client().clone(),
builder.prometheus_registry(),
);
let config = builder.config();

Expand Down
8 changes: 4 additions & 4 deletions client/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ mod tests {
let txpool = Arc::new(
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
).0
);
Expand Down Expand Up @@ -414,7 +414,7 @@ mod tests {
let txpool = Arc::new(
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
).0
);
Expand Down Expand Up @@ -449,7 +449,7 @@ mod tests {
let txpool = Arc::new(
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
).0
);
Expand Down Expand Up @@ -511,7 +511,7 @@ mod tests {
let txpool = Arc::new(
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
).0
);
Expand Down
6 changes: 5 additions & 1 deletion client/basic-authorship/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
//! # };
//! # use sc_transaction_pool::{BasicPool, FullChainApi};
//! # let client = Arc::new(substrate_test_runtime_client::new());
//! # let txpool = Arc::new(BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone())), None).0);
//! # let txpool = Arc::new(BasicPool::new(
//! # Default::default(),
//! # Arc::new(FullChainApi::new(client.clone(), None)),
//! # None).0,
//! # );
//! // The first step is to create a `ProposerFactory`.
//! let mut proposer_factory = ProposerFactory::new(client.clone(), txpool.clone(), None);
//!
Expand Down
16 changes: 9 additions & 7 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1141,7 +1141,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
fn on_transactions(
&mut self,
who: PeerId,
transactions: message::Transactions<B::Extrinsic>
transactions: message::Transactions<B::Extrinsic>,
) {
// sending transaction to light node is considered a bad behavior
if !self.config.roles.is_full() {
Expand Down Expand Up @@ -1211,7 +1211,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
&mut self,
transactions: &[(H, B::Extrinsic)],
) -> HashMap<H, Vec<String>> {
let mut propagated_to = HashMap::new();
let mut propagated_to = HashMap::<_, Vec<_>>::new();
let mut propagated_transactions = 0;

for (who, peer) in self.context_data.peers.iter_mut() {
// never send transactions to the light node
if !peer.info.roles.is_full() {
Expand All @@ -1224,11 +1226,13 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
.cloned()
.unzip();

propagated_transactions += hashes.len();

if !to_send.is_empty() {
for hash in hashes {
propagated_to
.entry(hash)
.or_insert_with(Vec::new)
.or_default()
.push(who.to_base58());
}
trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
Expand All @@ -1243,10 +1247,8 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}

if propagated_to.len() > 0 {
if let Some(ref metrics) = self.metrics {
metrics.propagated_transactions.inc();
}
if let Some(ref metrics) = self.metrics {
metrics.propagated_transactions.inc_by(propagated_transactions as _)
}

propagated_to
Expand Down
2 changes: 1 addition & 1 deletion client/offchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ mod tests {
let client = Arc::new(substrate_test_runtime_client::new());
let pool = Arc::new(TestPool(BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
).0));
client.execution_extensions()
Expand Down
2 changes: 1 addition & 1 deletion client/rpc/src/author/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Default for TestSetup {

let pool = Arc::new(BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
).0);
TestSetup {
Expand Down
2 changes: 1 addition & 1 deletion client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ mod tests {
let client = Arc::new(client);
let pool = Arc::new(BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
).0);
let source = sp_runtime::transaction_validity::TransactionSource::External;
Expand Down
7 changes: 6 additions & 1 deletion client/transaction-pool/graph/src/base_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
return_value
}

/// Returns if the transaction for the given hash is already imported.
pub fn is_imported(&self, tx_hash: &Hash) -> bool {
self.future.contains(tx_hash) || self.ready.contains(tx_hash)
}

/// Imports transaction to the pool.
///
/// The pool consists of two parts: Future and Ready.
Expand All @@ -272,7 +277,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
&mut self,
tx: Transaction<Hash, Ex>,
) -> error::Result<Imported<Hash, Ex>> {
if self.future.contains(&tx.hash) || self.ready.contains(&tx.hash) {
if self.is_imported(&tx.hash) {
return Err(error::Error::AlreadyImported(Box::new(tx.hash.clone())))
}

Expand Down
78 changes: 35 additions & 43 deletions client/transaction-pool/graph/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{

use crate::{base_pool as base, watcher::Watcher};

use futures::{Future, FutureExt};
use futures::Future;
use sp_runtime::{
generic::BlockId,
traits::{self, SaturatedConversion, Block as BlockT},
Expand Down Expand Up @@ -149,23 +149,27 @@ impl<B: ChainApi> Pool<B> {
}

/// Imports a bunch of unverified extrinsics to the pool
pub async fn submit_at<T>(
pub async fn submit_at(
&self,
at: &BlockId<B::Block>,
source: TransactionSource,
xts: T,
force: bool,
) -> Result<Vec<Result<ExtrinsicHash<B>, B::Error>>, B::Error> where
T: IntoIterator<Item=ExtrinsicFor<B>>,
{
let validated_pool = self.validated_pool.clone();
xts: impl IntoIterator<Item=ExtrinsicFor<B>>,
) -> Result<Vec<Result<ExtrinsicHash<B>, B::Error>>, B::Error> {
let xts = xts.into_iter().map(|xt| (source, xt));
self.verify(at, xts, force)
.map(move |validated_transactions| validated_transactions
.map(|validated_transactions| validated_pool.submit(validated_transactions
.into_iter()
.map(|(_, tx)| tx))))
.await
let validated_transactions = self.verify(at, xts, true).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change the bool flag to an enum, I just realised you flipped the meaning and was confused with all the false->true changes. Since it's a super important flag I think it's worth making it more descriptive.

Ok(self.validated_pool.submit(validated_transactions.into_iter().map(|(_, tx)| tx)))
}

/// Resubmit the given extrinsics to the pool.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment that it ignores the fact the transaction might be banned? I.e. this method is suitable to re-import recently pruned transactions (for instance coming from retracted blocks).

pub async fn resubmit_at(
&self,
at: &BlockId<B::Block>,
source: TransactionSource,
xts: impl IntoIterator<Item=ExtrinsicFor<B>>,
) -> Result<Vec<Result<ExtrinsicHash<B>, B::Error>>, B::Error> {
let xts = xts.into_iter().map(|xt| (source, xt));
let validated_transactions = self.verify(at, xts, false).await?;
Ok(self.validated_pool.submit(validated_transactions.into_iter().map(|(_, tx)| tx)))
}

/// Imports one unverified extrinsic to the pool
Expand All @@ -175,12 +179,8 @@ impl<B: ChainApi> Pool<B> {
source: TransactionSource,
xt: ExtrinsicFor<B>,
) -> Result<ExtrinsicHash<B>, B::Error> {
self.submit_at(at, source, std::iter::once(xt), false)
.map(|import_result| import_result.and_then(|mut import_result| import_result
.pop()
.expect("One extrinsic passed; one result returned; qed")
))
.await
let res = self.submit_at(at, source, std::iter::once(xt)).await?.pop();
res.expect("One extrinsic passed; one result returned; qed")
}

/// Import a single extrinsic and starts to watch their progress in the pool.
Expand All @@ -191,9 +191,7 @@ impl<B: ChainApi> Pool<B> {
xt: ExtrinsicFor<B>,
) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
let block_number = self.resolve_block_number(at)?;
let (_, tx) = self.verify_one(
at, block_number, source, xt, false
).await;
let (_, tx) = self.verify_one(at, block_number, source, xt, true).await;
self.validated_pool.submit_and_watch(tx)
}

Expand Down Expand Up @@ -328,7 +326,7 @@ impl<B: ChainApi> Pool<B> {
.into_iter()
.map(|tx| (tx.source, tx.data.clone()));

let reverified_transactions = self.verify(at, pruned_transactions, false).await?;
let reverified_transactions = self.verify(at, pruned_transactions, true).await?;

log::trace!(target: "txpool", "Pruning at {:?}. Resubmitting transactions.", at);
// And finally - submit reverified transactions back to the pool
Expand Down Expand Up @@ -358,23 +356,17 @@ impl<B: ChainApi> Pool<B> {
&self,
at: &BlockId<B::Block>,
xts: impl IntoIterator<Item=(TransactionSource, ExtrinsicFor<B>)>,
force: bool,
check_is_known: bool,
) -> Result<HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>, B::Error> {
// we need a block number to compute tx validity
let block_number = self.resolve_block_number(at)?;
let mut result = HashMap::new();

for (hash, validated_tx) in
futures::future::join_all(
xts.into_iter()
.map(|(source, xt)| self.verify_one(at, block_number, source, xt, force))
)
.await
{
result.insert(hash, validated_tx);
}

Ok(result)
let res = futures::future::join_all(
xts.into_iter()
.map(|(source, xt)| self.verify_one(at, block_number, source, xt, check_is_known))
).await.into_iter().collect::<HashMap<_, _>>();

Ok(res)
}

/// Returns future that validates single transaction at given block.
Expand All @@ -384,14 +376,14 @@ impl<B: ChainApi> Pool<B> {
block_number: NumberFor<B>,
source: TransactionSource,
xt: ExtrinsicFor<B>,
force: bool,
check_is_known: bool,
) -> (ExtrinsicHash<B>, ValidatedTransactionFor<B>) {
let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt);
if !force && self.validated_pool.is_banned(&hash) {
return (
hash.clone(),
ValidatedTransaction::Invalid(hash, error::Error::TemporarilyBanned.into()),
)

if check_is_known {
if let Err(err) = self.validated_pool.check_is_known(&hash) {
return (hash.clone(), ValidatedTransaction::Invalid(hash, err.into()))
}
}

let validation_result = self.validated_pool.api().validate_transaction(
Expand Down
21 changes: 18 additions & 3 deletions client/transaction-pool/graph/src/validated_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,24 @@ impl<B: ChainApi> ValidatedPool<B> {
self.rotator.is_banned(hash)
}

/// A fast check before doing any further processing of a transaction, like validation.
///
/// It checks if the transaction is already imported or banned. If so, it returns an error.
pub fn check_is_known(&self, tx_hash: &ExtrinsicHash<B>) -> Result<(), B::Error> {
if self.is_banned(tx_hash) {
Err(error::Error::TemporarilyBanned.into())
} else if self.pool.read().is_imported(tx_hash) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do this check unconditionally? The transaction will fail to be imported anyhow, wouldn't it?

Err(error::Error::AlreadyImported(Box::new(tx_hash.clone())).into())
} else {
Ok(())
}
}

/// Imports a bunch of pre-validated transactions to the pool.
pub fn submit<T>(&self, txs: T) -> Vec<Result<ExtrinsicHash<B>, B::Error>> where
T: IntoIterator<Item=ValidatedTransactionFor<B>>
{
pub fn submit(
&self,
txs: impl IntoIterator<Item=ValidatedTransactionFor<B>>,
) -> Vec<Result<ExtrinsicHash<B>, B::Error>> {
let results = txs.into_iter()
.map(|validated_tx| self.submit_one(validated_tx))
.collect::<Vec<_>>();
Expand Down Expand Up @@ -175,6 +189,7 @@ impl<B: ChainApi> ValidatedPool<B> {
},
ValidatedTransaction::Invalid(hash, err) => {
self.rotator.ban(&Instant::now(), std::iter::once(hash));
self.listener.write().invalid(&hash, false);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this one, but it looks like this was missed before?

Err(err.into())
},
ValidatedTransaction::Unknown(hash, err) => {
Expand Down
Loading