diff --git a/core/parentchain/block-import-dispatcher/src/triggered_dispatcher.rs b/core/parentchain/block-import-dispatcher/src/triggered_dispatcher.rs index 0d335db780..d0cdb2fe70 100644 --- a/core/parentchain/block-import-dispatcher/src/triggered_dispatcher.rs +++ b/core/parentchain/block-import-dispatcher/src/triggered_dispatcher.rs @@ -163,6 +163,7 @@ where &self, predicate: impl Fn(&BlockImporter::SignedBlockType) -> bool, ) -> Result> { + trace!("Import of parentchain blocks and events has been triggered"); let blocks_to_import = self.import_queue.pop_until(predicate).map_err(Error::ImportQueue)?; @@ -174,7 +175,7 @@ where let latest_imported_block = blocks_to_import.last().map(|b| (*b).clone()); trace!( - "Import of parentchain blocks and events has been triggered, importing {} blocks and {} events from queue", + "Importing {} blocks and {} events from queue", blocks_to_import.len(), events_to_import.len(), ); diff --git a/service/src/account_funding.rs b/service/src/account_funding.rs index 787072fd60..2b22de4433 100644 --- a/service/src/account_funding.rs +++ b/service/src/account_funding.rs @@ -146,7 +146,7 @@ fn bootstrap_funds_from_alice( "funding amount is too high: please change EXISTENTIAL_DEPOSIT_FACTOR_FOR_INIT_FUNDS ({:?})", funding_amount ); - return Err(Error::ApplicationSetup) + return Err(Error::ApplicationSetup("Alice funds too low".into())) } let mut alice_signer_api = api.clone(); diff --git a/service/src/error.rs b/service/src/error.rs index c99f51e6fc..8004e6474f 100644 --- a/service/src/error.rs +++ b/service/src/error.rs @@ -38,8 +38,8 @@ pub enum Error { Serialization(#[from] serde_json::Error), #[error("{0}")] FromUtf8(#[from] std::string::FromUtf8Error), - #[error("Application setup error!")] - ApplicationSetup, + #[error("Application setup error: {0}")] + ApplicationSetup(String), #[error("Failed to find any peer worker")] NoPeerWorkerFound, #[error("No worker for shard {0} found on parentchain")] @@ -50,6 +50,8 @@ pub enum Error { MissingGenesisHeader, #[error("Could not find last finalized block of the parentchain")] MissingLastFinalizedBlock, + #[error("Error during the block sync process: {0}")] + BlockSync(String), #[error("{0}")] Custom(Box), } diff --git a/service/src/main.rs b/service/src/main.rs index 3fd1f3401c..eeb02fa840 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -387,9 +387,6 @@ fn start_worker( let tokio_handle = tokio_handle_getter.get_handle(); - #[cfg(feature = "teeracle")] - let teeracle_tokio_handle = tokio_handle.clone(); - // ------------------------------------------------------------------------ // Get the public key of our TEE. let tee_accountid = enclave_account(enclave.as_ref()); @@ -455,7 +452,7 @@ fn start_worker( &config, enclave.clone(), sidechain_storage.clone(), - tokio_handle, + &tokio_handle, ); } @@ -542,34 +539,39 @@ fn start_worker( initialization_handler.registered_on_parentchain(); - // ------------------------------------------------------------------------ - // initialize teeracle interval - #[cfg(feature = "teeracle")] - if WorkerModeProvider::worker_mode() == WorkerMode::Teeracle { - schedule_periodic_reregistration_thread( - send_register_xt, - run_config.reregister_teeracle_interval(), - ); + match WorkerModeProvider::worker_mode() { + WorkerMode::Teeracle => { + // ------------------------------------------------------------------------ + // initialize teeracle interval + #[cfg(feature = "teeracle")] + schedule_periodic_reregistration_thread( + send_register_xt, + run_config.reregister_teeracle_interval(), + ); - start_periodic_market_update( - &integritee_rpc_api, - run_config.teeracle_update_interval(), - enclave.as_ref(), - &teeracle_tokio_handle, - ); - } + #[cfg(feature = "teeracle")] + start_periodic_market_update( + &integritee_rpc_api, + run_config.teeracle_update_interval(), + enclave.as_ref(), + &tokio_handle, + ); + }, + WorkerMode::OffChainWorker => { + println!("*** [+] Finished initializing light client, syncing parentchain..."); - if WorkerModeProvider::worker_mode() != WorkerMode::Teeracle { - println!("*** [+] Finished initializing light client, syncing parentchain..."); + // Syncing all parentchain blocks, this might take a while.. + let last_synced_header = + parentchain_handler.sync_parentchain(last_synced_header).unwrap(); - // Syncing all parentchain blocks, this might take a while.. - let mut last_synced_header = - parentchain_handler.sync_parentchain(last_synced_header).unwrap(); + start_parentchain_header_subscription_thread(parentchain_handler, last_synced_header); + }, + WorkerMode::Sidechain => { + println!("*** [+] Finished initializing light client, syncing parentchain..."); - // ------------------------------------------------------------------------ - // Initialize the sidechain - if WorkerModeProvider::worker_mode() == WorkerMode::Sidechain { - last_synced_header = sidechain_init_block_production( + // ------------------------------------------------------------------------ + // Initialize the sidechain + let last_synced_header = sidechain_init_block_production( enclave.clone(), ®ister_enclave_xt_header, we_are_primary_validateer, @@ -578,26 +580,15 @@ fn start_worker( &last_synced_header, ) .unwrap(); - } - // ------------------------------------------------------------------------ - // start parentchain syncing loop (subscribe to header updates) - thread::Builder::new() - .name("parentchain_sync_loop".to_owned()) - .spawn(move || { - if let Err(e) = - subscribe_to_parentchain_new_headers(parentchain_handler, last_synced_header) - { - error!("Parentchain block syncing terminated with a failure: {:?}", e); - } - println!("[!] Parentchain block syncing has terminated"); - }) - .unwrap(); - } + start_parentchain_header_subscription_thread(parentchain_handler, last_synced_header); - // ------------------------------------------------------------------------ - if WorkerModeProvider::worker_mode() == WorkerMode::Sidechain { - spawn_worker_for_shard_polling(shard, integritee_rpc_api.clone(), initialization_handler); + spawn_worker_for_shard_polling( + shard, + integritee_rpc_api.clone(), + initialization_handler, + ); + }, } if let Some(url) = config.target_a_parentchain_rpc_endpoint() { @@ -666,20 +657,7 @@ fn init_target_parentchain( parentchain_handler.sync_parentchain(last_synched_header).unwrap(); // start parentchain syncing loop (subscribe to header updates) - thread::Builder::new() - .name(format!("{:?}_parentchain_sync_loop", parentchain_id)) - .spawn(move || { - if let Err(e) = - subscribe_to_parentchain_new_headers(parentchain_handler, last_synched_header) - { - error!( - "[{:?}] parentchain block syncing terminated with a failure: {:?}", - parentchain_id, e - ); - } - println!("[!] [{:?}] parentchain block syncing has terminated", parentchain_id); - }) - .unwrap(); + start_parentchain_header_subscription_thread(parentchain_handler, last_synched_header) } // Subscribe to events and print them. @@ -1042,6 +1020,27 @@ fn send_extrinsic( } } +fn start_parentchain_header_subscription_thread( + parentchain_handler: Arc>, + last_synced_header: Header, +) { + let parentchain_id = *parentchain_handler.parentchain_id(); + thread::Builder::new() + .name(format!("{:?}_parentchain_sync_loop", parentchain_id)) + .spawn(move || { + if let Err(e) = + subscribe_to_parentchain_new_headers(parentchain_handler, last_synced_header) + { + error!( + "[{:?}] parentchain block syncing terminated with a failure: {:?}", + parentchain_id, e + ); + } + println!("[!] [{:?}] parentchain block syncing has terminated", parentchain_id); + }) + .unwrap(); +} + /// Subscribe to the node API finalized heads stream and trigger a parent chain sync /// upon receiving a new header. fn subscribe_to_parentchain_new_headers( diff --git a/service/src/parentchain_handler.rs b/service/src/parentchain_handler.rs index 8b11e92b87..3990822a57 100644 --- a/service/src/parentchain_handler.rs +++ b/service/src/parentchain_handler.rs @@ -48,11 +48,11 @@ pub trait HandleParentchain { fn trigger_parentchain_block_import(&self) -> ServiceResult<()>; /// Syncs and directly imports parentchain blocks from the latest synced header - /// until the specified until_header. + /// until the specified sync_target. fn sync_and_import_parentchain_until( &self, last_synced_header: &Header, - until_header: &Header, + sync_target: &Header, ) -> ServiceResult
; } @@ -149,59 +149,34 @@ where .ok_or(Error::MissingLastFinalizedBlock)?; let curr_block_number = curr_block.block.header().number(); + if last_synced_header.number == curr_block_number { + println!( + "[{:?}] No sync necessary, we are already up to date with block #{}", + id, last_synced_header.number, + ); + return Ok(last_synced_header) + } + println!( - "[{:?}] Syncing blocks from {} to {}", + "[{:?}] Syncing blocks from #{} to #{}", id, last_synced_header.number, curr_block_number ); - let mut until_synced_header = last_synced_header; - loop { - let block_chunk_to_sync = self.parentchain_api.get_blocks( - until_synced_header.number + 1, - min(until_synced_header.number + BLOCK_SYNC_BATCH_SIZE, curr_block_number), - )?; - println!("[+] [{:?}] Found {} block(s) to sync", id, block_chunk_to_sync.len()); - if block_chunk_to_sync.is_empty() { - return Ok(until_synced_header) - } - - let events_chunk_to_sync: Vec> = block_chunk_to_sync - .iter() - .map(|block| { - self.parentchain_api.get_events_for_block(Some(block.block.header.hash())) - }) - .collect::, _>>()?; - - println!("[+] [{:?}] Found {} event vector(s) to sync", id, events_chunk_to_sync.len()); - - let events_proofs_chunk_to_sync: Vec = block_chunk_to_sync - .iter() - .map(|block| { - self.parentchain_api.get_events_value_proof(Some(block.block.header.hash())) - }) - .collect::, _>>()?; - - self.enclave_api.sync_parentchain( - block_chunk_to_sync.as_slice(), - events_chunk_to_sync.as_slice(), - events_proofs_chunk_to_sync.as_slice(), - self.parentchain_id(), + let mut synced_until = last_synced_header; + while synced_until.number() < &curr_block_number { + synced_until = self.sync_blocks( + synced_until.number + 1, + // while statement tests that synced_until.number < curr_block_number + min(synced_until.number + BLOCK_SYNC_BATCH_SIZE, curr_block_number), )?; - let api_client_until_synced_header = block_chunk_to_sync - .last() - .map(|b| b.block.header.clone()) - .ok_or(Error::EmptyChunk)?; println!( "[{:?}] Synced {} out of {} finalized parentchain blocks", - id, until_synced_header.number, curr_block_number, + id, synced_until.number, curr_block_number, ); - - // #TODO: #1451: fix api/client types - until_synced_header = - Header::decode(&mut api_client_until_synced_header.encode().as_slice()) - .expect("Can decode previously encoded header; qed"); } + + Ok(synced_until) } fn trigger_parentchain_block_import(&self) -> ServiceResult<()> { @@ -212,24 +187,102 @@ where fn sync_and_import_parentchain_until( &self, last_synced_header: &Header, - until_header: &Header, + sync_target: &Header, ) -> ServiceResult
{ let id = self.parentchain_id(); - trace!( - "[{:?}] last synced block number: {}. synching until {}", - id, - last_synced_header.number, - until_header.number + println!( + "[{:?}] last synced block number: #{}. syncing until #{}", + id, last_synced_header.number, sync_target.number ); let mut last_synced_header = last_synced_header.clone(); - while last_synced_header.number() < until_header.number() { - last_synced_header = self.sync_parentchain(last_synced_header)?; - trace!("[{:?}] synced block number: {}", id, last_synced_header.number); + while last_synced_header.number() < sync_target.number() { + let curr_block_number = self + .parentchain_api + .last_finalized_block()? + .ok_or(Error::MissingLastFinalizedBlock)? + .block + .header() + .number; + + if curr_block_number < sync_target.number + && curr_block_number < last_synced_header.number + 1 + { + // Skip the rest of the loop and wait if we have synced as much + // as possible, but haven't reached the sync target yet. + println!( + "[{:?}] sync target #{} is not finalized (#{}), wait a sec ...", + id, sync_target.number, curr_block_number + ); + std::thread::sleep(std::time::Duration::from_secs(1)); + continue + } + + // min(sync_target, last_synced.number + chunk_size, current_parentchain_finalized_block) + let chunk_target = min( + min(last_synced_header.number + BLOCK_SYNC_BATCH_SIZE, curr_block_number), + sync_target.number, + ); + + // Tested above that last_synced_header.number < current_block_number (i.e. chunk_target). + last_synced_header = self.sync_blocks(last_synced_header.number + 1, chunk_target)?; + println!("[{:?}] synced block number: #{}", id, last_synced_header.number); + + // Verify and import blocks into the light client. This can't be done after the loop + // because the import is mandatory to remove them from RAM. When we register on + // a production system that has already many blocks, this might lead to an OOM if we + // import them all at once after the loop, see #1462. + self.trigger_parentchain_block_import()?; } - self.trigger_parentchain_block_import()?; Ok(last_synced_header) } } + +impl ParentchainHandler +where + EnclaveApi: Sidechain + EnclaveBase, +{ + fn sync_blocks(&self, from: u32, to: u32) -> ServiceResult
{ + let id = self.parentchain_id(); + + if from > to { + return Err(Error::BlockSync(format!( + "[{:?}] Block range to import must be positive, i.e., `from` can't be bigger than `to`. {} > {}", + id, from, to + ))) + } + + let blocks = self.parentchain_api.get_blocks(from, to)?; + debug!("[+] [{:?}] Found {} block(s) to sync", id, blocks.len()); + + let events: Vec> = blocks + .iter() + .map(|block| self.parentchain_api.get_events_for_block(Some(block.block.header.hash()))) + .collect::, _>>()?; + + debug!("[+] [{:?}] Found {} event vector(s) to sync", id, events.len()); + + let events_proofs: Vec = blocks + .iter() + .map(|block| { + self.parentchain_api.get_events_value_proof(Some(block.block.header.hash())) + }) + .collect::, _>>()?; + + self.enclave_api.sync_parentchain( + blocks.as_slice(), + events.as_slice(), + events_proofs.as_slice(), + self.parentchain_id(), + )?; + + let last_synced_header = + blocks.last().map(|b| b.block.header.clone()).ok_or(Error::EmptyChunk)?; + + // #TODO: #1451: fix api/client types + Ok(Header::decode(&mut last_synced_header.encode().as_slice()) + .expect("Can decode previously encoded header; qed")) + } +} diff --git a/service/src/sidechain_setup.rs b/service/src/sidechain_setup.rs index 9827cd53a2..822e6ea3b8 100644 --- a/service/src/sidechain_setup.rs +++ b/service/src/sidechain_setup.rs @@ -40,7 +40,7 @@ pub(crate) fn sidechain_start_untrusted_rpc_server( config: &Config, enclave: Arc, sidechain_storage: Arc, - tokio_handle: Handle, + tokio_handle: &Handle, ) where Enclave: DirectRequest + Clone, SidechainStorage: BlockPruner + FetchBlocks + Sync + Send + 'static,