Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 11 additions & 11 deletions node/core/candidate-selection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,18 +282,18 @@ impl CandidateSelectionJob {
candidate_receipt
);

let succeeded =
let result =
if let Err(err) = forward_invalidity_note(received_from, &mut self.sender).await {
log::warn!(
target: TARGET,
"failed to forward invalidity note: {:?}",
err
);
false
Err(())
} else {
true
Ok(())
};
self.metrics.on_invalid_selection(succeeded);
self.metrics.on_invalid_selection(result);
}
}

Expand Down Expand Up @@ -363,11 +363,11 @@ async fn second_candidate(
{
Err(err) => {
log::warn!(target: TARGET, "failed to send a seconding message");
metrics.on_second(false);
metrics.on_second(Err(()));
Err(err.into())
}
Ok(_) => {
metrics.on_second(true);
metrics.on_second(Ok(()));
Ok(())
}
}
Expand All @@ -391,21 +391,21 @@ struct MetricsInner {
invalid_selections: prometheus::CounterVec<prometheus::U64>,
}

/// Candidate backing metrics.
/// Candidate selection metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
fn on_second(&self, succeeded: bool) {
fn on_second(&self, result: Result<(), ()>) {
if let Some(metrics) = &self.0 {
let label = if succeeded { "succeeded" } else { "failed" };
let label = if result.is_ok() { "succeeded" } else { "failed" };
metrics.seconds.with_label_values(&[label]).inc();
}
}

fn on_invalid_selection(&self, succeeded: bool) {
fn on_invalid_selection(&self, result: Result<(), ()>) {
if let Some(metrics) = &self.0 {
let label = if succeeded { "succeeded" } else { "failed" };
let label = if result.is_ok() { "succeeded" } else { "failed" };
metrics.invalid_selections.with_label_values(&[label]).inc();
}
}
Expand Down
15 changes: 7 additions & 8 deletions node/core/provisioner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,9 @@ impl ProvisioningJob {
.await
{
log::warn!(target: "provisioner", "failed to assemble or send inherent data: {:?}", err);
self.metrics.on_inherent_data_request(false);
self.metrics.on_inherent_data_request(Err(()));
} else {
self.metrics.on_inherent_data_request(true);
self.metrics.on_inherent_data_request(Ok(()));
}
}
ToJob::Provisioner(RequestBlockAuthorshipData(_, sender)) => {
Expand Down Expand Up @@ -467,17 +467,16 @@ struct MetricsInner {
inherent_data_requests: prometheus::CounterVec<prometheus::U64>,
}

/// Candidate backing metrics.
/// Provisioner metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
fn on_inherent_data_request(&self, succeeded: bool) {
fn on_inherent_data_request(&self, response: Result<(), ()>) {
if let Some(metrics) = &self.0 {
if succeeded {
metrics.inherent_data_requests.with_label_values(&["succeded"]).inc();
} else {
metrics.inherent_data_requests.with_label_values(&["failed"]).inc();
match response {
Ok(()) => metrics.inherent_data_requests.with_label_values(&["succeded"]).inc(),
Err(()) => metrics.inherent_data_requests.with_label_values(&["failed"]).inc(),
}
}
}
Expand Down
69 changes: 58 additions & 11 deletions node/network/availability-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ use polkadot_subsystem::{
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem,
SubsystemContext, SubsystemError,
};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, View, ReputationChange as Rep, PeerId,
NetworkBridgeEvent,
Expand Down Expand Up @@ -292,6 +295,7 @@ async fn handle_network_msg<Context>(
ctx: &mut Context,
keystore: KeyStorePtr,
state: &mut ProtocolState,
metrics: &Metrics,
bridge_message: NetworkBridgeEvent<protocol_v1::AvailabilityDistributionMessage>,
) -> Result<()>
where
Expand All @@ -307,18 +311,18 @@ where
state.peer_views.remove(&peerid);
}
NetworkBridgeEvent::PeerViewChange(peerid, view) => {
handle_peer_view_change(ctx, state, peerid, view).await?;
handle_peer_view_change(ctx, state, peerid, view, metrics).await?;
}
NetworkBridgeEvent::OurViewChange(view) => {
handle_our_view_change(ctx, keystore, state, view).await?;
handle_our_view_change(ctx, keystore, state, view, metrics).await?;
}
NetworkBridgeEvent::PeerMessage(remote, msg) => {
let gossiped_availability = match msg {
protocol_v1::AvailabilityDistributionMessage::Chunk(candidate_hash, chunk) =>
AvailabilityGossipMessage { candidate_hash, erasure_chunk: chunk }
};

process_incoming_peer_message(ctx, state, remote, gossiped_availability).await?;
process_incoming_peer_message(ctx, state, remote, gossiped_availability, metrics).await?;
}
}
Ok(())
Expand All @@ -331,6 +335,7 @@ async fn handle_our_view_change<Context>(
keystore: KeyStorePtr,
state: &mut ProtocolState,
view: View,
metrics: &Metrics,
) -> Result<()>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
Expand Down Expand Up @@ -426,7 +431,7 @@ where
erasure_chunk,
};

send_tracked_gossip_message_to_peers(ctx, per_candidate, peers, message).await?;
send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await?;
}
}

Expand All @@ -442,31 +447,34 @@ where
async fn send_tracked_gossip_message_to_peers<Context>(
ctx: &mut Context,
per_candidate: &mut PerCandidate,
metrics: &Metrics,
peers: Vec<PeerId>,
message: AvailabilityGossipMessage,
) -> Result<()>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
send_tracked_gossip_messages_to_peers(ctx, per_candidate, peers, iter::once(message)).await
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await
}

#[inline(always)]
async fn send_tracked_gossip_messages_to_peer<Context>(
ctx: &mut Context,
per_candidate: &mut PerCandidate,
metrics: &Metrics,
peer: PeerId,
message_iter: impl IntoIterator<Item = AvailabilityGossipMessage>,
) -> Result<()>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
send_tracked_gossip_messages_to_peers(ctx, per_candidate, vec![peer], message_iter).await
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![peer], message_iter).await
}

async fn send_tracked_gossip_messages_to_peers<Context>(
ctx: &mut Context,
per_candidate: &mut PerCandidate,
metrics: &Metrics,
peers: Vec<PeerId>,
message_iter: impl IntoIterator<Item = AvailabilityGossipMessage>,
) -> Result<()>
Expand Down Expand Up @@ -503,6 +511,8 @@ where
))
.await
.map_err::<Error, _>(Into::into)?;

metrics.on_chunk_distributed();
}

Ok(())
Expand All @@ -515,6 +525,7 @@ async fn handle_peer_view_change<Context>(
state: &mut ProtocolState,
origin: PeerId,
view: View,
metrics: &Metrics,
) -> Result<()>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
Expand Down Expand Up @@ -560,7 +571,7 @@ where
.cloned()
.collect::<HashSet<_>>();

send_tracked_gossip_messages_to_peer(ctx, per_candidate, origin.clone(), messages).await?;
send_tracked_gossip_messages_to_peer(ctx, per_candidate, metrics, origin.clone(), messages).await?;
}
Ok(())
}
Expand Down Expand Up @@ -588,6 +599,7 @@ async fn process_incoming_peer_message<Context>(
state: &mut ProtocolState,
origin: PeerId,
message: AvailabilityGossipMessage,
metrics: &Metrics,
) -> Result<()>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
Expand Down Expand Up @@ -694,22 +706,24 @@ where
.collect::<Vec<_>>();

// gossip that message to interested peers
send_tracked_gossip_message_to_peers(ctx, per_candidate, peers, message).await
send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await
}

/// The bitfield distribution subsystem.
pub struct AvailabilityDistributionSubsystem {
/// Pointer to a keystore, which is required for determining this nodes validator index.
keystore: KeyStorePtr,
/// Prometheus metrics.
metrics: Metrics,
}

impl AvailabilityDistributionSubsystem {
/// Number of ancestors to keep around for the relay-chain heads.
const K: usize = 3;

/// Create a new instance of the availability distribution.
pub fn new(keystore: KeyStorePtr) -> Self {
Self { keystore }
pub fn new(keystore: KeyStorePtr, metrics: Metrics) -> Self {
Self { keystore, metrics }
}

/// Start processing work as passed on from the Overseer.
Expand All @@ -729,7 +743,8 @@ impl AvailabilityDistributionSubsystem {
&mut ctx,
self.keystore.clone(),
&mut state,
event
&self.metrics,
event,
).await {
warn!(
target: TARGET,
Expand Down Expand Up @@ -1073,5 +1088,37 @@ where
}


#[derive(Clone)]
struct MetricsInner {
gossipped_availability_chunks: prometheus::Counter<prometheus::U64>,
}

/// Availability Distribution metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
fn on_chunk_distributed(&self) {
if let Some(metrics) = &self.0 {
metrics.gossipped_availability_chunks.inc();
}
}
}

impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> std::result::Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
gossipped_availability_chunks: prometheus::register(
prometheus::Counter::new(
"parachain_gossipped_availability_chunks_total",
"Number of availability chunks gossipped to other peers."
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}

#[cfg(test)]
mod tests;
2 changes: 1 addition & 1 deletion node/network/availability-distribution/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ fn test_harness<T: Future<Output = ()>>(

let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());

let subsystem = AvailabilityDistributionSubsystem::new(keystore);
let subsystem = AvailabilityDistributionSubsystem::new(keystore, Default::default());
let subsystem = subsystem.run(context);

let test_fut = test(TestHarness { virtual_overseer });
Expand Down
2 changes: 1 addition & 1 deletion node/network/bitfield-distribution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ codec = { package="parity-scale-codec", version = "1.3.4" }
node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-subsystem-util = { package = "polkadot-node-subsystem-util", path = "../../subsystem-util" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-network-bridge = { path = "../../network/bridge" }
polkadot-node-network-protocol = { path = "../../network/protocol" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
Expand Down
Loading