Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bin/node/browser-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license = "Apache-2.0"

[dependencies]
futures-timer = "3.0.2"
libp2p = { version = "0.22.0", default-features = false }
libp2p = { version = "0.23.0", default-features = false }
jsonrpc-core = "14.2.0"
serde = "1.0.106"
serde_json = "1.0.48"
Expand Down
2 changes: 1 addition & 1 deletion bin/utils/subkey/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ derive_more = { version = "0.99.2" }
sc-rpc = { version = "2.0.0-rc5", path = "../../../client/rpc" }
jsonrpc-core-client = { version = "14.2.0", features = ["http"] }
hyper = "0.12.35"
libp2p = { version = "0.22.0", default-features = false }
libp2p = { version = "0.23.0", default-features = false }
serde_json = "1.0"

[features]
Expand Down
2 changes: 1 addition & 1 deletion client/authority-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ codec = { package = "parity-scale-codec", default-features = false, version = "1
derive_more = "0.99.2"
futures = "0.3.4"
futures-timer = "3.0.1"
libp2p = { version = "0.22.0", default-features = false, features = ["kad"] }
libp2p = { version = "0.23.0", default-features = false, features = ["kad"] }
log = "0.4.8"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.8.0-rc5"}
prost = "0.6.1"
Expand Down
2 changes: 1 addition & 1 deletion client/network-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
futures = "0.3.4"
futures-timer = "3.0.1"
libp2p = { version = "0.22.0", default-features = false }
libp2p = { version = "0.23.0", default-features = false }
log = "0.4.8"
lru = "0.4.3"
sc-network = { version = "0.8.0-rc5", path = "../network" }
Expand Down
13 changes: 9 additions & 4 deletions client/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ erased-serde = "0.3.9"
fnv = "1.0.6"
fork-tree = { version = "2.0.0-rc5", path = "../../utils/fork-tree" }
futures = "0.3.4"
futures-timer = "3.0.1"
futures-timer = "3.0.2"
futures_codec = "0.4.0"
hex = "0.4.0"
ip_network = "0.3.4"
Expand Down Expand Up @@ -60,16 +60,21 @@ void = "1.0.2"
wasm-timer = "0.2"
zeroize = "1.0.0"

[target.'cfg(target_arch="wasm32")'.dependencies]
async-std = { version = "1.6.2", features = ["unstable"] }

[target.'cfg(not(target_arch="wasm32"))'.dependencies]
async-std = { version = "1.6.2" }

[dependencies.libp2p]
version = "0.22.0"
version = "0.23.0"
default-features = false
features = ["identify", "kad", "mdns", "mplex", "noise", "ping", "tcp-async-std", "websocket", "yamux"]

[dev-dependencies]
async-std = "1.6.2"
assert_matches = "1.3"
env_logger = "0.7.0"
libp2p = { version = "0.22.0", default-features = false, features = ["secio"] }
libp2p = { version = "0.23.0", default-features = false, features = ["secio"] }
quickcheck = "0.9.0"
rand = "0.7.2"
sp-keyring = { version = "2.0.0-rc5", path = "../../primitives/keyring" }
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
/// Local copy of the `PeerId` of the local node.
local_peer_id: PeerId,
/// Bandwidth logging system. Can be queried to know the average bandwidth consumed.
bandwidth: Arc<transport::BandwidthSinks>,
bandwidth: Arc<transport::BandwidthMonitor>,
/// Peerset manager (PSM); manages the reputation of nodes and indicates the network which
/// nodes it should be connected to or not.
peerset: PeersetHandle,
Expand Down
134 changes: 130 additions & 4 deletions client/network/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use async_std::task;
use futures::prelude::*;
use futures_timer::Delay;
use libp2p::{
InboundUpgradeExt, OutboundUpgradeExt, PeerId, Transport,
core::{
Expand All @@ -27,7 +29,14 @@ use libp2p::{
};
#[cfg(not(target_os = "unknown"))]
use libp2p::{tcp, dns, websocket};
use std::{io, sync::Arc, time::Duration, usize};
use std::{
iter,
io,
ops::Range,
sync::{Arc, atomic::{Ordering, AtomicU64}},
time::Duration,
};
use void::Void;

pub use self::bandwidth::BandwidthSinks;

Expand All @@ -43,7 +52,7 @@ pub fn build_transport(
memory_only: bool,
wasm_external_transport: Option<wasm_ext::ExtTransport>,
use_yamux_flow_control: bool
) -> (Boxed<(PeerId, StreamMuxerBox), io::Error>, Arc<bandwidth::BandwidthSinks>) {
) -> (Boxed<(PeerId, StreamMuxerBox), io::Error>, Arc<BandwidthMonitor>) {
// Build configuration objects for encryption mechanisms.
let noise_config = {
// For more information about these two panics, see in "On the Importance of
Expand Down Expand Up @@ -104,7 +113,8 @@ pub fn build_transport(
OptionalTransport::none()
});

let (transport, sinks) = bandwidth::BandwidthLogging::new(transport, Duration::from_secs(5));
let (transport, sinks) = bandwidth::BandwidthLogging::new(transport);
let bandwidth = BandwidthMonitor::new(sinks);

// Encryption
let transport = transport.and_then(move |stream, endpoint| {
Expand Down Expand Up @@ -145,5 +155,121 @@ pub fn build_transport(
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.boxed();

(transport, sinks)
(transport, bandwidth)
}

/// A monitor for network bandwidth usage.
pub struct BandwidthMonitor {
avg_inbound_per_sec: AtomicU64,
avg_outbound_per_sec: AtomicU64,
}

/// The number of data points, i.e. total bandwidth usage
/// measurements, tracked for moving average bandwidth
/// calculation.
const DATA_POINTS: usize = 5;
/// The number of consecutive pairs of data points,
/// i.e. windows of size 2, for moving average bandwidth
/// calculation.
const DATA_WINDOWS: usize = DATA_POINTS - 1;

/// The data of the background task that updates the `BandwidthMonitor`.
struct BandwidthTask {
timer: Delay,
data: [(u64, u64); DATA_POINTS],
index: iter::Cycle<Range<usize>>,
input: Arc<BandwidthSinks>,
output: Arc<BandwidthMonitor>,
}

impl BandwidthTask {
/// Runs the task that periodically updates the `BandwidthMonitor`
/// from the `BandwidthSinks`.
async fn run(mut self) -> Void {
loop {
future::poll_fn(|cx| self.timer.poll_unpin(cx)).await;

let inbound = self.input.total_inbound();
let outbound = self.input.total_outbound();

let ix = self.index.next().expect("Cycle never ends; qed");
self.data[ix] = (inbound, outbound);

let mut i = ix;
let (mut rx, mut tx): (u64, u64) = (0,0);
for _ in 0 .. DATA_WINDOWS {
let j = dec_mod(i);
let (rx_i, tx_i) = self.data[i];
let (rx_j, tx_j) = self.data[j];
rx = rx.saturating_add(rx_i - rx_j);
tx = tx.saturating_add(tx_i - tx_j);
i = j;
}
let rx_avg = rx / DATA_WINDOWS as u64;
let tx_avg = tx / DATA_WINDOWS as u64;

self.output.avg_inbound_per_sec.store(rx_avg, Ordering::Relaxed);
self.output.avg_outbound_per_sec.store(tx_avg, Ordering::Relaxed);

self.timer.reset(Duration::from_secs(1));
}
}
}

impl BandwidthMonitor {
/// The (moving) average number of bytes sent per second
/// within the last few seconds.
pub fn average_upload_per_sec(&self) -> u64 {
self.avg_outbound_per_sec.load(Ordering::Relaxed)
}

/// The (moving) average number of bytes received per second
/// within the last few seconds.
pub fn average_download_per_sec(&self) -> u64 {
self.avg_inbound_per_sec.load(Ordering::Relaxed)
}

fn new(input: Arc<BandwidthSinks>) -> Arc<Self> {
let monitor = Arc::new(BandwidthMonitor {
avg_inbound_per_sec: AtomicU64::new(0),
avg_outbound_per_sec: AtomicU64::new(0),
});

let task = BandwidthTask {
input,
data: [(0,0); DATA_POINTS],
index: (0..DATA_POINTS).into_iter().cycle(),
timer: Delay::new(Duration::from_secs(1)),
output: monitor.clone(),
};

Self::spawn(task);

monitor
}

#[cfg(not(target_arch = "wasm32"))]
fn spawn(task: BandwidthTask) {
task::Builder::new()
.name("network-bandwidth".to_string())
.spawn(task.run())
.expect("Failed to spawn network-bandwidth task.");
}

#[cfg(target_arch = "wasm32")]
fn spawn(task: BandwidthTask) {
task::Builder::new()
.name("network-bandwidth".to_string())
.local(task.run())
.expect("Failed to spawn network-bandwidth task.");
}
}

/// Decrements in `Z/{DATA_POINTS}Z`.
fn dec_mod(n: usize) -> usize {
if n == 0 {
DATA_POINTS - 1
} else {
n - 1
}
}
2 changes: 1 addition & 1 deletion client/network/test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ parking_lot = "0.10.0"
futures = "0.3.4"
futures-timer = "3.0.1"
rand = "0.7.2"
libp2p = { version = "0.22.0", default-features = false }
libp2p = { version = "0.23.0", default-features = false }
sp-consensus = { version = "0.8.0-rc5", path = "../../../primitives/consensus/common" }
sc-consensus = { version = "0.8.0-rc5", path = "../../../client/consensus/common" }
sc-client-api = { version = "2.0.0-rc5", path = "../../api" }
Expand Down
2 changes: 1 addition & 1 deletion client/peerset/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
futures = "0.3.4"
libp2p = { version = "0.22.0", default-features = false }
libp2p = { version = "0.23.0", default-features = false }
sp-utils = { version = "2.0.0-rc5", path = "../../primitives/utils"}
log = "0.4.8"
serde_json = "1.0.41"
Expand Down
2 changes: 1 addition & 1 deletion client/telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ parking_lot = "0.10.0"
futures = "0.3.4"
futures-timer = "3.0.1"
wasm-timer = "0.2.0"
libp2p = { version = "0.22.0", default-features = false, features = ["dns", "tcp-async-std", "wasm-ext", "websocket"] }
libp2p = { version = "0.23.0", default-features = false, features = ["dns", "tcp-async-std", "wasm-ext", "websocket"] }
log = "0.4.8"
pin-project = "0.4.6"
rand = "0.7.2"
Expand Down
2 changes: 1 addition & 1 deletion primitives/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
derive_more = "0.99.2"
libp2p = { version = "0.22.0", default-features = false }
libp2p = { version = "0.23.0", default-features = false }
log = "0.4.8"
sp-core = { path= "../../core", version = "2.0.0-rc5"}
sp-inherents = { version = "2.0.0-rc5", path = "../../inherents" }
Expand Down