Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
7ce7057
feat(kad): add limit option for getting providers
dignifiedquire Jun 16, 2022
769ce4b
feat(kad): report get_providers call event based
dignifiedquire Jun 16, 2022
f68c903
change `GetRecord`api
dignifiedquire Sep 27, 2022
baf3e60
apply cr
dignifiedquire Oct 4, 2022
5db1bfa
tests and fixups for getclosestpeers
dignifiedquire Nov 11, 2022
91b5f59
remove KademliaCaching
dignifiedquire Nov 11, 2022
30eb56c
apply cr: move to nonzerousize
dignifiedquire Nov 11, 2022
0720deb
fix example
dignifiedquire Nov 11, 2022
a0b26b0
happy clippy
dignifiedquire Nov 11, 2022
e0e79fd
protocols/kad: Refactor step tracking
mxinden Nov 11, 2022
a4f0210
cr feedback round 1
dignifiedquire Nov 12, 2022
4e448c4
cr: improve api for GetRecordOk and GetProvidersOk
dignifiedquire Nov 12, 2022
96a952e
fixup rebase of examples
dignifiedquire Nov 12, 2022
2724afd
switch to counter for number of observed record
dignifiedquire Nov 12, 2022
83a2a86
bring back kademliacaching
dignifiedquire Nov 12, 2022
b483983
examples/file-sharing: Revert usage of HashSet
mxinden Nov 17, 2022
aa8a6ce
examples/file-sharing: Finish query once provider is found
mxinden Nov 17, 2022
3912acc
protocols/kad: Remove pub(crate) from replication_factor
mxinden Nov 17, 2022
e339cdf
protocols/kad: Refactor get_record
mxinden Nov 17, 2022
3ced598
protocols/kad: Refactor get_providers step instantiation
mxinden Nov 17, 2022
ac2e525
protocols/kad: Remove pub from ProgressStep methods
mxinden Nov 17, 2022
5ca8d70
remove unused as_intermediary_result
dignifiedquire Nov 18, 2022
bd05da6
fix test cr comments
dignifiedquire Nov 18, 2022
c7c4341
fixup: rebase
dignifiedquire Nov 18, 2022
5ecf9cb
use get instead of checked_add
dignifiedquire Nov 18, 2022
055636e
Update protocols/kad/src/behaviour.rs
dignifiedquire Nov 18, 2022
d5cb7e9
Merge branch 'master' into feat-kad-count
dignifiedquire Nov 22, 2022
0e443d4
Merge remote-tracking branch 'upstream/master' into feat-kad-count
dignifiedquire Nov 24, 2022
c9c00df
add changelog
dignifiedquire Nov 24, 2022
05e29d4
Update protocols/kad/CHANGELOG.md
dignifiedquire Nov 24, 2022
1223f02
Update protocols/kad/CHANGELOG.md
dignifiedquire Nov 24, 2022
9c28d98
Update protocols/kad/CHANGELOG.md
dignifiedquire Nov 24, 2022
cfd5461
Merge branch 'master' into feat-kad-count
dignifiedquire Nov 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
cr feedback round 1
  • Loading branch information
dignifiedquire committed Nov 18, 2022
commit a4f0210ced4ad7c24e3218469cdf19ef74e00f81
143 changes: 46 additions & 97 deletions examples/distributed-key-value-store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use libp2p::{
swarm::{NetworkBehaviour, SwarmEvent},
PeerId, Swarm,
};
use libp2p_kad::GetProvidersOk;
use libp2p_kad::{GetProvidersOk, GetRecordOk};
use std::error::Error;

#[async_std::main]
Expand Down Expand Up @@ -111,120 +111,69 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Kick it off.
loop {
select! {
line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")),
event = swarm.select_next_some() => match event {
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening in {address:?}");
},
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => {
for (peer_id, multiaddr) in list {
swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);
line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")),
event = swarm.select_next_some() => match event {
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening in {:?}", address);
},
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(MdnsEvent::Discovered(list))) => {
for (peer_id, multiaddr) in list {
swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);
}
}
}
SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryProgressed { result, ..})) => {
match result {
QueryResult::GetProviders(Ok(GetProvidersOk { key, providers, .. })) => {
for peer in providers {
println!(
"Peer {:?} provides key {:?}",
peer,
std::str::from_utf8(key.as_ref()).unwrap()
);
SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryProgressed { result, ..})) => {
match result {
QueryResult::GetProviders(Ok(GetProvidersOk { key, providers, .. })) => {
for peer in providers {
println!(
"Peer {:?} provides key {:?}",
peer,
std::str::from_utf8(key.as_ref()).unwrap()
);
}
}
}
QueryResult::GetProviders(Err(err)) => {
eprintln!("Failed to get providers: {:?}", err);
}
QueryResult::GetRecord(Ok(ok)) => {
for PeerRecord {
record: Record { key, value, .. },
..
} in ok.records
{
QueryResult::GetProviders(Err(err)) => {
eprintln!("Failed to get providers: {:?}", err);
}
QueryResult::GetRecord(Ok(
GetRecordOk::FoundRecord(PeerRecord {
record: Record { key, value, .. },
..
})
)) => {
println!(
"Got record {:?} {:?}",
std::str::from_utf8(key.as_ref()).unwrap(),
std::str::from_utf8(&value).unwrap(),
);
}
}
QueryResult::GetRecord(Err(err)) => {
eprintln!("Failed to get record: {:?}", err);
}
QueryResult::PutRecord(Ok(PutRecordOk { key })) => {
println!(
"Successfully put record {:?}",
std::str::from_utf8(key.as_ref()).unwrap()
);
}
QueryResult::PutRecord(Err(err)) => {
eprintln!("Failed to put record: {:?}", err);
}
QueryResult::StartProviding(Ok(AddProviderOk { key })) => {
println!(
"Successfully put provider record {:?}",
std::str::from_utf8(key.as_ref()).unwrap()
);
}
QueryResult::StartProviding(Err(err)) => {
eprintln!("Failed to put provider record: {:?}", err);
}
_ => {}
}
}
SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryProgressed { result, ..})) => {
match result {
QueryResult::GetProviders(Ok(GetProvidersOk { key, providers, .. })) => {
for peer in providers {
QueryResult::GetRecord(Ok(_)) => {}
QueryResult::GetRecord(Err(err)) => {
eprintln!("Failed to get record: {:?}", err);
}
QueryResult::PutRecord(Ok(PutRecordOk { key })) => {
println!(
"Peer {:?} provides key {:?}",
peer,
"Successfully put record {:?}",
std::str::from_utf8(key.as_ref()).unwrap()
);
}
}
QueryResult::GetProviders(Err(err)) => {
eprintln!("Failed to get providers: {:?}", err);
}
QueryResult::GetRecord(Ok(ok)) => {
for PeerRecord {
record: Record { key, value, .. },
..
} in ok.records
{
QueryResult::PutRecord(Err(err)) => {
eprintln!("Failed to put record: {:?}", err);
}
QueryResult::StartProviding(Ok(AddProviderOk { key })) => {
println!(
"Got record {:?} {:?}",
std::str::from_utf8(key.as_ref()).unwrap(),
std::str::from_utf8(&value).unwrap(),
"Successfully put provider record {:?}",
std::str::from_utf8(key.as_ref()).unwrap()
);
}
QueryResult::StartProviding(Err(err)) => {
eprintln!("Failed to put provider record: {:?}", err);
}
_ => {}
}
QueryResult::GetRecord(Err(err)) => {
eprintln!("Failed to get record: {:?}", err);
}
QueryResult::PutRecord(Ok(PutRecordOk { key })) => {
println!(
"Successfully put record {:?}",
std::str::from_utf8(key.as_ref()).unwrap()
);
}
QueryResult::PutRecord(Err(err)) => {
eprintln!("Failed to put record: {:?}", err);
}
QueryResult::StartProviding(Ok(AddProviderOk { key })) => {
println!(
"Successfully put provider record {:?}",
std::str::from_utf8(key.as_ref()).unwrap()
);
}
QueryResult::StartProviding(Err(err)) => {
eprintln!("Failed to put provider record: {:?}", err);
}
_ => {}
}
_ => {}
}
_ => {}
}
}
}

Expand Down
9 changes: 5 additions & 4 deletions misc/metrics/src/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use libp2p_kad::GetProvidersOk;
use libp2p_kad::{GetProvidersOk, GetRecordOk};
use prometheus_client::encoding::text::Encode;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
Expand Down Expand Up @@ -181,9 +181,10 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {

match result {
libp2p_kad::QueryResult::GetRecord(result) => match result {
Ok(ok) => self
.query_result_get_record_ok
.observe(ok.record.as_ref().map(|_| 1).unwrap_or_default() as f64),
Ok(GetRecordOk::FoundRecord(_)) => {
self.query_result_get_record_ok.observe(1.);
}
Ok(GetRecordOk::NoAdditionalRecord) => {}
Err(error) => {
self.query_result_get_record_error
.get_or_create(&error.into())
Expand Down
27 changes: 17 additions & 10 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ where
.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::GetRecord(Ok(GetRecordOk { record })),
result: QueryResult::GetRecord(Ok(record.into())),
step,
stats,
},
Expand Down Expand Up @@ -1348,9 +1348,7 @@ where
step.last = true;

let results = if found_a_record {
Ok(GetRecordOk {
record: Default::default(),
})
Ok(GetRecordOk::NoAdditionalRecord)
} else {
Err(GetRecordError::NotFound {
key,
Expand Down Expand Up @@ -2194,9 +2192,9 @@ where
.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::OutboundQueryProgressed {
id: user_data,
result: QueryResult::GetRecord(Ok(GetRecordOk {
record: Some(record),
})),
result: QueryResult::GetRecord(Ok(
GetRecordOk::FoundRecord(record),
)),
step: step.clone(),
stats,
},
Expand Down Expand Up @@ -2616,9 +2614,18 @@ pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;

/// The successful result of [`Kademlia::get_record`].
#[derive(Debug, Clone)]
pub struct GetRecordOk {
/// The record found in this iteration, including the peer that returned it.
pub record: Option<PeerRecord>,
pub enum GetRecordOk {
FoundRecord(PeerRecord),
NoAdditionalRecord,
}

impl From<Option<PeerRecord>> for GetRecordOk {
fn from(r: Option<PeerRecord>) -> Self {
match r {
Some(r) => GetRecordOk::FoundRecord(r),
None => GetRecordOk::NoAdditionalRecord,
}
}
}

/// The error result of [`Kademlia::get_record`].
Expand Down
30 changes: 16 additions & 14 deletions protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,9 @@ fn query_iter() {

match swarms[0].behaviour_mut().query(&qid) {
Some(q) => match q.info() {
QueryInfo::GetClosestPeers { key, count } => {
QueryInfo::GetClosestPeers { key, step } => {
assert_eq!(&key[..], search_target.to_bytes().as_slice());
assert_eq!(*count, 0); // no result reported yet
assert_eq!(usize::from(step.count), 1);
}
i => panic!("Unexpected query info: {:?}", i),
},
Expand Down Expand Up @@ -768,17 +768,20 @@ fn get_record() {
Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::GetRecord(Ok(GetRecordOk { record: r })),
result: QueryResult::GetRecord(Ok(r)),
step: ProgressStep { count, last },
..
},
))) => {
assert_eq!(id, qid);
if usize::from(count) == 1 {
assert!(r.is_none());
assert!(matches!(r, GetRecordOk::NoAdditionalRecord));
}
if usize::from(count) == 2 {
assert_eq!(r.expect("missing record").record, record);
assert!(matches!(r, GetRecordOk::FoundRecord(_)));
if let GetRecordOk::FoundRecord(r) = r {
assert_eq!(r.record, record);
}
}
if last {
assert_eq!(usize::from(count), 3);
Expand Down Expand Up @@ -827,13 +830,13 @@ fn get_record_many() {
Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::GetRecord(Ok(GetRecordOk { record: r, .. })),
result: QueryResult::GetRecord(Ok(r)),
step: ProgressStep { count: _, last },
..
},
))) => {
assert_eq!(id, qid);
if let Some(r) = r {
if let GetRecordOk::FoundRecord(r) = r {
assert_eq!(r.record, record);
records.push(r);
}
Expand Down Expand Up @@ -1149,11 +1152,10 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
);
}
match result {
Ok(GetRecordOk { record, .. }) => {
if let Some(record) = record {
assert_eq!(record.peer, Some(addr_trudy));
}
Ok(GetRecordOk::FoundRecord(r)) => {
assert_eq!(r.peer, Some(addr_trudy));
}
Ok(_) => {}
Err(e) => panic!("{:?}", e),
}
}
Expand All @@ -1177,8 +1179,8 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
.queries
.iter()
.for_each(|q| match &q.inner.info {
QueryInfo::GetRecord { count, .. } => {
assert_eq!(*count, 2);
QueryInfo::GetRecord { step, .. } => {
assert_eq!(usize::from(step.count), 3);
}
i => panic!("Unexpected query info: {:?}", i),
});
Expand All @@ -1202,7 +1204,7 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
}
match result {
Ok(ok) => {
if let Some(record) = ok.record {
if let GetRecordOk::FoundRecord(record) = ok {
records.push(record);
}
if records.len() == 1 {
Expand Down
3 changes: 2 additions & 1 deletion protocols/kad/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ pub use behaviour::{
QueryResult, QueryStats,
};
pub use behaviour::{
Kademlia, KademliaBucketInserts, KademliaConfig, KademliaEvent, KademliaStoreInserts, Quorum,
Kademlia, KademliaBucketInserts, KademliaConfig, KademliaEvent, KademliaStoreInserts,
ProgressStep, Quorum,
};
pub use protocol::KadConnectionType;
pub use query::QueryId;
Expand Down