Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Try async-trait.
  • Loading branch information
romanb committed Jun 27, 2020
commit 9c6e6b4e88de7f7a7369b3f3834ab5f5233bc398
1 change: 1 addition & 0 deletions protocols/request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
async-trait = "0.1"
futures = "0.3.1"
libp2p-core = { version = "0.19.2", path = "../../core" }
libp2p-swarm = { version = "0.19.1", path = "../../swarm" }
Expand Down
20 changes: 11 additions & 9 deletions protocols/request-response/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

pub use libp2p_core::ProtocolName;

use futures::{prelude::*, future::BoxFuture};
use async_trait::async_trait;
use futures::prelude::*;
use std::io;

/// A `RequestResponseCodec` defines the request and response types
/// for a [`RequestResponse`](crate::RequestResponse) protocol or
/// protocol family and how they are encoded / decoded on an I/O stream.
#[async_trait]
pub trait RequestResponseCodec {
/// The type of protocol(s) or protocol versions being negotiated.
type Protocol: ProtocolName + Send + Clone;
Expand All @@ -36,29 +38,29 @@ pub trait RequestResponseCodec {

/// Reads a request from the given I/O stream according to the
/// negotiated protocol.
fn read_request<'a, T>(&mut self, protocol: &Self::Protocol, io: &'a mut T)
-> BoxFuture<'a, Result<Self::Request, io::Error>>
async fn read_request<T>(&mut self, protocol: &Self::Protocol, io: &mut T)
-> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send;

/// Reads a response from the given I/O stream according to the
/// negotiated protocol.
fn read_response<'a, T>(&mut self, protocol: &Self::Protocol, io: &'a mut T)
-> BoxFuture<'a, Result<Self::Response, io::Error>>
async fn read_response<T>(&mut self, protocol: &Self::Protocol, io: &mut T)
-> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send;

/// Writes a request to the given I/O stream according to the
/// negotiated protocol.
fn write_request<'a, T>(&mut self, protocol: &Self::Protocol, io: &'a mut T, req: Self::Request)
-> BoxFuture<'a, Result<(), io::Error>>
async fn write_request<T>(&mut self, protocol: &Self::Protocol, io: &mut T, req: Self::Request)
-> io::Result<()>
where
T: AsyncWrite + Unpin + Send;

/// Writes a response to the given I/O stream according to the
/// negotiated protocol.
fn write_response<'a, T>(&mut self, protocol: &Self::Protocol, io: &'a mut T, res: Self::Response)
-> BoxFuture<'a, Result<(), io::Error>>
async fn write_response<T>(&mut self, protocol: &Self::Protocol, io: &mut T, res: Self::Response)
-> io::Result<()>
where
T: AsyncWrite + Unpin + Send;
}
32 changes: 17 additions & 15 deletions protocols/request-response/tests/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

//! Integration tests for the `RequestResponse` network behaviour.

use async_trait::async_trait;
use libp2p_core::{
Multiaddr,
PeerId,
Expand All @@ -32,7 +33,7 @@ use libp2p_noise::{NoiseConfig, X25519Spec, Keypair};
use libp2p_request_response::*;
use libp2p_swarm::Swarm;
use libp2p_tcp::TcpConfig;
use futures::{prelude::*, channel::mpsc, future::BoxFuture};
use futures::{prelude::*, channel::mpsc};
use rand::{self, Rng};
use std::{io, iter};

Expand Down Expand Up @@ -147,47 +148,48 @@ impl ProtocolName for PingProtocol {
}
}

#[async_trait]
impl RequestResponseCodec for PingCodec {
type Protocol = PingProtocol;
type Request = Ping;
type Response = Pong;

fn read_request<'a, T>(&mut self, _: &PingProtocol, io: &'a mut T)
-> BoxFuture<'a, Result<Self::Request, io::Error>>
async fn read_request<T>(&mut self, _: &PingProtocol, io: &mut T)
-> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send
{
read_one(io, 1024)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
.and_then(|data| future::ready(Ok(Ping(data))))
.boxed()
.map_ok(Ping)
.await
}

fn read_response<'a, T>(&mut self, _: &PingProtocol, io: &'a mut T)
-> BoxFuture<'a, Result<Self::Response, io::Error>>
async fn read_response<T>(&mut self, _: &PingProtocol, io: &mut T)
-> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send
{
read_one(io, 1024)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
.and_then(|data| future::ready(Ok(Pong(data))))
.boxed()
.map_ok(Pong)
.await
}

fn write_request<'a, T>(&mut self, _: &PingProtocol, io: &'a mut T, Ping(data): Ping)
-> BoxFuture<'a, Result<(), io::Error>>
async fn write_request<T>(&mut self, _: &PingProtocol, io: &mut T, Ping(data): Ping)
-> io::Result<()>
where
T: AsyncWrite + Unpin + Send
{
write_one(io, data).boxed()
write_one(io, data).await
}

fn write_response<'a, T>(&mut self, _: &PingProtocol, io: &'a mut T, Pong(data): Pong)
-> BoxFuture<'a, Result<(), io::Error>>
async fn write_response<T>(&mut self, _: &PingProtocol, io: &mut T, Pong(data): Pong)
-> io::Result<()>
where
T: AsyncWrite + Unpin + Send
{
write_one(io, data).boxed()
write_one(io, data).await
}
}