diff --git a/core-primitives/enclave-api/ffi/src/lib.rs b/core-primitives/enclave-api/ffi/src/lib.rs index 67f2a174a0..73b03d64a7 100644 --- a/core-primitives/enclave-api/ffi/src/lib.rs +++ b/core-primitives/enclave-api/ffi/src/lib.rs @@ -66,13 +66,6 @@ extern "C" { parentchain_id_size: u32, ) -> sgx_status_t; - pub fn trigger_parentchain_block_import( - eid: sgx_enclave_id_t, - retval: *mut sgx_status_t, - parentchain_id: *const u8, - parentchain_id_size: u32, - ) -> sgx_status_t; - pub fn execute_trusted_calls(eid: sgx_enclave_id_t, retval: *mut sgx_status_t) -> sgx_status_t; pub fn sync_parentchain( @@ -86,6 +79,7 @@ extern "C" { events_proofs_size: usize, parentchain_id: *const u8, parentchain_id_size: u32, + is_syncing: c_int, ) -> sgx_status_t; pub fn set_nonce( diff --git a/core-primitives/enclave-api/src/enclave_base.rs b/core-primitives/enclave-api/src/enclave_base.rs index abf676cd62..4e2f6e3791 100644 --- a/core-primitives/enclave-api/src/enclave_base.rs +++ b/core-primitives/enclave-api/src/enclave_base.rs @@ -57,11 +57,6 @@ pub trait EnclaveBase: Send + Sync + 'static { parentchain_id: &ParentchainId, ) -> EnclaveResult<()>; - /// Trigger the import of parentchain block explicitly. Used when initializing a light-client - /// with a triggered import dispatcher. - fn trigger_parentchain_block_import(&self, parentchain_id: &ParentchainId) - -> EnclaveResult<()>; - fn set_nonce(&self, nonce: u32, parentchain_id: ParentchainId) -> EnclaveResult<()>; fn set_node_metadata( @@ -212,27 +207,6 @@ mod impl_ffi { Ok(()) } - fn trigger_parentchain_block_import( - &self, - parentchain_id: &ParentchainId, - ) -> EnclaveResult<()> { - let mut retval = sgx_status_t::SGX_SUCCESS; - let parentchain_id_enc = parentchain_id.encode(); - - let result = unsafe { - ffi::trigger_parentchain_block_import( - self.eid, - &mut retval, - parentchain_id_enc.as_ptr(), - parentchain_id_enc.len() as u32, - ) - }; - - ensure!(result == sgx_status_t::SGX_SUCCESS, Error::Sgx(result)); - ensure!(retval == sgx_status_t::SGX_SUCCESS, Error::Sgx(retval)); - - Ok(()) - } fn set_nonce(&self, nonce: u32, parentchain_id: ParentchainId) -> EnclaveResult<()> { let mut retval = sgx_status_t::SGX_SUCCESS; diff --git a/core-primitives/enclave-api/src/sidechain.rs b/core-primitives/enclave-api/src/sidechain.rs index 1203a2d234..364262a99c 100644 --- a/core-primitives/enclave-api/src/sidechain.rs +++ b/core-primitives/enclave-api/src/sidechain.rs @@ -32,6 +32,7 @@ pub trait Sidechain: Send + Sync + 'static { events: &[Vec], events_proofs: &[StorageProof], parentchain_id: &ParentchainId, + is_syncing: bool, ) -> EnclaveResult<()>; fn execute_trusted_calls(&self) -> EnclaveResult<()>; @@ -56,6 +57,7 @@ mod impl_ffi { events: &[Vec], events_proofs: &[StorageProof], parentchain_id: &ParentchainId, + is_syncing: bool, ) -> EnclaveResult<()> { let mut retval = sgx_status_t::SGX_SUCCESS; let blocks_enc = blocks.encode(); @@ -75,6 +77,7 @@ mod impl_ffi { events_proofs_enc.len(), parentchain_id_enc.as_ptr(), parentchain_id_enc.len() as u32, + is_syncing.into(), ) }; diff --git a/core/parentchain/block-import-dispatcher/src/immediate_dispatcher.rs b/core/parentchain/block-import-dispatcher/src/immediate_dispatcher.rs index 0b8e6dbd30..a58383bf05 100644 --- a/core/parentchain/block-import-dispatcher/src/immediate_dispatcher.rs +++ b/core/parentchain/block-import-dispatcher/src/immediate_dispatcher.rs @@ -47,7 +47,14 @@ impl DispatchBlockImport where BlockImporter: ImportParentchainBlocks, { - fn dispatch_import(&self, blocks: Vec, events: Vec>) -> Result<()> { + fn dispatch_import( + &self, + blocks: Vec, + events: Vec>, + _is_syncing: bool, + ) -> Result<()> { + // _is_syncing does not matter for the immediate dispatcher, behavoiur is the same. Immediate block import. + debug!("Importing {} parentchain blocks", blocks.len()); self.block_importer.import_parentchain_blocks(blocks, events)?; debug!("Notifying {} observers of import", self.import_event_observers.len()); @@ -93,7 +100,7 @@ mod tests { counter_clone.increment(); }); - dispatcher.dispatch_import(vec![1u32, 2u32], vec![]).unwrap(); + dispatcher.dispatch_import(vec![1u32, 2u32], vec![], false).unwrap(); assert_eq!(1, notification_counter.get_counter()); } diff --git a/core/parentchain/block-import-dispatcher/src/lib.rs b/core/parentchain/block-import-dispatcher/src/lib.rs index a368066283..2385075644 100644 --- a/core/parentchain/block-import-dispatcher/src/lib.rs +++ b/core/parentchain/block-import-dispatcher/src/lib.rs @@ -45,7 +45,12 @@ pub trait DispatchBlockImport { /// Dispatch blocks to be imported. /// /// The blocks may be imported immediately, get queued, delayed or grouped. - fn dispatch_import(&self, blocks: Vec, events: Vec>) -> Result<()>; + fn dispatch_import( + &self, + blocks: Vec, + events: Vec>, + is_syncing: bool, + ) -> Result<()>; } /// Wrapper for the actual dispatchers. Allows to define one global type for @@ -96,15 +101,20 @@ where TriggeredDispatcher: DispatchBlockImport, ImmediateDispatcher: DispatchBlockImport, { - fn dispatch_import(&self, blocks: Vec, events: Vec>) -> Result<()> { + fn dispatch_import( + &self, + blocks: Vec, + events: Vec>, + is_syncing: bool, + ) -> Result<()> { match self { BlockImportDispatcher::TriggeredDispatcher(dispatcher) => { log::trace!("TRIGGERED DISPATCHER MATCH"); - dispatcher.dispatch_import(blocks, events) + dispatcher.dispatch_import(blocks, events, is_syncing) }, BlockImportDispatcher::ImmediateDispatcher(dispatcher) => { log::trace!("IMMEDIATE DISPATCHER MATCH"); - dispatcher.dispatch_import(blocks, events) + dispatcher.dispatch_import(blocks, events, is_syncing) }, BlockImportDispatcher::EmptyDispatcher => { log::trace!("EMPTY DISPATCHER DISPATCHER MATCH"); diff --git a/core/parentchain/block-import-dispatcher/src/triggered_dispatcher.rs b/core/parentchain/block-import-dispatcher/src/triggered_dispatcher.rs index d4ec052a2f..77812331b8 100644 --- a/core/parentchain/block-import-dispatcher/src/triggered_dispatcher.rs +++ b/core/parentchain/block-import-dispatcher/src/triggered_dispatcher.rs @@ -100,17 +100,28 @@ where &self, blocks: Vec, events: Vec, + is_syncing: bool, ) -> Result<()> { let parentchain_id = self.block_importer.parentchain_id(); trace!( - "[{:?}] Pushing parentchain block(s) and event(s) ({}) ({}) to import queue", + "[{:?}] Triggered dispatcher received block(s) and event(s) ({}) ({})", parentchain_id, blocks.len(), events.len() ); - // Push all the blocks to be dispatched into the queue. - self.events_queue.push_multiple(events).map_err(Error::ImportQueue)?; - self.import_queue.push_multiple(blocks).map_err(Error::ImportQueue) + if is_syncing { + trace!( + "[{:?}] Triggered is in sync mode, immediately importing blocks and events", + parentchain_id + ); + self.block_importer + .import_parentchain_blocks(blocks, events) + .map_err(Error::BlockImport) + } else { + trace!("[{:?}] pushing blocks and events to import queues", parentchain_id); + self.events_queue.push_multiple(events).map_err(Error::ImportQueue)?; + self.import_queue.push_multiple(blocks).map_err(Error::ImportQueue) + } } } @@ -167,6 +178,7 @@ where &self, predicate: impl Fn(&BlockImporter::SignedBlockType) -> bool, ) -> Result> { + trace!("Import of parentchain blocks and events has been triggered"); let blocks_to_import = self.import_queue.pop_until(predicate).map_err(Error::ImportQueue)?; @@ -232,7 +244,11 @@ mod tests { let dispatcher = test_fixtures(); dispatcher - .dispatch_import(vec![1, 2, 3, 4, 5], vec![vec![1], vec![2], vec![3], vec![4], vec![5]]) + .dispatch_import( + vec![1, 2, 3, 4, 5], + vec![vec![1], vec![2], vec![3], vec![4], vec![5]], + false, + ) .unwrap(); assert!(dispatcher.block_importer.get_all_imported_blocks().is_empty()); @@ -248,10 +264,14 @@ mod tests { let dispatcher = test_fixtures(); dispatcher - .dispatch_import(vec![1, 2, 3, 4, 5], vec![vec![1], vec![2], vec![3], vec![4], vec![5]]) + .dispatch_import( + vec![1, 2, 3, 4, 5], + vec![vec![1], vec![2], vec![3], vec![4], vec![5]], + false, + ) .unwrap(); dispatcher - .dispatch_import(vec![6, 7, 8], vec![vec![6], vec![7], vec![8]]) + .dispatch_import(vec![6, 7, 8], vec![vec![6], vec![7], vec![8]], false) .unwrap(); assert!(dispatcher.block_importer.get_all_imported_blocks().is_empty()); @@ -266,7 +286,7 @@ mod tests { fn triggering_import_all_empties_queue() { let dispatcher = test_fixtures(); - dispatcher.dispatch_import(vec![1, 2, 3, 4, 5], vec![]).unwrap(); + dispatcher.dispatch_import(vec![1, 2, 3, 4, 5], vec![], false).unwrap(); let latest_imported = dispatcher.import_all().unwrap().unwrap(); assert_eq!(latest_imported, 5); @@ -278,7 +298,7 @@ mod tests { fn triggering_import_all_on_empty_queue_imports_none() { let dispatcher = test_fixtures(); - dispatcher.dispatch_import(vec![], vec![]).unwrap(); + dispatcher.dispatch_import(vec![], vec![], false).unwrap(); let maybe_latest_imported = dispatcher.import_all().unwrap(); assert!(maybe_latest_imported.is_none()); @@ -295,7 +315,11 @@ mod tests { let dispatcher = test_fixtures(); dispatcher - .dispatch_import(vec![1, 2, 3, 4, 5], vec![vec![1], vec![2], vec![3], vec![4], vec![5]]) + .dispatch_import( + vec![1, 2, 3, 4, 5], + vec![vec![1], vec![2], vec![3], vec![4], vec![5]], + false, + ) .unwrap(); let latest_imported = dispatcher.import_until(|i: &SignedBlockType| i == &4).unwrap().unwrap(); @@ -311,7 +335,11 @@ mod tests { let dispatcher = test_fixtures(); dispatcher - .dispatch_import(vec![1, 2, 3, 4, 5], vec![vec![1], vec![2], vec![3], vec![4], vec![5]]) + .dispatch_import( + vec![1, 2, 3, 4, 5], + vec![vec![1], vec![2], vec![3], vec![4], vec![5]], + false, + ) .unwrap(); let maybe_latest_imported = dispatcher.import_until(|i: &SignedBlockType| i == &8).unwrap(); @@ -328,7 +356,7 @@ mod tests { fn trigger_import_all_but_latest_works() { let dispatcher = test_fixtures(); - dispatcher.dispatch_import(vec![1, 2, 3, 4, 5], vec![]).unwrap(); + dispatcher.dispatch_import(vec![1, 2, 3, 4, 5], vec![], false).unwrap(); dispatcher.import_all_but_latest().unwrap(); assert_eq!(dispatcher.block_importer.get_all_imported_blocks(), vec![1, 2, 3, 4]); diff --git a/enclave-runtime/Enclave.edl b/enclave-runtime/Enclave.edl index 098a346b09..6d865f6ef0 100644 --- a/enclave-runtime/Enclave.edl +++ b/enclave-runtime/Enclave.edl @@ -63,17 +63,14 @@ enclave { [in, size=parentchain_id_size] uint8_t* parentchain_id, uint32_t parentchain_id_size ); - public sgx_status_t trigger_parentchain_block_import( - [in, size=parentchain_id_size] uint8_t* parentchain_id, uint32_t parentchain_id_size - ); - public sgx_status_t execute_trusted_calls(); public sgx_status_t sync_parentchain( [in, size=blocks_size] uint8_t* blocks, size_t blocks_size, [in, size=events_size] uint8_t* events, size_t events_size, [in, size=events_proofs_size] uint8_t* events_proofs, size_t events_proofs_size, - [in, size=parentchain_id_size] uint8_t* parentchain_id, uint32_t parentchain_id_size + [in, size=parentchain_id_size] uint8_t* parentchain_id, uint32_t parentchain_id_size, + int is_syncing ); public sgx_status_t set_nonce( diff --git a/enclave-runtime/src/lib.rs b/enclave-runtime/src/lib.rs index 881abb0e69..b71362e930 100644 --- a/enclave-runtime/src/lib.rs +++ b/enclave-runtime/src/lib.rs @@ -48,12 +48,8 @@ use crate::{ }, }; use codec::Decode; -use itc_parentchain::{ - block_import_dispatcher::{ - triggered_dispatcher::TriggerParentchainBlockImport, DispatchBlockImport, - }, - primitives::ParentchainId, -}; +use core::ffi::c_int; +use itc_parentchain::{block_import_dispatcher::DispatchBlockImport, primitives::ParentchainId}; use itp_component_container::ComponentGetter; use itp_import_queue::PushToQueue; use itp_node_api::metadata::NodeMetadata; @@ -437,6 +433,7 @@ pub unsafe extern "C" fn sync_parentchain( events_proofs_to_sync_size: usize, parentchain_id: *const u8, parentchain_id_size: u32, + is_syncing: c_int, ) -> sgx_status_t { if let Err(e) = sync_parentchain_internal( blocks_to_sync, @@ -447,6 +444,7 @@ pub unsafe extern "C" fn sync_parentchain( events_proofs_to_sync_size, parentchain_id, parentchain_id_size, + is_syncing == 1, ) { error!("Error synching parentchain: {:?}", e); } @@ -464,6 +462,7 @@ unsafe fn sync_parentchain_internal( events_proofs_to_sync_size: usize, parentchain_id: *const u8, parentchain_id_size: u32, + is_syncing: bool, ) -> Result<()> { let blocks_to_sync = Vec::::decode_raw(blocks_to_sync, blocks_to_sync_size)?; let events_proofs_to_sync = @@ -483,6 +482,7 @@ unsafe fn sync_parentchain_internal( blocks_to_sync, events_to_sync, &parentchain_id, + is_syncing, ) } @@ -498,6 +498,7 @@ fn dispatch_parentchain_blocks_for_import blocks_to_sync: Vec, events_to_sync: Vec>, id: &ParentchainId, + is_syncing: bool, ) -> Result<()> { if WorkerModeProvider::worker_mode() == WorkerMode::Teeracle { trace!("Not importing any parentchain blocks"); @@ -512,27 +513,51 @@ fn dispatch_parentchain_blocks_for_import match id { ParentchainId::Integritee => { if let Ok(handler) = GLOBAL_INTEGRITEE_SOLOCHAIN_HANDLER_COMPONENT.get() { - handler.import_dispatcher.dispatch_import(blocks_to_sync, events_to_sync)?; + handler.import_dispatcher.dispatch_import( + blocks_to_sync, + events_to_sync, + is_syncing, + )?; } else if let Ok(handler) = GLOBAL_INTEGRITEE_PARACHAIN_HANDLER_COMPONENT.get() { - handler.import_dispatcher.dispatch_import(blocks_to_sync, events_to_sync)?; + handler.import_dispatcher.dispatch_import( + blocks_to_sync, + events_to_sync, + is_syncing, + )?; } else { return Err(Error::NoIntegriteeParentchainAssigned) }; }, ParentchainId::TargetA => { if let Ok(handler) = GLOBAL_TARGET_A_SOLOCHAIN_HANDLER_COMPONENT.get() { - handler.import_dispatcher.dispatch_import(blocks_to_sync, events_to_sync)?; + handler.import_dispatcher.dispatch_import( + blocks_to_sync, + events_to_sync, + is_syncing, + )?; } else if let Ok(handler) = GLOBAL_TARGET_A_PARACHAIN_HANDLER_COMPONENT.get() { - handler.import_dispatcher.dispatch_import(blocks_to_sync, events_to_sync)?; + handler.import_dispatcher.dispatch_import( + blocks_to_sync, + events_to_sync, + is_syncing, + )?; } else { return Err(Error::NoTargetAParentchainAssigned) }; }, ParentchainId::TargetB => { if let Ok(handler) = GLOBAL_TARGET_B_SOLOCHAIN_HANDLER_COMPONENT.get() { - handler.import_dispatcher.dispatch_import(blocks_to_sync, events_to_sync)?; + handler.import_dispatcher.dispatch_import( + blocks_to_sync, + events_to_sync, + is_syncing, + )?; } else if let Ok(handler) = GLOBAL_TARGET_B_PARACHAIN_HANDLER_COMPONENT.get() { - handler.import_dispatcher.dispatch_import(blocks_to_sync, events_to_sync)?; + handler.import_dispatcher.dispatch_import( + blocks_to_sync, + events_to_sync, + is_syncing, + )?; } else { return Err(Error::NoTargetBParentchainAssigned) }; @@ -579,92 +604,6 @@ fn validate_events( Ok(()) } -/// Triggers the import of parentchain blocks when using a queue to sync parentchain block import -/// with sidechain block production. -/// -/// This trigger is only useful in combination with a `TriggeredDispatcher` and sidechain. In case no -/// sidechain and the `ImmediateDispatcher` are used, this function is obsolete. -#[no_mangle] -pub unsafe extern "C" fn trigger_parentchain_block_import( - parentchain_id: *const u8, - parentchain_id_size: u32, -) -> sgx_status_t { - let parentchain_id = - match ParentchainId::decode_raw(parentchain_id, parentchain_id_size as usize) { - Ok(id) => id, - Err(e) => { - error!("Could not decode parentchain id: {:?}", e); - return sgx_status_t::SGX_ERROR_UNEXPECTED - }, - }; - - match internal_trigger_parentchain_block_import(&parentchain_id) { - Ok(()) => sgx_status_t::SGX_SUCCESS, - Err(e) => { - error!("Failed to trigger import of parentchain blocks: {:?}", e); - sgx_status_t::SGX_ERROR_UNEXPECTED - }, - } -} - -fn internal_trigger_parentchain_block_import(id: &ParentchainId) -> Result<()> { - let _maybe_latest_block = match id { - ParentchainId::Integritee => { - if let Ok(handler) = GLOBAL_INTEGRITEE_SOLOCHAIN_HANDLER_COMPONENT.get() { - handler - .import_dispatcher - .triggered_dispatcher() - .ok_or(Error::ExpectedTriggeredImportDispatcher)? - .import_all()? - } else if let Ok(handler) = GLOBAL_INTEGRITEE_PARACHAIN_HANDLER_COMPONENT.get() { - handler - .import_dispatcher - .triggered_dispatcher() - .ok_or(Error::ExpectedTriggeredImportDispatcher)? - .import_all()? - } else { - return Err(Error::NoIntegriteeParentchainAssigned) - } - }, - ParentchainId::TargetA => { - if let Ok(handler) = GLOBAL_TARGET_A_SOLOCHAIN_HANDLER_COMPONENT.get() { - handler - .import_dispatcher - .triggered_dispatcher() - .ok_or(Error::ExpectedTriggeredImportDispatcher)? - .import_all()? - } else if let Ok(handler) = GLOBAL_TARGET_A_PARACHAIN_HANDLER_COMPONENT.get() { - handler - .import_dispatcher - .triggered_dispatcher() - .ok_or(Error::ExpectedTriggeredImportDispatcher)? - .import_all()? - } else { - return Err(Error::NoTargetAParentchainAssigned) - } - }, - ParentchainId::TargetB => { - if let Ok(handler) = GLOBAL_TARGET_B_SOLOCHAIN_HANDLER_COMPONENT.get() { - handler - .import_dispatcher - .triggered_dispatcher() - .ok_or(Error::ExpectedTriggeredImportDispatcher)? - .import_all()? - } else if let Ok(handler) = GLOBAL_TARGET_B_PARACHAIN_HANDLER_COMPONENT.get() { - handler - .import_dispatcher - .triggered_dispatcher() - .ok_or(Error::ExpectedTriggeredImportDispatcher)? - .import_all()? - } else { - return Err(Error::NoTargetBParentchainAssigned) - } - }, - }; - - Ok(()) -} - // This is required, because `ring` / `ring-xous` would not compile without it non-release (debug) mode. // See #1200 for more details. #[cfg(debug_assertions)] diff --git a/service/src/main_impl.rs b/service/src/main_impl.rs index e4a61713f5..31121e770f 100644 --- a/service/src/main_impl.rs +++ b/service/src/main_impl.rs @@ -354,9 +354,6 @@ fn start_worker( let tokio_handle = tokio_handle_getter.get_handle(); - #[cfg(feature = "teeracle")] - let teeracle_tokio_handle = tokio_handle.clone(); - // ------------------------------------------------------------------------ // Get the public key of our TEE. let tee_accountid = enclave_account(enclave.as_ref()); @@ -422,7 +419,7 @@ fn start_worker( &config, enclave.clone(), sidechain_storage.clone(), - tokio_handle, + &tokio_handle, ); } @@ -509,34 +506,41 @@ fn start_worker( initialization_handler.registered_on_parentchain(); - // ------------------------------------------------------------------------ - // initialize teeracle interval - #[cfg(feature = "teeracle")] - if WorkerModeProvider::worker_mode() == WorkerMode::Teeracle { - schedule_periodic_reregistration_thread( - send_register_xt, - run_config.reregister_teeracle_interval(), - ); + match WorkerModeProvider::worker_mode() { + WorkerMode::Teeracle => { + // ------------------------------------------------------------------------ + // initialize teeracle interval + #[cfg(feature = "teeracle")] + schedule_periodic_reregistration_thread( + send_register_xt, + run_config.reregister_teeracle_interval(), + ); - start_periodic_market_update( - &integritee_rpc_api, - run_config.teeracle_update_interval(), - enclave.as_ref(), - &teeracle_tokio_handle, - ); - } + #[cfg(feature = "teeracle")] + start_periodic_market_update( + &integritee_rpc_api, + run_config.teeracle_update_interval(), + enclave.as_ref(), + &tokio_handle, + ); + }, + WorkerMode::OffChainWorker => { + println!("*** [+] Finished initializing light client, syncing parentchain..."); - if WorkerModeProvider::worker_mode() != WorkerMode::Teeracle { - println!("*** [+] Finished initializing light client, syncing parentchain..."); + // Syncing all parentchain blocks, this might take a while.. + let last_synced_header = + parentchain_handler.sync_parentchain(last_synced_header, true).unwrap(); - // Syncing all parentchain blocks, this might take a while.. - let mut last_synced_header = - parentchain_handler.sync_parentchain(last_synced_header).unwrap(); + start_parentchain_header_subscription_thread(parentchain_handler, last_synced_header); + + info!("skipping shard vault check because not yet supported for offchain worker"); + }, + WorkerMode::Sidechain => { + println!("*** [+] Finished initializing light client, syncing parentchain..."); - // ------------------------------------------------------------------------ - // Initialize the sidechain - if WorkerModeProvider::worker_mode() == WorkerMode::Sidechain { - last_synced_header = sidechain_init_block_production( + // ------------------------------------------------------------------------ + // Initialize the sidechain + let last_synced_header = sidechain_init_block_production( enclave.clone(), ®ister_enclave_xt_header, we_are_primary_validateer, @@ -545,44 +549,17 @@ fn start_worker( &last_synced_header, ) .unwrap(); - } - // ------------------------------------------------------------------------ - // start parentchain syncing loop (subscribe to header updates) - thread::Builder::new() - .name("parentchain_sync_loop".to_owned()) - .spawn(move || { - if let Err(e) = - subscribe_to_parentchain_new_headers(parentchain_handler, last_synced_header) - { - error!("Parentchain block syncing terminated with a failure: {:?}", e); - } - println!("[!] Parentchain block syncing has terminated"); - }) - .unwrap(); + start_parentchain_header_subscription_thread(parentchain_handler, last_synced_header); - if WorkerModeProvider::worker_mode() == WorkerMode::OffChainWorker { - info!("skipping shard vault check because not yet supported for offchain worker"); - } else if let Ok(shard_vault) = enclave.get_ecc_vault_pubkey(shard) { - println!( - "[Integritee] shard vault account is already initialized in state: {}", - shard_vault.to_ss58check() - ); - } else if we_are_primary_validateer { - println!("[Integritee] initializing proxied shard vault account now"); - enclave.init_proxied_shard_vault(shard, &ParentchainId::Integritee).unwrap(); - println!( - "[Integritee] initialized shard vault account: : {}", - enclave.get_ecc_vault_pubkey(shard).unwrap().to_ss58check() - ); - } else { - panic!("[Integritee] no vault account has been initialized and we are not the primary worker"); - } - } + init_provided_shard_vault(shard, &enclave, we_are_primary_validateer); - // ------------------------------------------------------------------------ - if WorkerModeProvider::worker_mode() == WorkerMode::Sidechain { - spawn_worker_for_shard_polling(shard, integritee_rpc_api.clone(), initialization_handler); + spawn_worker_for_shard_polling( + shard, + integritee_rpc_api.clone(), + initialization_handler, + ); + }, } if let Some(url) = config.target_a_parentchain_rpc_endpoint() { @@ -619,6 +596,30 @@ fn start_worker( } } +fn init_provided_shard_vault( + shard: &ShardIdentifier, + enclave: &Arc, + we_are_primary_validateer: bool, +) { + if let Ok(shard_vault) = enclave.get_ecc_vault_pubkey(shard) { + println!( + "[Integritee] shard vault account is already initialized in state: {}", + shard_vault.to_ss58check() + ); + } else if we_are_primary_validateer { + println!("[Integritee] initializing proxied shard vault account now"); + enclave.init_proxied_shard_vault(shard, &ParentchainId::Integritee).unwrap(); + println!( + "[Integritee] initialized shard vault account: : {}", + enclave.get_ecc_vault_pubkey(shard).unwrap().to_ss58check() + ); + } else { + panic!( + "[Integritee] no vault account has been initialized and we are not the primary worker" + ); + } +} + fn init_target_parentchain( enclave: &Arc, tee_account_id: &AccountId32, @@ -651,23 +652,9 @@ fn init_target_parentchain( // Syncing all parentchain blocks, this might take a while.. let last_synched_header = - parentchain_handler.sync_parentchain(last_synched_header).unwrap(); - - // start parentchain syncing loop (subscribe to header updates) - thread::Builder::new() - .name(format!("{:?}_parentchain_sync_loop", parentchain_id)) - .spawn(move || { - if let Err(e) = - subscribe_to_parentchain_new_headers(parentchain_handler, last_synched_header) - { - error!( - "[{:?}] parentchain block syncing terminated with a failure: {:?}", - parentchain_id, e - ); - } - println!("[!] [{:?}] parentchain block syncing has terminated", parentchain_id); - }) - .unwrap(); + parentchain_handler.sync_parentchain(last_synched_header, true).unwrap(); + + start_parentchain_header_subscription_thread(parentchain_handler, last_synched_header) } println!("[{:?}] initializing proxied shard vault account now", parentchain_id); enclave.init_proxied_shard_vault(shard, &parentchain_id).unwrap(); @@ -894,6 +881,27 @@ fn send_extrinsic( } } +fn start_parentchain_header_subscription_thread( + parentchain_handler: Arc>, + last_synced_header: Header, +) { + let parentchain_id = *parentchain_handler.parentchain_id(); + thread::Builder::new() + .name(format!("{:?}_parentchain_sync_loop", parentchain_id)) + .spawn(move || { + if let Err(e) = + subscribe_to_parentchain_new_headers(parentchain_handler, last_synced_header) + { + error!( + "[{:?}] parentchain block syncing terminated with a failure: {:?}", + parentchain_id, e + ); + } + println!("[!] [{:?}] parentchain block syncing has terminated", parentchain_id); + }) + .unwrap(); +} + /// Subscribe to the node API finalized heads stream and trigger a parent chain sync /// upon receiving a new header. fn subscribe_to_parentchain_new_headers( @@ -918,7 +926,7 @@ fn subscribe_to_parentchain_new_headers( parentchain_id, new_header.number ); - last_synced_header = parentchain_handler.sync_parentchain(last_synced_header)?; + last_synced_header = parentchain_handler.sync_parentchain(last_synced_header, false)?; } } diff --git a/service/src/parentchain_handler.rs b/service/src/parentchain_handler.rs index 58820d4bf0..41f843fe0e 100644 --- a/service/src/parentchain_handler.rs +++ b/service/src/parentchain_handler.rs @@ -42,10 +42,11 @@ pub trait HandleParentchain { /// Fetches the parentchain blocks to sync from the parentchain and feeds them to the enclave. /// Returns the latest synced block header. - fn sync_parentchain(&self, last_synced_header: Header) -> ServiceResult
; - - /// Triggers the import of the synced parentchain blocks inside the enclave. - fn trigger_parentchain_block_import(&self) -> ServiceResult<()>; + fn sync_parentchain( + &self, + last_synced_header: Header, + is_syncing: bool, + ) -> ServiceResult
; /// Syncs and directly imports parentchain blocks from the latest synced header /// until the specified until_header. @@ -140,7 +141,11 @@ where .init_parentchain_components(self.parentchain_init_params.clone())?) } - fn sync_parentchain(&self, last_synced_header: Header) -> ServiceResult
{ + fn sync_parentchain( + &self, + last_synced_header: Header, + is_syncing: bool, + ) -> ServiceResult
{ let id = self.parentchain_id(); trace!("[{:?}] Getting current head", id); let curr_block = self @@ -186,6 +191,7 @@ where events_chunk_to_sync.as_slice(), events_proofs_chunk_to_sync.as_slice(), self.parentchain_id(), + is_syncing, )?; let api_client_until_synced_header = block_chunk_to_sync @@ -204,11 +210,6 @@ where } } - fn trigger_parentchain_block_import(&self) -> ServiceResult<()> { - trace!("[{:?}] trigger parentchain block import", self.parentchain_id()); - Ok(self.enclave_api.trigger_parentchain_block_import(self.parentchain_id())?) - } - fn sync_and_import_parentchain_until( &self, last_synced_header: &Header, @@ -225,12 +226,10 @@ where let mut last_synced_header = last_synced_header.clone(); while last_synced_header.number() < until_header.number() { - last_synced_header = self.sync_parentchain(last_synced_header)?; - trace!("[{:?}] synced block number: {}", id, last_synced_header.number); + last_synced_header = self.sync_parentchain(last_synced_header, true)?; + println!("[{:?}] synced block number: #{}", id, last_synced_header.number); std::thread::sleep(std::time::Duration::from_secs(1)); } - self.trigger_parentchain_block_import()?; - Ok(last_synced_header) } } diff --git a/service/src/sidechain_setup.rs b/service/src/sidechain_setup.rs index be1ea3771f..d13b4b5fb0 100644 --- a/service/src/sidechain_setup.rs +++ b/service/src/sidechain_setup.rs @@ -40,7 +40,7 @@ pub(crate) fn sidechain_start_untrusted_rpc_server( config: &Config, enclave: Arc, sidechain_storage: Arc, - tokio_handle: Handle, + tokio_handle: &Handle, ) where Enclave: DirectRequest + Clone, SidechainStorage: BlockPruner + FetchBlocks + Sync + Send + 'static, diff --git a/service/src/tests/mocks/enclave_api_mock.rs b/service/src/tests/mocks/enclave_api_mock.rs index 3c24bdc1c2..115a27ea56 100644 --- a/service/src/tests/mocks/enclave_api_mock.rs +++ b/service/src/tests/mocks/enclave_api_mock.rs @@ -69,10 +69,6 @@ impl EnclaveBase for EnclaveMock { unimplemented!() } - fn trigger_parentchain_block_import(&self, _: &ParentchainId) -> EnclaveResult<()> { - unimplemented!() - } - fn set_nonce(&self, _: u32, _: ParentchainId) -> EnclaveResult<()> { unimplemented!() } @@ -105,6 +101,7 @@ impl Sidechain for EnclaveMock { _events: &[Vec], _events_proofs: &[StorageProof], _: &ParentchainId, + _: bool, ) -> EnclaveResult<()> { Ok(()) }