Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Serialize result of PVF preparation uniquely
  • Loading branch information
slumber committed Sep 23, 2021
commit 8bdc7d3791aec014a3f1dd26b2291add569bc07a
65 changes: 34 additions & 31 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use async_std::{
use futures::FutureExt as _;
use futures_timer::Delay;
use parity_scale_codec::{Decode, Encode};
use sp_core::hexdisplay::HexDisplay;
use std::{sync::Arc, time::Duration};

const NICENESS_BACKGROUND: i32 = 10;
Expand Down Expand Up @@ -114,37 +115,39 @@ pub async fn start_work(
let selected = futures::select! {
res = framed_recv(&mut stream).fuse() => {
match res {
Ok(x) if x == &[1u8] => {
tracing::debug!(
target: LOG_TARGET,
worker_pid = %pid,
"promoting WIP artifact {} to {}",
tmp_file.display(),
artifact_path.display(),
);

async_std::fs::rename(&tmp_file, &artifact_path)
.await
.map(|_| Selected::Done(Ok(())))
.unwrap_or_else(|err| {
tracing::warn!(
Ok(response_bytes) => {
// By convention we expect encoded `Result<(), PrepareError>`.
if let Ok(result) =
<Result<(), PrepareError>>::decode(&mut response_bytes.clone().as_slice())
{
if result.is_ok() {
tracing::debug!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to rename the artifact from {} to {}: {:?}",
"promoting WIP artifact {} to {}",
tmp_file.display(),
artifact_path.display(),
err,
);
Selected::IoErr
})
}
Ok(response_bytes) => {
use sp_core::hexdisplay::HexDisplay;
// By convention we expect encoded prepare error.
// If bytes cannot be deserialized, return io error.
if let Ok(error) = PrepareError::decode(&mut response_bytes.clone().as_slice()) {
Selected::Done(Err(error))

async_std::fs::rename(&tmp_file, &artifact_path)
.await
.map(|_| Selected::Done(result))
.unwrap_or_else(|err| {
tracing::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to rename the artifact from {} to {}: {:?}",
tmp_file.display(),
artifact_path.display(),
err,
);
Selected::IoErr
})
} else {
Selected::Done(result)
}
} else {
// We received invalid bytes from the worker.
let bound_bytes = &response_bytes[..response_bytes.len().min(4)];
tracing::warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -282,15 +285,15 @@ pub fn worker_entrypoint(socket_path: &str) {
"worker: preparing artifact",
);

let bytes = match prepare_artifact(&code) {
let result = match prepare_artifact(&code) {
Artifact::Error(err) => {
// Serialized error will be written into the socket.
err.encode()
Err(err)
},
Artifact::Compiled(compiled_artifact) => {
// Write the serialized artifact into a temp file.
// Since a compiled artifact can be heavy, we only send a single
// byte to indicate the success.
// Since a compiled artifact can be heavy, we send an empty
// `Ok` to indicate the success.
let artifact_bytes = compiled_artifact.encode();

tracing::debug!(
Expand All @@ -301,11 +304,11 @@ pub fn worker_entrypoint(socket_path: &str) {
);
async_std::fs::write(&dest, &artifact_bytes).await?;

vec![1u8]
Ok(())
},
};

framed_send(&mut stream, &bytes).await?;
framed_send(&mut stream, result.encode().as_slice()).await?;
}
});
}
Expand Down