diff --git a/core/src/transport/boxed.rs b/core/src/transport/boxed.rs new file mode 100644 index 00000000000..dff8298f91a --- /dev/null +++ b/core/src/transport/boxed.rs @@ -0,0 +1,232 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::prelude::*; +use multiaddr::Multiaddr; +use std::fmt; +use std::io::Error as IoError; +use std::sync::Arc; +use transport::{MuxedTransport, Transport}; + +/// See the `Transport::boxed` method. +#[inline] +pub fn boxed(transport: T) -> Boxed +where + T: Transport + Clone + Send + Sync + 'static, + T::Dial: Send + 'static, + T::Listener: Send + 'static, + T::ListenerUpgrade: Send + 'static, + T::MultiaddrFuture: Send + 'static, +{ + Boxed { + inner: Arc::new(transport) as Arc<_>, + } +} +/// See the `Transport::boxed_muxed` method. +#[inline] +pub fn boxed_muxed(transport: T) -> BoxedMuxed +where + T: MuxedTransport + Clone + Send + Sync + 'static, + T::Dial: Send + 'static, + T::Listener: Send + 'static, + T::ListenerUpgrade: Send + 'static, + T::MultiaddrFuture: Send + 'static, + T::Incoming: Send + 'static, + T::IncomingUpgrade: Send + 'static, +{ + BoxedMuxed { + inner: Arc::new(transport) as Arc<_>, + } +} + +pub type MultiaddrFuture = Box + Send>; +pub type Dial = Box + Send>; +pub type Listener = Box, Error = IoError> + Send>; +pub type ListenerUpgrade = Box + Send>; +pub type Incoming = Box, Error = IoError> + Send>; +pub type IncomingUpgrade = Box + Send>; + +trait Abstract { + fn listen_on(&self, addr: Multiaddr) -> Result<(Listener, Multiaddr), Multiaddr>; + fn dial(&self, addr: Multiaddr) -> Result, Multiaddr>; + fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option; +} + +impl Abstract for T +where + T: Transport + Clone + 'static, + T::Dial: Send + 'static, + T::Listener: Send + 'static, + T::ListenerUpgrade: Send + 'static, + T::MultiaddrFuture: Send + 'static, +{ + fn listen_on(&self, addr: Multiaddr) -> Result<(Listener, Multiaddr), Multiaddr> { + let (listener, new_addr) = + Transport::listen_on(self.clone(), addr).map_err(|(_, addr)| addr)?; + let fut = listener.map(|upgrade| { + let fut = upgrade.map(|(out, addr)| (out, Box::new(addr) as MultiaddrFuture)); + Box::new(fut) as ListenerUpgrade + }); + Ok((Box::new(fut) as Box<_>, new_addr)) + } + + fn dial(&self, addr: Multiaddr) -> Result, Multiaddr> { + let fut = Transport::dial(self.clone(), addr) + .map_err(|(_, addr)| addr)? + .map(|(out, addr)| (out, Box::new(addr) as MultiaddrFuture)); + Ok(Box::new(fut) as Box<_>) + } + + #[inline] + fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + Transport::nat_traversal(self, server, observed) + } +} + +trait AbstractMuxed: Abstract { + fn next_incoming(&self) -> Incoming; +} + +impl AbstractMuxed for T +where + T: MuxedTransport + Clone + 'static, + T::Dial: Send + 'static, + T::Listener: Send + 'static, + T::ListenerUpgrade: Send + 'static, + T::MultiaddrFuture: Send + 'static, + T::Incoming: Send + 'static, + T::IncomingUpgrade: Send + 'static, +{ + fn next_incoming(&self) -> Incoming { + let fut = MuxedTransport::next_incoming(self.clone()).map(|upgrade| { + let fut = upgrade.map(|(out, addr)| (out, Box::new(addr) as MultiaddrFuture)); + Box::new(fut) as IncomingUpgrade + }); + Box::new(fut) as Box<_> + } +} + +/// See the `Transport::boxed` method. +pub struct Boxed { + inner: Arc + Send + Sync>, +} + +impl fmt::Debug for Boxed { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "BoxedTransport") + } +} + +impl Clone for Boxed { + #[inline] + fn clone(&self) -> Self { + Boxed { + inner: self.inner.clone(), + } + } +} + +impl Transport for Boxed { + type Output = O; + type MultiaddrFuture = MultiaddrFuture; + type Listener = Listener; + type ListenerUpgrade = ListenerUpgrade; + type Dial = Dial; + + #[inline] + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { + match self.inner.listen_on(addr) { + Ok(listen) => Ok(listen), + Err(addr) => Err((self, addr)), + } + } + + #[inline] + fn dial(self, addr: Multiaddr) -> Result { + match self.inner.dial(addr) { + Ok(dial) => Ok(dial), + Err(addr) => Err((self, addr)), + } + } + + #[inline] + fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.inner.nat_traversal(server, observed) + } +} + +/// See the `Transport::boxed_muxed` method. +pub struct BoxedMuxed { + inner: Arc + Send + Sync>, +} + +impl fmt::Debug for BoxedMuxed { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "BoxedMuxedTransport") + } +} + +impl Clone for BoxedMuxed { + #[inline] + fn clone(&self) -> Self { + BoxedMuxed { + inner: self.inner.clone(), + } + } +} + +impl Transport for BoxedMuxed { + type Output = O; + type MultiaddrFuture = MultiaddrFuture; + type Listener = Listener; + type ListenerUpgrade = ListenerUpgrade; + type Dial = Dial; + + #[inline] + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { + match self.inner.listen_on(addr) { + Ok(listen) => Ok(listen), + Err(addr) => Err((self, addr)), + } + } + + #[inline] + fn dial(self, addr: Multiaddr) -> Result { + match self.inner.dial(addr) { + Ok(dial) => Ok(dial), + Err(addr) => Err((self, addr)), + } + } + + #[inline] + fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.inner.nat_traversal(server, observed) + } +} + +impl MuxedTransport for BoxedMuxed { + type Incoming = Incoming; + type IncomingUpgrade = IncomingUpgrade; + + #[inline] + fn next_incoming(self) -> Self::Incoming { + self.inner.next_incoming() + } +} diff --git a/core/src/transport/mod.rs b/core/src/transport/mod.rs index 690dbf4f0e5..479a33f28de 100644 --- a/core/src/transport/mod.rs +++ b/core/src/transport/mod.rs @@ -38,6 +38,7 @@ use tokio_io::{AsyncRead, AsyncWrite}; use upgrade::{ConnectionUpgrade, Endpoint}; pub mod and_then; +pub mod boxed; pub mod choice; pub mod denied; pub mod dummy; @@ -49,6 +50,7 @@ pub mod memory; pub mod muxed; pub mod upgrade; +pub use self::boxed::BoxedMuxed; pub use self::choice::OrTransport; pub use self::denied::DeniedTransport; pub use self::dummy::DummyMuxing; @@ -121,6 +123,34 @@ pub trait Transport { /// doesn't recognize the protocols, or if `server` and `observed` are related. fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option; + /// Turns this `Transport` into an abstract boxed transport. + #[inline] + fn boxed(self) -> boxed::Boxed + where Self: Sized + MuxedTransport + Clone + Send + Sync + 'static, + Self::Dial: Send + 'static, + Self::Listener: Send + 'static, + Self::ListenerUpgrade: Send + 'static, + Self::MultiaddrFuture: Send + 'static, + { + boxed::boxed(self) + } + + /// Turns this `Transport` into an abstract boxed transport. + /// + /// This is the version if the transport supports muxing. + #[inline] + fn boxed_muxed(self) -> boxed::BoxedMuxed + where Self: Sized + MuxedTransport + Clone + Send + Sync + 'static, + Self::Dial: Send + 'static, + Self::Listener: Send + 'static, + Self::ListenerUpgrade: Send + 'static, + Self::MultiaddrFuture: Send + 'static, + Self::Incoming: Send + 'static, + Self::IncomingUpgrade: Send + 'static, + { + boxed::boxed_muxed(self) + } + /// Applies a function on the output of the `Transport`. #[inline] fn map(self, map: F) -> map::Map