Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Implement style remarks
  • Loading branch information
cmichi committed Feb 27, 2019
commit 4b94dfde1754f462e8dc86fe00d12dda15bbfb1b
2 changes: 1 addition & 1 deletion core/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ where
// Override telemetry
if cli.no_telemetry {
config.telemetry_endpoints = None;
} else if cli.telemetry_endpoints.len() > 0 {
} else if !cli.telemetry_endpoints.is_empty() {
config.telemetry_endpoints = Some(TelemetryEndpoints::new(cli.telemetry_endpoints));
}

Expand Down
33 changes: 20 additions & 13 deletions core/consensus/aura/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,15 +267,17 @@ impl<B: Block, C, E, I, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, SO> whe
e
);
telemetry!(CONSENSUS_WARN; "aura.unable_fetching_authorities";
"slot" => ?chain_head.hash(), "err" => ?e);
"slot" => ?chain_head.hash(), "err" => ?e
);
return Box::new(future::ok(()));
}
};

if self.sync_oracle.is_offline() && authorities.len() > 1 {
debug!(target: "aura", "Skipping proposal slot. Waiting for the network.");
telemetry!(CONSENSUS_DEBUG; "aura.skipping_proposal_slot";
"authorities_len" => authorities.len());
"authorities_len" => authorities.len()
);
return Box::new(future::ok(()));
}

Expand All @@ -288,15 +290,17 @@ impl<B: Block, C, E, I, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, SO> whe
timestamp
);
telemetry!(CONSENSUS_DEBUG; "aura.starting_authorship";
"slot_num" => slot_num, "timestamp" => timestamp);
"slot_num" => slot_num, "timestamp" => timestamp
);

// we are the slot author. make a block and sign it.
let proposer = match env.init(&chain_head, &authorities) {
Ok(p) => p,
Err(e) => {
warn!("Unable to author block in slot {:?}: {:?}", slot_num, e);
telemetry!(CONSENSUS_WARN; "aura.unable_authoring_block";
"slot" => slot_num, "err" => ?e);
"slot" => slot_num, "err" => ?e
);
return Box::new(future::ok(()))
}
};
Expand Down Expand Up @@ -325,7 +329,8 @@ impl<B: Block, C, E, I, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, SO> whe
slot_num
);
telemetry!(CONSENSUS_INFO; "aura.discarding_proposal_took_too_long";
"slot" => slot_num);
"slot" => slot_num
);
return
}

Expand Down Expand Up @@ -362,13 +367,15 @@ impl<B: Block, C, E, I, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, SO> whe
telemetry!(CONSENSUS_INFO; "aura.pre_sealed_block";
"header_num" => ?header_num,
"hash_now" => ?import_block.post_header().hash(),
"hash_previously" => ?pre_hash);
"hash_previously" => ?pre_hash
);

if let Err(e) = block_import.import_block(import_block, None) {
warn!(target: "aura", "Error with block built on {:?}: {:?}",
parent_hash, e);
telemetry!(CONSENSUS_WARN; "aura.err_with_block_built_on";
"hash" => ?parent_hash, "err" => ?e);
"hash" => ?parent_hash, "err" => ?e
);
}
})
.map_err(|e| consensus_common::ErrorKind::ClientImport(format!("{:?}", e)).into())
Expand Down Expand Up @@ -474,7 +481,8 @@ impl<C, E> AuraVerifier<C, E>
diff
);
telemetry!(CONSENSUS_INFO; "aura.halting_for_future_block";
"diff" => ?diff);
"diff" => ?diff
);
thread::sleep(Duration::from_secs(diff));
Ok(())
},
Expand Down Expand Up @@ -523,8 +531,7 @@ impl<C, E> AuraVerifier<C, E>
"halting for block {} seconds in the future",
diff
);
telemetry!(CONSENSUS_INFO; "aura.halting_for_future_block";
"diff" => ?diff);
telemetry!(CONSENSUS_INFO; "aura.halting_for_future_block"; "diff" => ?diff);
thread::sleep(Duration::from_secs(diff));
Ok(())
},
Expand Down Expand Up @@ -610,8 +617,7 @@ impl<B: Block, C, E> Verifier<B> for AuraVerifier<C, E> where
}

trace!(target: "aura", "Checked {:?}; importing.", pre_header);
telemetry!(CONSENSUS_TRACE; "aura.checked_and_importing";
"pre_header" => ?pre_header);
telemetry!(CONSENSUS_TRACE; "aura.checked_and_importing"; "pre_header" => ?pre_header);

extra_verification.into_future().wait()?;

Expand All @@ -632,7 +638,8 @@ impl<B: Block, C, E> Verifier<B> for AuraVerifier<C, E> where
CheckedHeader::Deferred(a, b) => {
debug!(target: "aura", "Checking {:?} failed; {:?}, {:?}.", hash, a, b);
telemetry!(CONSENSUS_DEBUG; "aura.header_too_far_in_future";
"hash" => ?hash, "a" => ?a, "b" => ?b);
"hash" => ?hash, "a" => ?a, "b" => ?b
);
Err(format!("Header {:?} rejected: too far in the future", hash))
}
}
Expand Down
21 changes: 14 additions & 7 deletions core/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,27 +298,31 @@ impl<Block: BlockT> GossipValidator<Block> {
if set_id < rounds.set_id {
trace!(target: "afg", "Expired: Message with expired set_id {} (ours {})", set_id, rounds.set_id);
telemetry!(CONSENSUS_TRACE; "afg.expired_set_id";
"set_id" => ?set_id, "ours" => ?rounds.set_id);
"set_id" => ?set_id, "ours" => ?rounds.set_id
);
return true;
} else if set_id == rounds.set_id + 1 {
// allow a few first rounds of future set.
if round > MESSAGE_ROUND_TOLERANCE {
trace!(target: "afg", "Expired: Message too far in the future set, round {} (ours set_id {})", round, rounds.set_id);
telemetry!(CONSENSUS_TRACE; "afg.expired_msg_too_far_in_future_set";
"round" => ?round, "ours" => ?rounds.set_id);
"round" => ?round, "ours" => ?rounds.set_id
);
return true;
}
} else if set_id == rounds.set_id {
if round < rounds.min_live_round.saturating_sub(MESSAGE_ROUND_TOLERANCE) {
trace!(target: "afg", "Expired: Message round is out of bounds {} (ours {}-{})", round, rounds.min_live_round, rounds.max_round);
telemetry!(CONSENSUS_TRACE; "afg.msg_round_oob";
"round" => ?round, "our_min_live_round" => ?rounds.min_live_round, "our_max_round" => ?rounds.max_round);
"round" => ?round, "our_min_live_round" => ?rounds.min_live_round, "our_max_round" => ?rounds.max_round
);
return true;
}
} else {
trace!(target: "afg", "Expired: Message in invalid future set {} (ours {})", set_id, rounds.set_id);
telemetry!(CONSENSUS_TRACE; "afg.expired_msg_in_invalid_future_set";
"set_id" => ?set_id, "ours" => ?rounds.set_id);
"set_id" => ?set_id, "ours" => ?rounds.set_id
);
return true;
}
false
Expand Down Expand Up @@ -511,7 +515,8 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B
fn announce(&self, round: u64, _set_id: u64, block: B::Hash) {
debug!(target: "afg", "Announcing block {} to peers which we voted on in round {}", block, round);
telemetry!(CONSENSUS_DEBUG; "afg.announcing_blocks_to_voted_peers";
"block" => ?block, "round" => ?round);
"block" => ?block, "round" => ?round
);
self.service.announce_block(block)
}
}
Expand Down Expand Up @@ -570,7 +575,8 @@ pub fn block_import<B, E, Block: BlockT<Hash=H256>, RA, PRA>(
let genesis_authorities = api.runtime_api()
.grandpa_authorities(&BlockId::number(Zero::zero()))?;
telemetry!(CONSENSUS_DEBUG; "afg.loading_authorities";
"authorities_len" => ?genesis_authorities.len());
"authorities_len" => ?genesis_authorities.len()
);

let authority_set = SharedAuthoritySet::genesis(genesis_authorities);
let encoded = authority_set.inner().read().encode();
Expand Down Expand Up @@ -725,7 +731,8 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
let (env, last_round_number, last_state, authority_set_change) = params;
debug!(target: "afg", "{}: Starting new voter with set ID {}", config.name(), env.set_id);
telemetry!(CONSENSUS_DEBUG; "afg.starting_new_voter";
"name" => ?config.name(), "set_id" => ?env.set_id);
"name" => ?config.name(), "set_id" => ?env.set_id
);

let chain_info = match client.info() {
Ok(i) => i,
Expand Down
24 changes: 11 additions & 13 deletions core/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,15 @@ impl<D: Drain> Drain for Multiply<D> {
let mut oks = Vec::new();
let mut errs = Vec::new();

let mut iter = self.0.iter();
while let Some(l) = iter.next() {
self.0.iter().for_each(|l| {
let res: Result<<D as Drain>::Ok, <D as Drain>::Err> = (*l).log(record, logger_values);
match res {
Ok(o) => oks.push(o),
Err(e) => errs.push(e),
}
}
});

if errs.len() > 0 {
if !errs.is_empty() {
result::Result::Err(errs)
} else {
result::Result::Ok(oks)
Expand All @@ -99,9 +98,7 @@ pub fn init_telemetry(config: TelemetryConfig) -> slog_scope::GlobalLoggerGuard
let mut out_syncs = Vec::new();

// Set up a filter/drain for each endpoint
let endpoints = config.endpoints.0;
let mut iter = endpoints.iter();
while let Some((url, verbosity)) = iter.next() {
config.endpoints.0.iter().for_each(|(url, verbosity)| {
let writer = TelemetryWriter::new(Arc::new(url.to_owned()));
let out_sync = writer.out.clone();
out_syncs.push(out_sync);
Expand All @@ -117,7 +114,7 @@ pub fn init_telemetry(config: TelemetryConfig) -> slog_scope::GlobalLoggerGuard

let filter = Box::new(filter) as Box<slog::Filter<_, _>>;
endpoint_drains.push(filter);
}
});

// Set up logging to all endpoints
let drain = slog_async::Async::new(Multiply::new(endpoint_drains).fuse());
Expand All @@ -129,8 +126,7 @@ pub fn init_telemetry(config: TelemetryConfig) -> slog_scope::GlobalLoggerGuard

// Spawn a thread for each endpoint
let on_connect = Arc::new(config.on_connect);
let mut iter = endpoints.iter();
while let Some((url, verbosity)) = iter.next() {
config.endpoints.0.iter().for_each(|(url, verbosity)| {
let url_ = url.clone();
let inner_url = Arc::new(url.to_owned());
let inner_verbosity = Arc::new(verbosity.to_owned());
Expand All @@ -157,7 +153,7 @@ pub fn init_telemetry(config: TelemetryConfig) -> slog_scope::GlobalLoggerGuard
thread::sleep(time::Duration::from_millis(5000));
}
});
}
});

return logger_guard;
}
Expand All @@ -182,8 +178,10 @@ struct Connection {
}

impl Connection {
fn new(out: ws::Sender, out_sync: Arc<Mutex<Option<ws::Sender>>>,
on_connect: Arc<Box<Fn() + Send + Sync + 'static>>, url: Arc<String>) -> Self {
fn new(out: ws::Sender,
out_sync: Arc<Mutex<Option<ws::Sender>>>,
on_connect: Arc<Box<Fn() + Send + Sync + 'static>>,
url: Arc<String>) -> Self {
Connection {
out,
out_sync,
Expand Down