Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Make sure we don't submit the same transaction twice from the network…
… concurrently
  • Loading branch information
bkchr committed Jul 7, 2020
commit 16c65bf7e59a1706132bf0e64e3a4c6afa6320d5
53 changes: 37 additions & 16 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use message::generic::{Message as GenericMessage, ConsensusMessage, Roles};
use prometheus_endpoint::{Registry, Gauge, Counter, GaugeVec, HistogramVec, PrometheusError, Opts, register, U64};
use sync::{ChainSync, SyncState};
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque, hash_map::Entry};
use std::sync::Arc;
use std::fmt::Write;
use std::{cmp, io, num::NonZeroUsize, pin::Pin, task::Poll, time};
Expand Down Expand Up @@ -199,18 +199,21 @@ impl Metrics {
}
}

struct PendingTransaction {
#[pin_project::pin_project]
struct PendingTransaction<H> {
#[pin]
validation: TransactionImportFuture,
peer_id: PeerId,
tx_hash: H,
}

impl Future for PendingTransaction {
type Output = (PeerId, TransactionImport);
impl<H: ExHashT> Future for PendingTransaction<H> {
type Output = (H, TransactionImport);

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
if let Poll::Ready(import_result) = this.validation.poll_unpin(cx) {
return Poll::Ready((this.peer_id.clone(), import_result));
let mut this = self.project();

if let Poll::Ready(import_result) = Pin::new(&mut this.validation).poll_unpin(cx) {
return Poll::Ready((this.tx_hash.clone(), import_result));
}

Poll::Pending
Expand All @@ -226,7 +229,12 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
/// Pending list of messages to return from `poll` as a priority.
pending_messages: VecDeque<CustomMessageOutcome<B>>,
/// Pending transactions verification tasks.
pending_transactions: FuturesUnordered<PendingTransaction>,
pending_transactions: FuturesUnordered<PendingTransaction<H>>,
/// As multiple peers can send us the same transaction, we group
/// these peers using the transaction hash while the transaction is
/// imported. This prevents that we import the same transaction
/// multiple times concurrently.
pending_transactions_peers: HashMap<H, Vec<PeerId>>,
config: ProtocolConfig,
genesis_hash: B::Hash,
sync: ChainSync<B>,
Expand Down Expand Up @@ -435,6 +443,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)),
pending_messages: VecDeque::new(),
pending_transactions: FuturesUnordered::new(),
pending_transactions_peers: HashMap::new(),
config,
context_data: ContextData {
peers: HashMap::new(),
Expand Down Expand Up @@ -1170,14 +1179,22 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}

let hash = self.transaction_pool.hash_of(&t);
peer.known_transactions.insert(hash);
peer.known_transactions.insert(hash.clone());

self.peerset_handle.report_peer(who.clone(), rep::ANY_TRANSACTION);

self.pending_transactions.push(PendingTransaction {
peer_id: who.clone(),
validation: self.transaction_pool.import(t),
});
match self.pending_transactions_peers.entry(hash.clone()) {
Entry::Vacant(entry) => {
self.pending_transactions.push(PendingTransaction {
validation: self.transaction_pool.import(t),
tx_hash: hash,
});
entry.insert(vec![who.clone()]);
},
Entry::Occupied(mut entry) => {
entry.get_mut().push(who.clone());
}
}
}
}
}
Expand Down Expand Up @@ -2013,8 +2030,12 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
};
self.pending_messages.push_back(event);
}
if let Poll::Ready(Some((peer_id, result))) = self.pending_transactions.poll_next_unpin(cx) {
self.on_handle_transaction_import(peer_id, result);
if let Poll::Ready(Some((tx_hash, result))) = self.pending_transactions.poll_next_unpin(cx) {
if let Some(peers) = self.pending_transactions_peers.remove(&tx_hash) {
peers.into_iter().for_each(|p| self.on_handle_transaction_import(p, result));
} else {
warn!(target: "sub-libp2p", "Inconsistent state, no peers for pending transaction!");
}
}
if let Some(message) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message));
Expand Down