Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ assert_matches = "1.4.0"
cpu-time = "1.0.0"
futures = "0.3.21"
futures-timer = "3.0.2"
slotmap = "1.0"
gum = { package = "tracing-gum", path = "../../gum" }
pin-project = "1.0.9"
rand = "0.8.5"
rayon = "1.5.1"
slotmap = "1.0"
tempfile = "3.3.0"
tikv-jemalloc-ctl = "0.5.0"
tokio = { version = "1.24.2", features = ["fs", "process"] }
rayon = "1.5.1"

parity-scale-codec = { version = "3.3.0", default-features = false, features = ["derive"] }

Expand All @@ -38,8 +39,11 @@ sp-wasm-interface = { git = "https://github.com/paritytech/substrate", branch =
sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }

[target.'cfg(target_os = "linux")'.dependencies]
libc = "0.2.139"

[dev-dependencies]
adder = { package = "test-parachain-adder", path = "../../../parachain/test-parachains/adder" }
halt = { package = "test-parachain-halt", path = "../../../parachain/test-parachains/halt" }
hex-literal = "0.3.4"
tempfile = "3.2.0"
tempfile = "3.3.0"
2 changes: 1 addition & 1 deletion node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ mod tests {
let pulse = pulse_every(Duration::from_millis(100));
futures::pin_mut!(pulse);

for _ in 0usize..5usize {
for _ in 0..5 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the unrelated change, but this really triggered me.

let start = std::time::Instant::now();
let _ = pulse.next().await.unwrap();

Expand Down
60 changes: 60 additions & 0 deletions node/core/pvf/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,27 @@ impl Metrics {
pub(crate) fn time_execution(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.execution_time.start_timer())
}

/// Observe max_rss for preparation.
pub(crate) fn observe_preparation_max_rss(&self, max_rss: f64) {
if let Some(metrics) = &self.0 {
metrics.preparation_max_rss.observe(max_rss);
}
}

/// Observe max resident memory for preparation.
pub(crate) fn observe_preparation_max_resident(&self, max_resident_kb: f64) {
if let Some(metrics) = &self.0 {
metrics.preparation_max_resident.observe(max_resident_kb);
}
}

/// Observe max allocated memory for preparation.
pub(crate) fn observe_preparation_max_allocated(&self, max_allocated_kb: f64) {
if let Some(metrics) = &self.0 {
metrics.preparation_max_allocated.observe(max_allocated_kb);
}
}
}

#[derive(Clone)]
Expand All @@ -85,6 +106,9 @@ struct MetricsInner {
execute_finished: prometheus::Counter<prometheus::U64>,
preparation_time: prometheus::Histogram,
execution_time: prometheus::Histogram,
preparation_max_rss: prometheus::Histogram,
preparation_max_allocated: prometheus::Histogram,
preparation_max_resident: prometheus::Histogram,
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -202,6 +226,42 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
preparation_max_rss: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_pvf_preparation_max_rss",
"ru_maxrss (maximum resident set size) observed for preparation (in kilobytes)",
).buckets(
prometheus::exponential_buckets(8192.0, 2.0, 10)
.expect("arguments are always valid; qed"),
),
)?,
registry,
)?,
preparation_max_resident: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_pvf_preparation_max_resident",
"max resident memory observed for preparation (in kilobytes)",
).buckets(
prometheus::exponential_buckets(8192.0, 2.0, 10)
.expect("arguments are always valid; qed"),
),
)?,
registry,
)?,
preparation_max_allocated: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_pvf_preparation_max_allocated",
"max allocated memory observed for preparation (in kilobytes)",
).buckets(
prometheus::exponential_buckets(8192.0, 2.0, 10)
.expect("arguments are always valid; qed"),
),
)?,
registry,
)?,
};
Ok(Metrics(Some(inner)))
}
Expand Down
243 changes: 243 additions & 0 deletions node/core/pvf/src/prepare/memory_stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
// Copyright 2023 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Memory stats for preparation.
//!
//! Right now we gather three measurements:
//!
//! - `ru_maxrss` (resident set size) from `getrusage`.
//! - `resident` memory stat provided by `tikv-malloc-ctl`.
//! - `allocated` memory stat also from `tikv-malloc-ctl`.
//!
//! Currently we are only logging these for the purposes of gathering data. In the future, we may
//! use these stats to reject PVFs during pre-checking. See
//! <https://github.com/paritytech/polkadot/issues/6472#issuecomment-1381941762> for more
//! background.

use crate::{metrics::Metrics, LOG_TARGET};
use parity_scale_codec::{Decode, Encode};
use std::{
io,
sync::mpsc::{Receiver, RecvTimeoutError, Sender},
time::Duration,
};
use tikv_jemalloc_ctl::{epoch, stats, Error};
use tokio::task::JoinHandle;

#[cfg(target_os = "linux")]
use libc::{getrusage, rusage, timeval, RUSAGE_THREAD};

/// Helper struct to contain all the memory stats, including [`MemoryAllocationStats`] and, if
/// supported by the OS, `ru_maxrss`.
#[derive(Encode, Decode)]
pub struct MemoryStats {
/// Memory stats from `tikv_jemalloc_ctl`.
pub memory_tracker_stats: Option<MemoryAllocationStats>,
/// `ru_maxrss` from `getrusage`. A string error since `io::Error` is not `Encode`able.
pub max_rss: Option<Result<i64, String>>,
}

/// Statistics of collected memory metrics.
#[non_exhaustive]
#[derive(Clone, Debug, Default, Encode, Decode)]
pub struct MemoryAllocationStats {
/// Total resident memory, in bytes.
pub resident: u64,
/// Total allocated memory, in bytes.
pub allocated: u64,
}

#[derive(Clone)]
struct MemoryAllocationTracker {
epoch: tikv_jemalloc_ctl::epoch_mib,
allocated: stats::allocated_mib,
resident: stats::resident_mib,
}

impl MemoryAllocationTracker {
pub fn new() -> Result<Self, Error> {
Ok(Self {
epoch: epoch::mib()?,
allocated: stats::allocated::mib()?,
resident: stats::resident::mib()?,
})
}

pub fn snapshot(&self) -> Result<MemoryAllocationStats, Error> {
// update stats by advancing the allocation epoch
self.epoch.advance()?;

// Convert to `u64`, as `usize` is not `Encode`able.
let allocated = self.allocated.read()? as u64;
let resident = self.resident.read()? as u64;
Ok(MemoryAllocationStats { allocated, resident })
}
}

/// Get the rusage stats for the current thread.
#[cfg(target_os = "linux")]
fn getrusage_thread() -> io::Result<rusage> {
let mut result = rusage {
ru_utime: timeval { tv_sec: 0, tv_usec: 0 },
ru_stime: timeval { tv_sec: 0, tv_usec: 0 },
ru_maxrss: 0,
ru_ixrss: 0,
ru_idrss: 0,
ru_isrss: 0,
ru_minflt: 0,
ru_majflt: 0,
ru_nswap: 0,
ru_inblock: 0,
ru_oublock: 0,
ru_msgsnd: 0,
ru_msgrcv: 0,
ru_nsignals: 0,
ru_nvcsw: 0,
ru_nivcsw: 0,
};
if unsafe { getrusage(RUSAGE_THREAD, &mut result) } == -1 {
return Err(io::Error::last_os_error())
}
Ok(result)
}

/// Gets the `ru_maxrss` for the current thread if the OS supports `getrusage`. Otherwise, just
/// returns `None`.
pub fn get_max_rss_thread() -> Option<io::Result<i64>> {
// `c_long` is either `i32` or `i64` depending on architecture. `i64::from` always works.
#[cfg(target_os = "linux")]
let max_rss = Some(getrusage_thread().map(|rusage| i64::from(rusage.ru_maxrss)));
#[cfg(not(target_os = "linux"))]
let max_rss = None;
max_rss
}

/// Runs a thread in the background that observes memory statistics. The goal is to try to get
/// accurate stats during preparation.
///
/// # Algorithm
///
/// 1. Create the memory tracker.
///
/// 2. Sleep for some short interval. Whenever we wake up, take a snapshot by updating the
/// allocation epoch.
///
/// 3. When we receive a signal that preparation has completed, take one last snapshot and return
/// the maximum observed values.
///
/// # Errors
///
/// For simplicity, any errors are returned as a string. As this is not a critical component, errors
/// are used for informational purposes (logging) only.
pub fn memory_tracker_loop(finished_rx: Receiver<()>) -> Result<MemoryAllocationStats, String> {
// This doesn't need to be too fine-grained since preparation currently takes 3-10s or more.
// Apart from that, there is not really a science to this number.
const POLL_INTERVAL: Duration = Duration::from_millis(100);

let tracker = MemoryAllocationTracker::new().map_err(|err| err.to_string())?;
let mut max_stats = MemoryAllocationStats::default();

let mut update_stats = || -> Result<(), String> {
let current_stats = tracker.snapshot().map_err(|err| err.to_string())?;
if current_stats.resident > max_stats.resident {
max_stats.resident = current_stats.resident;
}
if current_stats.allocated > max_stats.allocated {
max_stats.allocated = current_stats.allocated;
}
Ok(())
};

loop {
// Take a snapshot and update the max stats.
update_stats()?;

// Sleep.
match finished_rx.recv_timeout(POLL_INTERVAL) {
// Received finish signal.
Ok(()) => {
update_stats()?;
return Ok(max_stats)
},
// Timed out, restart loop.
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) =>
return Err("memory_tracker_loop: finished_rx disconnected".into()),
}
}
}

/// Helper function to terminate the memory tracker thread and get the stats. Helps isolate all this
/// error handling.
pub async fn get_memory_tracker_loop_stats(
fut: JoinHandle<Result<MemoryAllocationStats, String>>,
tx: Sender<()>,
) -> Option<MemoryAllocationStats> {
// Signal to the memory tracker thread to terminate.
if let Err(err) = tx.send(()) {
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: error sending signal to memory tracker_thread: {}", err
);
None
} else {
// Join on the thread handle.
match fut.await {
Ok(Ok(stats)) => Some(stats),
Ok(Err(err)) => {
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: error occurred in the memory tracker thread: {}", err
);
None
},
Err(err) => {
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: error joining on memory tracker thread: {}", err
);
None
},
}
}
}

/// Helper function to send the memory metrics, if available, to prometheus.
pub fn observe_memory_metrics(metrics: &Metrics, memory_stats: MemoryStats, pid: u32) {
if let Some(max_rss) = memory_stats.max_rss {
match max_rss {
Ok(max_rss) => metrics.observe_preparation_max_rss(max_rss as f64),
Err(err) => gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"error getting `ru_maxrss` in preparation thread: {}",
err
),
}
}

if let Some(tracker_stats) = memory_stats.memory_tracker_stats {
// We convert these stats from B to KB to match the unit of `ru_maxrss` from `getrusage`.
let resident_kb = (tracker_stats.resident / 1024) as f64;
let allocated_kb = (tracker_stats.allocated / 1024) as f64;

metrics.observe_preparation_max_resident(resident_kb);
metrics.observe_preparation_max_allocated(allocated_kb);
}
}
3 changes: 2 additions & 1 deletion node/core/pvf/src/prepare/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// Copyright 2021-2023 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
Expand All @@ -22,6 +22,7 @@
//! The pool will spawn workers in new processes and those should execute pass control to
//! [`worker_entrypoint`].

mod memory_stats;
mod pool;
mod queue;
mod worker;
Expand Down
Loading