Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move to a separate module and add some tests.
  • Loading branch information
tomusdrw committed Aug 23, 2018
commit c8faf8739e5514ea2cf9617f7ef721cd96fde859
109 changes: 100 additions & 9 deletions polkadot/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use std::{
collections::HashMap,
ops::Deref,
sync::Arc,
time::{Duration, Instant},
};

use codec::{Decode, Encode};
Expand All @@ -51,6 +52,7 @@ use extrinsic_pool::{
Pool,
Listener,
};
use parking_lot::RwLock;
use polkadot_api::PolkadotApi;
use primitives::{AccountId, BlockId, Hash, Index, UncheckedExtrinsic as FutureProofUncheckedExtrinsic};
use runtime::{Address, UncheckedExtrinsic};
Expand All @@ -59,6 +61,11 @@ use substrate_runtime_primitives::traits::{Bounded, Checkable, Hash as HashT, Bl
pub use extrinsic_pool::txpool::{Options, Status, LightStatus, VerifiedTransaction as VerifiedTransactionOps};
pub use error::{Error, ErrorKind, Result};

/// Maximum time the transaction will be kept in the pool.
///
/// Transactions that don't get included within the limit are removed from the pool.
const POOL_TIME: Duration = Duration::from_secs(60 * 5);

/// Type alias for convenience.
pub type CheckedExtrinsic = <UncheckedExtrinsic as Checkable<fn(Address) -> std::result::Result<AccountId, &'static str>>>::Checked;

Expand All @@ -70,6 +77,7 @@ pub struct VerifiedTransaction {
sender: Option<AccountId>,
hash: Hash,
encoded_size: usize,
valid_till: Instant,
}

impl VerifiedTransaction {
Expand Down Expand Up @@ -187,17 +195,21 @@ impl txpool::Scoring<VerifiedTransaction> for Scoring {
pub struct Ready<'a, A: 'a + PolkadotApi> {
at_block: BlockId,
api: &'a A,
rotator: &'a PoolRotator,
known_nonces: HashMap<AccountId, ::primitives::Index>,
now: Instant,
}

impl<'a, A: 'a + PolkadotApi> Ready<'a, A> {
/// Create a new readiness evaluator at the given block. Requires that
/// the ID has already been checked for local corresponding and available state.
fn create(at: BlockId, api: &'a A) -> Self {
fn create(at: BlockId, api: &'a A, rotator: &'a PoolRotator) -> Self {
Ready {
at_block: at,
api,
known_nonces: HashMap::new(),
rotator,
known_nonces: Default::default(),
now: Instant::now(),
}
}
}
Expand All @@ -207,7 +219,9 @@ impl<'a, T: 'a + PolkadotApi> Clone for Ready<'a, T> {
Ready {
at_block: self.at_block.clone(),
api: self.api,
rotator: self.rotator,
known_nonces: self.known_nonces.clone(),
now: self.now.clone(),
}
}
}
Expand All @@ -230,6 +244,11 @@ impl<'a, A: 'a + PolkadotApi> txpool::Ready<VerifiedTransaction> for Ready<'a, A

trace!(target: "transaction-pool", "Next index for sender is {}; xt index is {}", next_index, xt.original.extrinsic.index);

if self.rotator.ban_if_stale(&self.now, xt) {
debug!(target: "transaction-pool", "[{}] Banning as stale.", xt.hash);
return Readiness::Stale;
}

let result = match xt.original.extrinsic.index.cmp(&next_index) {
// TODO: this won't work perfectly since accounts can now be killed, returning the nonce
// to zero.
Expand All @@ -251,11 +270,12 @@ impl<'a, A: 'a + PolkadotApi> txpool::Ready<VerifiedTransaction> for Ready<'a, A

pub struct Verifier<'a, A: 'a> {
api: &'a A,
rotator: &'a PoolRotator,
at_block: BlockId,
}

impl<'a, A> Verifier<'a, A> where
A: 'a + PolkadotApi,
A: 'a + PolkadotApi,
{
const NO_ACCOUNT: &'static str = "Account not found.";

Expand Down Expand Up @@ -289,6 +309,11 @@ impl<'a, A> txpool::Verifier<UncheckedExtrinsic> for Verifier<'a, A> where

debug!(target: "transaction-pool", "Transaction submitted: {}", ::substrate_primitives::hexdisplay::HexDisplay::from(&encoded));

if self.rotator.is_banned(&hash) {
debug!(target: "transaction-pool", "[{}] Transaction is temporarily banned", hash);
bail!(ErrorKind::TemporarilyBanned);
}

let inner = match uxt.clone().check_with(|a| self.lookup(a)) {
Ok(xt) => Some(xt),
// keep the transaction around in the future pool and attempt to promote it later.
Expand All @@ -298,37 +323,97 @@ impl<'a, A> txpool::Verifier<UncheckedExtrinsic> for Verifier<'a, A> where
let sender = inner.as_ref().map(|x| x.signed.clone());

if encoded_size < 1024 {
debug!(target: "transaction-pool", "Transaction verified: {} => {:?}", hash, uxt);
debug!(target: "transaction-pool", "[{}] Transaction verified: {:?}", hash, uxt);
} else {
debug!(target: "transaction-pool", "Transaction verified: {} ({} bytes is too large to display)", hash, encoded_size);
debug!(target: "transaction-pool", "[{}] Transaction verified: ({} bytes is too large to display)", hash, encoded_size);
}

Ok(VerifiedTransaction {
original: uxt,
inner,
sender,
hash,
encoded_size
encoded_size,
valid_till: Instant::now() + POOL_TIME,
})
}
}

/// Pool rotator is responsible to only keep fresh transactions in the pool.
///
/// Transactions that occupy the pool for too long are culled and temporarily banned from entering
/// the pool again.
struct PoolRotator {
/// How long the transaction is banned for.
ban_time: Duration,
/// Currently banned transactions.
banned_until: RwLock<HashMap<Hash, Instant>>,
}

impl Default for PoolRotator {
fn default() -> Self {
PoolRotator {
ban_time: Duration::from_secs(60 * 30),
banned_until: Default::default(),
}
}
}

impl PoolRotator {
/// Returns `true` if transaction hash is currently banned.
pub fn is_banned(&self, hash: &Hash) -> bool {
self.banned_until.read().contains_key(hash)
}

/// Bans transaction if it's stale.
///
/// Returns `true` if transaction is stale and got banned.
pub fn ban_if_stale(&self, now: &Instant, tx: &VerifiedTransaction) -> bool {
if &tx.valid_till > now {
return false;
}

self.banned_until.write().insert(*tx.hash(), *now + self.ban_time);
true
}

/// Removes timed bans.
pub fn clear_timeouts(&self, now: &Instant) {
let to_remove = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this in {}s?

self.banned_until.read()
.iter()
.filter_map(|(k, v)| if v < now {
Some(*k)
} else {
None
}).collect::<Vec<_>>()
};

let mut banned = self.banned_until.write();
for k in to_remove {
banned.remove(&k);
}
}
}

/// The polkadot transaction pool.
///
/// Wraps a `extrinsic_pool::Pool`.
pub struct TransactionPool<A> {
inner: Pool<Hash, VerifiedTransaction, Scoring, Error>,
api: Arc<A>,
rotator: PoolRotator,
}

impl<A> TransactionPool<A> where
A: PolkadotApi,
A: PolkadotApi,
{
/// Create a new transaction pool.
pub fn new(options: Options, api: Arc<A>) -> Self {
TransactionPool {
inner: Pool::new(options, Scoring),
api,
rotator: Default::default(),
}
}

Expand All @@ -337,6 +422,7 @@ impl<A> TransactionPool<A> where
let verifier = Verifier {
api: &*self.api,
at_block: block,
rotator: &self.rotator,
};
self.inner.submit(verifier, vec![uxt]).map(|mut v| v.swap_remove(0))
}
Expand All @@ -347,6 +433,7 @@ impl<A> TransactionPool<A> where
let verifier = Verifier {
api: &*self.api,
at_block: block,
rotator: &self.rotator,
};

self.inner.submit(verifier, to_reverify.into_iter().map(|tx| tx.original.clone()))?;
Expand All @@ -372,15 +459,18 @@ impl<A> TransactionPool<A> where

/// Cull old transactions from the queue.
pub fn cull(&self, block: BlockId) -> Result<usize> {
let ready = Ready::create(block, &*self.api);
self.rotator.clear_timeouts(&Instant::now());

let ready = Ready::create(block, &*self.api, &self.rotator);
Ok(self.inner.cull(None, ready))
}

/// Cull transactions from the queue and then compute the pending set.
pub fn cull_and_get_pending<F, T>(&self, block: BlockId, f: F) -> Result<T> where
F: FnOnce(txpool::PendingIterator<VerifiedTransaction, Ready<A>, Scoring, Listener<Hash>>) -> T,
{
let ready = Ready::create(block, &*self.api);
self.rotator.clear_timeouts(&Instant::now());
let ready = Ready::create(block, &*self.api, &self.rotator);
self.inner.cull(None, ready.clone());
Ok(self.inner.pending(ready, f))
}
Expand Down Expand Up @@ -424,6 +514,7 @@ impl<A> ExtrinsicPool<FutureProofUncheckedExtrinsic, BlockId, Hash> for Transact

let verifier = Verifier {
api: &*self.api,
rotator: &self.rotator,
at_block: block,
};

Expand Down
Loading