Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Structs for mock data
  • Loading branch information
maciejzelaszczyk committed Dec 19, 2022
commit 9f99e3bd970d17e24248e05acedc20aa59b257d3
4 changes: 2 additions & 2 deletions e2e-tests/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::accounts::{get_sudo_key, get_validators_keys, get_validators_seeds, N

static GLOBAL_CONFIG: Lazy<Config> = Lazy::new(|| {
let node = get_env("NODE_URL").unwrap_or_else(|| "ws://127.0.0.1:9943".to_string());
let validator_count = get_env("VALIDATOR_COUNT").unwrap_or_else(|| 5);
let validator_count = get_env("VALIDATOR_COUNT").unwrap_or(5);
let validators_seeds = env::var("VALIDATORS_SEEDS")
.ok()
.map(|s| s.split(',').map(|s| s.to_string()).collect());
Expand Down Expand Up @@ -41,7 +41,7 @@ where
{
env::var(name).ok().map(|v| {
v.parse()
.expect(&format!("Failed to parse env var {}", name))
.unwrap_or_else(|_| panic!("Failed to parse env var {}", name))
})
}

Expand Down
5 changes: 3 additions & 2 deletions finality-aleph/src/validator_network/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ use std::{
};

use codec::{Decode, Encode};
use futures::{Future, channel::mpsc::UnboundedReceiver};
use futures::{channel::mpsc::UnboundedReceiver, Future};
use tokio::io::{duplex, AsyncRead, AsyncWrite, DuplexStream, ReadBuf};

use crate::{
network::PeerId,
validator_network::{
protocols::{ProtocolError, ResultForService},
ConnectionInfo, PeerAddressInfo, PublicKey, SecretKey, Splittable
ConnectionInfo, PeerAddressInfo, PublicKey, SecretKey, Splittable,
},
};

Expand Down Expand Up @@ -166,6 +166,7 @@ pub struct MockPrelims<D> {
pub incoming_handle: Pin<Box<dyn Future<Output = Result<(), ProtocolError<MockPublicKey>>>>>,
pub outgoing_handle: Pin<Box<dyn Future<Output = Result<(), ProtocolError<MockPublicKey>>>>>,
pub data_from_incoming: UnboundedReceiver<D>,
pub data_from_outgoing: Option<UnboundedReceiver<D>>,
pub result_from_incoming: UnboundedReceiver<ResultForService<MockPublicKey, D>>,
pub result_from_outgoing: UnboundedReceiver<ResultForService<MockPublicKey, D>>,
}
121 changes: 64 additions & 57 deletions finality-aleph/src/validator_network/protocols/v0/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,7 @@ pub async fn incoming<SK: SecretKey, D: Data, S: Splittable>(

#[cfg(test)]
mod tests {
use futures::{
channel::mpsc,
pin_mut, FutureExt, StreamExt,
};
use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt};

use super::{incoming, outgoing, ProtocolError};
use crate::validator_network::{
Expand Down Expand Up @@ -153,6 +150,7 @@ mod tests {
incoming_handle,
outgoing_handle,
data_from_incoming,
data_from_outgoing: None,
result_from_incoming,
result_from_outgoing,
}
Expand All @@ -170,6 +168,7 @@ mod tests {
mut data_from_incoming,
result_from_incoming: _result_from_incoming,
mut result_from_outgoing,
..
} = prepare::<Vec<i32>>();
let incoming_handle = incoming_handle.fuse();
let outgoing_handle = outgoing_handle.fuse();
Expand All @@ -179,7 +178,7 @@ mod tests {
_ = &mut incoming_handle => panic!("incoming process unexpectedly finished"),
_ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"),
result = result_from_outgoing.next() => {
let (_, maybe_data_for_outgoing, connection_type) = result.expect("the chennel shouldn't be dropped");
let (_, maybe_data_for_outgoing, connection_type) = result.expect("the channel shouldn't be dropped");
assert_eq!(connection_type, ConnectionType::LegacyOutgoing);
let data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected");
data_for_outgoing
Expand Down Expand Up @@ -219,8 +218,9 @@ mod tests {
data_from_incoming: _data_from_incoming,
mut result_from_incoming,
result_from_outgoing: _result_from_outgoing,
..
} = prepare::<Vec<i32>>();
let incoming_handle = incoming_handle.fuse();
let incoming_handle = incoming_handle.fuse();
let outgoing_handle = outgoing_handle.fuse();
pin_mut!(incoming_handle);
pin_mut!(outgoing_handle);
Expand All @@ -229,7 +229,7 @@ mod tests {
_ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"),
received = result_from_incoming.next() => {
// we drop the exit oneshot channel, thus finishing incoming_handle
let (received_id, _, connection_type) = received.expect("the chennel shouldn't be dropped");
let (received_id, _, connection_type) = received.expect("the channel shouldn't be dropped");
assert_eq!(connection_type, ConnectionType::LegacyIncoming);
assert_eq!(received_id, id_outgoing);
},
Expand All @@ -251,6 +251,7 @@ mod tests {
data_from_incoming: _data_from_incoming,
result_from_incoming,
result_from_outgoing: _result_from_outgoing,
..
} = prepare::<Vec<i32>>();
std::mem::drop(result_from_incoming);
let incoming_handle = incoming_handle.fuse();
Expand All @@ -269,17 +270,18 @@ mod tests {

#[tokio::test]
async fn parent_user_dead() {
let (
_id_incoming,
_pen_incoming,
_id_outgoing,
_pen_outgoing,
let MockPrelims {
id_incoming: _id_incoming,
pen_incoming: _pen_incoming,
id_outgoing: _id_outgoing,
pen_outgoing: _pen_outgoing,
incoming_handle,
outgoing_handle,
data_from_incoming,
_result_from_incoming,
result_from_incoming: _result_from_incoming,
mut result_from_outgoing,
) = prepare::<Vec<i32>>();
..
} = prepare::<Vec<i32>>();
std::mem::drop(data_from_incoming);
let incoming_handle = incoming_handle.fuse();
let outgoing_handle = outgoing_handle.fuse();
Expand All @@ -289,7 +291,7 @@ mod tests {
_ = &mut incoming_handle => panic!("incoming process unexpectedly finished"),
_ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"),
result = result_from_outgoing.next() => {
let (_, maybe_data_for_outgoing, connection_type) = result.expect("the chennel shouldn't be dropped");
let (_, maybe_data_for_outgoing, connection_type) = result.expect("the channel shouldn't be dropped");
assert_eq!(connection_type, ConnectionType::LegacyOutgoing);
let data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected");
data_for_outgoing
Expand All @@ -310,17 +312,18 @@ mod tests {

#[tokio::test]
async fn sender_dead_before_handshake() {
let (
_id_incoming,
_pen_incoming,
_id_outgoing,
_pen_outgoing,
let MockPrelims {
id_incoming: _id_incoming,
pen_incoming: _pen_incoming,
id_outgoing: _id_outgoing,
pen_outgoing: _pen_outgoing,
incoming_handle,
outgoing_handle,
_data_from_incoming,
_result_from_incoming,
_result_from_outgoing,
) = prepare::<Vec<i32>>();
data_from_incoming: _data_from_incoming,
result_from_incoming: _result_from_incoming,
result_from_outgoing: _result_from_outgoing,
..
} = prepare::<Vec<i32>>();
std::mem::drop(outgoing_handle);
match incoming_handle.await {
Err(ProtocolError::HandshakeError(_)) => (),
Expand All @@ -331,17 +334,18 @@ mod tests {

#[tokio::test]
async fn sender_dead_after_handshake() {
let (
_id_incoming,
_pen_incoming,
_id_outgoing,
_pen_outgoing,
let MockPrelims {
id_incoming: _id_incoming,
pen_incoming: _pen_incoming,
id_outgoing: _id_outgoing,
pen_outgoing: _pen_outgoing,
incoming_handle,
outgoing_handle,
_data_from_incoming,
data_from_incoming: _data_from_incoming,
mut result_from_incoming,
_result_from_outgoing,
) = prepare::<Vec<i32>>();
result_from_outgoing: _result_from_outgoing,
..
} = prepare::<Vec<i32>>();
let incoming_handle = incoming_handle.fuse();
pin_mut!(incoming_handle);
let (_, _exit, connection_type) = tokio::select! {
Expand All @@ -360,17 +364,18 @@ mod tests {

#[tokio::test]
async fn receiver_dead_before_handshake() {
let (
_id_incoming,
_pen_incoming,
_id_outgoing,
_pen_outgoing,
let MockPrelims {
id_incoming: _id_incoming,
pen_incoming: _pen_incoming,
id_outgoing: _id_outgoing,
pen_outgoing: _pen_outgoing,
incoming_handle,
outgoing_handle,
_data_from_incoming,
_result_from_incoming,
_result_from_outgoing,
) = prepare::<Vec<i32>>();
data_from_incoming: _data_from_incoming,
result_from_incoming: _result_from_incoming,
result_from_outgoing: _result_from_outgoing,
..
} = prepare::<Vec<i32>>();
std::mem::drop(incoming_handle);
match outgoing_handle.await {
Err(ProtocolError::HandshakeError(_)) => (),
Expand All @@ -381,17 +386,18 @@ mod tests {

#[tokio::test]
async fn receiver_dead_after_handshake() {
let (
_id_incoming,
_pen_incoming,
_id_outgoing,
_pen_outgoing,
let MockPrelims {
id_incoming: _id_incoming,
pen_incoming: _pen_incoming,
id_outgoing: _id_outgoing,
pen_outgoing: _pen_outgoing,
incoming_handle,
outgoing_handle,
_data_from_incoming,
data_from_incoming: _data_from_incoming,
mut result_from_incoming,
_result_from_outgoing,
) = prepare::<Vec<i32>>();
result_from_outgoing: _result_from_outgoing,
..
} = prepare::<Vec<i32>>();
let outgoing_handle = outgoing_handle.fuse();
pin_mut!(outgoing_handle);
let (_, _exit, connection_type) = tokio::select! {
Expand All @@ -412,17 +418,18 @@ mod tests {

#[tokio::test]
async fn receiver_dead_after_handshake_try_send_error() {
let (
_id_incoming,
_pen_incoming,
_id_outgoing,
_pen_outgoing,
let MockPrelims {
id_incoming: _id_incoming,
pen_incoming: _pen_incoming,
id_outgoing: _id_outgoing,
pen_outgoing: _pen_outgoing,
incoming_handle,
outgoing_handle,
_data_from_incoming,
data_from_incoming: _data_from_incoming,
mut result_from_incoming,
_result_from_outgoing,
) = prepare::<Vec<i32>>();
result_from_outgoing: _result_from_outgoing,
..
} = prepare::<Vec<i32>>();
let outgoing_handle = outgoing_handle.fuse();
pin_mut!(outgoing_handle);
let (_, _exit, connection_type) = tokio::select! {
Expand Down
Loading