Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 1 commit
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
0bb1eb7
Add BlockStream Enum and utility fn
Apr 24, 2020
b291ff1
WIP: Modify import closure to work with BlockStream
Apr 24, 2020
49b0144
Fix trait bounds
Apr 24, 2020
cb6aded
Working prototype
Apr 24, 2020
155695d
Revamp block importing
Apr 28, 2020
e59e1ad
Add export_import_flow tests
Apr 28, 2020
7b729fe
Merge branch 'master' of github.com:paritytech/substrate into scott_d…
Apr 28, 2020
69a8bff
Add comments and clean code
Apr 28, 2020
ab173a5
Add more comments in the import fn
Apr 28, 2020
e22ed9b
Add link code to import function
Apr 28, 2020
00fbd6c
Add condition when returning Ready(Ok(()) to make sure we've imported…
Apr 28, 2020
e7a3c4d
Add check for imported blocks in JSON case
Apr 28, 2020
56d49fa
Use rest pattern
Apr 29, 2020
ff75523
Fix compilation error for undeclared variable
Apr 29, 2020
7c66af2
Add polling and waker before pending
Apr 29, 2020
419b0ab
Print read_block_count instead of count
Apr 29, 2020
be1d626
Simplify binary cli option with structopt
Apr 29, 2020
a67a75d
Update test to reflect changes in CLI api
Apr 29, 2020
e1ce1aa
Change Stream to take SignedBlock<B> instead of B
Apr 30, 2020
30f60de
Merge branch 'master' of github.com:paritytech/substrate into scott_d…
Apr 30, 2020
ecf8729
Add comments to BlockStream
Apr 30, 2020
7b8f34c
Move out logic to smaller functions for clearer code
Apr 30, 2020
1a9e910
Merge branch 'master' of github.com:paritytech/substrate into scott_d…
Apr 30, 2020
61260b5
Remove result over import_blocks return type
Apr 30, 2020
4ca8fe6
Check for error in command output rather than simply checking command…
May 1, 2020
c986b44
Revamp export/import/revert testing
May 1, 2020
8b3a17a
Fix minor typos and formatting errors
pscott May 6, 2020
c57415b
Remove unnecessary if condition in terminating condition
pscott May 6, 2020
32eb4af
Explicit error instead of returning it as a string
pscott May 6, 2020
412e942
Pass BlockStream to log_importing_status_updates and simplify matchin…
May 6, 2020
2266d67
Use .contains() instead of regex match
May 6, 2020
f62d5e0
Line break in match block; return future::ready instead of poll_fn
May 6, 2020
ad44445
Update Cargo.lock
May 6, 2020
17f4c74
Add check so that queue doesn't grow too big
May 6, 2020
994157c
Use Iterator instead of Stream
May 7, 2020
4d1234e
Remove allow dead_code
May 7, 2020
37ae67b
Merge branch 'master' of github.com:paritytech/substrate into scott_d…
May 7, 2020
757693d
Remove outdated comments
pscott May 7, 2020
9865fae
Return Errors instead of logging them
May 7, 2020
1150f12
Simplify match arms
pscott May 8, 2020
377823c
Remove check before terminating block import
May 8, 2020
576606c
Apply suggestions from code review
bkchr May 11, 2020
9b4658a
Check that queue is not full BEFORE calling
May 12, 2020
76a5954
Revert "Remove check before terminating block import"
May 12, 2020
f848a05
Improve unit tests to make sure we actually import blocks
May 13, 2020
d924935
Remove Unpin implementation for BlockIter
May 13, 2020
3d00933
Add prototype of enum for ImportStates
May 18, 2020
fb040fc
Add working prototype for StateMachine
May 18, 2020
4f9bcf8
Add comments for clearer code
May 18, 2020
9a134eb
Add sleep before calling Waker when waiting for import queue
May 19, 2020
fdd3683
Merge branch 'master' of github.com:paritytech/substrate into scott_d…
May 19, 2020
d98c92d
Add Speedometer
May 19, 2020
344286e
Merge branch 'master' of github.com:paritytech/substrate into scott_d…
May 20, 2020
5bdc4fd
add dbg!(&log) for test debugging
May 21, 2020
5d1f1e3
Fix lines with more than 100 cols
May 21, 2020
768c309
Merge branch 'master' of github.com:paritytech/substrate into scott_d…
May 21, 2020
a17cc1e
Fix regex capture for test
May 21, 2020
9d7d7a1
Update regexes to take to capture the whole number
May 21, 2020
03a5cc8
Merge branch 'master' of github.com:paritytech/substrate into scott_d…
May 21, 2020
ef02ec3
Rename Cmd to Command
pscott May 21, 2020
fb91c53
Actually rename Cmd to Command
May 21, 2020
4f067ef
Apply suggestions from code review
gnunicorn May 22, 2020
0c47a45
Merge branch 'master' of github.com:paritytech/substrate into scott_d…
May 22, 2020
196de60
Fix compilation errors for tests
May 22, 2020
731f1b7
Merge branch 'scott_default_json_import_blocks' of github.com:parityt…
May 22, 2020
f43f3de
Fix compilation errors from code review suggestion
May 22, 2020
cdc1290
Update bin/node/cli/tests/export_import_flow.rs
gavofyork May 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move out logic to smaller functions for clearer code
  • Loading branch information
Scott Piriou committed Apr 30, 2020
commit 7b8f34c5f9c56d7a2844d367985f765c4d04b45f
174 changes: 93 additions & 81 deletions client/service/src/chain_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,69 @@ impl<R, B> Stream for BlockStream<R, B>
}
}

/// Imports the SignedBlock to the queue.
fn import_block_to_queue<TBl, TImpQu>(signed_block: SignedBlock<TBl>, queue: &mut TImpQu, force: bool)
where
TBl: BlockT + MaybeSerializeDeserialize,
TImpQu: 'static + ImportQueue<TBl>,
{
let (header, extrinsics) = signed_block.block.deconstruct();
let hash = header.hash();
// import queue handles verification and importing it into the client.
queue.import_blocks(BlockOrigin::File, vec![
IncomingBlock::<TBl> {
hash,
header: Some(header),
body: Some(extrinsics),
justification: signed_block.justification,
origin: None,
allow_missing_state: false,
import_existing: force,
}
]);
}

/// Returns true if we have imported every block we were supposed to import, else returns false.
fn importing_is_done(count: Option<u64>, read_block_count: u64, imported_blocks: u64) -> bool {
if let Some(count) = count {
if imported_blocks >= count {
return true
}
} else {
if imported_blocks >= read_block_count {
return true
}
}
false
}

// Logs information regarding the current importing status.
fn log_importing_status_updates(count: Option<u64>, read_block_count: u64, blocks_before: u64, imported_blocks: u64) {
// Notify every 1000 blocks to let the user know everything is running smoothly.
if read_block_count % 1000 == 0 {
info!(
"#{} blocks were added to the queue",
read_block_count
);
}

// Notify every time we import an additional 1000 blocks.
if imported_blocks / 1000 != blocks_before / 1000 {
if let Some(count) = count {
info!(
"#{} blocks were imported (#{} left)",
imported_blocks,
count - imported_blocks
)
} else {
info!(
"{} blocks were imported, other are still being processed",
imported_blocks,
)
}
}
}

impl<
TBl, TRtApi, TBackend,
TExecDisp, TFchr, TSc, TImpQu, TFprb, TFpp,
Expand Down Expand Up @@ -228,100 +291,49 @@ impl<
match Pin::new(&mut block_stream).poll_next(cx) {
Poll::Ready(None) => {
let read_block_count = block_stream.read_block_count();
if let Some(count) = block_stream.count() {
if link.imported_blocks >= count {
info!(
"🎉 Imported {} blocks. Best: #{}",
read_block_count, client.chain_info().best_number
);
// We're done importing blocks, we can stop here.
return std::task::Poll::Ready(Ok(()))
} else {
queue.poll_actions(cx, &mut link);
cx.waker().wake_by_ref();
return std::task::Poll::Pending
}
} else {
if link.imported_blocks >= read_block_count {
info!(
"🎉 Imported {} blocks. Best: #{}",
read_block_count, client.chain_info().best_number
);
// We're done importing blocks, we can stop here.
return std::task::Poll::Ready(Ok(()))
} else {
queue.poll_actions(cx, &mut link);
cx.waker().wake_by_ref();
return std::task::Poll::Pending
}
let count = block_stream.count();

if importing_is_done(count, read_block_count, link.imported_blocks) {
info!(
"🎉 Imported {} blocks. Best: #{}",
read_block_count, client.chain_info().best_number
);
return std::task::Poll::Ready(Ok(()))
}
},
Poll::Ready(Some(block_result)) => {
let read_block_count = block_stream.read_block_count();
match block_result {
Ok(signed) => {
let (header, extrinsics) = signed.block.deconstruct();
let hash = header.hash();
// import queue handles verification and importing it into the client.
queue.import_blocks(BlockOrigin::File, vec![
IncomingBlock::<Self::Block> {
hash,
header: Some(header),
body: Some(extrinsics),
justification: signed.justification,
origin: None,
allow_missing_state: false,
import_existing: force,
}
]);

// Print a message every 1000 blocks to let the user everything's running smoothly.
if read_block_count % 1000 == 0 {
info!(
"#{} blocks were added to the queue",
read_block_count
);
}

queue.poll_actions(cx, &mut link);
if link.has_error {
info!(
"Stopping after #{} blocks because of an error",
link.imported_blocks,
);
// We've encountered an error, we can stop here.
return std::task::Poll::Ready(Ok(()));
}

let blocks_before = link.imported_blocks;
if link.imported_blocks / 1000 != blocks_before / 1000 {
if let Some(count) = block_stream.count() {
info!(
"#{} blocks were imported (#{} left)",
link.imported_blocks,
count - link.imported_blocks
)
} else {
info!(
"{} blocks were imported, other are still being processed",
link.imported_blocks,
)
}
}
}
Ok(signed_block) => import_block_to_queue(signed_block, queue, force),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we need to check how many blocks are already in the queue and wait if there's more than say 1024

Copy link
Contributor Author

@pscott pscott May 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I tried implementing this in 17f4c74, please let me know what you think! :)

Err(e) => {
let read_block_count = block_stream.read_block_count();
warn!("Error reading block data at {}: {}", read_block_count, e);
// We've encountered an error, we can stop here.
return std::task::Poll::Ready(Ok(()));
}
}

cx.waker().wake_by_ref();
return std::task::Poll::Pending;
},
}
Poll::Pending => unreachable!("BlockStream.poll_next() should never return Poll::Pending; qed")
}

let blocks_before = link.imported_blocks;
queue.poll_actions(cx, &mut link);

if link.has_error {
info!(
"Stopping after #{} blocks because of an error",
link.imported_blocks,
);
// We've encountered an error, we can stop here.
return std::task::Poll::Ready(Ok(()));
}

let read_block_count = block_stream.read_block_count();
let count = block_stream.count();
log_importing_status_updates(count, read_block_count, blocks_before, link.imported_blocks);


cx.waker().wake_by_ref();
return std::task::Poll::Pending;
});
Ok(Box::pin(import))
}
Expand Down