Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
01af630
skeleton
sandreim Oct 25, 2023
7c22abe
wip
sandreim Nov 6, 2023
c3adc77
measure tput and fixes
sandreim Nov 6, 2023
31b0351
add network emulation
sandreim Nov 7, 2023
e4bb037
cleanup
sandreim Nov 7, 2023
a694924
Add latency emulation
sandreim Nov 7, 2023
7ca4dba
support multiple pov sizes
sandreim Nov 8, 2023
0430b5b
new metric in recovery and more testing
sandreim Nov 8, 2023
027bcd8
CLI update and fixes
sandreim Nov 9, 2023
5a05da0
peer stats
sandreim Nov 9, 2023
895e8d6
Switch stats to atomics
sandreim Nov 10, 2023
a2fb0c9
add more network metrics, new load generator
sandreim Nov 12, 2023
d1b9fa3
refactor
sandreim Nov 14, 2023
c5937ab
pretty cli + minor refactor + remove unused
sandreim Nov 15, 2023
d6c259d
update
sandreim Nov 15, 2023
050529b
remove comment
sandreim Nov 15, 2023
cb38be5
separate cli options for availability
sandreim Nov 17, 2023
24a736a
implement unified and extensible configuration
sandreim Nov 17, 2023
2843865
Prepare to swtich to overseer
sandreim Nov 24, 2023
fd4620e
Merge branch 'master' of github.com:paritytech/polkadot-sdk into sand…
sandreim Nov 24, 2023
b17a147
add mocked subsystems
sandreim Nov 27, 2023
4724d8c
full overseer based implementation complete
sandreim Nov 27, 2023
7aed30f
make clean
sandreim Nov 27, 2023
b51485b
more cleaning
sandreim Nov 27, 2023
7e46444
more cleaning
sandreim Nov 27, 2023
d3df927
proper overseer control
sandreim Nov 27, 2023
7557768
refactor CLI display of env stats
sandreim Nov 27, 2023
787dc00
Add grafana dashboards for DA read
sandreim Nov 28, 2023
cd18f8d
network stats fixes
sandreim Nov 28, 2023
e8506b3
move examples and grafana
sandreim Nov 28, 2023
cbb6772
Add readme
sandreim Nov 28, 2023
1a80870
fmt + readme updates
sandreim Nov 28, 2023
eb49ea0
update dashboard and sample
sandreim Nov 28, 2023
b249056
remove unused
sandreim Nov 28, 2023
7fbcdfc
Merge branch 'master' of github.com:paritytech/polkadot-sdk into sand…
sandreim Nov 28, 2023
fb34181
revert unneeded changes
sandreim Nov 28, 2023
3a716a5
add missing comments and minor fixes
sandreim Nov 29, 2023
a092b76
clippy
sandreim Nov 29, 2023
ca27370
zepter format features --fix
sandreim Nov 29, 2023
be814e5
fix markdown
sandreim Nov 29, 2023
11ce8f5
remove sleep till end of block
sandreim Nov 29, 2023
8d93abc
review
sandreim Nov 29, 2023
af141ee
Emulated network improvements
sandreim Dec 1, 2023
29d80fa
fix comment
sandreim Dec 1, 2023
70ac38e
Add cpu profiling
AndreiEres Dec 4, 2023
a06f2a5
Merge branch 'sandreim/subsystem-bench' into AndreiEres/cpu-profiling
AndreiEres Dec 5, 2023
b9f4dd9
Update polkadot/node/subsystem-bench/README.md
AndreiEres Dec 5, 2023
8736689
Update
AndreiEres Dec 5, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor
Signed-off-by: Andrei Sandu <[email protected]>
  • Loading branch information
sandreim committed Nov 14, 2023
commit d1b9fa39aaa98cf7e20b2108399a887780255d3b
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions polkadot/node/subsystem-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ color-eyre = { version = "0.6.1", default-features = false }
assert_matches = "1.5"
async-trait = "0.1.57"
sp-keystore = { path = "../../../substrate/primitives/keystore" }
sc-keystore = { path = "../../../substrate/client/keystore" }
sp-core = { path = "../../../substrate/primitives/core" }
clap = { version = "4.4.6", features = ["derive"] }
futures = "0.3.21"
Expand Down
105 changes: 59 additions & 46 deletions polkadot/node/subsystem-bench/src/availability/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ use std::{
time::{Duration, Instant},
};

use sc_keystore::LocalKeystore;
use sp_application_crypto::AppCrypto;
use sp_keystore::{Keystore, KeystorePtr};

use futures::{
channel::{mpsc, oneshot},
stream::FuturesUnordered,
Expand Down Expand Up @@ -53,7 +57,7 @@ use polkadot_node_subsystem::{
};
use std::net::{Ipv4Addr, SocketAddr};

mod test_env;
use super::core::{keyring::Keyring, network::*, test_env::TestEnvironmentMetrics};

const LOG_TARGET: &str = "subsystem-bench::availability";

Expand All @@ -71,9 +75,8 @@ use polkadot_primitives_test_helpers::{dummy_candidate_receipt, dummy_hash};
use sc_service::{SpawnTaskHandle, TaskManager};

mod configuration;
mod network;

pub use configuration::TestConfiguration;
pub use configuration::{PeerLatency, TestConfiguration};

// Deterministic genesis hash for protocol names
const GENESIS_HASH: Hash = Hash::repeat_byte(0xff);
Expand Down Expand Up @@ -140,10 +143,12 @@ impl TestEnvironment {
task_manager.spawn_handle(),
state.config().use_fast_path,
);

let metrics =
TestEnvironmentMetrics::new(&registry).expect("Metrics need to be registered");
let mut network = NetworkEmulator::new(
state.config().n_validators,
state.validator_authority_id.clone(),
state.config().peer_bandwidth,
task_manager.spawn_handle(),
&registry,
Expand Down Expand Up @@ -243,7 +248,7 @@ impl TestEnvironment {
) -> NetworkAction {
match request {
Requests::ChunkFetchingV1(outgoing_request) => {
let validator_index = outgoing_request.payload.index.0 as usize;
let validator_index: usize = outgoing_request.payload.index.0 as usize;
let candidate_hash = outgoing_request.payload.candidate_hash;

let candidate_index = state
Expand All @@ -266,6 +271,12 @@ impl TestEnvironment {
Ok(req_res::v1::ChunkFetchingResponse::from(Some(chunk)).encode())
};

let authority_discovery_id = match outgoing_request.peer {
req_res::Recipient::Authority(authority_discovery_id) => authority_discovery_id,
_ => panic!("Peer recipient not supported yet"),
};
let authority_discovery_id_clone = authority_discovery_id.clone();

let future = async move {
let _ = outgoing_request.pending_response.send(response);
}
Expand All @@ -274,26 +285,21 @@ impl TestEnvironment {
let future_wrapper = async move {
// Forward the response to the ingress channel of our node.
// On receive side we apply our node receiving rate limit.
let action = NetworkAction::new(validator_index, future, size, None);
let action =
NetworkAction::new(authority_discovery_id_clone, future, size, None);
ingress_tx.send(action).unwrap();
}
.boxed();

NetworkAction::new(
validator_index,
authority_discovery_id,
future_wrapper,
size,
// Generate a random latency based on configuration.
Self::random_latency(state.config().latency.as_ref()),
)
},
Requests::AvailableDataFetchingV1(outgoing_request) => {
println!("{:?}", outgoing_request);
// TODO: do better, by implementing diff authority ids and mapping network actions
// to authority id,
let validator_index =
Uniform::from(0..state.config().n_validators).sample(&mut thread_rng());

let candidate_hash = outgoing_request.payload.candidate_hash;
let candidate_index = state
.candidate_hashes
Expand All @@ -318,16 +324,23 @@ impl TestEnvironment {
}
.boxed();

let authority_discovery_id = match outgoing_request.peer {
req_res::Recipient::Authority(authority_discovery_id) => authority_discovery_id,
_ => panic!("Peer recipient not supported yet"),
};
let authority_discovery_id_clone = authority_discovery_id.clone();

let future_wrapper = async move {
// Forward the response to the ingress channel of our node.
// On receive side we apply our node receiving rate limit.
let action = NetworkAction::new(validator_index, future, size, None);
let action =
NetworkAction::new(authority_discovery_id_clone, future, size, None);
ingress_tx.send(action).unwrap();
}
.boxed();

NetworkAction::new(
validator_index,
authority_discovery_id,
future_wrapper,
size,
// Generate a random latency based on configuration.
Expand Down Expand Up @@ -362,7 +375,7 @@ impl TestEnvironment {
network.inc_sent(Self::request_size(&request));
let action = Self::respond_to_send_request(&mut state, request, ingress_tx.clone());
// Account for our node sending the request over the emulated network.
network.submit_peer_action(action.index(), action);
network.submit_peer_action(action.peer(), action);
}
},
AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryAvailableData(_candidate_hash, tx)) => {
Expand Down Expand Up @@ -470,25 +483,24 @@ impl AvailabilityRecoverySubsystemInstance {
}
}

pub fn random_pov_size(min_pov_size: usize, max_pov_size: usize) -> usize {
random_uniform_sample(min_pov_size, max_pov_size)
}

fn random_uniform_sample<T: Into<usize> + From<usize>>(min_value: T, max_value: T) -> T {
Uniform::from(min_value.into()..=max_value.into())
.sample(&mut thread_rng())
.into()
}

// We use this to bail out sending messages to the subsystem if it is overloaded such that
// the time of flight is breaches 5s.
// This should eventually be a test parameter.
const MAX_TIME_OF_FLIGHT: Duration = Duration::from_millis(5000);

use sp_keyring::Sr25519Keyring;

use crate::availability::network::{ActionFuture, NetworkAction};

use self::{
configuration::PeerLatency,
network::{NetworkEmulator, RateLimit},
test_env::TestEnvironmentMetrics,
};

#[derive(Clone)]
pub struct TestState {
validators: Vec<Sr25519Keyring>,
validator_public: IndexedVec<ValidatorIndex, ValidatorId>,
validator_public: Vec<ValidatorId>,
validator_authority_id: Vec<AuthorityDiscoveryId>,
// The test node validator index.
validator_index: ValidatorIndex,
Expand Down Expand Up @@ -531,7 +543,7 @@ impl TestState {
let validator_groups = my_vec.chunks(5).map(|x| Vec::from(x)).collect::<Vec<_>>();

SessionInfo {
validators: self.validator_public.clone(),
validators: self.validator_public.clone().into(),
discovery_keys: self.validator_authority_id.clone(),
validator_groups: IndexedVec::<GroupIndex, Vec<ValidatorIndex>>::from(validator_groups),
assignment_keys: vec![],
Expand Down Expand Up @@ -608,13 +620,24 @@ impl TestState {
}

pub fn new(config: TestConfiguration) -> Self {
let validators = (0..config.n_validators as u64)
.into_iter()
.map(|_v| Sr25519Keyring::Alice)
let keystore: KeystorePtr = Arc::new(LocalKeystore::in_memory());

let keyrings = (0..config.n_validators)
.map(|peer_index| Keyring::new(format!("Node{}", peer_index).into()))
.collect::<Vec<_>>();

let validator_public = validator_pubkeys(&validators);
let validator_authority_id = validator_authority_id(&validators);
// Generate `AuthorityDiscoveryId`` for each peer
let validator_public: Vec<ValidatorId> = keyrings
.iter()
.map(|keyring: &Keyring| keyring.clone().public().into())
.collect::<Vec<_>>();

let validator_authority_id: Vec<AuthorityDiscoveryId> = keyrings
.iter()
.map({ |keyring| keyring.clone().public().into() })
.collect::<Vec<_>>()
.into();

let validator_index = ValidatorIndex(0);
let mut chunks = Vec::new();
let mut available_data = Vec::new();
Expand Down Expand Up @@ -643,7 +666,7 @@ impl TestState {
};

let (new_chunks, erasure_root) = derive_erasure_chunks_with_proofs_and_root(
validators.len(),
config.n_validators,
&new_available_data,
|_, _| {},
);
Expand All @@ -658,7 +681,6 @@ impl TestState {

let pov_sizes = config.pov_sizes.clone().into_iter().cycle();
let mut state = Self {
validators,
validator_public,
validator_authority_id,
validator_index,
Expand All @@ -681,14 +703,6 @@ impl TestState {
}
}

fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> IndexedVec<ValidatorIndex, ValidatorId> {
val_ids.iter().map(|v| v.public().into()).collect()
}

fn validator_authority_id(val_ids: &[Sr25519Keyring]) -> Vec<AuthorityDiscoveryId> {
val_ids.iter().map(|v| v.public().into()).collect()
}

fn derive_erasure_chunks_with_proofs_and_root(
n_validators: usize,
available_data: &AvailableData,
Expand Down Expand Up @@ -731,8 +745,6 @@ pub async fn bench_chunk_recovery(env: &mut TestEnvironment) {

env.metrics().set_n_validators(config.n_validators);
env.metrics().set_n_cores(config.n_cores);
env.metrics().set_pov_size(config.pov_sizes[0]);
let mut completed_count = 0;

for loop_num in 0..env.config().num_loops {
gum::info!(target: LOG_TARGET, loop_num, "Starting loop");
Expand All @@ -754,9 +766,10 @@ pub async fn bench_chunk_recovery(env: &mut TestEnvironment) {
.await;
}

gum::info!("{} requests pending, {} completed", batch.len(), completed_count);
gum::info!("{} requests pending", batch.len());
while let Some(completed) = batch.next().await {
let available_data = completed.unwrap().unwrap();
env.metrics().on_pov_size(available_data.encoded_size());
availability_bytes += available_data.encoded_size() as u128;
}

Expand Down
63 changes: 0 additions & 63 deletions polkadot/node/subsystem-bench/src/availability/test_env.rs

This file was deleted.

15 changes: 15 additions & 0 deletions polkadot/node/subsystem-bench/src/core/display.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
Loading