Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,9 @@ pub enum BehaviourOut<B: BlockT> {
messages: Vec<(ConsensusEngineId, Bytes)>,
},

/// Event generated by a DHT.
Dht(DhtEvent),
/// Events generated by a DHT as a response to get_value or put_value requests as well as the
/// request duration.
Dht(DhtEvent, Duration),
}

impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
Expand Down Expand Up @@ -454,17 +455,17 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<DiscoveryOut>
DiscoveryOut::Discovered(peer_id) => {
self.substrate.add_discovered_nodes(iter::once(peer_id));
}
DiscoveryOut::ValueFound(results) => {
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValueFound(results)));
DiscoveryOut::ValueFound(results, duration) => {
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValueFound(results), duration));
}
DiscoveryOut::ValueNotFound(key) => {
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValueNotFound(key)));
DiscoveryOut::ValueNotFound(key, duration) => {
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValueNotFound(key), duration));
}
DiscoveryOut::ValuePut(key) => {
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValuePut(key)));
DiscoveryOut::ValuePut(key, duration) => {
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValuePut(key), duration));
}
DiscoveryOut::ValuePutFailed(key) => {
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValuePutFailed(key)));
DiscoveryOut::ValuePutFailed(key, duration) => {
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValuePutFailed(key), duration));
}
DiscoveryOut::RandomKademliaStarted(protocols) => {
for protocol in protocols {
Expand Down
34 changes: 21 additions & 13 deletions client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl DiscoveryBehaviour {
for k in self.kademlias.values_mut() {
if let Err(e) = k.put_record(Record::new(key.clone(), value.clone()), Quorum::All) {
warn!(target: "sub-libp2p", "Libp2p => Failed to put record: {:?}", e);
self.pending_events.push_back(DiscoveryOut::ValuePutFailed(key.clone()));
self.pending_events.push_back(DiscoveryOut::ValuePutFailed(key.clone(), Duration::from_secs(0)));
}
}
}
Expand Down Expand Up @@ -379,17 +379,25 @@ pub enum DiscoveryOut {
/// the `identify` protocol.
UnroutablePeer(PeerId),

/// The DHT yielded results for the record request, grouped in (key, value) pairs.
ValueFound(Vec<(record::Key, Vec<u8>)>),
/// The DHT yielded results for the record request.
///
/// Returning the result grouped in (key, value) pairs as well as the request duration..
ValueFound(Vec<(record::Key, Vec<u8>)>, Duration),

/// The record requested was not found in the DHT.
ValueNotFound(record::Key),
///
/// Returning the corresponding key as well as the request duration.
ValueNotFound(record::Key, Duration),

/// The record with a given key was successfully inserted into the DHT.
ValuePut(record::Key),
///
/// Returning the corresponding key as well as the request duration.
ValuePut(record::Key, Duration),

/// Inserting a value into the DHT failed.
ValuePutFailed(record::Key),
///
/// Returning the corresponding key as well as the request duration.
ValuePutFailed(record::Key, Duration),

/// Started a random Kademlia query for each DHT identified by the given `ProtocolId`s.
RandomKademliaStarted(Vec<ProtocolId>),
Expand Down Expand Up @@ -620,36 +628,36 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
}
}
KademliaEvent::QueryResult { result: QueryResult::GetRecord(res), .. } => {
KademliaEvent::QueryResult { result: QueryResult::GetRecord(res), stats, .. } => {
let ev = match res {
Ok(ok) => {
let results = ok.records
.into_iter()
.map(|r| (r.record.key, r.record.value))
.collect();

DiscoveryOut::ValueFound(results)
DiscoveryOut::ValueFound(results, stats.duration().unwrap_or_else(Default::default))
}
Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
trace!(target: "sub-libp2p",
"Libp2p => Failed to get record: {:?}", e);
DiscoveryOut::ValueNotFound(e.into_key())
DiscoveryOut::ValueNotFound(e.into_key(), stats.duration().unwrap_or_else(Default::default))
}
Err(e) => {
warn!(target: "sub-libp2p",
"Libp2p => Failed to get record: {:?}", e);
DiscoveryOut::ValueNotFound(e.into_key())
DiscoveryOut::ValueNotFound(e.into_key(), stats.duration().unwrap_or_else(Default::default))
}
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::QueryResult { result: QueryResult::PutRecord(res), .. } => {
KademliaEvent::QueryResult { result: QueryResult::PutRecord(res), stats, .. } => {
let ev = match res {
Ok(ok) => DiscoveryOut::ValuePut(ok.key),
Ok(ok) => DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_else(Default::default)),
Err(e) => {
warn!(target: "sub-libp2p",
"Libp2p => Failed to put record: {:?}", e);
DiscoveryOut::ValuePutFailed(e.into_key())
DiscoveryOut::ValuePutFailed(e.into_key(), stats.duration().unwrap_or_else(Default::default))
}
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
Expand Down
28 changes: 26 additions & 2 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
ExHashT, NetworkStateInfo,
behaviour::{Behaviour, BehaviourOut},
config::{parse_addr, parse_str_addr, NonReservedPeerMode, Params, Role, TransportConfig},
DhtEvent,
discovery::DiscoveryConfig,
error::Error,
network_state::{
Expand Down Expand Up @@ -1119,6 +1120,7 @@ struct Metrics {
incoming_connections_total: Counter<U64>,
is_major_syncing: Gauge<U64>,
issued_light_requests: Counter<U64>,
kademlia_query_duration: HistogramVec,
kademlia_random_queries_total: CounterVec<U64>,
kademlia_records_count: GaugeVec<U64>,
kademlia_records_sizes_total: GaugeVec<U64>,
Expand Down Expand Up @@ -1196,6 +1198,17 @@ impl Metrics {
"issued_light_requests",
"Number of light client requests that our node has issued.",
)?, registry)?,
kademlia_query_duration: register(HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"sub_libp2p_kademlia_query_duration",
"Duration of Kademlia queries per protocol and query type"
),
buckets: prometheus_endpoint::exponential_buckets(0.5, 2.0, 10)
.expect("parameters are always valid values; qed"),
},
&["type"]
)?, registry)?,
kademlia_random_queries_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_kademlia_random_queries_total",
Expand Down Expand Up @@ -1508,8 +1521,19 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
messages,
});
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Dht(ev))) => {
this.event_streams.send(Event::Dht(ev));
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration))) => {
if let Some(metrics) = this.metrics.as_ref() {
let query_type = match event {
DhtEvent::ValueFound(_) => "value-found",
DhtEvent::ValueNotFound(_) => "value-not-found",
DhtEvent::ValuePut(_) => "value-put",
DhtEvent::ValuePutFailed(_) => "value-put-failed",
};
metrics.kademlia_query_duration.with_label_values(&[query_type])
.observe(duration.as_secs_f64());
}

this.event_streams.send(Event::Dht(event));
},
Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, endpoint, num_established }) => {
trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id);
Expand Down