This repository was archived by the owner on Nov 15, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Speed up logging once again #9981
Merged
paritytech-processbot
merged 12 commits into
paritytech:master
from
koute:master_faster_logging
Oct 21, 2021
Merged
Changes from 1 commit
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
bda46c2
Update `tracing`-related dependencies
koute 853214f
Enable `parking_lot` feature in `tracing-subscriber`
koute 00e85ee
Add an asynchronous stderr logger
koute 3981339
Make clippy happy
koute d6947b6
Add an integration test for the logger
koute 5ae74c8
Refactor `test_logger_filters`'s subprocess machinery into a separate…
koute 0fd0f24
Use a child process instead of hooking into stderr for the test
koute 08103bd
Add a doc comment for `MakeStderrWriter`
koute bfa8f52
Move the initialization into the `MakeStderrWriter`'s constructor
koute be83a6c
Add an extra test case to trigger the logger's emergency flush mechanism
koute ce69d8c
Use the buffer's mutex for asynchronous flushes
koute 949d1e1
Remove vestigial `nix` dependency from one of the previous commits
koute File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Add an integration test for the logger
- Loading branch information
commit d6947b6d7540f31118f14d276e2a3fc4196de42b
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,137 @@ | ||
| #![cfg(unix)] | ||
|
|
||
| use log::info; | ||
| use sc_tracing::logging::LoggerBuilder; | ||
| use std::{ | ||
| collections::BTreeMap, | ||
| fs::File, | ||
| io::{BufRead, BufReader}, | ||
| os::unix::io::{FromRawFd, RawFd}, | ||
| sync::{ | ||
| atomic::{AtomicBool, AtomicUsize, Ordering}, | ||
| mpsc::{channel, Receiver}, | ||
| Arc, | ||
| }, | ||
| time::Duration, | ||
| }; | ||
|
|
||
| // Here we use UNIX trickery to replace stderr of the process with our own handle. | ||
| // We use `println` + `abort` in case of errors since just `unwrap`ing is going to print to stderr. | ||
| fn replace_stderr_with(new_fd: RawFd) { | ||
koute marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if let Err(error) = nix::unistd::close(libc::STDERR_FILENO) { | ||
| println!("error: failed to close stderr: {}", error); | ||
| unsafe { libc::abort() } | ||
| } | ||
|
|
||
| match nix::fcntl::fcntl(new_fd, nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(2)) { | ||
| Ok(libc::STDERR_FILENO) => {}, | ||
| Ok(fd) => { | ||
| println!("error: failed to replace stderr: unexpected fd: {}", fd); | ||
| unsafe { libc::abort() } | ||
| }, | ||
| Err(error) => { | ||
| println!("error: failed to replace stderr: {}", error); | ||
| unsafe { libc::abort() } | ||
| }, | ||
| } | ||
| } | ||
|
|
||
| struct RestoreStderrOnDrop(RawFd); | ||
| impl Drop for RestoreStderrOnDrop { | ||
| fn drop(&mut self) { | ||
| replace_stderr_with(self.0); | ||
| } | ||
| } | ||
|
|
||
| fn hook_stderr() -> (RestoreStderrOnDrop, Receiver<String>) { | ||
| let (pipe_rx, pipe_tx) = nix::unistd::pipe().unwrap(); | ||
| let stderr_copy = nix::unistd::dup(libc::STDERR_FILENO).unwrap(); | ||
| replace_stderr_with(pipe_tx); | ||
| let restore_stderr_on_drop = RestoreStderrOnDrop(stderr_copy); | ||
|
|
||
| let (channel_tx, channel_rx) = channel(); | ||
| let pipe_rx = unsafe { File::from_raw_fd(pipe_rx) }; | ||
| std::thread::spawn(move || { | ||
| let pipe_rx = BufReader::new(pipe_rx); | ||
| for line in pipe_rx.lines() { | ||
| if let Ok(line) = line { | ||
| let _ = channel_tx.send(line); | ||
| } else { | ||
| break | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| (restore_stderr_on_drop, channel_rx) | ||
| } | ||
|
|
||
| // This creates a bunch of threads and makes sure they start executing | ||
| // a given callback almost exactly at the same time. | ||
| fn run_on_many_threads(thread_count: usize, callback: impl Fn(usize) + 'static + Send + Clone) { | ||
| let started_count = Arc::new(AtomicUsize::new(0)); | ||
| let barrier = Arc::new(AtomicBool::new(false)); | ||
| let threads: Vec<_> = (0..thread_count) | ||
| .map(|nth_thread| { | ||
| let started_count = started_count.clone(); | ||
| let barrier = barrier.clone(); | ||
| let callback = callback.clone(); | ||
|
|
||
| std::thread::spawn(move || { | ||
| started_count.fetch_add(1, Ordering::SeqCst); | ||
| while !barrier.load(Ordering::SeqCst) { | ||
| std::thread::yield_now(); | ||
| } | ||
|
|
||
| callback(nth_thread); | ||
| }) | ||
| }) | ||
| .collect(); | ||
|
|
||
| while started_count.load(Ordering::SeqCst) != thread_count { | ||
| std::thread::yield_now(); | ||
| } | ||
| barrier.store(true, Ordering::SeqCst); | ||
|
|
||
| for thread in threads { | ||
| if let Err(error) = thread.join() { | ||
| println!("error: failed to join thread: {:?}", error); | ||
| unsafe { libc::abort() } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_logger() { | ||
| const THREAD_COUNT: usize = 128; | ||
| const LOGS_PER_THREAD: usize = 1024; | ||
|
|
||
| let builder = LoggerBuilder::new(""); | ||
| builder.init().unwrap(); | ||
|
|
||
| let (restore_stderr_on_drop, stderr) = hook_stderr(); | ||
|
|
||
| run_on_many_threads(THREAD_COUNT, |nth_thread| { | ||
| for _ in 0..LOGS_PER_THREAD { | ||
| info!("Thread <<{}>>", nth_thread); | ||
| } | ||
| }); | ||
|
|
||
| // Wait until the logger flushes itself. | ||
| std::thread::sleep(Duration::from_millis(100)); | ||
koute marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| std::mem::drop(restore_stderr_on_drop); | ||
|
|
||
| let mut count_per_thread = BTreeMap::new(); | ||
| while let Ok(line) = stderr.try_recv() { | ||
| if let Some(index_s) = line.find("Thread <<") { | ||
| let index_s = index_s + "Thread <<".len(); | ||
| let index_e = line.find(">>").unwrap(); | ||
| let nth_thread: usize = line[index_s..index_e].parse().unwrap(); | ||
| *count_per_thread.entry(nth_thread).or_insert(0) += 1; | ||
| } | ||
| } | ||
|
|
||
| assert_eq!(count_per_thread.len(), THREAD_COUNT); | ||
| for (_, count) in count_per_thread { | ||
| assert_eq!(count, LOGS_PER_THREAD); | ||
| } | ||
| } | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.