Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Separate between listener, dial_to_handler and dial_custom_handler
  • Loading branch information
tomaka committed Jan 3, 2018
commit 13ba95e282bd58c96691e795de87696e0e77edb2
29 changes: 15 additions & 14 deletions example/examples/echo-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,28 +68,29 @@ fn main() {
// `Transport` because the output of the upgrade is not a stream but a controller for
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
// a `Transport`.
.into_connection_reuse()

// On top of both mutiplex and plaintext/secio, we use the "echo" protocol, which is a
// custom protocol just for this example.
// For this purpose, we create a `SimpleProtocol` struct.
.with_upgrade(SimpleProtocol::new("/echo/1.0.0", |socket| {
// This closure is called whenever a stream using the "echo" protocol has been
// successfully negotiated. The parameter is the raw socket (implements the AsyncRead
// and AsyncWrite traits), and the closure must return an implementation of
// `IntoFuture` that can yield any type of object.
Ok(length_delimited::Framed::<_, bytes::BytesMut>::new(socket))
}));
.into_connection_reuse();

// We now have a `transport` variable that can be used either to dial nodes or listen to
// incoming connections, and that will automatically apply all the selected protocols on top
// incoming connections, and that will automatically apply secio and multiplex on top
// of any opened stream.

// We now prepare the protocol that we are going to negotiate with nodes that open a connection
// or substream to our server.
let proto = SimpleProtocol::new("/echo/1.0.0", |socket| {
// This closure is called whenever a stream using the "echo" protocol has been
// successfully negotiated. The parameter is the raw socket (implements the AsyncRead
// and AsyncWrite traits), and the closure must return an implementation of
// `IntoFuture` that can yield any type of object.
Ok(length_delimited::Framed::<_, bytes::BytesMut>::new(socket))
});

// Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and
// outgoing connections for us.
let (swarm_controller, swarm_future) = swarm::swarm(transport, |socket, client_addr| {
let (swarm_controller, swarm_future) = swarm::swarm(transport, proto, |socket, client_addr| {
println!("Successfully negotiated protocol with {}", client_addr);

// The type of `socket` is exactly what the closure of `SimpleProtocol` returns.

// We loop forever in order to handle all the messages sent by the client.
loop_fn(socket, move |socket| {
let client_addr = client_addr.clone();
Expand Down
5 changes: 2 additions & 3 deletions libp2p-swarm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,9 @@ use libp2p_swarm::Transport;
let mut core = tokio_core::reactor::Core::new().unwrap();

let transport = libp2p_tcp_transport::TcpConfig::new(core.handle())
.with_dummy_muxing()
.with_upgrade(Ping);
.with_dummy_muxing();

let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport, |(mut pinger, service), client_addr| {
let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport, Ping, |(mut pinger, service), client_addr| {
pinger.ping().map_err(|_| panic!())
.select(service).map_err(|_| panic!())
.map(|_| ())
Expand Down
5 changes: 2 additions & 3 deletions libp2p-swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,9 @@
//! let mut core = tokio_core::reactor::Core::new().unwrap();
//!
//! let transport = libp2p_tcp_transport::TcpConfig::new(core.handle())
//! .with_dummy_muxing()
//! .with_upgrade(Ping);
//! .with_dummy_muxing();
//!
//! let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport, |(mut pinger, service), client_addr| {
//! let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport, Ping, |(mut pinger, service), client_addr| {
//! pinger.ping().map_err(|_| panic!())
//! .select(service).map_err(|_| panic!())
//! .map(|_| ())
Expand Down
74 changes: 62 additions & 12 deletions libp2p-swarm/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use std::io::Error as IoError;
use futures::{IntoFuture, Future, Stream, Async, Poll};
use futures::{IntoFuture, Future, Stream, Async, Poll, future};
use futures::sync::mpsc;
use {ConnectionUpgrade, Multiaddr, MuxedTransport, UpgradedNode};

Expand All @@ -31,7 +31,7 @@ use {ConnectionUpgrade, Multiaddr, MuxedTransport, UpgradedNode};
/// Produces a `SwarmController` and an implementation of `Future`. The controller can be used to
/// control, and the `Future` must be driven to completion in order for things to work.
///
pub fn swarm<T, C, H, F>(upgraded: UpgradedNode<T, C>, handler: H)
pub fn swarm<T, C, H, F>(transport: T, upgrade: C, handler: H)
-> (SwarmController<T, C>, SwarmFuture<T, C, H, F::Future>)
where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
C: ConnectionUpgrade<T::RawConn> + Clone + 'static, // TODO: 'static :-/
Expand All @@ -40,6 +40,9 @@ pub fn swarm<T, C, H, F>(upgraded: UpgradedNode<T, C>, handler: H)
{
let (new_dialers_tx, new_dialers_rx) = mpsc::unbounded();
let (new_listeners_tx, new_listeners_rx) = mpsc::unbounded();
let (new_toprocess_tx, new_toprocess_rx) = mpsc::unbounded();

let upgraded = transport.clone().with_upgrade(upgrade);

let future = SwarmFuture {
upgraded: upgraded.clone(),
Expand All @@ -51,12 +54,15 @@ pub fn swarm<T, C, H, F>(upgraded: UpgradedNode<T, C>, handler: H)
dialers: Vec::new(),
new_dialers: new_dialers_rx,
to_process: Vec::new(),
new_toprocess: new_toprocess_rx,
};

let controller = SwarmController {
transport: transport,
upgraded: upgraded,
new_listeners: new_listeners_tx,
new_dialers: new_dialers_tx,
new_toprocess: new_toprocess_tx,
};

(controller, future)
Expand All @@ -67,23 +73,29 @@ pub struct SwarmController<T, C>
where T: MuxedTransport + 'static, // TODO: 'static :-/
C: ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :-/
{
transport: T,
upgraded: UpgradedNode<T, C>,
new_listeners: mpsc::UnboundedSender<Box<Stream<Item = (Box<Future<Item = C::Output, Error = IoError>>, Multiaddr), Error = IoError>>>,
new_dialers: mpsc::UnboundedSender<(Box<Future<Item = C::Output, Error = IoError>>, Multiaddr)>,
new_toprocess: mpsc::UnboundedSender<Box<Future<Item = (), Error = IoError>>>,
}

impl<T, C> SwarmController<T, C>
where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
C: ConnectionUpgrade<T::RawConn> + Clone + 'static, // TODO: 'static :-/
C::NamesIter: Clone, // TODO: not elegant
{
/// Asks the swarm to dial the node with the given multiaddress.
///
/// Once the connection has been open and upgraded, it will be given to the handler.
/// Asks the swarm to dial the node with the given multiaddress. The connection is then
/// upgraded using the `upgrade`, and the output is sent to the handler that was passed when
/// calling `swarm`.
// TODO: consider returning a future so that errors can be processed?
pub fn dial(&self, multiaddr: Multiaddr) -> Result<(), Multiaddr> {
match self.upgraded.clone().dial(multiaddr.clone()) {
pub fn dial_to_handler<Du>(&self, multiaddr: Multiaddr, upgrade: Du) -> Result<(), Multiaddr>
where Du: ConnectionUpgrade<T::RawConn> + Clone + 'static, // TODO: 'static :-/
Du::Output: Into<C::Output>,
{
match self.transport.clone().with_upgrade(upgrade).dial(multiaddr.clone()) {
Ok(dial) => {
let dial = Box::new(dial.map(Into::into)) as Box<Future<Item = _, Error = _>>;
// Ignoring errors if the receiver has been closed, because in that situation
// nothing is going to be processed anyway.
let _ = self.new_dialers.unbounded_send((dial, multiaddr));
Expand All @@ -95,7 +107,34 @@ impl<T, C> SwarmController<T, C>
}
}

/// Adds a multiaddr to listen on.
/// Asks the swarm to dial the node with the given multiaddress. The connection is then
/// upgraded using the `upgrade`, and the output is then passed to `and_then`.
///
/// Contrary to `dial_to_handler`, the output of the upgrade is not given to the handler that
/// was passed at initialization.
// TODO: consider returning a future so that errors can be processed?
pub fn dial_custom_handler<Du, Df, Dfu>(&self, multiaddr: Multiaddr, upgrade: Du, and_then: Df)
-> Result<(), Multiaddr>
where Du: ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :-/
Df: FnOnce(Du::Output) -> Dfu + 'static, // TODO: 'static :-/
Dfu: IntoFuture<Item = (), Error = IoError> + 'static, // TODO: 'static :-/
{
match self.transport.clone().with_upgrade(upgrade).dial(multiaddr) {
Ok(dial) => {
let dial = Box::new(dial.and_then(and_then)) as Box<_>;
// Ignoring errors if the receiver has been closed, because in that situation
// nothing is going to be processed anyway.
let _ = self.new_toprocess.unbounded_send(dial);
Ok(())
},
Err((_, multiaddr)) => {
Err(multiaddr)
},
}
}

/// Adds a multiaddr to listen on. All the incoming connections will use the `upgrade` that
/// was passed to `swarm`.
pub fn listen_on(&self, multiaddr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
match self.upgraded.clone().listen_on(multiaddr) {
Ok((listener, new_addr)) => {
Expand Down Expand Up @@ -124,7 +163,8 @@ pub struct SwarmFuture<T, C, H, F>
listeners_upgrade: Vec<(Box<Future<Item = C::Output, Error = IoError>>, Multiaddr)>,
dialers: Vec<(Box<Future<Item = C::Output, Error = IoError>>, Multiaddr)>,
new_dialers: mpsc::UnboundedReceiver<(Box<Future<Item = C::Output, Error = IoError>>, Multiaddr)>,
to_process: Vec<F>,
to_process: Vec<future::Either<F, Box<Future<Item = (), Error = IoError>>>>,
new_toprocess: mpsc::UnboundedReceiver<Box<Future<Item = (), Error = IoError>>>,
}

impl<T, C, H, If, F> Future for SwarmFuture<T, C, H, F>
Expand All @@ -143,7 +183,7 @@ impl<T, C, H, If, F> Future for SwarmFuture<T, C, H, F>
match self.next_incoming.poll() {
Ok(Async::Ready((connec, client_addr))) => {
self.next_incoming = self.upgraded.clone().next_incoming();
self.to_process.push(handler(connec, client_addr).into_future());
self.to_process.push(future::Either::A(handler(connec, client_addr).into_future()));
},
Ok(Async::NotReady) => {},
Err(err) => return Err(err),
Expand All @@ -169,6 +209,16 @@ impl<T, C, H, If, F> Future for SwarmFuture<T, C, H, F>
Ok(Async::NotReady) => {},
};

match self.new_toprocess.poll() {
Ok(Async::Ready(Some(new_toprocess))) => {
self.to_process.push(future::Either::B(new_toprocess));
},
Ok(Async::Ready(None)) | Err(_) => {
// New to-process sender has been closed.
},
Ok(Async::NotReady) => {},
};

for n in (0 .. self.listeners.len()).rev() {
let mut listener = self.listeners.swap_remove(n);
match listener.poll() {
Expand All @@ -188,7 +238,7 @@ impl<T, C, H, If, F> Future for SwarmFuture<T, C, H, F>
let (mut upgrade, addr) = self.listeners_upgrade.swap_remove(n);
match upgrade.poll() {
Ok(Async::Ready(output)) => {
self.to_process.push(handler(output, addr).into_future());
self.to_process.push(future::Either::A(handler(output, addr).into_future()));
},
Ok(Async::NotReady) => {
self.listeners_upgrade.push((upgrade, addr));
Expand All @@ -201,7 +251,7 @@ impl<T, C, H, If, F> Future for SwarmFuture<T, C, H, F>
let (mut dialer, addr) = self.dialers.swap_remove(n);
match dialer.poll() {
Ok(Async::Ready(output)) => {
self.to_process.push(handler(output, addr).into_future());
self.to_process.push(future::Either::A(handler(output, addr).into_future()));
},
Ok(Async::NotReady) => {
self.dialers.push((dialer, addr));
Expand Down