Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions one_collect/src/perf_event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,18 @@ impl Drop for PerfSession {
impl PerfSession {
pub fn new(
source: Box<dyn PerfDataSource>) -> Self {
/* Increase rlimit for open files */
unsafe {
let mut limit = libc::rlimit {
rlim_cur: 0,
rlim_max: 0,
};

if libc::getrlimit(libc::RLIMIT_NOFILE, &mut limit) == 0 {
limit.rlim_cur = limit.rlim_max;
libc::setrlimit(libc::RLIMIT_NOFILE, &limit);
}
}

let session = Self {
source,
Expand Down
3 changes: 2 additions & 1 deletion one_collect/src/perf_event/rb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ impl RingBufBuilder {
FLAG_SAMPLE_ID_ALL |
FLAG_DISABLED |
FLAG_EXCLUDE_HV |
FLAG_EXCLUDE_IDLE,
FLAG_EXCLUDE_IDLE |
FLAG_INHERIT,
clockid: CLOCK_MONOTONIC_RAW,
read_format: abi::PERF_FORMAT_ID,
sample_type: abi::PERF_SAMPLE_IDENTIFIER |
Expand Down
29 changes: 27 additions & 2 deletions one_collect/src/perf_event/rb/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,24 @@ impl RingBufDataSource {
Ok(())
}

fn tasks_for_pids(pids: &mut Vec<i32>) {
let mut tasks = HashSet::new();

/* Find all unique tasks IDs */
for pid in pids.drain(..) {
tasks.insert(pid);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we insert pid here, then we're going to end up with duplicate pids when this function returns, no?

Copy link
Collaborator Author

@beaubelgrave beaubelgrave Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tasks is a HashSet, so we will only keep unique task IDs. I wanted to ensure we did not have duplicates. For example, if /proc/<pid>/task is given a task, it will include parent tasks. The hashset is preventing it, and pids is drained fully, so we only end up with a unique set after this function (pids drain into a hashset that then repopulates the drained pids vec).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right - I missed that it calls drain to empty them out.


procfs::iter_proc_tasks(
pid as u32,
|task| { tasks.insert(task as i32); });
}

/* Update PIDs with unique tasks */
for task in tasks.drain() {
pids.push(task);
}
}

fn build(&mut self) -> IOResult<()> {
/* Always required */
let common = self.kernel_builder
Expand All @@ -471,8 +489,15 @@ impl RingBufDataSource {

let empty_pids = Vec::new();

let pids = match &self.target_pids {
Some(pids) => { pids },
let target_pids = &mut self.target_pids.as_mut();

let pids = match target_pids {
Some(pids) => {
/* Populate current tasks for PIDs */
Self::tasks_for_pids(pids);

pids
},
None => { &empty_pids },
};

Expand Down
46 changes: 46 additions & 0 deletions one_collect/src/procfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,52 @@ pub fn ns_pid(
None
}

/// Iterates over the current tasks within a process.
///
/// # Parameters
///
/// * `pid`: The process ID for which to iterate over.
/// * `callback`: A mutable closure that takes a `u32` reference as its argument and returns nothing.
/// This closure is called for each task.
///
pub fn iter_proc_tasks(
pid: u32,
mut callback: impl FnMut(u32)) {
let mut path_buf = PathBuf::new();
path_buf.push("/proc");
path_buf.push_u32(pid);
path_buf.push("task");

let dirs = fs::read_dir(path_buf);

if let Ok(dirs) = dirs {
for entry in dirs {
if let Ok(entry) = entry {
let path = entry.path();

if path.components().count() == 5 {
let mut iter = path.iter();

iter.next(); // "/"
iter.next(); // "proc"
iter.next(); // "<pid>"
iter.next(); // "task"

if let Some(task_str) = iter.next() { // "<task>"
let s = task_str.to_str().unwrap();

if let Ok(task) = s.parse::<u32>() {
if task != pid {
(callback)(task);
}
}
}
}
}
}
}
}

/// Iterates over the memory modules of a process and applies a callback function to each module.
///
/// The function reads the `/proc/{pid}/maps` file to get the list of memory modules.
Expand Down