Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c6be9e5
expunge legacy code from polkadot-network
rphmeier Feb 20, 2020
953e292
mostly rip out old legacy protocol from service
rphmeier Feb 24, 2020
7a19620
ensure validation work is spawned by incoming messages
rphmeier Feb 24, 2020
134f3e5
decouple availabliity store from network logic; clean up data flow
rphmeier Feb 25, 2020
eaef68f
av_store: test helpers and use futures-abort
rphmeier Feb 26, 2020
4e1feb8
update polkadot-validation to pass n_validators when submitting chunks
rphmeier Feb 26, 2020
bf4443c
fallible erasure-chunk fetching
rphmeier Feb 26, 2020
697c136
implement `ErasureNetworking` for new network prot
rphmeier Feb 26, 2020
3416863
API for registering availability store in network
rphmeier Feb 26, 2020
457d406
fully integrate new network service into service
rphmeier Feb 26, 2020
8c65487
fix validation tests
rphmeier Feb 27, 2020
81ecaef
scaffolding for porting collator over to new network
rphmeier Feb 29, 2020
f86c52e
track connected validators' peer IDs and distribute collators' collat…
rphmeier Mar 2, 2020
bbef84c
helper in network for fetching all checked statements
rphmeier Mar 2, 2020
8bf595f
Merge branch 'master' into rh-remove-legacy-network
rphmeier Mar 2, 2020
16b79f2
fix adder-collator
rphmeier Mar 2, 2020
22c97dd
actually register notifications protocol
rphmeier Mar 3, 2020
0f53937
Update service/src/lib.rs
rphmeier Mar 5, 2020
9e76603
Make needed changes to service
expenses Mar 5, 2020
71a2d56
Merge two companion PRs.
gavofyork Mar 5, 2020
a4696ca
Some effort towards compilation
gavofyork Mar 5, 2020
20da96b
Fix
gavofyork Mar 5, 2020
85accd3
Merge remote-tracking branch 'origin/rh-remove-legacy-network' into g…
gavofyork Mar 5, 2020
99c63ea
Merge branch 'master' into gav-upsub
rphmeier Mar 5, 2020
da6dab4
remove `NetworkSpecialization` references from network
rphmeier Mar 5, 2020
de2dde0
fix compilation errors in service and collator
rphmeier Mar 5, 2020
3bf1967
ensure protocol name is valid
rphmeier Mar 5, 2020
80adf68
Fixes
gavofyork Mar 5, 2020
3b7d923
Fix
gavofyork Mar 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 28 additions & 43 deletions availability-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub use worker::AvailabilityBlockImport;
pub use store::AwaitedFrontierEntry;

use worker::{
Worker, WorkerHandle, Chunks, IncludedParachainBlocks, WorkerMsg, MakeAvailable,
Worker, WorkerHandle, IncludedParachainBlocks, WorkerMsg, MakeAvailable, Chunks
};

use store::{Store as InnerStore};
Expand All @@ -70,23 +70,7 @@ pub struct Config {
pub path: PathBuf,
}

/// Compute gossip topic for the erasure chunk messages given the relay parent,
/// root and the chunk index.
///
/// Since at this point we are not able to use [`network`] directly, but both
/// of them need to compute these topics, this lives here and not there.
///
/// [`network`]: ../polkadot_network/index.html
pub fn erasure_coding_topic(relay_parent: Hash, erasure_root: Hash, index: u32) -> Hash {
let mut v = relay_parent.as_ref().to_vec();
v.extend(erasure_root.as_ref());
v.extend(&index.to_le_bytes()[..]);
v.extend(b"erasure_chunks");

BlakeTwo256::hash(&v[..])
}

/// A trait that provides a shim for the [`NetworkService`] trait.
/// An abstraction around networking for the availablity-store.
///
/// Currently it is not possible to use the networking code in the availability store
/// core directly due to a number of loop dependencies it require:
Expand All @@ -95,26 +79,25 @@ pub fn erasure_coding_topic(relay_parent: Hash, erasure_root: Hash, index: u32)
///
/// `availability-store` -> `network` -> `validation` -> `availability-store`
///
/// So we provide this shim trait that gets implemented for a wrapper newtype in
/// the [`network`] module.
/// So we provide this trait that gets implemented for a type in
/// the [`network`] module or a mock in tests.
///
/// [`NetworkService`]: ../polkadot_network/trait.NetworkService.html
/// [`network`]: ../polkadot_network/index.html
pub trait ProvideGossipMessages {
/// Get a stream of gossip erasure chunk messages for a given topic.
///
/// Each item is a tuple (relay_parent, candidate_hash, erasure_chunk)
fn gossip_messages_for(
pub trait ErasureNetworking {
/// Errors that can occur when fetching erasure chunks.
type Error: std::fmt::Debug + 'static;

/// Fetch an erasure chunk from the networking service.
fn fetch_erasure_chunk(
&self,
topic: Hash,
) -> Pin<Box<dyn Stream<Item = (Hash, Hash, ErasureChunk)> + Send>>;
candidate_hash: &Hash,
index: u32,
) -> Pin<Box<dyn Future<Output = Result<ErasureChunk, Self::Error>> + Send>>;

/// Gossip an erasure chunk message.
fn gossip_erasure_chunk(
/// Distributes an erasure chunk to the correct validator node.
fn distribute_erasure_chunk(
&self,
relay_parent: Hash,
candidate_hash: Hash,
erasure_root: Hash,
chunk: ErasureChunk,
);
}
Expand Down Expand Up @@ -148,11 +131,11 @@ impl Store {
/// Creating a store among other things starts a background worker thread which
/// handles most of the write operations to the storage.
#[cfg(not(target_os = "unknown"))]
pub fn new<PGM>(config: Config, gossip: PGM) -> io::Result<Self>
where PGM: ProvideGossipMessages + Send + Sync + Clone + 'static
pub fn new<EN>(config: Config, network: EN) -> io::Result<Self>
where EN: ErasureNetworking + Send + Sync + Clone + 'static
{
let inner = InnerStore::new(config)?;
let worker = Arc::new(Worker::start(inner.clone(), gossip));
let worker = Arc::new(Worker::start(inner.clone(), network));
let to_worker = worker.to_worker().clone();

Ok(Self {
Expand All @@ -166,11 +149,11 @@ impl Store {
///
/// Creating a store among other things starts a background worker thread
/// which handles most of the write operations to the storage.
pub fn new_in_memory<PGM>(gossip: PGM) -> Self
where PGM: ProvideGossipMessages + Send + Sync + Clone + 'static
pub fn new_in_memory<EN>(network: EN) -> Self
where EN: ErasureNetworking + Send + Sync + Clone + 'static
{
let inner = InnerStore::new_in_memory();
let worker = Arc::new(Worker::start(inner.clone(), gossip));
let worker = Arc::new(Worker::start(inner.clone(), network));
let to_worker = worker.to_worker().clone();

Self {
Expand Down Expand Up @@ -204,7 +187,6 @@ impl Store {
let to_worker = self.to_worker.clone();

let import = AvailabilityBlockImport::new(
self.inner.clone(),
client,
wrapped_block_import,
spawner,
Expand Down Expand Up @@ -261,35 +243,38 @@ impl Store {
pub async fn add_erasure_chunk(
&self,
candidate: AbridgedCandidateReceipt,
n_validators: u32,
chunk: ErasureChunk,
) -> io::Result<()> {
self.add_erasure_chunks(candidate, vec![chunk]).await
self.add_erasure_chunks(candidate, n_validators, std::iter::once(chunk)).await
}

/// Adds a set of erasure chunks to storage.
///
/// The chunks should be checked for validity against the root of encoding
/// and it's proof prior to calling this.
/// and its proof prior to calling this.
///
/// This method will send the chunks to the background worker, allowing caller to
/// asynchrounously waiting for the result.
pub async fn add_erasure_chunks<I>(
&self,
candidate: AbridgedCandidateReceipt,
n_validators: u32,
chunks: I,
) -> io::Result<()>
where I: IntoIterator<Item = ErasureChunk>
{
let candidate_hash = candidate.hash();
let relay_parent = candidate.relay_parent;

self.add_candidate(candidate).await?;

let (s, r) = oneshot::channel();
let chunks = chunks.into_iter().collect();

let msg = WorkerMsg::Chunks(Chunks {
relay_parent,
candidate_hash,
chunks,
n_validators,
result: s,
});

Expand Down
69 changes: 33 additions & 36 deletions availability-store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,33 +60,31 @@ fn candidate_key(candidate_hash: &Hash) -> Vec<u8> {
(candidate_hash, 2i8).encode()
}

fn available_chunks_key(relay_parent: &Hash, erasure_root: &Hash) -> Vec<u8> {
(relay_parent, erasure_root, 3i8).encode()
}

fn candidates_with_relay_parent_key(relay_block: &Hash) -> Vec<u8> {
(relay_block, 4i8).encode()
}

// meta keys
fn awaited_chunks_key() -> [u8; 14] {
*b"awaited_chunks"
}
const AWAITED_CHUNKS_KEY: [u8; 14] = *b"awaited_chunks";

fn validator_index_and_n_validators_key(relay_parent: &Hash) -> Vec<u8> {
(relay_parent, 1i8).encode()
}

fn available_chunks_key(candidate_hash: &Hash) -> Vec<u8> {
(candidate_hash, 2i8).encode()
}

/// An entry in the awaited frontier of chunks we are interested in.
#[derive(Encode, Decode, Debug, Hash, PartialEq, Eq, Clone)]
pub struct AwaitedFrontierEntry {
/// The relay-chain parent block hash.
/// The hash of the candidate for which we want to fetch a chunk for.
/// There will be duplicate entries in the case of multiple candidates with
/// the same erasure-root, but this is unlikely.
pub candidate_hash: Hash,
/// Although the relay-parent is implicitly referenced by the candidate hash,
/// we include it here as well for convenience in pruning the set.
pub relay_parent: Hash,
/// The erasure-chunk trie root we are comparing against.
///
/// We index by erasure-root because there may be multiple candidates
/// with the same erasure root.
pub erasure_root: Hash,
/// The index of the validator we represent.
pub validator_index: u32,
}
Expand Down Expand Up @@ -153,7 +151,7 @@ impl Store {

/// Get a set of all chunks we are waiting for.
pub fn awaited_chunks(&self) -> Option<HashSet<AwaitedFrontierEntry>> {
self.query_inner(columns::META, &awaited_chunks_key()).map(|vec: Vec<AwaitedFrontierEntry>| {
self.query_inner(columns::META, &AWAITED_CHUNKS_KEY).map(|vec: Vec<AwaitedFrontierEntry>| {
HashSet::from_iter(vec.into_iter())
})
}
Expand Down Expand Up @@ -183,21 +181,21 @@ impl Store {
if let Some((validator_index, _)) = self.get_validator_index_and_n_validators(relay_parent) {
let candidates = candidates.clone();
let awaited_frontier: Vec<AwaitedFrontierEntry> = self
.query_inner(columns::META, &awaited_chunks_key())
.query_inner(columns::META, &AWAITED_CHUNKS_KEY)
.unwrap_or_else(|| Vec::new());

let mut awaited_frontier: HashSet<AwaitedFrontierEntry> =
HashSet::from_iter(awaited_frontier.into_iter());

awaited_frontier.extend(candidates.iter().filter_map(|candidate| {
self.get_candidate(&candidate).map(|receipt| AwaitedFrontierEntry {
awaited_frontier.extend(candidates.iter().cloned().map(|candidate_hash| {
AwaitedFrontierEntry {
relay_parent: relay_parent.clone(),
erasure_root: receipt.commitments.erasure_root,
candidate_hash,
validator_index,
})
}
}));
let awaited_frontier = Vec::from_iter(awaited_frontier.into_iter());
tx.put_vec(columns::META, &awaited_chunks_key(), awaited_frontier.encode());
tx.put_vec(columns::META, &AWAITED_CHUNKS_KEY, awaited_frontier.encode());
}

let mut descendent_candidates = self.get_candidates_with_relay_parent(relay_parent);
Expand Down Expand Up @@ -246,15 +244,12 @@ impl Store {

let mut v = self.query_inner(columns::DATA, &dbkey).unwrap_or(Vec::new());

let av_chunks_key = available_chunks_key(
&receipt.relay_parent,
&receipt.commitments.erasure_root,
);
let av_chunks_key = available_chunks_key(candidate_hash);
let mut have_chunks = self.query_inner(columns::META, &av_chunks_key).unwrap_or(Vec::new());

let awaited_frontier: Option<Vec<AwaitedFrontierEntry>> = self.query_inner(
columns::META,
&awaited_chunks_key(),
&AWAITED_CHUNKS_KEY,
);

for chunk in chunks.into_iter() {
Expand All @@ -268,19 +263,21 @@ impl Store {
awaited_frontier.retain(|entry| {
!(
entry.relay_parent == receipt.relay_parent &&
entry.erasure_root == receipt.commitments.erasure_root &&
&entry.candidate_hash == candidate_hash &&
have_chunks.contains(&entry.validator_index)
)
});
tx.put_vec(columns::META, &awaited_chunks_key(), awaited_frontier.encode());
tx.put_vec(columns::META, &AWAITED_CHUNKS_KEY, awaited_frontier.encode());
}

// If therea are no block data in the store at this point,
// If there are no block data in the store at this point,
// check that they can be reconstructed now and add them to store if they can.
if self.execution_data(&candidate_hash).is_none() {
if let Ok(available_data) = erasure::reconstruct(
n_validators as usize,
v.iter().map(|chunk| (chunk.chunk.as_ref(), chunk.index as usize))) {
v.iter().map(|chunk| (chunk.chunk.as_ref(), chunk.index as usize)),
)
{
self.make_available(*candidate_hash, available_data)?;
}
}
Expand Down Expand Up @@ -339,11 +336,11 @@ impl Store {
let mut tx = DBTransaction::new();

let awaited_frontier: Option<Vec<AwaitedFrontierEntry>> = self
.query_inner(columns::META, &awaited_chunks_key());
.query_inner(columns::META, &AWAITED_CHUNKS_KEY);

if let Some(mut awaited_frontier) = awaited_frontier {
awaited_frontier.retain(|entry| entry.relay_parent != relay_parent);
tx.put_vec(columns::META, &awaited_chunks_key(), awaited_frontier.encode());
tx.put_vec(columns::META, &AWAITED_CHUNKS_KEY, awaited_frontier.encode());
}

let candidates = self.get_candidates_with_relay_parent(&relay_parent);
Expand All @@ -354,6 +351,8 @@ impl Store {
tx.delete(columns::DATA, execution_data_key(&candidate).as_slice());
tx.delete(columns::DATA, &erasure_chunks_key(&candidate));
tx.delete(columns::DATA, &candidate_key(&candidate));

tx.delete(columns::META, &available_chunks_key(&candidate));
}

self.inner.write(tx)
Expand Down Expand Up @@ -576,7 +575,6 @@ mod tests {
proof: Vec::new(),
};
let candidates = vec![receipt_1_hash, receipt_2_hash];
let erasure_roots = vec![erasure_root_1, erasure_root_2];

let store = Store::new_in_memory();

Expand All @@ -596,10 +594,9 @@ mod tests {
let expected: HashSet<_> = candidates
.clone()
.into_iter()
.zip(erasure_roots.iter())
.map(|(_c, &e)| AwaitedFrontierEntry {
.map(|c| AwaitedFrontierEntry {
relay_parent,
erasure_root: e,
candidate_hash: c,
validator_index,
})
.collect();
Expand All @@ -612,7 +609,7 @@ mod tests {
// Now we wait for the other chunk that we haven't received yet.
let expected: HashSet<_> = vec![AwaitedFrontierEntry {
relay_parent,
erasure_root: erasure_roots[1],
candidate_hash: receipt_2_hash,
validator_index,
}].into_iter().collect();

Expand Down
Loading