Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Change the MuxedTransport trait
  • Loading branch information
tomaka committed Jan 3, 2018
commit 47a3a24fcdb9ca9cd9d567b554a95df857cbc181
30 changes: 11 additions & 19 deletions example/examples/echo-dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ extern crate tokio_io;
use bytes::BytesMut;
use futures::{Future, Sink, Stream};
use std::env;
use swarm::{UpgradeExt, SimpleProtocol, Transport};
use swarm::{UpgradeExt, SimpleProtocol, Transport, MuxedTransport};
use tcp::TcpConfig;
use tokio_core::reactor::Core;
use tokio_io::codec::length_delimited;
Expand Down Expand Up @@ -68,8 +68,10 @@ fn main() {
// `Transport` because the output of the upgrade is not a stream but a controller for
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
// a `Transport`.
.into_connection_reuse()
.into_connection_reuse();

let transport_with_echo = transport
.clone()
// On top of plaintext or secio, we use the "echo" protocol, which is a custom protocol
// just for this example.
// For this purpose, we create a `SimpleProtocol` struct.
Expand All @@ -86,34 +88,23 @@ fn main() {
// of any opened stream.

// We use it to dial the address.
let dialer = transport
.dial_and_listen(swarm::Multiaddr::new(&target_addr).expect("invalid multiaddr"))
let dialer = transport_with_echo
.dial(swarm::Multiaddr::new(&target_addr).expect("invalid multiaddr"))
// If the multiaddr protocol exists but is not supported, then we get an error containing
// the transport and the original multiaddress. Therefore we cannot directly use `unwrap()`
// or `expect()`, but have to add a `map_err()` beforehand.
.map_err(|(_, addr)| addr).expect("unsupported multiaddr")

.and_then(|(incoming, echo)| {
.and_then(|echo| {
// `echo` is what the closure used when initializing "echo" returns.
// Consequently, please note that the `send` method is available only because the type
// `length_delimited::Framed` has a `send` method.
println!("Sending \"hello world\" to listener");
echo.and_then(|echo| echo.send("hello world".into()).map(Option::Some))
.select(
incoming
.for_each(|_| {
println!("opened");
Ok(())
})
.map(|()| None),
)
.map(|(n, _)| n)
.map_err(|(e, _)| e)
echo.send("hello world".into())
})
.and_then(|echo| {
// The message has been successfully sent. Now wait for an answer.
echo.unwrap()
.into_future()
echo.into_future()
.map(|(msg, rest)| {
println!("Received message from listener: {:?}", msg);
rest
Expand All @@ -124,5 +115,6 @@ fn main() {
// `dialer` is a future that contains all the behaviour that we want, but nothing has actually
// started yet. Because we created the `TcpConfig` with tokio, we need to run the future
// through the tokio core.
core.run(dialer).unwrap();
core.run(dialer.map(|_| ()).select(transport.incoming().for_each(|_| Ok(()))))
.unwrap_or_else(|_| panic!());
}
1 change: 1 addition & 0 deletions libp2p-swarm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ bytes = "0.4"
multiaddr = "0.2.0"
multistream-select = { path = "../multistream-select" }
futures = { version = "0.1", features = ["use_std"] }
parking_lot = "0.5.3"
smallvec = "0.5"
tokio-io = "0.1"

Expand Down
130 changes: 98 additions & 32 deletions libp2p-swarm/src/connection_reuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,23 @@
//!
//! When called on a `ConnectionReuse`, the `dial` method will try to use a connection that has
//! already been opened earlier, and open an outgoing substream on it. If none is available, it
//! will dial the given multiaddress.
//! will dial the given multiaddress. Dialed node can also spontaneously open new substreams with
//! us. In order to handle these new substreams you should use the `next_incoming` method of the
//! `MuxedTransport` trait.
//! TODO: this raises several questions ^
//!
//! TODO: this whole code is a dummy and should be rewritten after the design has been properly
//! figured out.

use futures::{Async, Future, Poll, Stream};
use futures::future::{IntoFuture, FutureResult};
use futures::future::{self, IntoFuture, FutureResult};
use futures::{stream, Async, Future, Poll, Stream, task};
use futures::stream::Fuse as StreamFuse;
use futures::stream;
use multiaddr::Multiaddr;
use muxing::StreamMuxer;
use parking_lot::Mutex;
use smallvec::SmallVec;
use std::io::Error as IoError;
use std::sync::Arc;
use transport::{ConnectionUpgrade, MuxedTransport, Transport, UpgradedNode};

/// Allows reusing the same muxed connection multiple times.
Expand All @@ -66,6 +69,14 @@ where
{
// Underlying transport and connection upgrade for when we need to dial or listen.
inner: UpgradedNode<T, C>,
shared: Arc<Mutex<Shared<C::Output>>>,
}

struct Shared<O> {
// List of futures to dialed connections.
incoming: Vec<Box<Stream<Item = (O, Multiaddr), Error = future::SharedError<Mutex<Option<IoError>>>>>>,
// Tasks to signal when an element is added to `incoming`. Only used when `incoming` is empty.
to_signal: Vec<task::Task>,
}

impl<T, C> From<UpgradedNode<T, C>> for ConnectionReuse<T, C>
Expand All @@ -75,7 +86,13 @@ where
{
#[inline]
fn from(node: UpgradedNode<T, C>) -> ConnectionReuse<T, C> {
ConnectionReuse { inner: node }
ConnectionReuse {
inner: node,
shared: Arc::new(Mutex::new(Shared {
incoming: Vec::new(),
to_signal: Vec::new(),
})),
}
}
}

Expand All @@ -96,7 +113,7 @@ where
let (listener, new_addr) = match self.inner.listen_on(addr.clone()) {
Ok((l, a)) => (l, a),
Err((inner, addr)) => {
return Err((ConnectionReuse { inner: inner }, addr));
return Err((ConnectionReuse { inner: inner, shared: self.shared }, addr));
}
};

Expand All @@ -110,14 +127,30 @@ where
}

fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
let dial = match self.inner.dial(addr) {
let dial = match self.inner.dial(addr.clone()) {
Ok(l) => l,
Err((inner, addr)) => {
return Err((ConnectionReuse { inner: inner }, addr));
return Err((ConnectionReuse { inner: inner, shared: self.shared }, addr));
}
};

let future = dial.and_then(|dial| dial.outbound());
let dial = dial
.map_err::<fn(IoError) -> Mutex<Option<IoError>>, _>(|err| Mutex::new(Some(err)))
.shared();

let ingoing = dial.clone()
.map(|muxer| stream::repeat(muxer))
.flatten_stream()
.map(move |muxer| ((&*muxer).clone(), addr.clone()));

let mut lock = self.shared.lock();
lock.incoming.push(Box::new(ingoing) as Box<_>);
for task in lock.to_signal.drain(..) { task.notify(); }
drop(lock);

let future = dial
.map_err(|err| err.lock().take().expect("error can only be extracted once"))
.and_then(|dial| (&*dial).clone().outbound());
Ok(Box::new(future) as Box<_>)
}
}
Expand All @@ -130,29 +163,15 @@ where
C::Output: StreamMuxer + Clone,
C::NamesIter: Clone, // TODO: not elegant
{
type Incoming = stream::AndThen<
stream::Repeat<C::Output, IoError>,
fn(C::Output)
-> <<C as ConnectionUpgrade<T::RawConn>>::Output as StreamMuxer>::InboundSubstream,
<<C as ConnectionUpgrade<T::RawConn>>::Output as StreamMuxer>::InboundSubstream,
>;
type Outgoing =
<<C as ConnectionUpgrade<T::RawConn>>::Output as StreamMuxer>::OutboundSubstream;
type DialAndListen = Box<Future<Item = (Self::Incoming, Self::Outgoing), Error = IoError>>;

fn dial_and_listen(self, addr: Multiaddr) -> Result<Self::DialAndListen, (Self, Multiaddr)> {
self.inner
.dial(addr)
.map_err(|(inner, addr)| (ConnectionReuse { inner: inner }, addr))
.map(|fut| {
fut.map(|muxer| {
(
stream::repeat(muxer.clone()).and_then(StreamMuxer::inbound as fn(_) -> _),
muxer.outbound(),
)
})
})
.map(|fut| Box::new(fut) as _)
type Incoming = Box<Future<Item = (<C::Output as StreamMuxer>::Substream, Multiaddr), Error = IoError>>;

#[inline]
fn next_incoming(self) -> Self::Incoming {
let future = ConnectionReuseIncoming { shared: self.shared.clone() }
.and_then(|(out, addr)| {
out.inbound().map(|o| (o, addr))
});
Box::new(future) as Box<_>
}
}

Expand Down Expand Up @@ -253,3 +272,50 @@ where S: Stream<Item = (F, Multiaddr), Error = IoError>,
Ok(Async::NotReady)
}
}

/// Implementation of `Future<Item = (impl AsyncRead + AsyncWrite, Multiaddr)` for the
/// `ConnectionReuse` struct.
pub struct ConnectionReuseIncoming<O> {
shared: Arc<Mutex<Shared<O>>>,
}

impl<O> Future for ConnectionReuseIncoming<O>
where O: Clone
{
type Item = (O, Multiaddr);
type Error = IoError;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut lock = self.shared.lock();

let mut to_remove = SmallVec::<[_; 8]>::new();
let mut ret_value = None;

for (offset, future) in lock.incoming.iter_mut().enumerate() {
match future.poll() {
Ok(Async::Ready(Some((value, addr)))) => {
ret_value = Some((value.clone(), addr));
break;
},
Ok(Async::Ready(None)) => {
to_remove.push(offset);
},
Ok(Async::NotReady) => {},
Err(_) => {
to_remove.push(offset);
},
}
}

for offset in to_remove.into_iter().rev() {
lock.incoming.swap_remove(offset);
}

if let Some(ret_value) = ret_value {
Ok(Async::Ready(ret_value))
} else {
lock.to_signal.push(task::current());
Ok(Async::NotReady)
}
}
}
1 change: 1 addition & 0 deletions libp2p-swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ extern crate bytes;
#[macro_use]
extern crate futures;
extern crate multistream_select;
extern crate parking_lot;
extern crate smallvec;
extern crate tokio_io;

Expand Down
Loading