Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions core/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,12 @@ mod tests {
fn should_cease_building_block_when_deadline_is_reached() {
// given
let client = Arc::new(test_client::new());
let chain_api = transaction_pool::ChainApi::new(client.clone());
let chain_api = transaction_pool::FullChainApi::new(client.clone());
let txpool = Arc::new(TransactionPool::new(Default::default(), chain_api));

txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)], false).unwrap();
futures::executor::block_on(
txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)], false)
).unwrap();

let mut proposer_factory = ProposerFactory {
client: client.clone(),
Expand Down
2 changes: 1 addition & 1 deletion core/basic-authorship/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
//! # use test_client::{self, runtime::{Extrinsic, Transfer}, AccountKeyring};
//! # use transaction_pool::txpool::{self, Pool as TransactionPool};
//! # let client = Arc::new(test_client::new());
//! # let chain_api = transaction_pool::ChainApi::new(client.clone());
//! # let chain_api = transaction_pool::FullChainApi::new(client.clone());
//! # let txpool = Arc::new(TransactionPool::new(Default::default(), chain_api));
//! // The first step is to create a `ProposerFactory`.
//! let mut proposer_factory = ProposerFactory {
Expand Down
28 changes: 22 additions & 6 deletions core/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,14 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
who: PeerId,
extrinsics: message::Transactions<B::Extrinsic>
) {
// sending extrinsic to light node is considered a bad behavior
if !self.config.roles.is_full() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh, forgot about this change - could create a separate PR for it, if required. Extrinsics from other nodes are ignored by light tx pool anyway => this change only decreases traffic on light client.

trace!(target: "sync", "Peer {} is trying to send extrinsic to the light node", who);
self.behaviour.disconnect_peer(&who);
self.peerset_handle.report_peer(who, i32::min_value());
return;
}

// Accept extrinsics only when fully synced
if self.sync.status().state != SyncState::Idle {
trace!(target: "sync", "{} Ignoring extrinsics while syncing", who);
Expand All @@ -971,12 +979,15 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
trace!(target: "sync", "Received {} extrinsics from {}", extrinsics.len(), who);
if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
for t in extrinsics {
if let Some(hash) = self.transaction_pool.import(&t) {
self.peerset_handle.report_peer(who.clone(), NEW_EXTRINSIC_REPUTATION_CHANGE);
peer.known_extrinsics.insert(hash);
} else {
trace!(target: "sync", "Extrinsic rejected");
}
let hash = self.transaction_pool.hash_of(&t);
peer.known_extrinsics.insert(hash);

self.transaction_pool.import(
self.peerset_handle.clone().into(),
who.clone(),
NEW_EXTRINSIC_REPUTATION_CHANGE,
t,
);
}
}
}
Expand All @@ -995,6 +1006,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
let extrinsics = self.transaction_pool.transactions();
let mut propagated_to = HashMap::new();
for (who, peer) in self.context_data.peers.iter_mut() {
// never send extrinsics to the light node
if !peer.info.roles.is_full() {
continue;
}

let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics
.iter()
.filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone()))
Expand Down
18 changes: 17 additions & 1 deletion core/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,18 @@ impl<T> ExHashT for T where
pub trait TransactionPool<H: ExHashT, B: BlockT>: Send + Sync {
/// Get transactions from the pool that are ready to be propagated.
fn transactions(&self) -> Vec<(H, B::Extrinsic)>;
/// Get hash of transaction.
fn hash_of(&self, transaction: &B::Extrinsic) -> H;
/// Import a transaction into the pool.
fn import(&self, transaction: &B::Extrinsic) -> Option<H>;
///
/// Peer reputation is changed by reputation_change if transaction is accepted by the pool.
fn import(
&self,
report_handle: ReportHandle,
who: PeerId,
reputation_change: i32,
transaction: B::Extrinsic,
);
/// Notify the pool about transactions broadcast.
fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>);
}
Expand All @@ -77,6 +87,12 @@ pub struct ReportHandle {
inner: PeersetHandle, // wraps it so we don't have to worry about breaking API.
}

impl From<PeersetHandle> for ReportHandle {
fn from(peerset_handle: PeersetHandle) -> Self {
ReportHandle { inner: peerset_handle }
}
}

impl ReportHandle {
/// Report a given peer as either beneficial (+) or costly (-) according to the
/// given scalar.
Expand Down
8 changes: 5 additions & 3 deletions core/network/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use consensus::Error as ConsensusError;
use consensus::{BlockOrigin, ForkChoiceStrategy, BlockImportParams, BlockCheckParams, JustificationImport};
use futures::prelude::*;
use futures03::{StreamExt as _, TryStreamExt as _};
use crate::{NetworkWorker, NetworkService, config::ProtocolId};
use crate::{NetworkWorker, NetworkService, ReportHandle, config::ProtocolId};
use crate::config::{NetworkConfiguration, TransportConfig, BoxFinalityProofRequestBuilder};
use libp2p::PeerId;
use parking_lot::Mutex;
Expand Down Expand Up @@ -400,10 +400,12 @@ impl TransactionPool<Hash, Block> for EmptyTransactionPool {
Vec::new()
}

fn import(&self, _transaction: &Extrinsic) -> Option<Hash> {
None
fn hash_of(&self, _transaction: &Extrinsic) -> Hash {
Hash::default()
}

fn import(&self, _report_handle: ReportHandle, _who: PeerId, _rep_change: i32, _transaction: Extrinsic) {}

fn on_broadcasted(&self, _: HashMap<Hash, Vec<String>>) {}
}

Expand Down
19 changes: 9 additions & 10 deletions core/offchain/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,29 +302,28 @@ impl<A: ChainApi> AsyncApi<A> {
match msg {
ExtMessage::SubmitExtrinsic(ext) => self.submit_extrinsic(ext),
}
future::ready(())
});

future::join(extrinsics, http)
.map(|((), ())| ())
}

fn submit_extrinsic(&mut self, ext: Vec<u8>) {
fn submit_extrinsic(&mut self, ext: Vec<u8>) -> impl Future<Output = ()> {
let xt = match <A::Block as traits::Block>::Extrinsic::decode(&mut &*ext) {
Ok(xt) => xt,
Err(e) => {
warn!("Unable to decode extrinsic: {:?}: {}", ext, e.what());
return
return future::Either::Left(future::ready(()))
},
};

info!("Submitting to the pool: {:?} (isSigned: {:?})", xt, xt.is_signed());
match self.transaction_pool.submit_one(&self.at, xt.clone()) {
Ok(hash) => debug!("[{:?}] Offchain transaction added to the pool.", hash),
Err(e) => {
debug!("Couldn't submit transaction: {:?}", e);
},
}
future::Either::Right(self.transaction_pool
.submit_one(&self.at, xt.clone())
.map(|result| match result {
Ok(hash) => { debug!("[{:?}] Offchain transaction added to the pool.", hash); },
Err(e) => { debug!("Couldn't submit transaction: {:?}", e); },
}))
}
}

Expand Down Expand Up @@ -354,7 +353,7 @@ mod tests {
let db = LocalStorage::new_test();
let client = Arc::new(test_client::new());
let pool = Arc::new(
Pool::new(Default::default(), transaction_pool::ChainApi::new(client.clone()))
Pool::new(Default::default(), transaction_pool::FullChainApi::new(client.clone()))
);

let mock = Arc::new(MockNetworkStateInfo());
Expand Down
2 changes: 1 addition & 1 deletion core/offchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ mod tests {
// given
let _ = env_logger::try_init();
let client = Arc::new(test_client::new());
let pool = Arc::new(Pool::new(Default::default(), transaction_pool::ChainApi::new(client.clone())));
let pool = Arc::new(Pool::new(Default::default(), transaction_pool::FullChainApi::new(client.clone())));
let db = client_db::offchain::LocalStorage::new_test();
let network_state = Arc::new(MockNetworkStateInfo());

Expand Down
3 changes: 3 additions & 0 deletions core/rpc/api/src/author/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ use jsonrpc_core as rpc;
/// Author RPC Result type.
pub type Result<T> = std::result::Result<T, Error>;

/// Author RPC future Result type.
pub type FutureResult<T> = Box<dyn rpc::futures::Future<Item = T, Error = Error> + Send>;

/// Author RPC errors.
#[derive(Debug, derive_more::Display, derive_more::From)]
pub enum Error {
Expand Down
4 changes: 2 additions & 2 deletions core/rpc/api/src/author/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use primitives::{
Bytes
};
use self::error::Result;
use self::error::{FutureResult, Result};
use txpool::watcher::Status;

pub use self::gen_client::Client as AuthorClient;
Expand All @@ -37,7 +37,7 @@ pub trait AuthorApi<Hash, BlockHash> {

/// Submit hex-encoded extrinsic for inclusion in block.
#[rpc(name = "author_submitExtrinsic")]
fn submit_extrinsic(&self, extrinsic: Bytes) -> Result<Hash>;
fn submit_extrinsic(&self, extrinsic: Bytes) -> FutureResult<Hash>;

/// Insert a key into the keystore.
#[rpc(name = "author_insertKey")]
Expand Down
2 changes: 1 addition & 1 deletion core/rpc/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod helpers;
mod subscriptions;

pub use jsonrpc_core::IoHandlerExtension as RpcExtension;
pub use subscriptions::Subscriptions;
pub use subscriptions::{Subscriptions, TaskExecutor};
pub use helpers::Receiver;

pub mod author;
Expand Down
7 changes: 5 additions & 2 deletions core/rpc/api/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ use jsonrpc_core::futures::{Future, future};

type Id = u64;

/// Alias for a an implementation of `futures::future::Executor`.
pub type TaskExecutor = Arc<dyn future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>;

/// Generate unique ids for subscriptions.
#[derive(Clone, Debug)]
pub struct IdProvider {
Expand Down Expand Up @@ -53,12 +56,12 @@ impl IdProvider {
pub struct Subscriptions {
next_id: IdProvider,
active_subscriptions: Arc<Mutex<HashMap<Id, oneshot::Sender<()>>>>,
executor: Arc<dyn future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>,
executor: TaskExecutor,
}

impl Subscriptions {
/// Creates new `Subscriptions` object.
pub fn new(executor: Arc<dyn future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>) -> Self {
pub fn new(executor: TaskExecutor) -> Self {
Subscriptions {
next_id: Default::default(),
active_subscriptions: Default::default(),
Expand Down
55 changes: 39 additions & 16 deletions core/rpc/src/author/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@
mod tests;

use std::{sync::Arc, convert::TryInto};
use futures03::future::{FutureExt, TryFutureExt};
use log::warn;

use client::{self, Client};
use rpc::futures::{Sink, Future};
use rpc::futures::{
Sink, Future,
stream::Stream as _,
future::result,
};
use futures03::{StreamExt as _, compat::Compat};
use api::Subscriptions;
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use log::warn;
use codec::{Encode, Decode};
use primitives::{Bytes, Blake2Hasher, H256, traits::BareCryptoStorePtr};
use sr_primitives::{generic, traits::{self, ProvideRuntimeApi}};
Expand All @@ -44,7 +49,7 @@ use session::SessionKeys;

/// Re-export the API for backward compatibility.
pub use api::author::*;
use self::error::{Error, Result};
use self::error::{Error, FutureResult, Result};

/// Authoring API
pub struct Author<B, E, P, RA> where P: PoolChainApi + Sync + Send + 'static {
Expand Down Expand Up @@ -108,15 +113,19 @@ impl<B, E, P, RA> AuthorApi<ExHash<P>, BlockHash<P>> for Author<B, E, P, RA> whe
).map(Into::into).map_err(|e| Error::Client(Box::new(e)))
}

fn submit_extrinsic(&self, ext: Bytes) -> Result<ExHash<P>> {
let xt = Decode::decode(&mut &ext[..])?;
fn submit_extrinsic(&self, ext: Bytes) -> FutureResult<ExHash<P>> {
let xt = match Decode::decode(&mut &ext[..]) {
Ok(xt) => xt,
Err(err) => return Box::new(result(Err(err.into()))),
};
let best_block_hash = self.client.info().chain.best_hash;
self.pool
Box::new(self.pool
.submit_one(&generic::BlockId::hash(best_block_hash), xt)
.compat()
.map_err(|e| e.into_pool_error()
.map(Into::into)
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
)
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into()))
)
}

fn pending_extrinsics(&self) -> Result<Vec<Bytes>> {
Expand Down Expand Up @@ -151,30 +160,44 @@ impl<B, E, P, RA> AuthorApi<ExHash<P>, BlockHash<P>> for Author<B, E, P, RA> whe
) {
let submit = || -> Result<_> {
let best_block_hash = self.client.info().chain.best_hash;
let dxt = <<P as PoolChainApi>::Block as traits::Block>::Extrinsic::decode(&mut &xt[..])?;
self.pool
let dxt = <<P as PoolChainApi>::Block as traits::Block>::Extrinsic::decode(&mut &xt[..])
.map_err(error::Error::from)?;
Ok(self.pool
.submit_and_watch(&generic::BlockId::hash(best_block_hash), dxt)
.boxed()
.compat()
.map_err(|e| e.into_pool_error()
.map(Into::into)
.map(error::Error::from)
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
)
))
};

let watcher = match submit() {
Ok(watcher) => watcher,
let future_watcher = match submit() {
Ok(future_watcher) => future_watcher,
Err(err) => {
// reject the subscriber (ignore errors - we don't care if subscriber is no longer there).
let _ = subscriber.reject(err.into());
return;
},
};

// make 'future' watcher be a future with output = stream of watcher events
let future_watcher = future_watcher
.map_err(|err| { warn!("Failed to submit extrinsic: {}", err); })
.map(|watcher| Compat::new(watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v)))));

// convert a 'future' watcher into the stream with single element = stream of watcher events
let watcher_stream = future_watcher.into_stream();

// and now flatten the 'watcher_stream' so that we'll have the stream with watcher events
let watcher_stream = watcher_stream.flatten();

self.subscriptions.add(subscriber, move |sink| {
sink
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(Compat::new(watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v)))))
.send_all(watcher_stream)
.map(|_| ())
})
});
}

fn unwatch_extrinsic(&self, _metadata: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool> {
Expand Down
Loading