diff --git a/example/examples/echo-dialer.rs b/example/examples/echo-dialer.rs index cf628c9d6fa..06dea677317 100644 --- a/example/examples/echo-dialer.rs +++ b/example/examples/echo-dialer.rs @@ -30,7 +30,7 @@ extern crate tokio_io; use bytes::BytesMut; use futures::{Future, Sink, Stream}; use std::env; -use swarm::{UpgradeExt, SimpleProtocol, Transport}; +use swarm::{UpgradeExt, SimpleProtocol, Transport, MuxedTransport}; use tcp::TcpConfig; use tokio_core::reactor::Core; use tokio_io::codec::length_delimited; @@ -68,8 +68,10 @@ fn main() { // `Transport` because the output of the upgrade is not a stream but a controller for // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into // a `Transport`. - .into_connection_reuse() + .into_connection_reuse(); + let transport_with_echo = transport + .clone() // On top of plaintext or secio, we use the "echo" protocol, which is a custom protocol // just for this example. // For this purpose, we create a `SimpleProtocol` struct. @@ -86,34 +88,23 @@ fn main() { // of any opened stream. // We use it to dial the address. - let dialer = transport - .dial_and_listen(swarm::Multiaddr::new(&target_addr).expect("invalid multiaddr")) + let dialer = transport_with_echo + .dial(swarm::Multiaddr::new(&target_addr).expect("invalid multiaddr")) // If the multiaddr protocol exists but is not supported, then we get an error containing // the transport and the original multiaddress. Therefore we cannot directly use `unwrap()` // or `expect()`, but have to add a `map_err()` beforehand. .map_err(|(_, addr)| addr).expect("unsupported multiaddr") - .and_then(|(incoming, echo)| { + .and_then(|echo| { // `echo` is what the closure used when initializing "echo" returns. // Consequently, please note that the `send` method is available only because the type // `length_delimited::Framed` has a `send` method. println!("Sending \"hello world\" to listener"); - echo.and_then(|echo| echo.send("hello world".into()).map(Option::Some)) - .select( - incoming - .for_each(|_| { - println!("opened"); - Ok(()) - }) - .map(|()| None), - ) - .map(|(n, _)| n) - .map_err(|(e, _)| e) + echo.send("hello world".into()) }) .and_then(|echo| { // The message has been successfully sent. Now wait for an answer. - echo.unwrap() - .into_future() + echo.into_future() .map(|(msg, rest)| { println!("Received message from listener: {:?}", msg); rest @@ -124,5 +115,6 @@ fn main() { // `dialer` is a future that contains all the behaviour that we want, but nothing has actually // started yet. Because we created the `TcpConfig` with tokio, we need to run the future // through the tokio core. - core.run(dialer).unwrap(); + core.run(dialer.map(|_| ()).select(transport.incoming().for_each(|_| Ok(())))) + .unwrap_or_else(|_| panic!()); } diff --git a/example/examples/echo-server.rs b/example/examples/echo-server.rs index 9f803e585e3..4334bef01d5 100644 --- a/example/examples/echo-server.rs +++ b/example/examples/echo-server.rs @@ -95,62 +95,49 @@ fn main() { println!("Now listening on {:?}", address); let future = listener - .filter_map(|(socket, client_addr)| { - let client_addr = client_addr.to_string(); - - // This closure is called whenever a new connection has been received. The `socket` - // is a `Result<..., IoError>` which contains an error if for example protocol - // negotiation or the secio handshake failed. We handle this situation by printing a - // message on stderr and ignoring the connection. - match socket { - Ok(s) => Some((s, client_addr)), - Err(err) => { - eprintln!("Failed connection attempt from {}\n => Error: {:?}", - client_addr, err); - None - }, - } - }) - .for_each(|(socket, client_addr)| { - // This closure is called whenever a new connection has been received and successfully - // upgraded to use secio/plaintext and echo. - println!("Successfully negotiated protocol with {}", client_addr); - - // We loop forever in order to handle all the messages sent by the client. - let client_finished = { - let client_addr = client_addr.clone(); - loop_fn(socket, move |socket| { - let client_addr = client_addr.clone(); - socket.into_future() - .map_err(|(err, _)| err) - .and_then(move |(msg, rest)| { - if let Some(msg) = msg { - // One message has been received. We send it back to the client. - println!("Received a message from {}: {:?}\n => Sending back \ - identical message to remote", client_addr, msg); - Box::new(rest.send(msg).map(|m| Loop::Continue(m))) - as Box> - } else { - // End of stream. Connection closed. Breaking the loop. - println!("Received EOF from {}\n => Dropping connection", - client_addr); - Box::new(Ok(Loop::Break(())).into_future()) - as Box> - } - }) + // This closure is called whenever a new connection has been received. + // `socket` is a future that will be triggered once the upgrade to secio, multiplex + // and echo is complete. + let client_addr = client_addr.to_string(); + println!("Incoming connection from {}", client_addr); + + socket + .and_then(move |socket| { + println!("Successfully negotiated protocol with {}", client_addr); + + // We loop forever in order to handle all the messages sent by the client. + loop_fn(socket, move |socket| { + let client_addr = client_addr.clone(); + socket.into_future() + .map_err(|(err, _)| err) + .and_then(move |(msg, rest)| { + if let Some(msg) = msg { + // One message has been received. We send it back to the client. + println!("Received a message from {}: {:?}\n => Sending back \ + identical message to remote", client_addr, msg); + Box::new(rest.send(msg).map(|m| Loop::Continue(m))) + as Box> + } else { + // End of stream. Connection closed. Breaking the loop. + println!("Received EOF from {}\n => Dropping connection", + client_addr); + Box::new(Ok(Loop::Break(())).into_future()) + as Box> + } + }) + }) }) - }; - // We absorb errors from the `client_finished` future so that an error while processing - // a client (eg. if the client unexpectedly disconnects) doesn't propagate and stop the - // entire server. - client_finished.then(move |res| { - if let Err(err) = res { - println!("Error while processing client {}: {:?}", client_addr, err); - } - Ok(()) - }) + // We absorb errors from the future so that an error while processing a client + // (eg. if the client unexpectedly disconnects) doesn't propagate and stop the + // entire server. + .then(move |res| { + if let Err(err) = res { + println!("Error while processing client: {:?}", err); + } + Ok(()) + }) }); // `future` is a future that contains all the behaviour that we want, but nothing has actually diff --git a/libp2p-swarm/Cargo.toml b/libp2p-swarm/Cargo.toml index cf03e056e62..1439f8392c1 100644 --- a/libp2p-swarm/Cargo.toml +++ b/libp2p-swarm/Cargo.toml @@ -8,6 +8,7 @@ bytes = "0.4" multiaddr = "0.2.0" multistream-select = { path = "../multistream-select" } futures = { version = "0.1", features = ["use_std"] } +parking_lot = "0.5.3" smallvec = "0.5" tokio-io = "0.1" diff --git a/libp2p-swarm/src/connection_reuse.rs b/libp2p-swarm/src/connection_reuse.rs index 60037db050c..906e1886269 100644 --- a/libp2p-swarm/src/connection_reuse.rs +++ b/libp2p-swarm/src/connection_reuse.rs @@ -37,19 +37,23 @@ //! //! When called on a `ConnectionReuse`, the `dial` method will try to use a connection that has //! already been opened earlier, and open an outgoing substream on it. If none is available, it -//! will dial the given multiaddress. +//! will dial the given multiaddress. Dialed node can also spontaneously open new substreams with +//! us. In order to handle these new substreams you should use the `next_incoming` method of the +//! `MuxedTransport` trait. //! TODO: this raises several questions ^ //! //! TODO: this whole code is a dummy and should be rewritten after the design has been properly //! figured out. -use futures::{Async, Future, Poll, Stream}; +use futures::future::{self, IntoFuture, FutureResult}; +use futures::{stream, Async, Future, Poll, Stream, task}; use futures::stream::Fuse as StreamFuse; -use futures::stream; use multiaddr::Multiaddr; use muxing::StreamMuxer; +use parking_lot::Mutex; use smallvec::SmallVec; use std::io::Error as IoError; +use std::sync::Arc; use transport::{ConnectionUpgrade, MuxedTransport, Transport, UpgradedNode}; /// Allows reusing the same muxed connection multiple times. @@ -65,6 +69,14 @@ where { // Underlying transport and connection upgrade for when we need to dial or listen. inner: UpgradedNode, + shared: Arc>>, +} + +struct Shared { + // List of futures to dialed connections. + incoming: Vec>>>>>, + // Tasks to signal when an element is added to `incoming`. Only used when `incoming` is empty. + to_signal: Vec, } impl From> for ConnectionReuse @@ -74,7 +86,13 @@ where { #[inline] fn from(node: UpgradedNode) -> ConnectionReuse { - ConnectionReuse { inner: node } + ConnectionReuse { + inner: node, + shared: Arc::new(Mutex::new(Shared { + incoming: Vec::new(), + to_signal: Vec::new(), + })), + } } } @@ -87,42 +105,52 @@ where C::NamesIter: Clone, // TODO: not elegant { type RawConn = ::Substream; - type Listener = ConnectionReuseListener< - Box< - Stream< - Item = (Result, Multiaddr), - Error = IoError, - >, - >, - C::Output, - >; + type Listener = Box>; + type ListenerUpgrade = FutureResult; type Dial = Box>; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { let (listener, new_addr) = match self.inner.listen_on(addr.clone()) { Ok((l, a)) => (l, a), Err((inner, addr)) => { - return Err((ConnectionReuse { inner: inner }, addr)); + return Err((ConnectionReuse { inner: inner, shared: self.shared }, addr)); } }; let listener = ConnectionReuseListener { listener: listener.fuse(), + current_upgrades: Vec::new(), connections: Vec::new(), }; - Ok((listener, new_addr)) + Ok((Box::new(listener) as Box<_>, new_addr)) } fn dial(self, addr: Multiaddr) -> Result { - let dial = match self.inner.dial(addr) { + let dial = match self.inner.dial(addr.clone()) { Ok(l) => l, Err((inner, addr)) => { - return Err((ConnectionReuse { inner: inner }, addr)); + return Err((ConnectionReuse { inner: inner, shared: self.shared }, addr)); } }; - let future = dial.and_then(|dial| dial.outbound()); + let dial = dial + .map_err:: Mutex>, _>(|err| Mutex::new(Some(err))) + .shared(); + + let ingoing = dial.clone() + .map(|muxer| stream::repeat(muxer)) + .flatten_stream() + .map(move |muxer| ((&*muxer).clone(), addr.clone())); + + let mut lock = self.shared.lock(); + lock.incoming.push(Box::new(ingoing) as Box<_>); + for task in lock.to_signal.drain(..) { task.notify(); } + drop(lock); + + let future = dial + .map_err(|err| err.lock().take().expect("error can only be extracted once")) + .and_then(|dial| (&*dial).clone().outbound()); Ok(Box::new(future) as Box<_>) } } @@ -135,62 +163,43 @@ where C::Output: StreamMuxer + Clone, C::NamesIter: Clone, // TODO: not elegant { - type Incoming = stream::AndThen< - stream::Repeat, - fn(C::Output) - -> <>::Output as StreamMuxer>::InboundSubstream, - <>::Output as StreamMuxer>::InboundSubstream, - >; - type Outgoing = - <>::Output as StreamMuxer>::OutboundSubstream; - type DialAndListen = Box>; - - fn dial_and_listen(self, addr: Multiaddr) -> Result { - self.inner - .dial(addr) - .map_err(|(inner, addr)| (ConnectionReuse { inner: inner }, addr)) - .map(|fut| { - fut.map(|muxer| { - ( - stream::repeat(muxer.clone()).and_then(StreamMuxer::inbound as fn(_) -> _), - muxer.outbound(), - ) - }) - }) - .map(|fut| Box::new(fut) as _) + type Incoming = Box::Substream, Multiaddr), Error = IoError>>; + + #[inline] + fn next_incoming(self) -> Self::Incoming { + let future = ConnectionReuseIncoming { shared: self.shared.clone() } + .and_then(|(out, addr)| { + out.inbound().map(|o| (o, addr)) + }); + Box::new(future) as Box<_> } } /// Implementation of `Stream +pub struct ConnectionReuseListener where - S: Stream, Multiaddr), Error = IoError>, - M: StreamMuxer + S: Stream, + F: Future, + M: StreamMuxer, { listener: StreamFuse, + current_upgrades: Vec<(F, Multiaddr)>, connections: Vec<(M, ::InboundSubstream, Multiaddr)>, } -impl Stream for ConnectionReuseListener -where S: Stream, Multiaddr), Error = IoError>, +impl Stream for ConnectionReuseListener +where S: Stream, + F: Future, M: StreamMuxer + Clone + 'static // TODO: 'static :( { - type Item = (Result, Multiaddr); + type Item = (FutureResult, Multiaddr); type Error = IoError; fn poll(&mut self) -> Poll, Self::Error> { match self.listener.poll() { Ok(Async::Ready(Some((upgrade, client_addr)))) => { - match upgrade { - Ok(upgrade) => { - let next_incoming = upgrade.clone().inbound(); - self.connections.push((upgrade, next_incoming, client_addr)); - }, - Err(err) => { - return Ok(Async::Ready(Some((Err(err), client_addr)))); - }, - } + self.current_upgrades.push((upgrade, client_addr)); } Ok(Async::NotReady) => (), Ok(Async::Ready(None)) => { @@ -204,11 +213,41 @@ where S: Stream, Multiaddr), Error = IoError>, } } }; - + // Most of the time, this array will contain 0 or 1 elements, but sometimes it may contain // more and we don't want to panic if that happens. With 8 elements, we can be pretty // confident that this is never going to spill into a `Vec`. - let mut connections_to_drop: SmallVec<[_; 8]> = SmallVec::new(); + let mut upgrades_to_drop: SmallVec<[_; 8]> = SmallVec::new(); + let mut early_ret = None; + + for (index, &mut (ref mut current_upgrade, ref mut client_addr)) in + self.current_upgrades.iter_mut().enumerate() + { + match current_upgrade.poll() { + Ok(Async::Ready(muxer)) => { + let next_incoming = muxer.clone().inbound(); + self.connections.push((muxer, next_incoming, client_addr.clone())); + upgrades_to_drop.push(index); + }, + Ok(Async::NotReady) => {}, + Err(err) => { + upgrades_to_drop.push(index); + early_ret = Some(Async::Ready(Some((Err(err).into_future(), client_addr.clone())))); + }, + } + } + + for &index in upgrades_to_drop.iter().rev() { + self.current_upgrades.swap_remove(index); + } + + if let Some(early_ret) = early_ret { + return Ok(early_ret); + } + + // We reuse `upgrades_to_drop`. + upgrades_to_drop.clear(); + let mut connections_to_drop = upgrades_to_drop; for (index, &mut (ref mut muxer, ref mut next_incoming, ref client_addr)) in self.connections.iter_mut().enumerate() @@ -217,7 +256,7 @@ where S: Stream, Multiaddr), Error = IoError>, Ok(Async::Ready(incoming)) => { let mut new_next = muxer.clone().inbound(); *next_incoming = new_next; - return Ok(Async::Ready(Some((Ok(incoming), client_addr.clone())))); + return Ok(Async::Ready(Some((Ok(incoming).into_future(), client_addr.clone())))); } Ok(Async::NotReady) => {} Err(_) => { @@ -233,3 +272,50 @@ where S: Stream, Multiaddr), Error = IoError>, Ok(Async::NotReady) } } + +/// Implementation of `Future { + shared: Arc>>, +} + +impl Future for ConnectionReuseIncoming + where O: Clone +{ + type Item = (O, Multiaddr); + type Error = IoError; + + fn poll(&mut self) -> Poll { + let mut lock = self.shared.lock(); + + let mut to_remove = SmallVec::<[_; 8]>::new(); + let mut ret_value = None; + + for (offset, future) in lock.incoming.iter_mut().enumerate() { + match future.poll() { + Ok(Async::Ready(Some((value, addr)))) => { + ret_value = Some((value.clone(), addr)); + break; + }, + Ok(Async::Ready(None)) => { + to_remove.push(offset); + }, + Ok(Async::NotReady) => {}, + Err(_) => { + to_remove.push(offset); + }, + } + } + + for offset in to_remove.into_iter().rev() { + lock.incoming.swap_remove(offset); + } + + if let Some(ret_value) = ret_value { + Ok(Async::Ready(ret_value)) + } else { + lock.to_signal.push(task::current()); + Ok(Async::NotReady) + } + } +} diff --git a/libp2p-swarm/src/lib.rs b/libp2p-swarm/src/lib.rs index 17850a5ad7c..59e1fb6dca4 100644 --- a/libp2p-swarm/src/lib.rs +++ b/libp2p-swarm/src/lib.rs @@ -48,11 +48,12 @@ //! The `MuxedTransport` trait is an extension to the `Transport` trait, and is implemented on //! transports that can receive incoming connections on streams that have been opened with `dial()`. //! -//! The trait provides the `dial_and_listen()` method, which returns both a dialer and a stream of -//! incoming connections. +//! The trait provides the `next_incoming()` method, which returns a future that will resolve to +//! the next substream that arrives from a dialed node. //! //! > **Note**: This trait is mainly implemented for transports that provide stream muxing -//! > capabilities. +//! > capabilities, but it can also be implemented in a dummy way by returning an empty +//! > iterator. //! //! # Connection upgrades //! @@ -78,7 +79,7 @@ //! `Transport` trait. The return value of this method also implements the `Transport` trait, which //! means that you can call `dial()` and `listen_on()` on it in order to directly obtain an //! upgraded connection or a listener that will yield upgraded connections. Similarly, the -//! `dial_and_listen()` method will automatically apply the upgrade on both the dialer and the +//! `next_incoming()` method will automatically apply the upgrade on both the dialer and the //! listener. An error is produced if the remote doesn't support the protocol corresponding to the //! connection upgrade. //! @@ -123,7 +124,7 @@ //! transport. //! //! However the `UpgradedNode` struct returned by `with_upgrade` still provides methods named -//! `dial`, `listen_on`, and `dial_and_listen`, which will yield you a `Future` or a `Stream`, +//! `dial`, `listen_on`, and `next_incoming`, which will yield you a `Future` or a `Stream`, //! which you can use to obtain the `Output`. This `Output` can then be used in a protocol-specific //! way to use the protocol. //! @@ -167,6 +168,7 @@ extern crate bytes; #[macro_use] extern crate futures; extern crate multistream_select; +extern crate parking_lot; extern crate smallvec; extern crate tokio_io; diff --git a/libp2p-swarm/src/transport.rs b/libp2p-swarm/src/transport.rs index 2c51360035c..2e959eb8a20 100644 --- a/libp2p-swarm/src/transport.rs +++ b/libp2p-swarm/src/transport.rs @@ -26,12 +26,12 @@ //! encryption middleware to the connection). //! //! Thanks to the `Transport::or_transport`, `Transport::with_upgrade` and -//! `UpgradeNode::or_upgrade` methods, you can combine multiple transports and/or upgrades together -//! in a complex chain of protocols negotiation. +//! `UpgradedNode::or_upgrade` methods, you can combine multiple transports and/or upgrades +//! together in a complex chain of protocols negotiation. use bytes::Bytes; use connection_reuse::ConnectionReuse; -use futures::{Async, Poll, Stream}; +use futures::{Async, Poll, stream, Stream}; use futures::future::{self, FromErr, Future, FutureResult, IntoFuture}; use multiaddr::Multiaddr; use multistream_select; @@ -56,7 +56,16 @@ pub trait Transport { type RawConn: AsyncRead + AsyncWrite; /// The listener produces incoming connections. - type Listener: Stream, Multiaddr), Error = IoError>; + /// + /// An item should be produced whenever a connection is received at the lowest level of the + /// transport stack. The item is a `Future` that is signalled once some pre-processing has + /// taken place, and that connection has been upgraded to the wanted protocols. + type Listener: Stream; + + /// After a connection has been received, we may need to do some asynchronous pre-processing + /// on it (eg. an intermediary protocol negotiation). While this pre-processing takes place, we + /// want to be able to continue polling on the listener. + type ListenerUpgrade: Future; /// A future which indicates that we are currently dialing to a peer. type Dial: IntoFuture; @@ -114,21 +123,24 @@ pub trait Transport { /// Extension trait for `Transport`. Implemented on structs that provide a `Transport` on which /// the dialed node can dial you back. pub trait MuxedTransport: Transport { - /// Produces substreams on the dialed connection. - type Incoming: Stream; - - /// Future resolving to an outgoing connection - type Outgoing: Future; - - /// Future resolving to a tuple of `(Incoming, Outgoing)` - type DialAndListen: Future; + /// Future resolving to an incoming connection. + type Incoming: Future; + + /// Returns the next incoming substream opened by a node that we dialed ourselves. + /// + /// > **Note**: Doesn't produce incoming substreams coming from addresses we are listening on. + /// > This only concerns nodes that we dialed with `dial()`. + fn next_incoming(self) -> Self::Incoming + where Self: Sized; - /// Dial to the given multi-addr, and listen to incoming substreams on the dialed connection. - /// - /// Returns either a future which may resolve to a connection, or gives back the multiaddress. - fn dial_and_listen(self, addr: Multiaddr) -> Result - where - Self: Sized; + /// Returns a stream of incoming connections. + #[inline] + fn incoming(self) -> stream::AndThen, fn(Self) -> Self::Incoming, + Self::Incoming> + where Self: Sized + Clone + { + stream::repeat(self).and_then(|me| me.next_incoming()) + } } /// Dummy implementation of `Transport` that just denies every single attempt. @@ -138,7 +150,8 @@ pub struct DeniedTransport; impl Transport for DeniedTransport { // TODO: could use `!` for associated types once stable type RawConn = Cursor>; - type Listener = Box, Multiaddr), Error = IoError>>; + type Listener = Box>; + type ListenerUpgrade = Box>; type Dial = Box>; #[inline] @@ -153,14 +166,11 @@ impl Transport for DeniedTransport { } impl MuxedTransport for DeniedTransport { - // TODO: could use `!` once stable - type Incoming = Box>; - type Outgoing = Box>; - type DialAndListen = Box>; + type Incoming = future::Empty<(Self::RawConn, Multiaddr), IoError>; #[inline] - fn dial_and_listen(self, addr: Multiaddr) -> Result { - Err((DeniedTransport, addr)) + fn next_incoming(self) -> Self::Incoming { + future::empty() } } @@ -175,6 +185,7 @@ where { type RawConn = EitherSocket; type Listener = EitherListenStream; + type ListenerUpgrade = EitherTransportFuture; type Dial = EitherTransportFuture<::Future, ::Future>; @@ -240,38 +251,19 @@ impl MuxedTransport for OrTransport where A: MuxedTransport, B: MuxedTransport, - A::DialAndListen: 'static, - B::DialAndListen: 'static, + A::Incoming: 'static, // TODO: meh :-/ + B::Incoming: 'static, // TODO: meh :-/ { - type Incoming = EitherIncomingStream; - type Outgoing = future::Either< - future::Map Self::RawConn>, - future::Map Self::RawConn>, - >; - type DialAndListen = Box>; - - fn dial_and_listen(self, addr: Multiaddr) -> Result { - let (first, addr) = match self.0.dial_and_listen(addr) { - Ok(connec) => { - return Ok(Box::new(connec.map(|(inc, out)| { - ( - EitherIncomingStream::First(inc), - future::Either::A(out.map(EitherSocket::First as fn(_) -> _)), - ) - }))); - } - Err(err) => err, - }; + type Incoming = Box, Multiaddr), Error = IoError>>; - match self.1.dial_and_listen(addr) { - Ok(connec) => Ok(Box::new(connec.map(|(inc, out)| { - ( - EitherIncomingStream::Second(inc), - future::Either::B(out.map(EitherSocket::Second as fn(_) -> _)), - ) - }))), - Err((second, addr)) => Err((OrTransport(first, second), addr)), - } + #[inline] + fn next_incoming(self) -> Self::Incoming { + let first = self.0.next_incoming().map(|(out, addr)| (EitherSocket::First(out), addr)); + let second = self.1.next_incoming().map(|(out, addr)| (EitherSocket::Second(out), addr)); + let future = first.select(second) + .map(|(i, _)| i) + .map_err(|(e, _)| e); + Box::new(future) as Box<_> } } @@ -308,19 +300,19 @@ pub enum EitherListenStream { impl Stream for EitherListenStream where - AStream: Stream, Multiaddr), Error = IoError>, - BStream: Stream, Multiaddr), Error = IoError>, + AStream: Stream, + BStream: Stream, { - type Item = (Result, IoError>, Multiaddr); + type Item = (EitherTransportFuture, Multiaddr); type Error = IoError; #[inline] fn poll(&mut self) -> Poll, Self::Error> { match self { &mut EitherListenStream::First(ref mut a) => a.poll() - .map(|i| i.map(|v| v.map(|(s, a)| (s.map(EitherSocket::First), a)))), + .map(|i| i.map(|v| v.map(|(s, a)| (EitherTransportFuture::First(s), a)))), &mut EitherListenStream::Second(ref mut a) => a.poll() - .map(|i| i.map(|v| v.map(|(s, a)| (s.map(EitherSocket::Second), a)))), + .map(|i| i.map(|v| v.map(|(s, a)| (EitherTransportFuture::Second(s), a)))), } } } @@ -754,93 +746,31 @@ where Ok(Box::new(future)) } - /// Tries to dial on the `Multiaddr` using the transport that was passed to `new`, then upgrade - /// the connection. Also listens to incoming substream requires on that dialed connection, and - /// automatically upgrades the incoming substreams. - /// - /// Note that this does the same as `MuxedTransport::dial_and_listen`, but with less - /// restrictions on the trait requirements. - pub fn dial_and_listen( - self, - addr: Multiaddr, - ) -> Result< - Box< - Future< - Item = ( - Box + 'a>, - Box + 'a>, - ), - Error = IoError, - > - + 'a, - >, - (Self, Multiaddr), - > - where - T: MuxedTransport, - C::NamesIter: Clone, // TODO: not elegant - C: Clone, + /// If the underlying transport is a `MuxedTransport`, then after calling `dial` we may receive + /// substreams opened by the dialed nodes. + /// + /// This function returns the next incoming substream. You are strongly encouraged to call it + /// if you have a muxed transport. + pub fn next_incoming(self) -> Box + 'a> + where T: MuxedTransport { let upgrade = self.upgrade; - let upgrade2 = upgrade.clone(); - self.transports - .dial_and_listen(addr) - .map_err(move |(trans, addr)| { - let builder = UpgradedNode { - transports: trans, - upgrade: upgrade, - }; + let future = self.transports.next_incoming() + // Try to negotiate the protocol. + .and_then(move |(connection, addr)| { + let iter = upgrade.protocol_names() + .map(|(name, id)| (name, ::eq, id)); + let negotiated = multistream_select::dialer_select_proto(connection, iter) + .map_err(|err| IoError::new(IoErrorKind::Other, err)); + negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr)) + }) + .and_then(|(upgrade_id, connection, upgrade, addr)| { + upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer) + .map(|u| (u, addr)) + }); - (builder, addr) - }) - .map(move |dialed_fut| { - let dialed_fut = dialed_fut - // Try to negotiate the protocol. - .map(move |(in_stream, dialer)| { - let upgrade = upgrade2.clone(); - - let dialer = { - let iter = upgrade2.protocol_names() - .map(|(name, id)| (name, ::eq, id)); - let negotiated = dialer.and_then(|dialer| { - multistream_select::dialer_select_proto(dialer, iter) - .map_err(|err| IoError::new(IoErrorKind::Other, err)) - }); - negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade2)) - } - .and_then(|(upgrade_id, connection, upgrade)| { - upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer) - }); - - let in_stream = in_stream - // Try to negotiate the protocol. - .and_then(move |connection| { - let upgrade = upgrade.clone(); - - let iter = upgrade.protocol_names() - .map((|(n, t)| { - (n, ::eq, t) - }) as fn(_) -> _); - let negotiated = multistream_select::listener_select_proto( - connection, - iter, - ); - negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade)) - .map_err(|err| IoError::new(IoErrorKind::Other, err)) - }) - .and_then(|(upgrade_id, connection, upgrade)| { - upgrade.upgrade(connection, upgrade_id, Endpoint::Listener) - }); - - ( - Box::new(in_stream) as Box>, - Box::new(dialer) as Box>, - ) - }); - - Box::new(dialed_fut) as _ - }) + Box::new(future) as Box<_> } /// Start listening on the multiaddr using the transport that was passed to `new`. @@ -853,7 +783,7 @@ where self, addr: Multiaddr, ) -> Result< - (Box, Multiaddr), Error = IoError> + 'a>, Multiaddr), + (Box + 'a>, Multiaddr), Error = IoError> + 'a>, Multiaddr), (Self, Multiaddr), > where @@ -879,30 +809,22 @@ where // Instead the `stream` will produce `Ok(Err(...))`. // `stream` can only produce an `Err` if `listening_stream` produces an `Err`. let stream = listening_stream - // Try to negotiate the protocol - .and_then(move |(connection, client_addr)| { - // Turn the `Result` into - // a `Result, IoError>` - let connection = connection.map(|connection| { - let upgrade = upgrade.clone(); - let iter = upgrade.protocol_names() - .map::<_, fn(_) -> _>(|(n, t)| (n, ::eq, t)); - multistream_select::listener_select_proto(connection, iter) - .map_err(|err| IoError::new(IoErrorKind::Other, err)) - .and_then(|(upgrade_id, connection)| { - upgrade.upgrade(connection, upgrade_id, Endpoint::Listener) - }) - }); - - connection - .into_future() - .flatten() - .then(move |nego_res| { - match nego_res { - Ok(upgraded) => Ok((Ok(upgraded), client_addr)), - Err(err) => Ok((Err(err), client_addr)), - } - }) + .map(move |(connection, client_addr)| { + let upgrade = upgrade.clone(); + let connection = connection + // Try to negotiate the protocol + .and_then(move |connection| { + let iter = upgrade.protocol_names() + .map::<_, fn(_) -> _>(|(n, t)| (n, ::eq, t)); + multistream_select::listener_select_proto(connection, iter) + .map_err(|err| IoError::new(IoErrorKind::Other, err)) + .and_then(|(upgrade_id, connection)| { + upgrade.upgrade(connection, upgrade_id, Endpoint::Listener) + }) + .into_future() + }); + + (Box::new(connection) as Box<_>, client_addr) }); Ok((Box::new(stream), new_addr)) @@ -918,7 +840,8 @@ where C: Clone, { type RawConn = C::Output; - type Listener = Box, Multiaddr), Error = IoError>>; + type Listener = Box>; + type ListenerUpgrade = Box>; type Dial = Box>; #[inline] @@ -940,13 +863,10 @@ where C::NamesIter: Clone, // TODO: not elegant C: Clone, { - type Incoming = Box>; - type Outgoing = Box>; - type DialAndListen = Box>; + type Incoming = Box>; #[inline] - fn dial_and_listen(self, addr: Multiaddr) -> Result { - // Calls an inherent function above - self.dial_and_listen(addr) + fn next_incoming(self) -> Self::Incoming { + self.next_incoming() } } diff --git a/libp2p-tcp-transport/src/lib.rs b/libp2p-tcp-transport/src/lib.rs index c20dbf7cc29..cc311769fc2 100644 --- a/libp2p-tcp-transport/src/lib.rs +++ b/libp2p-tcp-transport/src/lib.rs @@ -59,7 +59,7 @@ use std::io::Error as IoError; use std::net::SocketAddr; use tokio_core::reactor::Handle; use tokio_core::net::{TcpStream, TcpListener, TcpStreamNew}; -use futures::Future; +use futures::future::{self, Future, FutureResult, IntoFuture}; use futures::stream::Stream; use multiaddr::{Multiaddr, AddrComponent, ToMultiaddr}; use swarm::Transport; @@ -84,13 +84,9 @@ impl TcpConfig { } impl Transport for TcpConfig { - /// The raw connection. type RawConn = TcpStream; - - /// The listener produces incoming connections. - type Listener = Box, Multiaddr), Error = IoError>>; - - /// A future which indicates currently dialing to a peer. + type Listener = Box>; + type ListenerUpgrade = FutureResult; type Dial = TcpStreamNew; /// Listen on the given multi-addr. @@ -109,12 +105,13 @@ impl Transport for TcpConfig { } Err(_) => addr, }; - let future = futures::future::result(listener).map(|listener| { + + let future = future::result(listener).map(|listener| { // Pull out a stream of sockets for incoming connections listener.incoming().map(|(sock, addr)| { let addr = addr.to_multiaddr() .expect("generating a multiaddr from a socket addr never fails"); - (Ok(sock), addr) + (Ok(sock).into_future(), addr) }) }) .flatten_stream(); @@ -224,16 +221,18 @@ mod tests { let tcp = TcpConfig::new(core.handle()); let handle = core.handle(); let listener = tcp.listen_on(addr).unwrap().0.for_each(|(sock, _)| { - // Define what to do with the socket that just connected to us - // Which in this case is read 3 bytes - let handle_conn = tokio_io::io::read_exact(sock.unwrap(), [0; 3]) - .map(|(_, buf)| assert_eq!(buf, [1, 2, 3])) - .map_err(|err| panic!("IO error {:?}", err)); + sock.and_then(|sock| { + // Define what to do with the socket that just connected to us + // Which in this case is read 3 bytes + let handle_conn = tokio_io::io::read_exact(sock, [0; 3]) + .map(|(_, buf)| assert_eq!(buf, [1, 2, 3])) + .map_err(|err| panic!("IO error {:?}", err)); - // Spawn the future as a concurrent task - handle.spawn(handle_conn); + // Spawn the future as a concurrent task + handle.spawn(handle_conn); - Ok(()) + Ok(()) + }) }); core.run(listener).unwrap();