Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 11 additions & 19 deletions example/examples/echo-dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ extern crate tokio_io;
use bytes::BytesMut;
use futures::{Future, Sink, Stream};
use std::env;
use swarm::{UpgradeExt, SimpleProtocol, Transport};
use swarm::{UpgradeExt, SimpleProtocol, Transport, MuxedTransport};
use tcp::TcpConfig;
use tokio_core::reactor::Core;
use tokio_io::codec::length_delimited;
Expand Down Expand Up @@ -68,8 +68,10 @@ fn main() {
// `Transport` because the output of the upgrade is not a stream but a controller for
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
// a `Transport`.
.into_connection_reuse()
.into_connection_reuse();

let transport_with_echo = transport
.clone()
// On top of plaintext or secio, we use the "echo" protocol, which is a custom protocol
// just for this example.
// For this purpose, we create a `SimpleProtocol` struct.
Expand All @@ -86,34 +88,23 @@ fn main() {
// of any opened stream.

// We use it to dial the address.
let dialer = transport
.dial_and_listen(swarm::Multiaddr::new(&target_addr).expect("invalid multiaddr"))
let dialer = transport_with_echo
.dial(swarm::Multiaddr::new(&target_addr).expect("invalid multiaddr"))
// If the multiaddr protocol exists but is not supported, then we get an error containing
// the transport and the original multiaddress. Therefore we cannot directly use `unwrap()`
// or `expect()`, but have to add a `map_err()` beforehand.
.map_err(|(_, addr)| addr).expect("unsupported multiaddr")

.and_then(|(incoming, echo)| {
.and_then(|echo| {
// `echo` is what the closure used when initializing "echo" returns.
// Consequently, please note that the `send` method is available only because the type
// `length_delimited::Framed` has a `send` method.
println!("Sending \"hello world\" to listener");
echo.and_then(|echo| echo.send("hello world".into()).map(Option::Some))
.select(
incoming
.for_each(|_| {
println!("opened");
Ok(())
})
.map(|()| None),
)
.map(|(n, _)| n)
.map_err(|(e, _)| e)
echo.send("hello world".into())
})
.and_then(|echo| {
// The message has been successfully sent. Now wait for an answer.
echo.unwrap()
.into_future()
echo.into_future()
.map(|(msg, rest)| {
println!("Received message from listener: {:?}", msg);
rest
Expand All @@ -124,5 +115,6 @@ fn main() {
// `dialer` is a future that contains all the behaviour that we want, but nothing has actually
// started yet. Because we created the `TcpConfig` with tokio, we need to run the future
// through the tokio core.
core.run(dialer).unwrap();
core.run(dialer.map(|_| ()).select(transport.incoming().for_each(|_| Ok(()))))
.unwrap_or_else(|_| panic!());
}
93 changes: 40 additions & 53 deletions example/examples/echo-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,62 +95,49 @@ fn main() {
println!("Now listening on {:?}", address);

let future = listener
.filter_map(|(socket, client_addr)| {
let client_addr = client_addr.to_string();

// This closure is called whenever a new connection has been received. The `socket`
// is a `Result<..., IoError>` which contains an error if for example protocol
// negotiation or the secio handshake failed. We handle this situation by printing a
// message on stderr and ignoring the connection.
match socket {
Ok(s) => Some((s, client_addr)),
Err(err) => {
eprintln!("Failed connection attempt from {}\n => Error: {:?}",
client_addr, err);
None
},
}
})

.for_each(|(socket, client_addr)| {
// This closure is called whenever a new connection has been received and successfully
// upgraded to use secio/plaintext and echo.
println!("Successfully negotiated protocol with {}", client_addr);

// We loop forever in order to handle all the messages sent by the client.
let client_finished = {
let client_addr = client_addr.clone();
loop_fn(socket, move |socket| {
let client_addr = client_addr.clone();
socket.into_future()
.map_err(|(err, _)| err)
.and_then(move |(msg, rest)| {
if let Some(msg) = msg {
// One message has been received. We send it back to the client.
println!("Received a message from {}: {:?}\n => Sending back \
identical message to remote", client_addr, msg);
Box::new(rest.send(msg).map(|m| Loop::Continue(m)))
as Box<Future<Item = _, Error = _>>
} else {
// End of stream. Connection closed. Breaking the loop.
println!("Received EOF from {}\n => Dropping connection",
client_addr);
Box::new(Ok(Loop::Break(())).into_future())
as Box<Future<Item = _, Error = _>>
}
})
// This closure is called whenever a new connection has been received.
// `socket` is a future that will be triggered once the upgrade to secio, multiplex
// and echo is complete.
let client_addr = client_addr.to_string();
println!("Incoming connection from {}", client_addr);

socket
.and_then(move |socket| {
println!("Successfully negotiated protocol with {}", client_addr);

// We loop forever in order to handle all the messages sent by the client.
loop_fn(socket, move |socket| {
let client_addr = client_addr.clone();
socket.into_future()
.map_err(|(err, _)| err)
.and_then(move |(msg, rest)| {
if let Some(msg) = msg {
// One message has been received. We send it back to the client.
println!("Received a message from {}: {:?}\n => Sending back \
identical message to remote", client_addr, msg);
Box::new(rest.send(msg).map(|m| Loop::Continue(m)))
as Box<Future<Item = _, Error = _>>
} else {
// End of stream. Connection closed. Breaking the loop.
println!("Received EOF from {}\n => Dropping connection",
client_addr);
Box::new(Ok(Loop::Break(())).into_future())
as Box<Future<Item = _, Error = _>>
}
})
})
})
};

// We absorb errors from the `client_finished` future so that an error while processing
// a client (eg. if the client unexpectedly disconnects) doesn't propagate and stop the
// entire server.
client_finished.then(move |res| {
if let Err(err) = res {
println!("Error while processing client {}: {:?}", client_addr, err);
}
Ok(())
})
// We absorb errors from the future so that an error while processing a client
// (eg. if the client unexpectedly disconnects) doesn't propagate and stop the
// entire server.
.then(move |res| {
if let Err(err) = res {
println!("Error while processing client: {:?}", err);
}
Ok(())
})
});

// `future` is a future that contains all the behaviour that we want, but nothing has actually
Expand Down
1 change: 1 addition & 0 deletions libp2p-swarm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ bytes = "0.4"
multiaddr = "0.2.0"
multistream-select = { path = "../multistream-select" }
futures = { version = "0.1", features = ["use_std"] }
parking_lot = "0.5.3"
smallvec = "0.5"
tokio-io = "0.1"

Expand Down
Loading