Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'upstream/master' into HEAD
  • Loading branch information
tomaka committed Oct 24, 2019
commit a25be8e6e4511895e768b8d9feaad06d1d1ff6bd
9 changes: 8 additions & 1 deletion core/finality-grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
announce_sender: periodic::BlockAnnounceSender<B>,
}

impl<B: BlockT<Hash = H256>, N: Network<B>> NetworkBridge<B, N> {
impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
/// Create a new NetworkBridge to the given NetworkService. Returns the service
/// handle and a future that must be polled to completion to finish startup.
/// On creation it will register previous rounds' votes with the gossip
Expand Down Expand Up @@ -379,7 +379,9 @@ impl<B: BlockT<Hash = H256>, N: Network<B>> NetworkBridge<B, N> {
|to, neighbor| self.neighbor_sender.send(to, neighbor),
);
}
}

impl<B: BlockT<Hash = H256>, N: Network<B>> NetworkBridge<B, N> {
/// Get a stream of signature-checked round messages from the network as well as a sink for round messages to the
/// network all within the current set.
pub(crate) fn round_communication(
Expand Down Expand Up @@ -476,6 +478,9 @@ impl<B: BlockT<Hash = H256>, N: Network<B>> NetworkBridge<B, N> {
format!("Failed to receive on unbounded receiver for round {}", round.0)
));

// Combine incoming votes from external GRANDPA nodes with outgoing
// votes from our own GRANDPA voter to have a single
// vote-import-pipeline.
let incoming = stream::select(incoming, out_rx);

(incoming, outgoing)
Expand Down Expand Up @@ -522,7 +527,9 @@ impl<B: BlockT<Hash = H256>, N: Network<B>> NetworkBridge<B, N> {

(incoming, outgoing)
}
}

impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
/// Notifies the sync service to try and sync the given block from the given
/// peers.
///
Expand Down
5 changes: 3 additions & 2 deletions core/finality-grandpa/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,8 +603,9 @@ where
// schedule incoming messages from the network to be held until
// corresponding blocks are imported.
let incoming = Box::pin(UntilVoteTargetImported::new(
self.inner.import_notification_stream(),
self.inner.clone(),
self.client.import_notification_stream(),
self.network.clone(),
self.client.clone(),
incoming,
"round",
).map_err(Into::into));
Expand Down
26 changes: 16 additions & 10 deletions core/finality-grandpa/src/until_imported.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ pub(crate) trait BlockUntilImported<Block: BlockT>: Sized {
}

/// Buffering imported messages until blocks with given hashes are imported.
pub(crate) struct UntilImported<Block: BlockT, Status, I, M: BlockUntilImported<Block>> {
pub(crate) struct UntilImported<Block: BlockT, BlockStatus, BlockSyncRequester, I, M: BlockUntilImported<Block>> {
import_notifications: Fuse<ImportNotifications<Block>>,
status_check: Status,
block_sync_requester: BlockSyncRequester,
status_check: BlockStatus,
inner: Fuse<I>,
ready: VecDeque<M::Blocked>,
check_pending: Interval,
Expand Down Expand Up @@ -106,6 +107,7 @@ impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockSta

UntilImported {
import_notifications: import_notifications.fuse(),
block_sync_requester,
status_check,
inner: stream.fuse(),
ready: VecDeque::new(),
Expand All @@ -116,8 +118,10 @@ impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockSta
}
}

impl<Block: BlockT, Status, I, M> Stream for UntilImported<Block, Status, I, M> where
Status: BlockStatus<Block>,
impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStatus, BSyncRequester, I, M> where
Block: BlockT,
BStatus: BlockStatusT<Block>,
BSyncRequester: BlockSyncRequesterT<Block>,
I: Stream<Item = Result<M::Blocked, Error>> + Unpin,
M: BlockUntilImported<Block>,
{
Expand All @@ -140,7 +144,7 @@ impl<Block: BlockT, Status, I, M> Stream for UntilImported<Block, Status, I, M>
M::schedule_wait(
input,
&this.status_check,
|target_hash, wait| pending
|target_hash, target_number, wait| pending
.entry(target_hash)
.or_insert_with(|| (target_number, Instant::now(), Vec::new()))
.2
Expand All @@ -157,7 +161,7 @@ impl<Block: BlockT, Status, I, M> Stream for UntilImported<Block, Status, I, M>
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(notification)) => {
// new block imported. queue up all messages tied to that hash.
if let Some((_, messages)) = this.pending.remove(&notification.hash) {
if let Some((_, _, messages)) = this.pending.remove(&notification.hash) {
let canon_number = notification.header.number().clone();
let ready_messages = messages.into_iter()
.filter_map(|m| m.wait_completed(canon_number));
Expand All @@ -176,7 +180,7 @@ impl<Block: BlockT, Status, I, M> Stream for UntilImported<Block, Status, I, M>

if update_interval {
let mut known_keys = Vec::new();
for (&block_hash, &mut (ref mut last_log, ref v)) in &mut this.pending {
for (&block_hash, &mut (block_number, ref mut last_log, ref v)) in &mut this.pending {
if let Some(number) = this.status_check.block_number(block_hash)? {
known_keys.push((block_hash, number));
} else {
Expand All @@ -195,7 +199,7 @@ impl<Block: BlockT, Status, I, M> Stream for UntilImported<Block, Status, I, M>
// NOTE: when sending an empty vec of peers the
// underlying should make a best effort to sync the
// block from any peers it knows about.
self.block_sync_requester.set_sync_fork_request(
this.block_sync_requester.set_sync_fork_request(
vec![],
block_hash,
block_number,
Expand All @@ -207,7 +211,7 @@ impl<Block: BlockT, Status, I, M> Stream for UntilImported<Block, Status, I, M>
}

for (known_hash, canon_number) in known_keys {
if let Some((_, pending_messages)) = this.pending.remove(&known_hash) {
if let Some((_, _, pending_messages)) = this.pending.remove(&known_hash) {
let ready_messages = pending_messages.into_iter()
.filter_map(|m| m.wait_completed(canon_number));

Expand All @@ -228,7 +232,9 @@ impl<Block: BlockT, Status, I, M> Stream for UntilImported<Block, Status, I, M>
}
}

impl<Block: BlockT, Status, I, M: BlockUntilImported<Block>> Unpin for UntilImported<Block, Status, I, M> {

impl<Block: BlockT, BlockStatus, BlockSyncRequester, I, M: BlockUntilImported<Block>> Unpin for
UntilImported<Block, BlockStatus, BlockSyncRequester, I, M> {
}

fn warn_authority_wrong_target<H: ::std::fmt::Display>(hash: H, id: AuthorityId) {
Expand Down
2 changes: 1 addition & 1 deletion core/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,7 @@ ServiceBuilder<
.select(exit.clone())
.then(|_| Ok(()))));

let telemetry_connection_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<()>>>> = Default::default();
let telemetry_connection_sinks: Arc<Mutex<Vec<futures03::channel::mpsc::UnboundedSender<()>>>> = Default::default();

// Telemetry
let telemetry = config.telemetry_endpoints.clone().map(|endpoints| {
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.