Skip to content
This repository was archived by the owner on Jul 4, 2022. It is now read-only.
Next Next commit
Revalidate transactions only on latest best block (#6824)
* Revalidate transactions only on latest best block

We should revalidate transactions only on the latest best block and not
on any arbitrary block. The revalidation before failed when there were
multiple blocks on the height given to the revalidation function, but no
block was imported as best block.

* Update test-utils/runtime/transaction-pool/src/lib.rs

Co-authored-by: Jaco Greeff <[email protected]>

* Fix tests

* Only process best blocks in the transaction pool

Co-authored-by: Jaco Greeff <[email protected]>
  • Loading branch information
2 people authored and jordy25519 committed Sep 13, 2020
commit 98fda86fcae7d35f06fe297823d6bfdb9b571fed
4 changes: 1 addition & 3 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,11 +480,9 @@ mod tests {

futures::executor::block_on(
service.transaction_pool().maintain(
ChainEvent::NewBlock {
is_new_best: true,
ChainEvent::NewBestBlock {
hash: parent_header.hash(),
tree_route: None,
header: parent_header.clone(),
},
)
);
Expand Down
20 changes: 12 additions & 8 deletions client/api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

//! A set of APIs supported by the client along with their primitives.

use std::{fmt, collections::HashSet, sync::Arc};
use std::{fmt, collections::HashSet, sync::Arc, convert::TryFrom};
use sp_core::storage::StorageKey;
use sp_runtime::{
traits::{Block as BlockT, NumberFor},
Expand Down Expand Up @@ -245,13 +245,17 @@ pub struct FinalityNotification<Block: BlockT> {
pub header: Block::Header,
}

impl<B: BlockT> From<BlockImportNotification<B>> for sp_transaction_pool::ChainEvent<B> {
fn from(n: BlockImportNotification<B>) -> Self {
Self::NewBlock {
is_new_best: n.is_new_best,
hash: n.hash,
header: n.header,
tree_route: n.tree_route,
impl<B: BlockT> TryFrom<BlockImportNotification<B>> for sp_transaction_pool::ChainEvent<B> {
type Error = ();

fn try_from(n: BlockImportNotification<B>) -> Result<Self, ()> {
if n.is_new_best {
Ok(Self::NewBestBlock {
hash: n.hash,
tree_route: n.tree_route,
})
} else {
Err(())
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions client/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,9 @@ mod tests {
fn chain_event<B: BlockT>(header: B::Header) -> ChainEvent<B>
where NumberFor<B>: From<u64>
{
ChainEvent::NewBlock {
ChainEvent::NewBestBlock {
hash: header.hash(),
tree_route: None,
is_new_best: true,
header,
}
}

Expand Down
14 changes: 7 additions & 7 deletions client/consensus/manual-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ mod tests {
use sc_client::LongestChain;
use sp_inherents::InherentDataProviders;
use sc_basic_authorship::ProposerFactory;
use sc_client_api::BlockBackend;

fn api() -> Arc<TestApi> {
Arc::new(TestApi::empty())
Expand Down Expand Up @@ -443,15 +444,13 @@ mod tests {
}
}
);
// assert that there's a new block in the db.
assert!(client.header(&BlockId::Number(0)).unwrap().is_some());
let block = client.block(&BlockId::Number(1)).unwrap().unwrap().block;
pool_api.add_block(block, true);
assert!(pool.submit_one(&BlockId::Number(1), SOURCE, uxt(Alice, 1)).await.is_ok());

let header = client.header(&BlockId::Number(1)).expect("db error").expect("imported above");
pool.maintain(sp_transaction_pool::ChainEvent::NewBlock {
pool.maintain(sp_transaction_pool::ChainEvent::NewBestBlock {
hash: header.hash(),
header,
is_new_best: true,
tree_route: None,
}).await;

Expand All @@ -466,10 +465,11 @@ mod tests {
rx1.await.expect("should be no error receiving"),
Ok(_)
);
assert!(client.header(&BlockId::Number(1)).unwrap().is_some());
let block = client.block(&BlockId::Number(2)).unwrap().unwrap().block;
pool_api.add_block(block, true);
pool_api.increment_nonce(Alice.into());

assert!(pool.submit_one(&BlockId::Number(2), SOURCE, uxt(Alice, 2)).await.is_ok());
assert!(pool.submit_one(&BlockId::Number(1), SOURCE, uxt(Alice, 2)).await.is_ok());
let (tx2, rx2) = futures::channel::oneshot::channel();
assert!(sink.send(EngineCommand::SealNewBlock {
parent_hash: Some(created_block.hash),
Expand Down
17 changes: 8 additions & 9 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub mod testing;
pub use sc_transaction_graph as txpool;
pub use crate::api::{FullChainApi, LightChainApi};

use std::{collections::{HashMap, HashSet}, sync::Arc, pin::Pin};
use std::{collections::{HashMap, HashSet}, sync::Arc, pin::Pin, convert::TryInto};
use futures::{prelude::*, future::{self, ready}, channel::oneshot};
use parking_lot::Mutex;

Expand Down Expand Up @@ -468,7 +468,7 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
{
fn maintain(&self, event: ChainEvent<Self::Block>) -> Pin<Box<dyn Future<Output=()> + Send>> {
match event {
ChainEvent::NewBlock { hash, tree_route, is_new_best, .. } => {
ChainEvent::NewBestBlock { hash, tree_route } => {
let pool = self.pool.clone();
let api = self.api.clone();

Expand Down Expand Up @@ -527,10 +527,7 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
})
}

// If this is a new best block, we need to prune its transactions from the pool.
if is_new_best {
pruned_log.extend(prune_known_txs_for_block(id.clone(), &*api, &*pool).await);
}
pruned_log.extend(prune_known_txs_for_block(id.clone(), &*api, &*pool).await);

metrics.report(
|metrics| metrics.block_transactions_pruned.inc_by(pruned_log.len() as u64)
Expand Down Expand Up @@ -610,9 +607,9 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
.map(|tx| tx.hash.clone())
.collect();
revalidation_queue.revalidate_later(block_number, hashes).await;
}

revalidation_strategy.lock().clear();
revalidation_strategy.lock().clear();
}
}.boxed()
}
ChainEvent::Finalized { hash } => {
Expand Down Expand Up @@ -641,7 +638,9 @@ pub async fn notification_future<Client, Pool, Block>(
Client: sc_client_api::BlockchainEvents<Block>,
Pool: MaintainedTransactionPool<Block=Block>,
{
let import_stream = client.import_notification_stream().map(Into::into).fuse();
let import_stream = client.import_notification_stream()
.filter_map(|n| ready(n.try_into().ok()))
.fuse();
let finality_stream = client.finality_notification_stream()
.map(Into::into)
.fuse();
Expand Down
21 changes: 12 additions & 9 deletions client/transaction-pool/src/revalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
mut self,
from_queue: TracingUnboundedReceiver<WorkerPayload<Api>>,
interval: R,
) where R: Send, R::Guard: Send
{
) where R: Send, R::Guard: Send {
let interval = interval.into_stream().fuse();
let from_queue = from_queue.fuse();
futures::pin_mut!(interval, from_queue);
Expand Down Expand Up @@ -253,7 +252,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
if this.members.len() > 0 {
log::debug!(
target: "txpool",
"Updated revalidation queue at {}. Transactions: {:?}",
"Updated revalidation queue at {:?}. Transactions: {:?}",
this.best_block,
this.members,
);
Expand Down Expand Up @@ -298,9 +297,7 @@ where
api: Arc<Api>,
pool: Arc<Pool<Api>>,
interval: R,
) -> (Self, Pin<Box<dyn Future<Output=()> + Send>>)
where R: Send + 'static, R::Guard: Send
{
) -> (Self, Pin<Box<dyn Future<Output=()> + Send>>) where R: Send + 'static, R::Guard: Send {
let (to_worker, from_queue) = tracing_unbounded("mpsc_revalidation_queue");

let worker = RevalidationWorker::new(api.clone(), pool.clone());
Expand Down Expand Up @@ -338,16 +335,22 @@ where
/// If queue configured with background worker, this will return immediately.
/// If queue configured without background worker, this will resolve after
/// revalidation is actually done.
pub async fn revalidate_later(&self, at: NumberFor<Api>, transactions: Vec<ExtrinsicHash<Api>>) {
pub async fn revalidate_later(
&self,
at: NumberFor<Api>,
transactions: Vec<ExtrinsicHash<Api>>,
) {
if transactions.len() > 0 {
log::debug!(target: "txpool", "Sent {} transactions to revalidation queue", transactions.len());
log::debug!(
target: "txpool", "Sent {} transactions to revalidation queue",
transactions.len(),
);
}

if let Some(ref to_worker) = self.background {
if let Err(e) = to_worker.unbounded_send(WorkerPayload { at, transactions }) {
log::warn!(target: "txpool", "Failed to update background worker: {:?}", e);
}
return;
} else {
let pool = self.pool.clone();
let api = self.api.clone();
Expand Down
Loading