Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion finality-aleph/src/tcp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,36 @@ use tokio::net::{

use crate::{
network::{Multiaddress, NetworkIdentity, PeerId},
validator_network::{Dialer, Listener, Splittable},
validator_network::{ConnectionInfo, Dialer, Listener, Splittable},
};

impl ConnectionInfo for TcpStream {
fn peer_address_info(&self) -> String {
match self.peer_addr() {
Ok(addr) => addr.to_string(),
Err(e) => e.to_string(),
}
}
}

impl ConnectionInfo for OwnedWriteHalf {
fn peer_address_info(&self) -> String {
match self.peer_addr() {
Ok(addr) => addr.to_string(),
Err(e) => e.to_string(),
}
}
}

impl ConnectionInfo for OwnedReadHalf {
fn peer_address_info(&self) -> String {
match self.peer_addr() {
Ok(addr) => addr.to_string(),
Err(e) => e.to_string(),
}
}
}

impl Splittable for TcpStream {
type Sender = OwnedWriteHalf;
type Receiver = OwnedReadHalf;
Expand Down
31 changes: 28 additions & 3 deletions finality-aleph/src/testing/mocks/validator_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use crate::{
crypto::AuthorityPen,
network::{mock::Channel, Data, Multiaddress, NetworkIdentity},
validator_network::{
mock::random_keys, Dialer as DialerT, Listener as ListenerT, Network, Service, Splittable,
mock::random_keys, ConnectionInfo, Dialer as DialerT, Listener as ListenerT, Network,
PeerAddressInfo, Service, Splittable,
},
};

Expand Down Expand Up @@ -119,6 +120,7 @@ impl<D: Data> MockNetwork<D> {
pub struct UnreliableDuplexStream {
stream: DuplexStream,
counter: Option<usize>,
peer_address: Address,
}

impl AsyncWrite for UnreliableDuplexStream {
Expand Down Expand Up @@ -160,33 +162,44 @@ impl AsyncRead for UnreliableDuplexStream {
pub struct UnreliableSplittable {
incoming_data: UnreliableDuplexStream,
outgoing_data: UnreliableDuplexStream,
peer_address: Address,
}

impl UnreliableSplittable {
/// Create a pair of mock splittables connected to each other.
pub fn new(max_buf_size: usize, ends_after: Option<usize>) -> (Self, Self) {
pub fn new(
max_buf_size: usize,
ends_after: Option<usize>,
peer_address: Address,
) -> (Self, Self) {
let (in_a, out_b) = duplex(max_buf_size);
let (in_b, out_a) = duplex(max_buf_size);
(
UnreliableSplittable {
incoming_data: UnreliableDuplexStream {
stream: in_a,
counter: ends_after,
peer_address,
},
outgoing_data: UnreliableDuplexStream {
stream: out_a,
counter: ends_after,
peer_address,
},
peer_address,
},
UnreliableSplittable {
incoming_data: UnreliableDuplexStream {
stream: in_b,
counter: ends_after,
peer_address,
},
outgoing_data: UnreliableDuplexStream {
stream: out_b,
counter: ends_after,
peer_address,
},
peer_address,
},
)
}
Expand Down Expand Up @@ -216,6 +229,18 @@ impl AsyncWrite for UnreliableSplittable {
}
}

impl ConnectionInfo for UnreliableSplittable {
fn peer_address_info(&self) -> PeerAddressInfo {
self.peer_address.to_string()
}
}

impl ConnectionInfo for UnreliableDuplexStream {
fn peer_address_info(&self) -> PeerAddressInfo {
self.peer_address.to_string()
}
}

impl Splittable for UnreliableSplittable {
type Sender = UnreliableDuplexStream;
type Receiver = UnreliableDuplexStream;
Expand Down Expand Up @@ -308,7 +333,7 @@ impl UnreliableConnectionMaker {
info!(target: "validator-network", "UnreliableConnectionMaker: waiting for new request...");
let (addr, c) = self.dialers.next().await.expect("should receive");
info!(target: "validator-network", "UnreliableConnectionMaker: received request");
let (l_stream, r_stream) = Connection::new(4096, connections_end_after);
let (l_stream, r_stream) = Connection::new(4096, connections_end_after, addr);
info!(target: "validator-network", "UnreliableConnectionMaker: sending stream");
c.send(l_stream).expect("should send");
self.listeners[addr as usize]
Expand Down
3 changes: 2 additions & 1 deletion finality-aleph/src/validator_network/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ pub async fn incoming<D: Data, S: Splittable>(
result_for_parent: mpsc::UnboundedSender<(AuthorityId, oneshot::Sender<()>)>,
data_for_user: mpsc::UnboundedSender<D>,
) {
let addr = stream.peer_address_info();
if let Err(e) = manage_incoming(authority_pen, stream, result_for_parent, data_for_user).await {
info!(target: "validator-network", "Incoming connection failed: {}", e);
info!(target: "validator-network", "Incoming connection failed: {}. Peer address info: {}", e, addr);
}
}
17 changes: 16 additions & 1 deletion finality-aleph/src/validator_network/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use aleph_primitives::{AuthorityId, KEY_TYPE};
use sp_keystore::{testing::KeyStore, CryptoStore};
use tokio::io::{duplex, AsyncRead, AsyncWrite, DuplexStream, ReadBuf};

use crate::{crypto::AuthorityPen, validator_network::Splittable};
use crate::{
crypto::AuthorityPen,
validator_network::{ConnectionInfo, PeerAddressInfo, Splittable},
};

/// Create a random authority id and pen pair.
pub async fn key() -> (AuthorityId, AuthorityPen) {
Expand Down Expand Up @@ -86,6 +89,18 @@ impl AsyncWrite for MockSplittable {
}
}

impl ConnectionInfo for MockSplittable {
fn peer_address_info(&self) -> PeerAddressInfo {
String::from("MOCK_ADDRESS")
}
}

impl ConnectionInfo for DuplexStream {
fn peer_address_info(&self) -> PeerAddressInfo {
String::from("MOCK_ADDRESS")
}
}

impl Splittable for MockSplittable {
type Sender = DuplexStream;
type Receiver = DuplexStream;
Expand Down
14 changes: 11 additions & 3 deletions finality-aleph/src/validator_network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,18 @@ pub trait Network<A: Data, D: Data>: Send + 'static {
async fn next(&mut self) -> Option<D>;
}

pub type PeerAddressInfo = String;

/// Reports address of the peer that we are connected to.
pub trait ConnectionInfo {
/// Return the address of the peer that we are connected to.
fn peer_address_info(&self) -> PeerAddressInfo;
}

/// A stream that can be split into a sending and receiving part.
pub trait Splittable: AsyncWrite + AsyncRead + Unpin + Send {
type Sender: AsyncWrite + Unpin + Send;
type Receiver: AsyncRead + Unpin + Send;
pub trait Splittable: AsyncWrite + AsyncRead + ConnectionInfo + Unpin + Send {
type Sender: AsyncWrite + ConnectionInfo + Unpin + Send;
type Receiver: AsyncRead + ConnectionInfo + Unpin + Send;

/// Split into the sending and receiving part.
fn split(self) -> (Self::Sender, Self::Receiver);
Expand Down
36 changes: 16 additions & 20 deletions finality-aleph/src/validator_network/outgoing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,31 @@ use crate::{
validator_network::{
protocol_negotiation::{protocol, ProtocolNegotiationError},
protocols::ProtocolError,
Data, Dialer,
ConnectionInfo, Data, Dialer, PeerAddressInfo,
},
};

enum OutgoingError<A: Data, ND: Dialer<A>> {
Dial(ND::Error),
ProtocolNegotiation(ProtocolNegotiationError),
Protocol(ProtocolError),
ProtocolNegotiation(PeerAddressInfo, ProtocolNegotiationError),
Protocol(PeerAddressInfo, ProtocolError),
}

impl<A: Data, ND: Dialer<A>> Display for OutgoingError<A, ND> {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
use OutgoingError::*;
match self {
Dial(e) => write!(f, "dial error: {}", e),
ProtocolNegotiation(e) => write!(f, "protocol negotiation error: {}", e),
Protocol(e) => write!(f, "protocol error: {}", e),
ProtocolNegotiation(addr, e) => write!(
f,
"protocol negotiation error: {}, peer address info: {}",
e, addr
),
Protocol(addr, e) => write!(f, "protocol error: {}, peer address info: {}", e, addr),
}
}
}

impl<A: Data, ND: Dialer<A>> From<ProtocolNegotiationError> for OutgoingError<A, ND> {
fn from(e: ProtocolNegotiationError) -> Self {
OutgoingError::ProtocolNegotiation(e)
}
}

impl<A: Data, ND: Dialer<A>> From<ProtocolError> for OutgoingError<A, ND> {
fn from(e: ProtocolError) -> Self {
OutgoingError::Protocol(e)
}
}

async fn manage_outgoing<D: Data, A: Data, ND: Dialer<A>>(
authority_pen: AuthorityPen,
peer_id: AuthorityId,
Expand All @@ -55,12 +47,16 @@ async fn manage_outgoing<D: Data, A: Data, ND: Dialer<A>>(
.connect(addresses)
.await
.map_err(OutgoingError::Dial)?;
let peer_address_info = stream.peer_address_info();
debug!(target: "validator-network", "Performing outgoing protocol negotiation.");
let (stream, protocol) = protocol(stream).await?;
let (stream, protocol) = protocol(stream)
.await
.map_err(|e| OutgoingError::ProtocolNegotiation(peer_address_info.clone(), e))?;
debug!(target: "validator-network", "Negotiated protocol, running.");
Ok(protocol
protocol
.manage_outgoing(stream, authority_pen, peer_id, result_for_parent)
.await?)
.await
.map_err(|e| OutgoingError::Protocol(peer_address_info.clone(), e))
}

const RETRY_DELAY: Duration = Duration::from_secs(10);
Expand Down