Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 11 additions & 19 deletions example/examples/echo-dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ extern crate tokio_io;
use bytes::BytesMut;
use futures::{Future, Sink, Stream};
use std::env;
use swarm::{UpgradeExt, SimpleProtocol, Transport};
use swarm::{UpgradeExt, SimpleProtocol, Transport, MuxedTransport};
use tcp::TcpConfig;
use tokio_core::reactor::Core;
use tokio_io::codec::length_delimited;
Expand Down Expand Up @@ -68,8 +68,10 @@ fn main() {
// `Transport` because the output of the upgrade is not a stream but a controller for
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
// a `Transport`.
.into_connection_reuse()
.into_connection_reuse();

let transport_with_echo = transport
.clone()
// On top of plaintext or secio, we use the "echo" protocol, which is a custom protocol
// just for this example.
// For this purpose, we create a `SimpleProtocol` struct.
Expand All @@ -86,34 +88,23 @@ fn main() {
// of any opened stream.

// We use it to dial the address.
let dialer = transport
.dial_and_listen(swarm::Multiaddr::new(&target_addr).expect("invalid multiaddr"))
let dialer = transport_with_echo
.dial(swarm::Multiaddr::new(&target_addr).expect("invalid multiaddr"))
// If the multiaddr protocol exists but is not supported, then we get an error containing
// the transport and the original multiaddress. Therefore we cannot directly use `unwrap()`
// or `expect()`, but have to add a `map_err()` beforehand.
.map_err(|(_, addr)| addr).expect("unsupported multiaddr")

.and_then(|(incoming, echo)| {
.and_then(|echo| {
// `echo` is what the closure used when initializing "echo" returns.
// Consequently, please note that the `send` method is available only because the type
// `length_delimited::Framed` has a `send` method.
println!("Sending \"hello world\" to listener");
echo.and_then(|echo| echo.send("hello world".into()).map(Option::Some))
.select(
incoming
.for_each(|_| {
println!("opened");
Ok(())
})
.map(|()| None),
)
.map(|(n, _)| n)
.map_err(|(e, _)| e)
echo.send("hello world".into())
})
.and_then(|echo| {
// The message has been successfully sent. Now wait for an answer.
echo.unwrap()
.into_future()
echo.into_future()
.map(|(msg, rest)| {
println!("Received message from listener: {:?}", msg);
rest
Expand All @@ -124,5 +115,6 @@ fn main() {
// `dialer` is a future that contains all the behaviour that we want, but nothing has actually
// started yet. Because we created the `TcpConfig` with tokio, we need to run the future
// through the tokio core.
core.run(dialer).unwrap();
core.run(dialer.map(|_| ()).select(transport.incoming().for_each(|_| Ok(()))))
.unwrap_or_else(|_| panic!());
}
135 changes: 54 additions & 81 deletions example/examples/echo-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,93 +68,66 @@ fn main() {
// `Transport` because the output of the upgrade is not a stream but a controller for
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
// a `Transport`.
.into_connection_reuse()

// On top of both mutiplex and plaintext/secio, we use the "echo" protocol, which is a
// custom protocol just for this example.
// For this purpose, we create a `SimpleProtocol` struct.
.with_upgrade(SimpleProtocol::new("/echo/1.0.0", |socket| {
// This closure is called whenever a stream using the "echo" protocol has been
// successfully negotiated. The parameter is the raw socket (implements the AsyncRead
// and AsyncWrite traits), and the closure must return an implementation of
// `IntoFuture` that can yield any type of object.
Ok(length_delimited::Framed::new(socket))
}));
.into_connection_reuse();

// We now have a `transport` variable that can be used either to dial nodes or listen to
// incoming connections, and that will automatically apply all the selected protocols on top
// incoming connections, and that will automatically apply secio and multiplex on top
// of any opened stream.

// We use it to listen on the address.
let (listener, address) = transport
// We now prepare the protocol that we are going to negotiate with nodes that open a connection
// or substream to our server.
let proto = SimpleProtocol::new("/echo/1.0.0", |socket| {
// This closure is called whenever a stream using the "echo" protocol has been
// successfully negotiated. The parameter is the raw socket (implements the AsyncRead
// and AsyncWrite traits), and the closure must return an implementation of
// `IntoFuture` that can yield any type of object.
Ok(length_delimited::Framed::<_, bytes::BytesMut>::new(socket))
});

// Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and
// outgoing connections for us.
let (swarm_controller, swarm_future) = swarm::swarm(transport, proto, |socket, client_addr| {
println!("Successfully negotiated protocol with {}", client_addr);

// The type of `socket` is exactly what the closure of `SimpleProtocol` returns.

// We loop forever in order to handle all the messages sent by the client.
loop_fn(socket, move |socket| {
let client_addr = client_addr.clone();
socket
.into_future()
.map_err(|(e, _)| e)
.and_then(move |(msg, rest)| {
if let Some(msg) = msg {
// One message has been received. We send it back to the client.
println!("Received a message from {}: {:?}\n => Sending back \
identical message to remote", client_addr, msg);
Box::new(rest.send(msg).map(|m| Loop::Continue(m)))
as Box<Future<Item = _, Error = _>>
} else {
// End of stream. Connection closed. Breaking the loop.
println!("Received EOF from {}\n => Dropping connection",
client_addr);
Box::new(Ok(Loop::Break(())).into_future())
as Box<Future<Item = _, Error = _>>
}
})
})
});

// We now use the controller to listen on the address.
let address = swarm_controller
.listen_on(swarm::Multiaddr::new(&listen_addr).expect("invalid multiaddr"))
// If the multiaddr protocol exists but is not supported, then we get an error containing
// the transport and the original multiaddress. Therefore we cannot directly use `unwrap()`
// or `expect()`, but have to add a `map_err()` beforehand.
.map_err(|(_, addr)| addr).expect("unsupported multiaddr");
// the original multiaddress.
.expect("unsupported multiaddr");
// The address we actually listen on can be different from the address that was passed to
// the `listen_on` function. For example if you pass `/ip4/0.0.0.0/tcp/0`, then the port `0`
// will be replaced with the actual port.
println!("Now listening on {:?}", address);

let future = listener
.filter_map(|(socket, client_addr)| {
let client_addr = client_addr.to_string();

// This closure is called whenever a new connection has been received. The `socket`
// is a `Result<..., IoError>` which contains an error if for example protocol
// negotiation or the secio handshake failed. We handle this situation by printing a
// message on stderr and ignoring the connection.
match socket {
Ok(s) => Some((s, client_addr)),
Err(err) => {
eprintln!("Failed connection attempt from {}\n => Error: {:?}",
client_addr, err);
None
},
}
})

.for_each(|(socket, client_addr)| {
// This closure is called whenever a new connection has been received and successfully
// upgraded to use secio/plaintext and echo.
println!("Successfully negotiated protocol with {}", client_addr);

// We loop forever in order to handle all the messages sent by the client.
let client_finished = {
let client_addr = client_addr.clone();
loop_fn(socket, move |socket| {
let client_addr = client_addr.clone();
socket.into_future()
.map_err(|(err, _)| err)
.and_then(move |(msg, rest)| {
if let Some(msg) = msg {
// One message has been received. We send it back to the client.
println!("Received a message from {}: {:?}\n => Sending back \
identical message to remote", client_addr, msg);
Box::new(rest.send(msg).map(|m| Loop::Continue(m)))
as Box<Future<Item = _, Error = _>>
} else {
// End of stream. Connection closed. Breaking the loop.
println!("Received EOF from {}\n => Dropping connection",
client_addr);
Box::new(Ok(Loop::Break(())).into_future())
as Box<Future<Item = _, Error = _>>
}
})
})
};

// We absorb errors from the `client_finished` future so that an error while processing
// a client (eg. if the client unexpectedly disconnects) doesn't propagate and stop the
// entire server.
client_finished.then(move |res| {
if let Err(err) = res {
println!("Error while processing client {}: {:?}", client_addr, err);
}
Ok(())
})
});

// `future` is a future that contains all the behaviour that we want, but nothing has actually
// started yet. Because we created the `TcpConfig` with tokio, we need to run the future
// through the tokio core.
core.run(future).unwrap();
// `swarm_future` is a future that contains all the behaviour that we want, but nothing has
// actually started yet. Because we created the `TcpConfig` with tokio, we need to run the
// future through the tokio core.
core.run(swarm_future).unwrap();
}
1 change: 1 addition & 0 deletions libp2p-swarm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ bytes = "0.4"
multiaddr = "0.2.0"
multistream-select = { path = "../multistream-select" }
futures = { version = "0.1", features = ["use_std"] }
parking_lot = "0.5.3"
smallvec = "0.5"
tokio-io = "0.1"

Expand Down
56 changes: 46 additions & 10 deletions libp2p-swarm/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# libp2p-swarm

Transport and protocol upgrade system of *libp2p*.
Transport, protocol upgrade and swarm systems of *libp2p*.

This crate contains all the core traits and mechanisms of the transport system of *libp2p*.
This crate contains all the core traits and mechanisms of the transport and swarm systems
of *libp2p*.

# The `Transport` trait

Expand All @@ -27,11 +28,12 @@ multiple times in a row in order to chain as many implementations as you want.
The `MuxedTransport` trait is an extension to the `Transport` trait, and is implemented on
transports that can receive incoming connections on streams that have been opened with `dial()`.

The trait provides the `dial_and_listen()` method, which returns both a dialer and a stream of
incoming connections.
The trait provides the `next_incoming()` method, which returns a future that will resolve to
the next substream that arrives from a dialed node.

> **Note**: This trait is mainly implemented for transports that provide stream muxing
> capabilities.
> capabilities, but it can also be implemented in a dummy way by returning an empty
> iterator.

# Connection upgrades

Expand All @@ -57,7 +59,7 @@ A middleware can be applied on a transport by using the `with_upgrade` method of
`Transport` trait. The return value of this method also implements the `Transport` trait, which
means that you can call `dial()` and `listen_on()` on it in order to directly obtain an
upgraded connection or a listener that will yield upgraded connections. Similarly, the
`dial_and_listen()` method will automatically apply the upgrade on both the dialer and the
`next_incoming()` method will automatically apply the upgrade on both the dialer and the
listener. An error is produced if the remote doesn't support the protocol corresponding to the
connection upgrade.

Expand Down Expand Up @@ -100,11 +102,11 @@ implement the `AsyncRead` and `AsyncWrite` traits. This means that that the retu
transport.

However the `UpgradedNode` struct returned by `with_upgrade` still provides methods named
`dial`, `listen_on`, and `dial_and_listen`, which will yield you a `Future` or a `Stream`,
`dial`, `listen_on`, and `next_incoming`, which will yield you a `Future` or a `Stream`,
which you can use to obtain the `Output`. This `Output` can then be used in a protocol-specific
way to use the protocol.

```no_run
```rust
extern crate futures;
extern crate libp2p_ping;
extern crate libp2p_swarm;
Expand All @@ -115,7 +117,6 @@ use futures::Future;
use libp2p_ping::Ping;
use libp2p_swarm::Transport;

# fn main() {
let mut core = tokio_core::reactor::Core::new().unwrap();

let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle())
Expand All @@ -130,11 +131,46 @@ let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle())

// Runs until the ping arrives.
core.run(ping_finished_future).unwrap();
# }
```

## Grouping protocols

You can use the `.or_upgrade()` method to group multiple upgrades together. The return value
also implements the `ConnectionUpgrade` trait and will choose one of the protocols amongst the
ones supported.

# Swarm

Once you have created an object that implements the `Transport` trait, you can put it in a
*swarm*. This is done by calling the `swarm()` freestanding function with the transport
alongside with a function or a closure that will turn the output of the upgrade (usually an
actual protocol, as explained above) into a `Future` producing `()`.

```rust
extern crate futures;
extern crate libp2p_ping;
extern crate libp2p_swarm;
extern crate libp2p_tcp_transport;
extern crate tokio_core;

use futures::Future;
use libp2p_ping::Ping;
use libp2p_swarm::Transport;

let mut core = tokio_core::reactor::Core::new().unwrap();

let transport = libp2p_tcp_transport::TcpConfig::new(core.handle())
.with_dummy_muxing();

let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport, Ping, |(mut pinger, service), client_addr| {
pinger.ping().map_err(|_| panic!())
.select(service).map_err(|_| panic!())
.map(|_| ())
});

// The `swarm_controller` can then be used to do some operations.
swarm_controller.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap());

// Runs until everything is finished.
core.run(swarm_future).unwrap();
```
Loading