Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 40 additions & 53 deletions example/examples/echo-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,62 +95,49 @@ fn main() {
println!("Now listening on {:?}", address);

let future = listener
.filter_map(|(socket, client_addr)| {
let client_addr = client_addr.to_string();

// This closure is called whenever a new connection has been received. The `socket`
// is a `Result<..., IoError>` which contains an error if for example protocol
// negotiation or the secio handshake failed. We handle this situation by printing a
// message on stderr and ignoring the connection.
match socket {
Ok(s) => Some((s, client_addr)),
Err(err) => {
eprintln!("Failed connection attempt from {}\n => Error: {:?}",
client_addr, err);
None
},
}
})

.for_each(|(socket, client_addr)| {
// This closure is called whenever a new connection has been received and successfully
// upgraded to use secio/plaintext and echo.
println!("Successfully negotiated protocol with {}", client_addr);

// We loop forever in order to handle all the messages sent by the client.
let client_finished = {
let client_addr = client_addr.clone();
loop_fn(socket, move |socket| {
let client_addr = client_addr.clone();
socket.into_future()
.map_err(|(err, _)| err)
.and_then(move |(msg, rest)| {
if let Some(msg) = msg {
// One message has been received. We send it back to the client.
println!("Received a message from {}: {:?}\n => Sending back \
identical message to remote", client_addr, msg);
Box::new(rest.send(msg).map(|m| Loop::Continue(m)))
as Box<Future<Item = _, Error = _>>
} else {
// End of stream. Connection closed. Breaking the loop.
println!("Received EOF from {}\n => Dropping connection",
client_addr);
Box::new(Ok(Loop::Break(())).into_future())
as Box<Future<Item = _, Error = _>>
}
})
// This closure is called whenever a new connection has been received.
// `socket` is a future that will be triggered once the upgrade to secio, multiplex
// and echo is complete.
let client_addr = client_addr.to_string();
println!("Incoming connection from {}", client_addr);

socket
.and_then(move |socket| {
println!("Successfully negotiated protocol with {}", client_addr);

// We loop forever in order to handle all the messages sent by the client.
loop_fn(socket, move |socket| {
let client_addr = client_addr.clone();
socket.into_future()
.map_err(|(err, _)| err)
.and_then(move |(msg, rest)| {
if let Some(msg) = msg {
// One message has been received. We send it back to the client.
println!("Received a message from {}: {:?}\n => Sending back \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just for debugging now or should we bring in a logging library like the parity client, allowing users to flip some flags for extra debug data? Just a general question that I thought of when I saw this, doesn't have to be addressed in this PR obviously.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example is here to quickly demonstrate how to use libp2p, so I don't think a logging library is necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No definitely not, I was just talking generally, might be nice to be able to flip on something like trace messages, we can move that conversation to Riot :P

identical message to remote", client_addr, msg);
Box::new(rest.send(msg).map(|m| Loop::Continue(m)))
as Box<Future<Item = _, Error = _>>
} else {
// End of stream. Connection closed. Breaking the loop.
println!("Received EOF from {}\n => Dropping connection",
client_addr);
Box::new(Ok(Loop::Break(())).into_future())
as Box<Future<Item = _, Error = _>>
}
})
})
})
};

// We absorb errors from the `client_finished` future so that an error while processing
// a client (eg. if the client unexpectedly disconnects) doesn't propagate and stop the
// entire server.
client_finished.then(move |res| {
if let Err(err) = res {
println!("Error while processing client {}: {:?}", client_addr, err);
}
Ok(())
})
// We absorb errors from the future so that an error while processing a client
// (eg. if the client unexpectedly disconnects) doesn't propagate and stop the
// entire server.
.then(move |res| {
if let Err(err) = res {
println!("Error while processing client: {:?}", err);
}
Ok(())
})
});

// `future` is a future that contains all the behaviour that we want, but nothing has actually
Expand Down
76 changes: 48 additions & 28 deletions libp2p-swarm/src/connection_reuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
//! figured out.

use futures::{Async, Future, Poll, Stream};
use futures::future::{IntoFuture, FutureResult};
use futures::stream::Fuse as StreamFuse;
use futures::stream;
use multiaddr::Multiaddr;
Expand Down Expand Up @@ -87,15 +88,8 @@ where
C::NamesIter: Clone, // TODO: not elegant
{
type RawConn = <C::Output as StreamMuxer>::Substream;
type Listener = ConnectionReuseListener<
Box<
Stream<
Item = (Result<C::Output, IoError>, Multiaddr),
Error = IoError,
>,
>,
C::Output,
>;
type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError>>;
type ListenerUpgrade = FutureResult<Self::RawConn, IoError>;
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;

fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
Expand All @@ -108,10 +102,11 @@ where

let listener = ConnectionReuseListener {
listener: listener.fuse(),
current_upgrades: Vec::new(),
connections: Vec::new(),
};

Ok((listener, new_addr))
Ok((Box::new(listener) as Box<_>, new_addr))
}

fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
Expand Down Expand Up @@ -163,34 +158,29 @@ where

/// Implementation of `Stream<Item = (impl AsyncRead + AsyncWrite, Multiaddr)` for the
/// `ConnectionReuse` struct.
pub struct ConnectionReuseListener<S, M>
pub struct ConnectionReuseListener<S, F, M>
where
S: Stream<Item = (Result<M, IoError>, Multiaddr), Error = IoError>,
M: StreamMuxer
S: Stream<Item = (F, Multiaddr), Error = IoError>,
F: Future<Item = M, Error = IoError>,
M: StreamMuxer,
{
listener: StreamFuse<S>,
current_upgrades: Vec<(F, Multiaddr)>,
connections: Vec<(M, <M as StreamMuxer>::InboundSubstream, Multiaddr)>,
}

impl<S, M> Stream for ConnectionReuseListener<S, M>
where S: Stream<Item = (Result<M, IoError>, Multiaddr), Error = IoError>,
impl<S, F, M> Stream for ConnectionReuseListener<S, F, M>
where S: Stream<Item = (F, Multiaddr), Error = IoError>,
F: Future<Item = M, Error = IoError>,
M: StreamMuxer + Clone + 'static // TODO: 'static :(
{
type Item = (Result<M::Substream, IoError>, Multiaddr);
type Item = (FutureResult<M::Substream, IoError>, Multiaddr);
type Error = IoError;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.listener.poll() {
Ok(Async::Ready(Some((upgrade, client_addr)))) => {
match upgrade {
Ok(upgrade) => {
let next_incoming = upgrade.clone().inbound();
self.connections.push((upgrade, next_incoming, client_addr));
},
Err(err) => {
return Ok(Async::Ready(Some((Err(err), client_addr))));
},
}
self.current_upgrades.push((upgrade, client_addr));
}
Ok(Async::NotReady) => (),
Ok(Async::Ready(None)) => {
Expand All @@ -204,11 +194,41 @@ where S: Stream<Item = (Result<M, IoError>, Multiaddr), Error = IoError>,
}
}
};

// Most of the time, this array will contain 0 or 1 elements, but sometimes it may contain
// more and we don't want to panic if that happens. With 8 elements, we can be pretty
// confident that this is never going to spill into a `Vec`.
let mut connections_to_drop: SmallVec<[_; 8]> = SmallVec::new();
let mut upgrades_to_drop: SmallVec<[_; 8]> = SmallVec::new();
let mut early_ret = None;

for (index, &mut (ref mut current_upgrade, ref mut client_addr)) in
self.current_upgrades.iter_mut().enumerate()
{
match current_upgrade.poll() {
Ok(Async::Ready(muxer)) => {
let next_incoming = muxer.clone().inbound();
self.connections.push((muxer, next_incoming, client_addr.clone()));
upgrades_to_drop.push(index);
},
Ok(Async::NotReady) => {},
Err(err) => {
upgrades_to_drop.push(index);
early_ret = Some(Async::Ready(Some((Err(err).into_future(), client_addr.clone()))));
},
}
}

for &index in upgrades_to_drop.iter().rev() {
self.current_upgrades.swap_remove(index);
}

if let Some(early_ret) = early_ret {
return Ok(early_ret);
}

// We reuse `upgrades_to_drop`.
upgrades_to_drop.clear();
let mut connections_to_drop = upgrades_to_drop;

for (index, &mut (ref mut muxer, ref mut next_incoming, ref client_addr)) in
self.connections.iter_mut().enumerate()
Expand All @@ -217,7 +237,7 @@ where S: Stream<Item = (Result<M, IoError>, Multiaddr), Error = IoError>,
Ok(Async::Ready(incoming)) => {
let mut new_next = muxer.clone().inbound();
*next_incoming = new_next;
return Ok(Async::Ready(Some((Ok(incoming), client_addr.clone()))));
return Ok(Async::Ready(Some((Ok(incoming).into_future(), client_addr.clone()))));
}
Ok(Async::NotReady) => {}
Err(_) => {
Expand Down
74 changes: 39 additions & 35 deletions libp2p-swarm/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
//! encryption middleware to the connection).
//!
//! Thanks to the `Transport::or_transport`, `Transport::with_upgrade` and
//! `UpgradeNode::or_upgrade` methods, you can combine multiple transports and/or upgrades together
//! in a complex chain of protocols negotiation.
//! `UpgradedNode::or_upgrade` methods, you can combine multiple transports and/or upgrades
//! together in a complex chain of protocols negotiation.

use bytes::Bytes;
use connection_reuse::ConnectionReuse;
Expand Down Expand Up @@ -56,7 +56,16 @@ pub trait Transport {
type RawConn: AsyncRead + AsyncWrite;

/// The listener produces incoming connections.
type Listener: Stream<Item = (Result<Self::RawConn, IoError>, Multiaddr), Error = IoError>;
///
/// An item should be produced whenever a connection is received at the lowest level of the
/// transport stack. The item is a `Future` that is signalled once some pre-processing has
/// taken place, and that connection has been upgraded to the wanted protocols.
type Listener: Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError>;

/// After a connection has been received, we may need to do some asynchronous pre-processing
/// on it (eg. an intermediary protocol negotiation). While this pre-processing takes place, we
/// want to be able to continue polling on the listener.
type ListenerUpgrade: Future<Item = Self::RawConn, Error = IoError>;

/// A future which indicates that we are currently dialing to a peer.
type Dial: IntoFuture<Item = Self::RawConn, Error = IoError>;
Expand Down Expand Up @@ -138,7 +147,8 @@ pub struct DeniedTransport;
impl Transport for DeniedTransport {
// TODO: could use `!` for associated types once stable
type RawConn = Cursor<Vec<u8>>;
type Listener = Box<Stream<Item = (Result<Self::RawConn, IoError>, Multiaddr), Error = IoError>>;
type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = Self::RawConn, Error = IoError>>;
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;

#[inline]
Expand Down Expand Up @@ -175,6 +185,7 @@ where
{
type RawConn = EitherSocket<A::RawConn, B::RawConn>;
type Listener = EitherListenStream<A::Listener, B::Listener>;
type ListenerUpgrade = EitherTransportFuture<A::ListenerUpgrade, B::ListenerUpgrade>;
type Dial =
EitherTransportFuture<<A::Dial as IntoFuture>::Future, <B::Dial as IntoFuture>::Future>;

Expand Down Expand Up @@ -308,19 +319,19 @@ pub enum EitherListenStream<A, B> {

impl<AStream, BStream, AInner, BInner> Stream for EitherListenStream<AStream, BStream>
where
AStream: Stream<Item = (Result<AInner, IoError>, Multiaddr), Error = IoError>,
BStream: Stream<Item = (Result<BInner, IoError>, Multiaddr), Error = IoError>,
AStream: Stream<Item = (AInner, Multiaddr), Error = IoError>,
BStream: Stream<Item = (BInner, Multiaddr), Error = IoError>,
{
type Item = (Result<EitherSocket<AInner, BInner>, IoError>, Multiaddr);
type Item = (EitherTransportFuture<AInner, BInner>, Multiaddr);
type Error = IoError;

#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self {
&mut EitherListenStream::First(ref mut a) => a.poll()
.map(|i| i.map(|v| v.map(|(s, a)| (s.map(EitherSocket::First), a)))),
.map(|i| i.map(|v| v.map(|(s, a)| (EitherTransportFuture::First(s), a)))),
&mut EitherListenStream::Second(ref mut a) => a.poll()
.map(|i| i.map(|v| v.map(|(s, a)| (s.map(EitherSocket::Second), a)))),
.map(|i| i.map(|v| v.map(|(s, a)| (EitherTransportFuture::Second(s), a)))),
}
}
}
Expand Down Expand Up @@ -853,7 +864,7 @@ where
self,
addr: Multiaddr,
) -> Result<
(Box<Stream<Item = (Result<C::Output, IoError>, Multiaddr), Error = IoError> + 'a>, Multiaddr),
(Box<Stream<Item = (Box<Future<Item = C::Output, Error = IoError> + 'a>, Multiaddr), Error = IoError> + 'a>, Multiaddr),
(Self, Multiaddr),
>
where
Expand All @@ -879,30 +890,22 @@ where
// Instead the `stream` will produce `Ok(Err(...))`.
// `stream` can only produce an `Err` if `listening_stream` produces an `Err`.
let stream = listening_stream
// Try to negotiate the protocol
.and_then(move |(connection, client_addr)| {
// Turn the `Result<impl AsyncRead + AsyncWrite, IoError>` into
// a `Result<impl Future<Item = impl AsyncRead + AsyncWrite, Error = IoError>, IoError>`
let connection = connection.map(|connection| {
let upgrade = upgrade.clone();
let iter = upgrade.protocol_names()
.map::<_, fn(_) -> _>(|(n, t)| (n, <Bytes as PartialEq>::eq, t));
multistream_select::listener_select_proto(connection, iter)
.map_err(|err| IoError::new(IoErrorKind::Other, err))
.and_then(|(upgrade_id, connection)| {
upgrade.upgrade(connection, upgrade_id, Endpoint::Listener)
})
});

connection
.into_future()
.flatten()
.then(move |nego_res| {
match nego_res {
Ok(upgraded) => Ok((Ok(upgraded), client_addr)),
Err(err) => Ok((Err(err), client_addr)),
}
})
.map(move |(connection, client_addr)| {
let upgrade = upgrade.clone();
let connection = connection
// Try to negotiate the protocol
.and_then(move |connection| {
let iter = upgrade.protocol_names()
.map::<_, fn(_) -> _>(|(n, t)| (n, <Bytes as PartialEq>::eq, t));
multistream_select::listener_select_proto(connection, iter)
.map_err(|err| IoError::new(IoErrorKind::Other, err))
.and_then(|(upgrade_id, connection)| {
upgrade.upgrade(connection, upgrade_id, Endpoint::Listener)
})
.into_future()
});

(Box::new(connection) as Box<_>, client_addr)
});

Ok((Box::new(stream), new_addr))
Expand All @@ -918,7 +921,8 @@ where
C: Clone,
{
type RawConn = C::Output;
type Listener = Box<Stream<Item = (Result<C::Output, IoError>, Multiaddr), Error = IoError>>;
type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = C::Output, Error = IoError>>;
type Dial = Box<Future<Item = C::Output, Error = IoError>>;

#[inline]
Expand Down
Loading