Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions core/src/connection_reuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ enum PeerState<D, M> where M: StreamMuxer {
// TODO: stronger Future type
Pending {
/// Future that produces the muxer.
future: Box<Future<Item = ((D, M), Multiaddr), Error = IoError>>,
future: Box<Future<Item = ((D, M), Multiaddr), Error = IoError> + Send>,
/// All the tasks to notify when `future` resolves.
notify: FnvHashMap<usize, task::Task>,
},
Expand Down Expand Up @@ -164,15 +164,19 @@ where

impl<T, D, M> Transport for ConnectionReuse<T, D, M>
where
T: Transport + 'static, // TODO: 'static :(
T: Transport + Send + 'static, // TODO: 'static :(
T::Dial: Send,
T::MultiaddrFuture: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
T: Transport<Output = (D, M)> + Clone + 'static, // TODO: 'static :(
M: StreamMuxer + 'static,
D: Clone + 'static,
M: Send + Sync + StreamMuxer + 'static,
D: Send + Clone + 'static,
T: Clone,
{
type Output = (D, ConnectionReuseSubstream<T, D, M>);
type MultiaddrFuture = future::FutureResult<Multiaddr, IoError>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError> + Send>;
type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>;
type Dial = ConnectionReuseDial<T, D, M>;

Expand Down Expand Up @@ -245,10 +249,14 @@ where

impl<T, D, M> MuxedTransport for ConnectionReuse<T, D, M>
where
T: Transport + 'static, // TODO: 'static :(
T: Transport + Send + 'static, // TODO: 'static :(
T::Dial: Send,
T::MultiaddrFuture: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
T: Transport<Output = (D, M)> + Clone + 'static, // TODO: 'static :(
M: StreamMuxer + 'static,
D: Clone + 'static,
M: Send + Sync + StreamMuxer + 'static,
D: Send + Clone + 'static,
T: Clone,
{
type Incoming = ConnectionReuseIncoming<T, D, M>;
Expand Down Expand Up @@ -305,10 +313,10 @@ where
impl<T, D, M> Future for ConnectionReuseDial<T, D, M>
where
T: Transport<Output = (D, M)> + Clone,
M: StreamMuxer + 'static,
D: Clone + 'static,
<T as Transport>::Dial: 'static,
<T as Transport>::MultiaddrFuture: 'static,
M: Send + StreamMuxer + 'static,
D: Send + Clone + 'static,
<T as Transport>::Dial: Send + 'static,
<T as Transport>::MultiaddrFuture: Send + 'static,
{
type Item = ((D, ConnectionReuseSubstream<T, D, M>), FutureResult<Multiaddr, IoError>);
type Error = IoError;
Expand Down Expand Up @@ -471,7 +479,7 @@ where
/// Identifier for this listener. Used to determine which connections were opened by it.
listener_id: u64,
/// Opened connections that need to be upgraded.
current_upgrades: FuturesUnordered<Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>>,
current_upgrades: FuturesUnordered<Box<Future<Item = (T::Output, Multiaddr), Error = IoError> + Send>>,

/// Shared between the whole connection reuse mechanism.
shared: Arc<Mutex<Shared<T, D, M>>>,
Expand All @@ -484,7 +492,7 @@ where
M: StreamMuxer,
D: Clone,
L: Stream<Item = Lu, Error = IoError>,
Lu: Future<Item = (T::Output, Multiaddr), Error = IoError> + 'static,
Lu: Future<Item = (T::Output, Multiaddr), Error = IoError> + Send + 'static,
{
type Item = FutureResult<((D, ConnectionReuseSubstream<T, D, M>), FutureResult<Multiaddr, IoError>), IoError>;
type Error = IoError;
Expand Down
37 changes: 24 additions & 13 deletions core/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub fn swarm<T, H, F>(
) -> (SwarmController<T, F::Future>, SwarmEvents<T, F::Future, H>)
where
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
H: FnMut(T::Output, Box<Future<Item = Multiaddr, Error = IoError>>) -> F,
H: FnMut(T::Output, Box<Future<Item = Multiaddr, Error = IoError> + Send>) -> F,
F: IntoFuture<Item = (), Error = IoError>,
{
let shared = Arc::new(Mutex::new(Shared {
Expand Down Expand Up @@ -105,6 +105,11 @@ where
impl<T, F> SwarmController<T, F>
where
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
T::Dial: Send,
T::MultiaddrFuture: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
T::Output: Send,
F: 'static,
{
/// Asks the swarm to dial the node with the given multiaddress. The connection is then
Expand All @@ -119,6 +124,8 @@ where
-> Result<impl Future<Item = (), Error = IoError>, Multiaddr>
where
Du: Transport + 'static, // TODO: 'static :-/
Du::Dial: Send,
Du::MultiaddrFuture: Send,
Du::Output: Into<T::Output>,
{
self.dial_then(multiaddr, transport, |v| v)
Expand All @@ -132,8 +139,10 @@ where
-> Result<impl Future<Item = (), Error = IoError>, Multiaddr>
where
Du: Transport + 'static, // TODO: 'static :-/
Du::Dial: Send,
Du::MultiaddrFuture: Send,
Du::Output: Into<T::Output>,
TThen: FnOnce(Result<(), IoError>) -> Result<(), IoError> + 'static,
TThen: FnOnce(Result<(), IoError>) -> Result<(), IoError> + Send + 'static,
{
trace!("Swarm dialing {}", multiaddr);

Expand All @@ -148,12 +157,12 @@ where
let mut then = Box::new(move |val: Result<(), IoError>| {
let then = then.take().expect("The Boxed FnMut should only be called once");
then(val);
}) as Box<FnMut(_)>;
}) as Box<FnMut(_) + Send>;

let dial = dial.then(|result| {
match result {
Ok((output, client_addr)) => {
let client_addr = Box::new(client_addr) as Box<Future<Item = _, Error = _>>;
let client_addr = Box::new(client_addr) as Box<Future<Item = _, Error = _> + Send>;
Ok((output.into(), then, client_addr))
}
Err(err) => {
Expand Down Expand Up @@ -206,12 +215,12 @@ where
let listener = Box::new(
listener.map(|f| {
let f = f.map(|(out, maf)| {
(out, Box::new(maf) as Box<Future<Item = Multiaddr, Error = IoError>>)
(out, Box::new(maf) as Box<Future<Item = Multiaddr, Error = IoError> + Send>)
});

Box::new(f) as Box<Future<Item = _, Error = _>>
Box::new(f) as Box<Future<Item = _, Error = _> + Send>
}),
) as Box<Stream<Item = _, Error = _>>;
) as Box<Stream<Item = _, Error = _> + Send>;
shared.listeners.push((new_addr.clone(), listener.into_future()));
if let Some(task) = shared.task_to_notify.take() {
task.notify();
Expand Down Expand Up @@ -242,7 +251,9 @@ where
impl<T, H, If, F> Stream for SwarmEvents<T, F, H>
where
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/,
H: FnMut(T::Output, Box<Future<Item = Multiaddr, Error = IoError>>) -> If,
T::MultiaddrFuture: Send,
T::IncomingUpgrade: Send,
H: FnMut(T::Output, Box<Future<Item = Multiaddr, Error = IoError> + Send>) -> If,
If: IntoFuture<Future = F, Item = (), Error = IoError>,
F: Future<Item = (), Error = IoError> + 'static, // TODO: 'static :-/
{
Expand All @@ -259,7 +270,7 @@ where
debug!("Swarm received new multiplexed incoming connection");
shared.next_incoming = self.transport.clone().next_incoming();
let connec = connec.map(|(out, maf)| {
(out, Box::new(maf) as Box<Future<Item = Multiaddr, Error = IoError>>)
(out, Box::new(maf) as Box<Future<Item = Multiaddr, Error = IoError> + Send>)
});
shared.listeners_upgrade.push(Box::new(connec) as Box<_>);
}
Expand Down Expand Up @@ -393,21 +404,21 @@ struct Shared<T, F> where T: MuxedTransport + 'static {
StreamFuture<
Box<
Stream<
Item = Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError>>), Error = IoError>>,
Item = Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError> + Send>), Error = IoError> + Send>,
Error = IoError,
>,
> + Send,
>,
>,
)>,

/// Futures that upgrade an incoming listening connection to a full connection.
listeners_upgrade:
Vec<Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError>>), Error = IoError>>>,
Vec<Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError> + Send>), Error = IoError> + Send>>,

/// Futures that dial a remote address.
///
/// Contains the address we dial, so that we can cancel it if necessary.
dialers: Vec<(Multiaddr, Box<Future<Item = (T::Output, Box<FnMut(Result<(), IoError>)>, Box<Future<Item = Multiaddr, Error = IoError>>), Error = IoError>>)>,
dialers: Vec<(Multiaddr, Box<Future<Item = (T::Output, Box<FnMut(Result<(), IoError>) + Send>, Box<Future<Item = Multiaddr, Error = IoError> + Send>), Error = IoError> + Send>)>,

/// List of futures produced by the swarm closure. Must be processed to the end.
to_process: Vec<F>,
Expand Down
28 changes: 18 additions & 10 deletions core/src/transport/and_then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,18 @@ pub struct AndThen<T, C> {
impl<T, C, F, O, Maf> Transport for AndThen<T, C>
where
T: Transport + 'static,
C: FnOnce(T::Output, Endpoint, T::MultiaddrFuture) -> F + Clone + 'static,
F: Future<Item = (O, Maf), Error = IoError> + 'static,
T::Dial: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
C: FnOnce(T::Output, Endpoint, T::MultiaddrFuture) -> F + Clone + Send + 'static,
F: Future<Item = (O, Maf), Error = IoError> + Send + 'static,
Maf: Future<Item = Multiaddr, Error = IoError> + 'static,
{
type Output = O;
type MultiaddrFuture = Maf;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (O, Self::MultiaddrFuture), Error = IoError>>;
type Dial = Box<Future<Item = (O, Self::MultiaddrFuture), Error = IoError>>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError> + Send>;
type ListenerUpgrade = Box<Future<Item = (O, Self::MultiaddrFuture), Error = IoError> + Send>;
type Dial = Box<Future<Item = (O, Self::MultiaddrFuture), Error = IoError> + Send>;

#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
Expand Down Expand Up @@ -116,12 +119,17 @@ where
impl<T, C, F, O, Maf> MuxedTransport for AndThen<T, C>
where
T: MuxedTransport + 'static,
C: FnOnce(T::Output, Endpoint, T::MultiaddrFuture) -> F + Clone + 'static,
F: Future<Item = (O, Maf), Error = IoError> + 'static,
T::Dial: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
T::Incoming: Send,
T::IncomingUpgrade: Send,
C: FnOnce(T::Output, Endpoint, T::MultiaddrFuture) -> F + Clone + Send + 'static,
F: Future<Item = (O, Maf), Error = IoError> + Send + 'static,
Maf: Future<Item = Multiaddr, Error = IoError> + 'static,
{
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (O, Self::MultiaddrFuture), Error = IoError>>;
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError> + Send>;
type IncomingUpgrade = Box<Future<Item = (O, Self::MultiaddrFuture), Error = IoError> + Send>;

#[inline]
fn next_incoming(self) -> Self::Incoming {
Expand All @@ -134,7 +142,7 @@ where
upgrade(connection, Endpoint::Listener, client_addr)
});

Box::new(future) as Box<Future<Item = _, Error = _>>
Box::new(future) as Box<Future<Item = _, Error = _> + Send>
});

Box::new(future) as Box<_>
Expand Down
16 changes: 8 additions & 8 deletions core/src/transport/choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,26 +85,26 @@ impl<A, B> MuxedTransport for OrTransport<A, B>
where
A: MuxedTransport,
B: MuxedTransport,
A::Incoming: 'static, // TODO: meh :-/
B::Incoming: 'static, // TODO: meh :-/
A::IncomingUpgrade: 'static, // TODO: meh :-/
B::IncomingUpgrade: 'static, // TODO: meh :-/
A::Incoming: Send + 'static, // TODO: meh :-/
B::Incoming: Send + 'static, // TODO: meh :-/
A::IncomingUpgrade: Send + 'static, // TODO: meh :-/
B::IncomingUpgrade: Send + 'static, // TODO: meh :-/
A::Output: 'static, // TODO: meh :-/
B::Output: 'static, // TODO: meh :-/
{
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError> + Send>;
type IncomingUpgrade =
Box<Future<Item = (EitherOutput<A::Output, B::Output>, Self::MultiaddrFuture), Error = IoError>>;
Box<Future<Item = (EitherOutput<A::Output, B::Output>, Self::MultiaddrFuture), Error = IoError> + Send>;

#[inline]
fn next_incoming(self) -> Self::Incoming {
let first = self.0.next_incoming().map(|out| {
let fut = out.map(move |(v, addr)| (EitherOutput::First(v), future::Either::A(addr)));
Box::new(fut) as Box<Future<Item = _, Error = _>>
Box::new(fut) as Box<Future<Item = _, Error = _> + Send>
});
let second = self.1.next_incoming().map(|out| {
let fut = out.map(move |(v, addr)| (EitherOutput::Second(v), future::Either::B(addr)));
Box::new(fut) as Box<Future<Item = _, Error = _>>
Box::new(fut) as Box<Future<Item = _, Error = _> + Send>
});
let future = first.select(second).map(|(i, _)| i).map_err(|(e, _)| e);
Box::new(future) as Box<_>
Expand Down
22 changes: 15 additions & 7 deletions core/src/transport/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ impl<T, F> Map<T, F> {
impl<T, F, D> Transport for Map<T, F>
where
T: Transport + 'static, // TODO: 'static :-/
F: FnOnce(T::Output, Endpoint) -> D + Clone + 'static, // TODO: 'static :-/
T::Dial: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
F: FnOnce(T::Output, Endpoint) -> D + Clone + Send + 'static, // TODO: 'static :-/
{
type Output = D;
type MultiaddrFuture = T::MultiaddrFuture;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError> + Send>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;

fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
let map = self.map;
Expand Down Expand Up @@ -91,10 +94,15 @@ where
impl<T, F, D> MuxedTransport for Map<T, F>
where
T: MuxedTransport + 'static, // TODO: 'static :-/
F: FnOnce(T::Output, Endpoint) -> D + Clone + 'static, // TODO: 'static :-/
T::Dial: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
T::Incoming: Send,
T::IncomingUpgrade: Send,
F: FnOnce(T::Output, Endpoint) -> D + Clone + Send + 'static, // TODO: 'static :-/
{
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError> + Send>;
type IncomingUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;

fn next_incoming(self) -> Self::Incoming {
let map = self.map;
Expand Down
8 changes: 5 additions & 3 deletions core/src/transport/map_err_dial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ impl<T, F> MapErrDial<T, F> {
impl<T, F> Transport for MapErrDial<T, F>
where
T: Transport + 'static, // TODO: 'static :-/
F: FnOnce(IoError, Multiaddr) -> IoError + Clone + 'static, // TODO: 'static :-/
T::Dial: Send,
F: FnOnce(IoError, Multiaddr) -> IoError + Clone + Send + 'static, // TODO: 'static :-/
{
type Output = T::Output;
type MultiaddrFuture = T::MultiaddrFuture;
type Listener = T::Listener;
type ListenerUpgrade = T::ListenerUpgrade;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;

fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
match self.transport.listen_on(addr) {
Expand Down Expand Up @@ -77,7 +78,8 @@ where
impl<T, F> MuxedTransport for MapErrDial<T, F>
where
T: MuxedTransport + 'static, // TODO: 'static :-/
F: FnOnce(IoError, Multiaddr) -> IoError + Clone + 'static, // TODO: 'static :-/
T::Dial: Send,
F: FnOnce(IoError, Multiaddr) -> IoError + Clone + Send + 'static, // TODO: 'static :-/
{
type Incoming = T::Incoming;
type IncomingUpgrade = T::IncomingUpgrade;
Expand Down
Loading