Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ macro_rules! new_full_start {
.with_transaction_pool(|builder| {
let pool_api = sc_transaction_pool::FullChainApi::new(
builder.client().clone(),
None,
);
Ok(sc_transaction_pool::BasicPool::new(
builder.config().transaction_pool.clone(),
Expand Down
1 change: 1 addition & 0 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ macro_rules! new_full_start {
.with_transaction_pool(|builder| {
let pool_api = sc_transaction_pool::FullChainApi::new(
builder.client().clone(),
builder.prometheus_registry(),
);
let config = builder.config();

Expand Down
8 changes: 4 additions & 4 deletions client/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ mod tests {
let txpool = Arc::new(
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
).0
);
Expand Down Expand Up @@ -414,7 +414,7 @@ mod tests {
let txpool = Arc::new(
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
).0
);
Expand Down Expand Up @@ -449,7 +449,7 @@ mod tests {
let txpool = Arc::new(
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
).0
);
Expand Down Expand Up @@ -511,7 +511,7 @@ mod tests {
let txpool = Arc::new(
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
).0
);
Expand Down
6 changes: 5 additions & 1 deletion client/basic-authorship/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
//! # };
//! # use sc_transaction_pool::{BasicPool, FullChainApi};
//! # let client = Arc::new(substrate_test_runtime_client::new());
//! # let txpool = Arc::new(BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone())), None).0);
//! # let txpool = Arc::new(BasicPool::new(
//! # Default::default(),
//! # Arc::new(FullChainApi::new(client.clone(), None)),
//! # None).0,
//! # );
//! // The first step is to create a `ProposerFactory`.
//! let mut proposer_factory = ProposerFactory::new(client.clone(), txpool.clone(), None);
//!
Expand Down
69 changes: 46 additions & 23 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use message::generic::{Message as GenericMessage, ConsensusMessage, Roles};
use prometheus_endpoint::{Registry, Gauge, Counter, GaugeVec, HistogramVec, PrometheusError, Opts, register, U64};
use sync::{ChainSync, SyncState};
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque, hash_map::Entry};
use std::sync::Arc;
use std::fmt::Write;
use std::{cmp, io, num::NonZeroUsize, pin::Pin, task::Poll, time};
Expand Down Expand Up @@ -199,18 +199,21 @@ impl Metrics {
}
}

struct PendingTransaction {
#[pin_project::pin_project]
struct PendingTransaction<H> {
#[pin]
validation: TransactionImportFuture,
peer_id: PeerId,
tx_hash: H,
}

impl Future for PendingTransaction {
type Output = (PeerId, TransactionImport);
impl<H: ExHashT> Future for PendingTransaction<H> {
type Output = (H, TransactionImport);

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
if let Poll::Ready(import_result) = this.validation.poll_unpin(cx) {
return Poll::Ready((this.peer_id.clone(), import_result));
let mut this = self.project();

if let Poll::Ready(import_result) = Pin::new(&mut this.validation).poll_unpin(cx) {
return Poll::Ready((this.tx_hash.clone(), import_result));
}

Poll::Pending
Expand All @@ -226,7 +229,12 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
/// Pending list of messages to return from `poll` as a priority.
pending_messages: VecDeque<CustomMessageOutcome<B>>,
/// Pending transactions verification tasks.
pending_transactions: FuturesUnordered<PendingTransaction>,
pending_transactions: FuturesUnordered<PendingTransaction<H>>,
/// As multiple peers can send us the same transaction, we group
/// these peers using the transaction hash while the transaction is
/// imported. This prevents that we import the same transaction
/// multiple times concurrently.
pending_transactions_peers: HashMap<H, Vec<PeerId>>,
config: ProtocolConfig,
genesis_hash: B::Hash,
sync: ChainSync<B>,
Expand Down Expand Up @@ -435,6 +443,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)),
pending_messages: VecDeque::new(),
pending_transactions: FuturesUnordered::new(),
pending_transactions_peers: HashMap::new(),
config,
context_data: ContextData {
peers: HashMap::new(),
Expand Down Expand Up @@ -1141,7 +1150,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
fn on_transactions(
&mut self,
who: PeerId,
transactions: message::Transactions<B::Extrinsic>
transactions: message::Transactions<B::Extrinsic>,
) {
// sending transaction to light node is considered a bad behavior
if !self.config.roles.is_full() {
Expand Down Expand Up @@ -1170,14 +1179,22 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}

let hash = self.transaction_pool.hash_of(&t);
peer.known_transactions.insert(hash);
peer.known_transactions.insert(hash.clone());

self.peerset_handle.report_peer(who.clone(), rep::ANY_TRANSACTION);

self.pending_transactions.push(PendingTransaction {
peer_id: who.clone(),
validation: self.transaction_pool.import(t),
});
match self.pending_transactions_peers.entry(hash.clone()) {
Entry::Vacant(entry) => {
self.pending_transactions.push(PendingTransaction {
validation: self.transaction_pool.import(t),
tx_hash: hash,
});
entry.insert(vec![who.clone()]);
},
Entry::Occupied(mut entry) => {
entry.get_mut().push(who.clone());
}
}
}
}
}
Expand Down Expand Up @@ -1211,7 +1228,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
&mut self,
transactions: &[(H, B::Extrinsic)],
) -> HashMap<H, Vec<String>> {
let mut propagated_to = HashMap::new();
let mut propagated_to = HashMap::<_, Vec<_>>::new();
let mut propagated_transactions = 0;

for (who, peer) in self.context_data.peers.iter_mut() {
// never send transactions to the light node
if !peer.info.roles.is_full() {
Expand All @@ -1224,11 +1243,13 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
.cloned()
.unzip();

propagated_transactions += hashes.len();

if !to_send.is_empty() {
for hash in hashes {
propagated_to
.entry(hash)
.or_insert_with(Vec::new)
.or_default()
.push(who.to_base58());
}
trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
Expand All @@ -1243,10 +1264,8 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}

if propagated_to.len() > 0 {
if let Some(ref metrics) = self.metrics {
metrics.propagated_transactions.inc();
}
if let Some(ref metrics) = self.metrics {
metrics.propagated_transactions.inc_by(propagated_transactions as _)
}

propagated_to
Expand Down Expand Up @@ -2011,8 +2030,12 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
};
self.pending_messages.push_back(event);
}
if let Poll::Ready(Some((peer_id, result))) = self.pending_transactions.poll_next_unpin(cx) {
self.on_handle_transaction_import(peer_id, result);
if let Poll::Ready(Some((tx_hash, result))) = self.pending_transactions.poll_next_unpin(cx) {
if let Some(peers) = self.pending_transactions_peers.remove(&tx_hash) {
peers.into_iter().for_each(|p| self.on_handle_transaction_import(p, result));
} else {
warn!(target: "sub-libp2p", "Inconsistent state, no peers for pending transaction!");
}
}
if let Some(message) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message));
Expand Down
2 changes: 1 addition & 1 deletion client/offchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ mod tests {
let client = Arc::new(substrate_test_runtime_client::new());
let pool = Arc::new(TestPool(BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
).0));
client.execution_extensions()
Expand Down
2 changes: 1 addition & 1 deletion client/rpc/src/author/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Default for TestSetup {

let pool = Arc::new(BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
).0);
TestSetup {
Expand Down
2 changes: 1 addition & 1 deletion client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ mod tests {
let client = Arc::new(client);
let pool = Arc::new(BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
).0);
let source = sp_runtime::transaction_validity::TransactionSource::External;
Expand Down
7 changes: 6 additions & 1 deletion client/transaction-pool/graph/src/base_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
return_value
}

/// Returns if the transaction for the given hash is already imported.
pub fn is_imported(&self, tx_hash: &Hash) -> bool {
self.future.contains(tx_hash) || self.ready.contains(tx_hash)
}

/// Imports transaction to the pool.
///
/// The pool consists of two parts: Future and Ready.
Expand All @@ -272,7 +277,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
&mut self,
tx: Transaction<Hash, Ex>,
) -> error::Result<Imported<Hash, Ex>> {
if self.future.contains(&tx.hash) || self.ready.contains(&tx.hash) {
if self.is_imported(&tx.hash) {
return Err(error::Error::AlreadyImported(Box::new(tx.hash.clone())))
}

Expand Down
Loading