Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Metrics
  • Loading branch information
montekki committed Sep 21, 2020
commit fff2ff7c4b7952700b2fb5163e69f7fc21d8d89c
74 changes: 69 additions & 5 deletions node/network/collator-protocol/src/collator_side.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use polkadot_subsystem::{
AllMessages, CollatorProtocolMessage, RuntimeApiMessage, RuntimeApiRequest,
NetworkBridgeMessage,
},
metrics::{self, prometheus},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, View, PeerId, PeerSet, NetworkBridgeEvent, RequestId,
Expand All @@ -38,6 +39,54 @@ use polkadot_node_subsystem_util::{
request_validator_groups_ctx,
};

#[derive(Clone, Default)]
pub(super) struct Metrics(Option<MetricsInner>);

impl Metrics {
fn on_advertisment_made(&self) {
if let Some(metrics) = &self.0 {
metrics.advertisments_made.inc();
}
}

fn on_collation_sent(&self) {
if let Some(metrics) = &self.0 {
metrics.collations_sent.inc();
}
}
}

#[derive(Clone)]
struct MetricsInner {
advertisments_made: prometheus::Counter<prometheus::U64>,
collations_sent: prometheus::Counter<prometheus::U64>,
}

impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry)
-> std::result::Result<Self, prometheus::PrometheusError>
{
let metrics = MetricsInner {
advertisments_made: prometheus::register(
prometheus::Counter::new(
"advertisments_made_total",
"A number of advertisments sent to validators.",
)?,
registry,
)?,
collations_sent: prometheus::register(
prometheus::Counter::new(
"collations_sent_total",
"A number of collations sent to validators.",
)?,
registry,
)?,
};

Ok(Metrics(Some(metrics)))
}
}

#[derive(Default)]
struct State {
/// Our id.
Expand Down Expand Up @@ -69,6 +118,9 @@ struct State {
/// Entries in this map will be cleared as validator groups in `our_validator_groups`
/// go out of scope with their respective deactivated leafs.
known_validators: HashMap<PeerId, ValidatorId>,

/// Metrics.
metrics: Metrics,
}

/// Distribute a collation.
Expand Down Expand Up @@ -287,6 +339,8 @@ where
)
)).await?;

state.metrics.on_advertisment_made();

Ok(())
}

Expand Down Expand Up @@ -367,6 +421,7 @@ where
/// Issue a response to a previously requested collation.
async fn send_collation<Context>(
ctx: &mut Context,
state: &mut State,
request_id: RequestId,
origin: PeerId,
receipt: CandidateReceipt,
Expand All @@ -388,6 +443,8 @@ where
)
)).await?;

state.metrics.on_collation_sent();

Ok(())
}

Expand Down Expand Up @@ -421,7 +478,7 @@ where
Some(our_para_id) => {
if our_para_id == para_id {
if let Some(collation) = state.collations.get(&relay_parent).cloned() {
send_collation(ctx, request_id, origin, collation.0, collation.1).await?;
send_collation(ctx, state, request_id, origin, collation.0, collation.1).await?;
}
} else {
warn!(
Expand Down Expand Up @@ -555,14 +612,21 @@ async fn handle_our_view_change(
}

/// The collator protocol collator side main loop.
pub(crate) async fn run<Context>(mut ctx: Context, our_id: CollatorId) -> Result<()>
pub(crate) async fn run<Context>(
mut ctx: Context,
our_id: CollatorId,
metrics: Metrics,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
use FromOverseer::*;
use OverseerSignal::*;

let mut state = State::default();
let mut state = State {
metrics,
..Default::default()
};

state.our_id = our_id;

Expand Down Expand Up @@ -597,7 +661,7 @@ mod tests {
};
use polkadot_subsystem::ActiveLeavesUpdate;
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem_testhelpers::{self as test_helpers};
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_node_network_protocol::ObservedRole;

#[derive(Default)]
Expand Down Expand Up @@ -719,7 +783,7 @@ mod tests {

let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());

let subsystem = run(context, collator_id);
let subsystem = run(context, collator_id, Metrics::default());

let test_fut = test(TestHarness { virtual_overseer });

Expand Down
55 changes: 35 additions & 20 deletions node/network/collator-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,28 @@ enum Error {
RuntimeApi(RuntimeApiError),
#[from]
UtilError(util::Error),
#[from]
Prometheus(prometheus::PrometheusError),
}

type Result<T> = std::result::Result<T, Error>;

enum ProtocolSide {
Validator,
Collator(CollatorId),
Validator(validator_side::Metrics),
Collator(CollatorId, collator_side::Metrics),
}

/// Metrics.
#[derive(Clone, Default)]
pub struct Metrics;

// TODO: So how do we tie this to metrics in modules?.
impl metrics::Metrics for Metrics {
fn try_register(_registry: &prometheus::Registry)
-> std::result::Result<Self, prometheus::PrometheusError>
{
Ok(Self)
}
}

/// The collator protocol subsystem.
Expand All @@ -71,39 +86,39 @@ impl CollatorProtocolSubsystem {
/// Start the collator protocol.
/// If `id` is `Some` this is a collator side of the protocol.
/// If `id` is `None` this is a validator side of the protocol.
pub fn new(id: Option<CollatorId>) -> Self {
/// Caller must provide a registry for prometheus metrics.
pub fn new(id: Option<CollatorId>, registry: &prometheus::Registry)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think &Registry is not always available, real_overseer accepts Option<&Registry>. we could do the same and call register function instead of try_register.

Wouldn't it be better to make ProtocolSide public and pass it here instead?

ProtocolSide could have two constructors:

impl ProtocolSide {
    pub fn validator(registry: Option<&Registry>) -> Result<Self> {
        ...
    }
    pub fn collator(id: CollatorId, registry: Option<&Registry>) -> Result<Self> {
        ...
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the original PR there were grumbles agains it

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But how was it resolved? We still are making a choice at startup depending on whether id is None or not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I see no other options

-> std::result::Result<Self, prometheus::PrometheusError> {
use metrics::Metrics;
let protocol_side = match id {
Some(id) => ProtocolSide::Collator(id),
None => ProtocolSide::Validator,
Some(id) => ProtocolSide::Collator(id, collator_side::Metrics::try_register(registry)?),
None => ProtocolSide::Validator(validator_side::Metrics::try_register(registry)?),
};

Self {
Ok(Self {
protocol_side,
}
})
}

async fn run<Context>(self, ctx: Context) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
match self.protocol_side {
ProtocolSide::Validator => validator_side::run(ctx, REQUEST_TIMEOUT).await,
ProtocolSide::Collator(id) => collator_side::run(ctx, id).await,
ProtocolSide::Validator(metrics) => validator_side::run(
ctx,
REQUEST_TIMEOUT,
metrics,
).await,
ProtocolSide::Collator(id, metrics) => collator_side::run(
ctx,
id,
metrics,
).await,
}
}
}

/// Collator protocol metrics.
#[derive(Default, Clone)]
pub struct Metrics;

impl metrics::Metrics for Metrics {
fn try_register(_registry: &prometheus::Registry)
-> std::result::Result<Self, prometheus::PrometheusError> {
Ok(Metrics)
}
}

impl<Context> Subsystem<Context> for CollatorProtocolSubsystem
where
Context: SubsystemContext<Message = CollatorProtocolMessage> + Sync + Send,
Expand Down
58 changes: 56 additions & 2 deletions node/network/collator-protocol/src/validator_side.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use polkadot_subsystem::{
messages::{
AllMessages, CandidateSelectionMessage, CollatorProtocolMessage, NetworkBridgeMessage,
},
metrics::{self, prometheus},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, View, PeerId, ReputationChange as Rep, RequestId,
Expand All @@ -48,6 +49,47 @@ const COST_REQUEST_TIMED_OUT: Rep = Rep::new(-20, "A collation request has timed
const COST_REPORT_BAD: Rep = Rep::new(-50, "A collator was reported by another subsystem");
const BENEFIT_NOTIFY_GOOD: Rep = Rep::new(50, "A collator was noted good by another subsystem");

#[derive(Clone, Default)]
pub(super) struct Metrics(Option<MetricsInner>);

impl Metrics {
fn on_request(&self, succeeded: bool) {
if let Some(metrics) = &self.0 {
if succeeded {
metrics.collation_requests.with_label_values(&["succeeded"]).inc();
} else {
metrics.collation_requests.with_label_values(&["failed"]).inc();
}
}
}
}

#[derive(Clone)]
struct MetricsInner {
collation_requests: prometheus::CounterVec<prometheus::U64>,
}

impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry)
-> std::result::Result<Self, prometheus::PrometheusError>
{
let metrics = MetricsInner {
collation_requests: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"collation_requests_total",
"Number of collations requested from Collators.",
),
&["succeeded", "failed"],
)?,
registry,
)?
};

Ok(Metrics(Some(metrics)))
}
}

#[derive(Debug)]
enum CollationRequestResult {
Received(RequestId),
Expand Down Expand Up @@ -134,6 +176,9 @@ struct State {

/// Possessed collations.
collations: HashMap<(Hash, ParaId), Vec<(CollatorId, CandidateReceipt, PoV)>>,

/// Metrics.
metrics: Metrics,
}

/// Another subsystem has requested to fetch collations on a particular leaf for some para.
Expand Down Expand Up @@ -291,6 +336,7 @@ where
let _ = per_request.received.send(());
if let Some(collator_id) = state.known_collators.get(&origin) {
let _ = per_request.result.send((receipt.clone(), pov.clone()));
state.metrics.on_request(true);

state.collations
.entry((relay_parent, para_id))
Expand All @@ -304,6 +350,7 @@ where
// This is tricky. If our chain has moved on, we have already canceled
// the relevant request and removed it from the map; so and we are not expecting
// this reply although technically it is not a malicious behaviur.
state.metrics.on_request(false);
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await?;
}

Expand Down Expand Up @@ -497,6 +544,8 @@ async fn request_timed_out<Context>(
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
state.metrics.on_request(false);

// We have to go backwards in the map, again.
if let Some(key) = find_val_in_map(&state.requested_collations, &id) {
if let Some(_) = state.requested_collations.remove(&key) {
Expand Down Expand Up @@ -595,7 +644,11 @@ where
}

/// The main run loop.
pub(crate) async fn run<Context>(mut ctx: Context, request_timeout: Duration) -> Result<()>
pub(crate) async fn run<Context>(
mut ctx: Context,
request_timeout: Duration,
metrics: Metrics,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
Expand All @@ -604,6 +657,7 @@ where

let mut state = State {
request_timeout,
metrics,
..Default::default()
};

Expand Down Expand Up @@ -707,7 +761,7 @@ mod tests {

let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());

let subsystem = run(context, Duration::from_millis(50));
let subsystem = run(context, Duration::from_millis(50), Metrics::default());

let test_fut = test(TestHarness { virtual_overseer });

Expand Down