Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 69 additions & 5 deletions node/network/collator-protocol/src/collator_side.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use polkadot_subsystem::{
AllMessages, CollatorProtocolMessage, RuntimeApiMessage, RuntimeApiRequest,
NetworkBridgeMessage,
},
metrics::{self, prometheus},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, View, PeerId, PeerSet, NetworkBridgeEvent, RequestId,
Expand All @@ -38,6 +39,54 @@ use polkadot_node_subsystem_util::{
request_validator_groups_ctx,
};

#[derive(Clone, Default)]
pub(super) struct Metrics(Option<MetricsInner>);

impl Metrics {
fn on_advertisment_made(&self) {
if let Some(metrics) = &self.0 {
metrics.advertisments_made.inc();
}
}

fn on_collation_sent(&self) {
if let Some(metrics) = &self.0 {
metrics.collations_sent.inc();
}
}
}

#[derive(Clone)]
struct MetricsInner {
advertisments_made: prometheus::Counter<prometheus::U64>,
collations_sent: prometheus::Counter<prometheus::U64>,
}

impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry)
-> std::result::Result<Self, prometheus::PrometheusError>
{
let metrics = MetricsInner {
advertisments_made: prometheus::register(
prometheus::Counter::new(
"parachain_advertisments_made_total",
"A number of advertisments sent to validators.",
)?,
registry,
)?,
collations_sent: prometheus::register(
prometheus::Counter::new(
"parachain_collations_sent_total",
"A number of collations sent to validators.",
)?,
registry,
)?,
};

Ok(Metrics(Some(metrics)))
}
}

#[derive(Default)]
struct State {
/// Our id.
Expand Down Expand Up @@ -69,6 +118,9 @@ struct State {
/// Entries in this map will be cleared as validator groups in `our_validator_groups`
/// go out of scope with their respective deactivated leafs.
known_validators: HashMap<PeerId, ValidatorId>,

/// Metrics.
metrics: Metrics,
}

/// Distribute a collation.
Expand Down Expand Up @@ -287,6 +339,8 @@ where
)
)).await?;

state.metrics.on_advertisment_made();

Ok(())
}

Expand Down Expand Up @@ -367,6 +421,7 @@ where
/// Issue a response to a previously requested collation.
async fn send_collation<Context>(
ctx: &mut Context,
state: &mut State,
request_id: RequestId,
origin: PeerId,
receipt: CandidateReceipt,
Expand All @@ -388,6 +443,8 @@ where
)
)).await?;

state.metrics.on_collation_sent();

Ok(())
}

Expand Down Expand Up @@ -421,7 +478,7 @@ where
Some(our_para_id) => {
if our_para_id == para_id {
if let Some(collation) = state.collations.get(&relay_parent).cloned() {
send_collation(ctx, request_id, origin, collation.0, collation.1).await?;
send_collation(ctx, state, request_id, origin, collation.0, collation.1).await?;
}
} else {
warn!(
Expand Down Expand Up @@ -555,14 +612,21 @@ async fn handle_our_view_change(
}

/// The collator protocol collator side main loop.
pub(crate) async fn run<Context>(mut ctx: Context, our_id: CollatorId) -> Result<()>
pub(crate) async fn run<Context>(
mut ctx: Context,
our_id: CollatorId,
metrics: Metrics,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
use FromOverseer::*;
use OverseerSignal::*;

let mut state = State::default();
let mut state = State {
metrics,
..Default::default()
};

state.our_id = our_id;

Expand Down Expand Up @@ -597,7 +661,7 @@ mod tests {
};
use polkadot_subsystem::ActiveLeavesUpdate;
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem_testhelpers::{self as test_helpers};
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_node_network_protocol::ObservedRole;

#[derive(Default)]
Expand Down Expand Up @@ -719,7 +783,7 @@ mod tests {

let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());

let subsystem = run(context, collator_id);
let subsystem = run(context, collator_id, Metrics::default());

let test_fut = test(TestHarness { virtual_overseer });

Expand Down
39 changes: 20 additions & 19 deletions node/network/collator-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@ enum Error {
RuntimeApi(RuntimeApiError),
#[from]
UtilError(util::Error),
#[from]
Prometheus(prometheus::PrometheusError),
}

type Result<T> = std::result::Result<T, Error>;

enum ProtocolSide {
Validator,
Collator(CollatorId),
Validator(validator_side::Metrics),
Collator(CollatorId, collator_side::Metrics),
}

/// The collator protocol subsystem.
Expand All @@ -71,10 +73,12 @@ impl CollatorProtocolSubsystem {
/// Start the collator protocol.
/// If `id` is `Some` this is a collator side of the protocol.
/// If `id` is `None` this is a validator side of the protocol.
pub fn new(id: Option<CollatorId>) -> Self {
/// Caller must provide a registry for prometheus metrics.
pub fn new(id: Option<CollatorId>, registry: Option<&prometheus::Registry>) -> Self {
use metrics::Metrics;
let protocol_side = match id {
Some(id) => ProtocolSide::Collator(id),
None => ProtocolSide::Validator,
Some(id) => ProtocolSide::Collator(id, collator_side::Metrics::register(registry)),
None => ProtocolSide::Validator(validator_side::Metrics::register(registry)),
};

Self {
Expand All @@ -87,28 +91,25 @@ impl CollatorProtocolSubsystem {
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
match self.protocol_side {
ProtocolSide::Validator => validator_side::run(ctx, REQUEST_TIMEOUT).await,
ProtocolSide::Collator(id) => collator_side::run(ctx, id).await,
ProtocolSide::Validator(metrics) => validator_side::run(
ctx,
REQUEST_TIMEOUT,
metrics,
).await,
ProtocolSide::Collator(id, metrics) => collator_side::run(
ctx,
id,
metrics,
).await,
}
}
}

/// Collator protocol metrics.
#[derive(Default, Clone)]
pub struct Metrics;

impl metrics::Metrics for Metrics {
fn try_register(_registry: &prometheus::Registry)
-> std::result::Result<Self, prometheus::PrometheusError> {
Ok(Metrics)
}
}

impl<Context> Subsystem<Context> for CollatorProtocolSubsystem
where
Context: SubsystemContext<Message = CollatorProtocolMessage> + Sync + Send,
{
type Metrics = Metrics;
type Metrics = ();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand why we expose the Metrics type in the trait if it's ok to just fake it and use a different type internally. I assume this is fine as @ordian has already approved this PR, but it may be worth considering whether we could remove type Metrics from trait Subsystem.

If on the other hand there's a reason to publish this type, then we really should be distributing it via an enum.


fn start(self, ctx: Context) -> SpawnedSubsystem {
SpawnedSubsystem {
Expand Down
76 changes: 69 additions & 7 deletions node/network/collator-protocol/src/validator_side.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use polkadot_subsystem::{
messages::{
AllMessages, CandidateSelectionMessage, CollatorProtocolMessage, NetworkBridgeMessage,
},
metrics::{self, prometheus},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, View, PeerId, ReputationChange as Rep, RequestId,
Expand All @@ -48,6 +49,47 @@ const COST_REQUEST_TIMED_OUT: Rep = Rep::new(-20, "A collation request has timed
const COST_REPORT_BAD: Rep = Rep::new(-50, "A collator was reported by another subsystem");
const BENEFIT_NOTIFY_GOOD: Rep = Rep::new(50, "A collator was noted good by another subsystem");

#[derive(Clone, Default)]
pub(super) struct Metrics(Option<MetricsInner>);

impl Metrics {
fn on_request(&self, succeeded: bool) {
if let Some(metrics) = &self.0 {
if succeeded {
metrics.collation_requests.with_label_values(&["succeeded"]).inc();
} else {
metrics.collation_requests.with_label_values(&["failed"]).inc();
}
}
}
}

#[derive(Clone)]
struct MetricsInner {
collation_requests: prometheus::CounterVec<prometheus::U64>,
}

impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry)
-> std::result::Result<Self, prometheus::PrometheusError>
{
let metrics = MetricsInner {
collation_requests: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_collation_requests_total",
"Number of collations requested from Collators.",
),
&["succeeded", "failed"],
)?,
registry,
)?
};

Ok(Metrics(Some(metrics)))
}
}

#[derive(Debug)]
enum CollationRequestResult {
Received(RequestId),
Expand Down Expand Up @@ -134,6 +176,14 @@ struct State {

/// Possessed collations.
collations: HashMap<(Hash, ParaId), Vec<(CollatorId, CandidateReceipt, PoV)>>,

/// Leaves have recently moved out of scope.
/// These are looked into when we receive previously requested collations that we
/// are no longer interested in.
recently_removed_heads: HashSet<Hash>,

/// Metrics.
metrics: Metrics,
}

/// Another subsystem has requested to fetch collations on a particular leaf for some para.
Expand Down Expand Up @@ -291,6 +341,7 @@ where
let _ = per_request.received.send(());
if let Some(collator_id) = state.known_collators.get(&origin) {
let _ = per_request.result.send((receipt.clone(), pov.clone()));
state.metrics.on_request(true);

state.collations
.entry((relay_parent, para_id))
Expand All @@ -300,11 +351,11 @@ where
}
}
} else {
// TODO: https://github.com/paritytech/polkadot/issues/1694
// This is tricky. If our chain has moved on, we have already canceled
// the relevant request and removed it from the map; so and we are not expecting
// this reply although technically it is not a malicious behaviur.
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await?;
// If this collation is not just a delayed one that we were expecting,
// but our view has moved on, in that case modify peer's reputation.
if !state.recently_removed_heads.contains(&relay_parent) {
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await?;
}
}

Ok(())
Expand Down Expand Up @@ -481,7 +532,11 @@ async fn handle_our_view_change(
.cloned()
.collect::<Vec<_>>();

// Update the set of recently removed chain heads.
state.recently_removed_heads.clear();

for removed in removed.into_iter() {
state.recently_removed_heads.insert(removed.clone());
remove_relay_parent(state, removed).await?;
}

Expand All @@ -497,6 +552,8 @@ async fn request_timed_out<Context>(
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
state.metrics.on_request(false);

// We have to go backwards in the map, again.
if let Some(key) = find_val_in_map(&state.requested_collations, &id) {
if let Some(_) = state.requested_collations.remove(&key) {
Expand Down Expand Up @@ -595,7 +652,11 @@ where
}

/// The main run loop.
pub(crate) async fn run<Context>(mut ctx: Context, request_timeout: Duration) -> Result<()>
pub(crate) async fn run<Context>(
mut ctx: Context,
request_timeout: Duration,
metrics: Metrics,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
Expand All @@ -604,6 +665,7 @@ where

let mut state = State {
request_timeout,
metrics,
..Default::default()
};

Expand Down Expand Up @@ -707,7 +769,7 @@ mod tests {

let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());

let subsystem = run(context, Duration::from_millis(50));
let subsystem = run(context, Duration::from_millis(50), Metrics::default());

let test_fut = test(TestHarness { virtual_overseer });

Expand Down