Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Immediately print job output for fresh jobs
This prevents a deadlock where the message queue is filled with output
messages but not emptied as the job producing the messages runs on the
same thread as the message processing.
  • Loading branch information
bjorn3 committed Nov 10, 2020
commit 0583081d2a1c5a1d1e321ba834bb1ca317e1a5f1
13 changes: 7 additions & 6 deletions src/cargo/core/compiler/custom_build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ fn emit_build_output(
output: &BuildOutput,
out_dir: &Path,
package_id: PackageId,
) {
) -> CargoResult<()> {
let library_paths = output
.library_paths
.iter()
Expand All @@ -144,7 +144,8 @@ fn emit_build_output(
out_dir,
}
.to_json_string();
state.stdout(msg);
state.stdout(msg)?;
Ok(())
}

fn build_work(cx: &mut Context<'_, '_>, unit: &Unit) -> CargoResult<Job> {
Expand Down Expand Up @@ -353,13 +354,13 @@ fn build_work(cx: &mut Context<'_, '_>, unit: &Unit) -> CargoResult<Job> {
warnings_in_case_of_panic.push(warning.to_owned());
}
if extra_verbose {
state.stdout(format!("{}{}", prefix, stdout));
state.stdout(format!("{}{}", prefix, stdout))?;
}
Ok(())
},
&mut |stderr| {
if extra_verbose {
state.stderr(format!("{}{}", prefix, stderr));
state.stderr(format!("{}{}", prefix, stderr))?;
}
Ok(())
},
Expand Down Expand Up @@ -396,7 +397,7 @@ fn build_work(cx: &mut Context<'_, '_>, unit: &Unit) -> CargoResult<Job> {
BuildOutput::parse(&output.stdout, &pkg_name, &script_out_dir, &script_out_dir)?;

if json_messages {
emit_build_output(state, &parsed_output, script_out_dir.as_path(), id);
emit_build_output(state, &parsed_output, script_out_dir.as_path(), id)?;
}
build_script_outputs
.lock()
Expand All @@ -421,7 +422,7 @@ fn build_work(cx: &mut Context<'_, '_>, unit: &Unit) -> CargoResult<Job> {
};

if json_messages {
emit_build_output(state, &output, script_out_dir.as_path(), id);
emit_build_output(state, &output, script_out_dir.as_path(), id)?;
}

build_script_outputs
Expand Down
63 changes: 38 additions & 25 deletions src/cargo/core/compiler/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,11 @@ pub struct JobState<'a> {
/// Channel back to the main thread to coordinate messages and such.
messages: Arc<Queue<Message>>,

/// Normally messages are handled in a bounded way. When the job is fresh
/// however we need to immediately return to prevent a deadlock as the messages
/// are processed on the same thread as they are sent from.
messages_bounded: bool,
/// Normally output is sent to the job queue with backpressure. When the job is fresh
/// however we need to immediately display the output to prevent a deadlock as the
/// output messages are processed on the same thread as they are sent from. `output`
/// defines where to output in this case.
output: Option<&'a Config>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you include some notes that it is crucial that this is only set on the main thread? There are several concerns:

  • The RefMut from shell can cause panics if used between threads.
  • The event loop is carefully crafted to avoid flickering of the progress bar. By splitting the output into different places, this makes it harder to ensure batches of messages are grouped together. I think the change is OK because it is all on the main thread, but it is pretty subtle.
  • Keeping one thread responsible for output helps prevent interleaving of messages. In particular, some messages are not printed atomically (like the "status" messages).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Config is not Sync, so it isn't possible to assign it when not running on the main thread. Also because Shell is wrapped in a RefCell it isn't possible to borrow it twice at the same time to interleave output.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Still, please add a comment discussing the concerns about the coordination of ownership of the output. My concern about interleaving is about the future, and things to watch out for if this is ever changed (like wrapping output in a mutex). It might also help to explain how JobState works with respect to how it is constructed and passed into the job threads.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interleaving would still not be possible when the RefCell is replaced with a Mutex as the mutex would be locked for the duration of the printing of a single message. It would be hard to accidentally unlock the mutex in the middle of printing.

I can add some more docs to JobState.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


/// The job id that this state is associated with, used when sending
/// messages back to the main thread.
Expand Down Expand Up @@ -236,20 +237,24 @@ impl<'a> JobState<'a> {
.push(Message::BuildPlanMsg(module_name, cmd, filenames));
}

pub fn stdout(&self, stdout: String) {
if self.messages_bounded {
self.messages.push_bounded(Message::Stdout(stdout));
pub fn stdout(&self, stdout: String) -> CargoResult<()> {
if let Some(config) = self.output {
writeln!(config.shell().out(), "{}", stdout)?;
} else {
self.messages.push(Message::Stdout(stdout));
self.messages.push_bounded(Message::Stdout(stdout));
}
Ok(())
}

pub fn stderr(&self, stderr: String) {
if self.messages_bounded {
self.messages.push_bounded(Message::Stderr(stderr));
pub fn stderr(&self, stderr: String) -> CargoResult<()> {
if let Some(config) = self.output {
let mut shell = config.shell();
shell.print_ansi(stderr.as_bytes())?;
shell.err().write_all(b"\n")?;
} else {
self.messages.push(Message::Stderr(stderr));
self.messages.push_bounded(Message::Stderr(stderr));
}
Ok(())
}

/// A method used to signal to the coordinator thread that the rmeta file
Expand Down Expand Up @@ -839,17 +844,9 @@ impl<'cfg> DrainState<'cfg> {
self.note_working_on(cx.bcx.config, unit, fresh)?;
}

let doit = move || {
let state = JobState {
id,
messages: messages.clone(),
messages_bounded: job.freshness() == Freshness::Dirty,
rmeta_required: Cell::new(rmeta_required),
_marker: marker::PhantomData,
};

let doit = move |state: JobState<'_>| {
let mut sender = FinishOnDrop {
messages: &messages,
messages: &state.messages,
id,
result: None,
};
Expand All @@ -868,7 +865,9 @@ impl<'cfg> DrainState<'cfg> {
// we need to make sure that the metadata is flagged as produced so
// send a synthetic message here.
if state.rmeta_required.get() && sender.result.as_ref().unwrap().is_ok() {
messages.push(Message::Finish(id, Artifact::Metadata, Ok(())));
state
.messages
.push(Message::Finish(state.id, Artifact::Metadata, Ok(())));
}

// Use a helper struct with a `Drop` implementation to guarantee
Expand Down Expand Up @@ -898,11 +897,25 @@ impl<'cfg> DrainState<'cfg> {
self.timings.add_fresh();
// Running a fresh job on the same thread is often much faster than spawning a new
// thread to run the job.
doit();
doit(JobState {
id,
messages: messages.clone(),
output: Some(cx.bcx.config),
rmeta_required: Cell::new(rmeta_required),
_marker: marker::PhantomData,
});
}
Freshness::Dirty => {
self.timings.add_dirty();
scope.spawn(move |_| doit());
scope.spawn(move |_| {
doit(JobState {
id,
messages: messages.clone(),
output: None,
rmeta_required: Cell::new(rmeta_required),
_marker: marker::PhantomData,
})
});
}
}

Expand Down
12 changes: 6 additions & 6 deletions src/cargo/core/compiler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ fn link_targets(cx: &mut Context<'_, '_>, unit: &Unit, fresh: bool) -> CargoResu
fresh,
}
.to_json_string();
state.stdout(msg);
state.stdout(msg)?;
}
Ok(())
}))
Expand Down Expand Up @@ -1139,7 +1139,7 @@ fn on_stdout_line(
_package_id: PackageId,
_target: &Target,
) -> CargoResult<()> {
state.stdout(line.to_string());
state.stdout(line.to_string())?;
Ok(())
}

Expand Down Expand Up @@ -1177,7 +1177,7 @@ fn on_stderr_line_inner(
// something like that), so skip over everything that doesn't look like a
// JSON message.
if !line.starts_with('{') {
state.stderr(line.to_string());
state.stderr(line.to_string())?;
return Ok(true);
}

Expand All @@ -1189,7 +1189,7 @@ fn on_stderr_line_inner(
// to stderr.
Err(e) => {
debug!("failed to parse json: {:?}", e);
state.stderr(line.to_string());
state.stderr(line.to_string())?;
return Ok(true);
}
};
Expand Down Expand Up @@ -1225,7 +1225,7 @@ fn on_stderr_line_inner(
.map(|v| String::from_utf8(v).expect("utf8"))
.expect("strip should never fail")
};
state.stderr(rendered);
state.stderr(rendered)?;
return Ok(true);
}
}
Expand Down Expand Up @@ -1316,7 +1316,7 @@ fn on_stderr_line_inner(
// Switch json lines from rustc/rustdoc that appear on stderr to stdout
// instead. We want the stdout of Cargo to always be machine parseable as
// stderr has our colorized human-readable messages.
state.stdout(msg);
state.stdout(msg)?;
Ok(true)
}

Expand Down