Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Switch to the master branch of libp2p
  • Loading branch information
tomaka committed Jul 26, 2018
commit b18f0701c211802fca184d613d92b86cf0fb446a
295 changes: 135 additions & 160 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions substrate/network-libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ bytes = "0.4"
error-chain = { version = "0.12", default-features = false }
fnv = "1.0"
futures = "0.1"
libp2p = { git = "https://github.com/tomaka/libp2p-rs", rev = "c537102bb39f3ec6590befbfe9094cf411387f77", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
libp2p = { git = "https://github.com/tomaka/libp2p-rs", branch = "polkadot-2", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
ethcore-io = { git = "https://github.com/paritytech/parity.git" }
ethkey = { git = "https://github.com/paritytech/parity.git" }
ethereum-types = "0.3"
Expand All @@ -20,10 +20,10 @@ parking_lot = "0.5"
libc = "0.2"
log = "0.3"
rand = "0.5.0"
tokio-core = "0.1"
tokio = "0.1"
tokio-io = "0.1"
tokio-timer = "0.2"
varint = { git = "https://github.com/libp2p/rust-libp2p" }
varint = { git = "https://github.com/tomaka/libp2p-rs", branch = "polkadot-2" }

[dev-dependencies]
assert_matches = "1.2"
Expand Down
3 changes: 1 addition & 2 deletions substrate/network-libp2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@

extern crate parking_lot;
extern crate fnv;
#[macro_use]
extern crate futures;
extern crate tokio_core;
extern crate tokio;
extern crate tokio_io;
extern crate tokio_timer;
extern crate ethkey;
Expand Down
28 changes: 13 additions & 15 deletions substrate/network-libp2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use std::thread;
use std::time::{Duration, Instant};
use futures::{future, Future, Stream, IntoFuture};
use futures::sync::{mpsc, oneshot};
use tokio_core::reactor::{Core, Handle};
use tokio::runtime::current_thread;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{Interval, Deadline};

Expand Down Expand Up @@ -118,7 +118,7 @@ impl NetworkService {
local_peer_id: local_peer_id.clone(),
kbuckets_timeout: Duration::from_secs(600),
request_timeout: Duration::from_secs(10),
known_initial_peers: network_state.known_peers().collect(),
known_initial_peers: network_state.known_peers(),
});

let shared = Arc::new(Shared {
Expand Down Expand Up @@ -191,16 +191,16 @@ impl NetworkService {

let shared = self.shared.clone();
let join_handle = thread::spawn(move || {
// Tokio core that is going to run everything in this thread.
let mut core = match Core::new() {
// Tokio runtime that is going to run everything in this thread.
let mut runtime = match current_thread::Runtime::new() {
Ok(c) => c,
Err(err) => {
let _ = init_tx.send(Err(err.into()));
return
}
};

let fut = match init_thread(core.handle(), shared,
let fut = match init_thread(shared,
timeouts_register_rx, close_rx) {
Ok(future) => {
debug!(target: "sub-libp2p", "Successfully started networking service");
Expand All @@ -213,7 +213,7 @@ impl NetworkService {
}
};

match core.run(fut) {
match runtime.block_on(fut) {
Ok(()) => debug!(target: "sub-libp2p", "libp2p future finished"),
Err(err) => error!(target: "sub-libp2p", "error while running libp2p: {:?}", err),
}
Expand Down Expand Up @@ -395,7 +395,6 @@ impl NetworkContext for NetworkContextImpl {
/// - `timeouts_register_rx` should receive newly-registered timeouts.
/// - `close_rx` should be triggered when we want to close the network.
fn init_thread(
core: Handle,
shared: Arc<Shared>,
timeouts_register_rx: mpsc::UnboundedReceiver<
(Duration, (Arc<NetworkProtocolHandler + Send + Sync + 'static>, ProtocolId, TimerToken))
Expand All @@ -405,7 +404,6 @@ fn init_thread(
// Build the transport layer.
let transport = {
let base = transport::build_transport(
core.clone(),
transport::UnencryptedAllowed::Denied,
shared.network_state.local_private_key().clone()
);
Expand Down Expand Up @@ -535,7 +533,7 @@ fn init_thread(

// Build the timeouts system for the `register_timeout` function.
// (note: this has nothing to do with socket timeouts)
let timeouts = timeouts::build_timeouts_stream(core.clone(), timeouts_register_rx)
let timeouts = timeouts::build_timeouts_stream(timeouts_register_rx)
.for_each({
let shared = shared.clone();
move |(handler, protocol_id, timer_token)| {
Expand Down Expand Up @@ -630,7 +628,7 @@ fn listener_handle<'a, C>(
match shared.network_state.ping_connection(node_id.clone()) {
Ok((_, ping_connec)) => {
trace!(target: "sub-libp2p", "Successfully opened ping substream with {:?}", node_id);
let fut = ping_connec.set_until(pinger, future);
let fut = ping_connec.tie_or_passthrough(pinger, future);
Box::new(fut) as Box<_>
},
Err(err) => Box::new(future::err(err)) as Box<_>
Expand Down Expand Up @@ -687,7 +685,7 @@ fn handle_kademlia_connection(
val
});

Ok(kad_connec.set_until(controller, future))
Ok(kad_connec.tie_or_passthrough(controller, future))
}

/// When a remote performs a `FIND_NODE` Kademlia request for `searched`,
Expand Down Expand Up @@ -823,7 +821,7 @@ fn handle_custom_connection(
});

let val = (custom_proto_out.outgoing, custom_proto_out.protocol_version);
let final_fut = unique_connec.set_until(val, fut)
let final_fut = unique_connec.tie_or_stop(val, fut)
.then(move |val| {
// Makes sure that `dc_guard` is kept alive until here.
drop(dc_guard);
Expand Down Expand Up @@ -1163,7 +1161,7 @@ fn open_peer_custom_proto<T, To, St, C>(
}

// TODO: this future should be used
let _ = unique_connec.get_or_dial(&swarm_controller, &addr, with_err);
let _ = unique_connec.dial(&swarm_controller, &addr, with_err);
},
Err(err) => {
trace!(target: "sub-libp2p",
Expand Down Expand Up @@ -1202,7 +1200,7 @@ fn obtain_kad_connection<T, To, St, C>(shared: Arc<Shared>,
shared.network_state
.kad_connection(who.clone())
.into_future()
.map(move |(_, k)| k.get_or_dial(&swarm_controller, &addr, transport))
.map(move |(_, k)| k.dial(&swarm_controller, &addr, transport))
.flatten()
}

Expand Down Expand Up @@ -1304,7 +1302,7 @@ fn ping_all<T, St, C>(

let addr = Multiaddr::from(AddrComponent::P2P(who.clone().into_bytes()));
let fut = pinger
.get_or_dial(&swarm_controller, &addr, transport.clone())
.dial(&swarm_controller, &addr, transport.clone())
.and_then(move |mut p| {
trace!(target: "sub-libp2p", "Pinging peer #{} aka. {:?}", peer, who);
p.ping()
Expand Down
26 changes: 13 additions & 13 deletions substrate/network-libp2p/src/timeouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.?

use futures::{Async, future, Future, Poll, stream, Stream, sync::mpsc};
use std::io::Error as IoError;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::marker::PhantomData;
use std::time::{Duration, Instant};
use tokio_core::reactor::{Handle, Timeout};
use tokio_timer::{self, Delay};

/// Builds the timeouts system.
///
Expand All @@ -28,7 +28,6 @@ use tokio_core::reactor::{Handle, Timeout};
/// to the output. Timeouts continue to fire forever, as there is no way to
/// unregister them.
pub fn build_timeouts_stream<T>(
core: Handle,
timeouts_rx: mpsc::UnboundedReceiver<(Duration, T)>
) -> impl Stream<Item = T, Error = IoError>
where T: Clone {
Expand All @@ -40,8 +39,6 @@ pub fn build_timeouts_stream<T>(
stream::unfold(vec![future::Either::A(next_timeout)], move |timeouts| {
// `timeouts` is a `Vec` of futures that produce an `Out`.

let core = core.clone();

// `select_ok` panics if `timeouts` is empty anyway.
if timeouts.is_empty() {
return None
Expand All @@ -53,8 +50,7 @@ pub fn build_timeouts_stream<T>(
Out::NewTimeout((Some((duration, item)), next_timeouts)) => {
// Received a new timeout request on the channel.
let next_timeout = next_in_timeouts_stream(next_timeouts);
let at = Instant::now() + duration;
let timeout = Timeout::new_at(at, &core)?;
let timeout = Delay::new(Instant::now() + duration);
let timeout = TimeoutWrapper(timeout, duration, Some(item), PhantomData);
timeouts.push(future::Either::B(timeout));
timeouts.push(future::Either::A(next_timeout));
Expand All @@ -66,8 +62,7 @@ pub fn build_timeouts_stream<T>(
Out::Timeout(duration, item) => {
// A timeout has happened.
let returned = item.clone();
let at = Instant::now() + duration;
let timeout = Timeout::new_at(at, &core)?;
let timeout = Delay::new(Instant::now() + duration);
let timeout = TimeoutWrapper(timeout, duration, Some(item), PhantomData);
timeouts.push(future::Either::B(timeout));
Ok((Some(returned), timeouts))
Expand Down Expand Up @@ -97,15 +92,20 @@ fn next_in_timeouts_stream<T, B>(
.map_err(|_| unreachable!("an UnboundedReceiver can never error"))
}

/// Does the equivalent to `future.map(move |()| (duration, item))`.
/// Does the equivalent to `future.map(move |()| (duration, item)).map_err(|err| to_io_err(err))`.
struct TimeoutWrapper<A, F, T>(F, Duration, Option<T>, PhantomData<A>);
impl<A, F, T> Future for TimeoutWrapper<A, F, T>
where F: Future<Item = ()> {
where F: Future<Item = (), Error = tokio_timer::Error> {
type Item = Out<A, T>;
type Error = F::Error;
type Error = IoError;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let _ready: () = try_ready!(self.0.poll());
match self.0.poll() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => return Err(IoError::new(IoErrorKind::Other, err.to_string())),
}

let out = Out::Timeout(self.1, self.2.take().expect("poll() called again after success"));
Ok(Async::Ready(out))
}
Expand Down
4 changes: 1 addition & 3 deletions substrate/network-libp2p/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ use libp2p::{self, Transport, mplex, secio, yamux};
use libp2p::core::{MuxedTransport, either, upgrade};
use libp2p::transport_timeout::TransportTimeout;
use std::time::Duration;
use tokio_core::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite};

/// Builds the transport that serves as a common ground for all connections.
pub fn build_transport(
core: Handle,
unencrypted_allowed: UnencryptedAllowed,
local_private_key: secio::SecioKeyPair
) -> impl MuxedTransport<Output = impl AsyncRead + AsyncWrite> + Clone {
let base = libp2p::CommonTransport::new(core)
let base = libp2p::CommonTransport::new()
.with_upgrade({
let secio = secio::SecioConfig {
key: local_private_key,
Expand Down