Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Merge branch 'master' into light_sync_deadlock_fix
  • Loading branch information
svyatonik committed Jul 16, 2018
commit 4ab810c502b1767dd6e07cfcaabc24c491956fc0
13 changes: 8 additions & 5 deletions substrate/network/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ impl<B: BlockT> ImportQueue<B> for AsyncImportQueue<B> {
}

fn import_blocks(&self, _sync: &mut ChainSync<B>, _protocol: &mut Context<B>, blocks: (BlockOrigin, Vec<BlockData<B>>)) {
trace!(target:"sync", "Scheduling {} blocks for import", blocks.1.len());

let mut queue = self.data.queue.lock();
let mut queue_blocks = self.data.queue_blocks.write();
let mut best_importing_number = self.data.best_importing_number.write();
Expand All @@ -145,6 +147,8 @@ impl<B: BlockT> Drop for AsyncImportQueue<B> {

/// Blocks import thread.
fn import_thread<B: BlockT, E: ExecuteInContext<B>>(sync: Weak<RwLock<ChainSync<B>>>, service: Weak<E>, chain: Weak<Client<B>>, qdata: Arc<AsyncImportQueueData<B>>) {
trace!(target: "sync", "Starting import thread");

loop {
let new_blocks = {
let mut queue_lock = qdata.queue.lock();
Expand All @@ -157,7 +161,6 @@ fn import_thread<B: BlockT, E: ExecuteInContext<B>>(sync: Weak<RwLock<ChainSync<
None => break,
}
};

if qdata.is_stopping.load(Ordering::SeqCst) {
break;
}
Expand Down Expand Up @@ -238,6 +241,7 @@ fn import_many_blocks<'a, B: BlockT>(
let (blocks_origin, blocks) = blocks;
let count = blocks.len();
let mut imported = 0;

// Blocks in the response/drain should be in ascending order.
for block in blocks {
let import_result = import_single_block(link.chain(), blocks_origin.clone(), block);
Expand Down Expand Up @@ -286,8 +290,8 @@ fn import_single_block<B: BlockT>(
let result = chain.import(
block_origin,
header,
justification.to_justification(),
block.body.map(|b| b.to_extrinsics()),
justification,
block.body,
);
match result {
Ok(ImportResult::AlreadyInChain) => {
Expand Down Expand Up @@ -487,8 +491,7 @@ pub mod tests {
body: None,
receipt: None,
message_queue: None,
justification: Some(::message::generic::BlockJustification::V2(
client.justification(&BlockId::Number(1)).unwrap().unwrap())),
justification: client.justification(&BlockId::Number(1)).unwrap(),
};

(client, hash, number, BlockData { block, origin: 0 })
Expand Down
2 changes: 1 addition & 1 deletion substrate/network/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ pub mod generic {
3 => Some(Message::BlockAnnounce(Decode::decode(input)?)),
4 => Some(Message::Transactions(Decode::decode(input)?)),
5 => Some(Message::BftMessage(Decode::decode(input)?)),
6 => Some(Message::RemoteCallResponse(Decode::decode(input)?)),
6 => Some(Message::RemoteCallRequest(Decode::decode(input)?)),
7 => Some(Message::RemoteCallResponse(Decode::decode(input)?)),
255 => Some(Message::ChainSpecific(Decode::decode(input)?)),
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion substrate/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use message::{self, Message};
use message::generic::Message as GenericMessage;
use specialization::Specialization;
use sync::{ChainSync, Status as SyncStatus, SyncState};
use service::{Role, TransactionPool};
use service::{Roles, TransactionPool};
use import_queue::ImportQueue;
use config::ProtocolConfig;
use chain::Client;
Expand Down
17 changes: 7 additions & 10 deletions substrate/network/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use blocks::{self, BlockCollection};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor};
use runtime_primitives::generic::BlockId;
use message::{self, generic::Message as GenericMessage};
use service::Role;
use service::Roles;
use import_queue::ImportQueue;

// Maximum blocks to request in a single packet.
Expand Down Expand Up @@ -55,7 +55,7 @@ pub struct ChainSync<B: BlockT> {
blocks: BlockCollection<B>,
best_queued_number: NumberFor<B>,
best_queued_hash: B::Hash,
required_block_attributes: Vec<message::BlockAttribute>,
required_block_attributes: message::BlockAttributes,
import_queue: Arc<ImportQueue<B>>,
}

Expand All @@ -79,13 +79,10 @@ pub struct Status<B: BlockT> {

impl<B: BlockT> ChainSync<B> {
/// Create a new instance.
pub(crate) fn new(role: Role, info: &ClientInfo<B>, import_queue: Arc<ImportQueue<B>>) -> Self {
let mut required_block_attributes = vec![
message::BlockAttribute::Header,
message::BlockAttribute::Justification
];
if role.intersects(Role::FULL) {
required_block_attributes.push(message::BlockAttribute::Body);
pub(crate) fn new(role: Roles, info: &ClientInfo<B>, import_queue: Arc<ImportQueue<B>>) -> Self {
let mut required_block_attributes = message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION;
if role.intersects(Roles::FULL) {
required_block_attributes |= message::BlockAttributes::BODY;
}

ChainSync {
Expand All @@ -94,7 +91,7 @@ impl<B: BlockT> ChainSync<B> {
blocks: BlockCollection::new(),
best_queued_hash: info.best_queued_hash.unwrap_or(info.chain.best_hash),
best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number),
required_block_attributes: required_block_attributes,
required_block_attributes,
import_queue,
}
}
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.