Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Expose concrete iterator.
  • Loading branch information
tomusdrw committed Sep 14, 2021
commit 42f43ade582b50e94caedb3beb0d015b83cf7f61
26 changes: 18 additions & 8 deletions bin/node/bench/src/construct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ use std::{borrow::Cow, collections::HashMap, pin::Pin, sync::Arc};

use node_primitives::Block;
use node_testing::bench::{BenchDb, BlockType, DatabaseType, KeyTypes, Profile};
use sc_transaction_pool_api::{
ImportNotificationStream, PoolFuture, PoolStatus, TransactionFor, TransactionSource,
TransactionStatusStreamFor, TxHash,
};
use sc_transaction_pool_api::{ImportNotificationStream, PoolFuture, PoolStatus, ReadyTransactions, TransactionFor, TransactionSource, TransactionStatusStreamFor, TxHash};
use sp_consensus::{Environment, Proposer};
use sp_inherents::InherentDataProvider;
use sp_runtime::{generic::BlockId, traits::NumberFor, OpaqueExtrinsic};
Expand Down Expand Up @@ -216,6 +213,19 @@ impl sc_transaction_pool_api::InPoolTransaction for PoolTransaction {

#[derive(Clone, Debug)]
pub struct Transactions(Vec<Arc<PoolTransaction>>);
pub struct TransactionsIterator(std::vec::IntoIter<Arc<PoolTransaction>>);

impl Iterator for TransactionsIterator {
type Item = Arc<PoolTransaction>;

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}

impl ReadyTransactions for TransactionsIterator {
fn report_invalid(&mut self) {}
}

impl sc_transaction_pool_api::TransactionPool for Transactions {
type Block = Block;
Expand Down Expand Up @@ -257,16 +267,16 @@ impl sc_transaction_pool_api::TransactionPool for Transactions {
_at: NumberFor<Self::Block>,
) -> Pin<
Box<
dyn Future<Output = Box<dyn Iterator<Item = Arc<Self::InPoolTransaction>> + Send>>
dyn Future<Output = Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>>
+ Send,
>,
> {
let iter: Box<dyn Iterator<Item = Arc<PoolTransaction>> + Send> =
Box::new(self.0.clone().into_iter());
let iter: Box<dyn ReadyTransactions<Item = Arc<PoolTransaction>> + Send> =
Box::new(TransactionsIterator(self.0.clone().into_iter()));
Box::pin(futures::future::ready(iter))
}

fn ready(&self) -> Box<dyn Iterator<Item = Arc<Self::InPoolTransaction>> + Send> {
fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send> {
unimplemented!()
}

Expand Down
8 changes: 6 additions & 2 deletions client/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ where
let mut t2 =
futures_timer::Delay::new(deadline.saturating_duration_since((self.now)()) / 8).fuse();

let pending_iterator = select! {
let mut pending_iterator = select! {
res = t1 => res,
_ = t2 => {
log::warn!(
Expand All @@ -363,7 +363,7 @@ where
let mut transaction_pushed = false;
let mut hit_block_size_limit = false;

for pending_tx in pending_iterator {
while let Some(pending_tx) = pending_iterator.next() {
if (self.now)() > deadline {
debug!(
"Consensus deadline reached when pushing block transactions, \
Expand All @@ -385,6 +385,7 @@ where
but will try {} more transactions before quitting.",
MAX_SKIPPED_TRANSACTIONS - skipped,
);
pending_iterator.report_invalid();
continue
} else {
debug!("Reached block size limit, proceeding with proposing.");
Expand All @@ -400,6 +401,7 @@ where
debug!("[{:?}] Pushed to the block.", pending_tx_hash);
},
Err(ApplyExtrinsicFailed(Validity(e))) if e.exhausted_resources() => {
pending_iterator.report_invalid();
if skipped < MAX_SKIPPED_TRANSACTIONS {
skipped += 1;
debug!(
Expand All @@ -412,13 +414,15 @@ where
}
},
Err(e) if skipped > 0 => {
pending_iterator.report_invalid();
trace!(
"[{:?}] Ignoring invalid transaction when skipping: {}",
pending_tx_hash,
e
);
},
Err(e) => {
pending_iterator.report_invalid();
debug!("[{:?}] Invalid transaction: {}", pending_tx_hash, e);
unqueue_invalid.push(pending_tx_hash);
},
Expand Down
25 changes: 23 additions & 2 deletions client/transaction-pool/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,13 @@ pub trait TransactionPool: Send + Sync {
at: NumberFor<Self::Block>,
) -> Pin<
Box<
dyn Future<Output = Box<dyn Iterator<Item = Arc<Self::InPoolTransaction>> + Send>>
dyn Future<Output = Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>>
+ Send,
>,
>;

/// Get an iterator for ready transactions ordered by priority.
fn ready(&self) -> Box<dyn Iterator<Item = Arc<Self::InPoolTransaction>> + Send>;
fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;

// *** Block production
/// Remove transactions identified by given hashes (and dependent transactions) from the pool.
Expand All @@ -254,6 +254,27 @@ pub trait TransactionPool: Send + Sync {
fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>>;
}

/// An iterator of ready transactions.
///
/// The trait extends regular [`std::iter::Iterator`] trait and allows reporting
/// last-returned element as invalid.
///
/// The implementation is then allowed, for performance reasons, to change the elements
/// returned next, by e.g. skipping elements that are known to depend on the reported
/// transaction, which yields them invalid as well.
pub trait ReadyTransactions: Iterator {
/// Report last-returned transaction as invalid.
///
/// This might affect subsequent elements returned by the iterator, so dependent transactions
/// are skipped for performance reasons.
fn report_invalid(&mut self);
}

/// A no-op implementation for an empty iterator.
impl<T> ReadyTransactions for std::iter::Empty<T> {
fn report_invalid(&mut self) {}
}

/// Events that the transaction pool listens for.
pub enum ChainEvent<B: BlockT> {
/// New best block have been added to the chain
Expand Down
7 changes: 2 additions & 5 deletions client/transaction-pool/src/graph/base_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ use sp_runtime::{
},
};

use super::{
future::{FutureTransactions, WaitingTransaction},
ready::ReadyTransactions,
};
use super::{future::{FutureTransactions, WaitingTransaction}, ready::{BestIterator, ReadyTransactions}};

/// Successful import result.
#[derive(Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -356,7 +353,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
}

/// Returns an iterator over ready transactions in the pool.
pub fn ready(&self) -> impl Iterator<Item = Arc<Transaction<Hash, Ex>>> {
pub fn ready(&self) -> BestIterator<Hash, Ex> {
self.ready.get()
}

Expand Down
6 changes: 6 additions & 0 deletions client/transaction-pool/src/graph/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,12 @@ impl<Hash: hash::Hash + Member, Ex> BestIterator<Hash, Ex> {
}
}

impl<Hash: hash::Hash + Member, Ex> sc_transaction_pool_api::ReadyTransactions for BestIterator<Hash, Ex> {
fn report_invalid(&mut self) {
BestIterator::report_invalid(self)
}
}

impl<Hash: hash::Hash + Member, Ex> BestIterator<Hash, Ex> {
/// Report last returned value as invalid.
///
Expand Down
4 changes: 2 additions & 2 deletions client/transaction-pool/src/graph/validated_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{
use futures::channel::mpsc::{channel, Sender};
use parking_lot::{Mutex, RwLock};
use retain_mut::RetainMut;
use sc_transaction_pool_api::{error, PoolStatus};
use sc_transaction_pool_api::{PoolStatus, ReadyTransactions, error};
use serde::Serialize;
use sp_runtime::{
generic::BlockId,
Expand Down Expand Up @@ -631,7 +631,7 @@ impl<B: ChainApi> ValidatedPool<B> {
}

/// Get an iterator for ready transactions ordered by priority
pub fn ready(&self) -> impl Iterator<Item = TransactionFor<B>> + Send {
pub fn ready(&self) -> impl ReadyTransactions<Item = TransactionFor<B>> + Send {
self.pool.read().ready()
}

Expand Down
3 changes: 2 additions & 1 deletion client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use graph::{ExtrinsicHash, IsValidator};
use sc_transaction_pool_api::{
ChainEvent, ImportNotificationStream, MaintainedTransactionPool, PoolFuture, PoolStatus,
TransactionFor, TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash,
ReadyTransactions
};
use sp_core::traits::SpawnEssentialNamed;
use sp_runtime::{
Expand All @@ -69,7 +70,7 @@ use crate::metrics::MetricsLink as PrometheusMetrics;
use prometheus_endpoint::Registry as PrometheusRegistry;

type BoxedReadyIterator<Hash, Data> =
Box<dyn Iterator<Item = Arc<graph::base_pool::Transaction<Hash, Data>>> + Send>;
Box<dyn ReadyTransactions<Item = Arc<graph::base_pool::Transaction<Hash, Data>>> + Send>;

type ReadyIteratorFor<PoolApi> =
BoxedReadyIterator<graph::ExtrinsicHash<PoolApi>, graph::ExtrinsicFor<PoolApi>>;
Expand Down