Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
7ce7057
feat(kad): add limit option for getting providers
dignifiedquire Jun 16, 2022
769ce4b
feat(kad): report get_providers call event based
dignifiedquire Jun 16, 2022
f68c903
change `GetRecord`api
dignifiedquire Sep 27, 2022
baf3e60
apply cr
dignifiedquire Oct 4, 2022
5db1bfa
tests and fixups for getclosestpeers
dignifiedquire Nov 11, 2022
91b5f59
remove KademliaCaching
dignifiedquire Nov 11, 2022
30eb56c
apply cr: move to nonzerousize
dignifiedquire Nov 11, 2022
0720deb
fix example
dignifiedquire Nov 11, 2022
a0b26b0
happy clippy
dignifiedquire Nov 11, 2022
e0e79fd
protocols/kad: Refactor step tracking
mxinden Nov 11, 2022
a4f0210
cr feedback round 1
dignifiedquire Nov 12, 2022
4e448c4
cr: improve api for GetRecordOk and GetProvidersOk
dignifiedquire Nov 12, 2022
96a952e
fixup rebase of examples
dignifiedquire Nov 12, 2022
2724afd
switch to counter for number of observed record
dignifiedquire Nov 12, 2022
83a2a86
bring back kademliacaching
dignifiedquire Nov 12, 2022
b483983
examples/file-sharing: Revert usage of HashSet
mxinden Nov 17, 2022
aa8a6ce
examples/file-sharing: Finish query once provider is found
mxinden Nov 17, 2022
3912acc
protocols/kad: Remove pub(crate) from replication_factor
mxinden Nov 17, 2022
e339cdf
protocols/kad: Refactor get_record
mxinden Nov 17, 2022
3ced598
protocols/kad: Refactor get_providers step instantiation
mxinden Nov 17, 2022
ac2e525
protocols/kad: Remove pub from ProgressStep methods
mxinden Nov 17, 2022
5ca8d70
remove unused as_intermediary_result
dignifiedquire Nov 18, 2022
bd05da6
fix test cr comments
dignifiedquire Nov 18, 2022
c7c4341
fixup: rebase
dignifiedquire Nov 18, 2022
5ecf9cb
use get instead of checked_add
dignifiedquire Nov 18, 2022
055636e
Update protocols/kad/src/behaviour.rs
dignifiedquire Nov 18, 2022
d5cb7e9
Merge branch 'master' into feat-kad-count
dignifiedquire Nov 22, 2022
0e443d4
Merge remote-tracking branch 'upstream/master' into feat-kad-count
dignifiedquire Nov 24, 2022
c9c00df
add changelog
dignifiedquire Nov 24, 2022
05e29d4
Update protocols/kad/CHANGELOG.md
dignifiedquire Nov 24, 2022
1223f02
Update protocols/kad/CHANGELOG.md
dignifiedquire Nov 24, 2022
9c28d98
Update protocols/kad/CHANGELOG.md
dignifiedquire Nov 24, 2022
cfd5461
Merge branch 'master' into feat-kad-count
dignifiedquire Nov 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
change GetRecordapi
  • Loading branch information
dignifiedquire committed Nov 18, 2022
commit f68c90310cf72cd3628849d08be4f309ead6fbf9
225 changes: 116 additions & 109 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,11 @@ where
}
}

/// Returns the currently configured `replication_factor`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Returns the currently configured `replication_factor`.

Thinking about this some more, this is only needed in tests. In those tests, we can get the value from the Default implementation of KademliaConfig.

I don't think we should introduce a public facing method just for testing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

pub fn replication_factor(&self) -> NonZeroUsize {
self.queries.config().replication_factor
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method necessary? We don't provide getters for any of the other config parameters.

Given that it is a static value and given that the user can set it at construction time, why is this getter needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only needed internally, adjusted visibility

/// Gets an iterator over immutable references to all running queries.
pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
self.queries.iter().filter_map(|query| {
Expand Down Expand Up @@ -682,37 +687,50 @@ where
///
/// The result of this operation is delivered in a
/// [`KademliaEvent::OutboundQueryCompleted{QueryResult::GetRecord}`].
pub fn get_record(&mut self, key: record::Key, quorum: Quorum) -> QueryId {
let quorum = quorum.eval(self.queries.config().replication_factor);
let mut records = Vec::with_capacity(quorum.get());

if let Some(record) = self.store.get(&key) {
pub fn get_record(&mut self, key: record::Key) -> QueryId {
let record = if let Some(record) = self.store.get(&key) {
if record.is_expired(Instant::now()) {
self.store.remove(&key)
self.store.remove(&key);
None
} else {
records.push(PeerRecord {
Some(PeerRecord {
peer: None,
record: record.into_owned(),
});
})
}
}
} else {
None
};

let done = records.len() >= quorum.get();
let target = kbucket::Key::new(key.clone());
let info = QueryInfo::GetRecord {
key,
records,
quorum,
count: 1,
record_to_cache: record.as_ref().map(|r| r.record.clone()),
cache_candidates: BTreeMap::new(),
};
let peers = self.kbuckets.closest_keys(&target);
let inner = QueryInner::new(info);
let id = self.queries.add_iter_closest(target.clone(), peers, inner); // (*)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let id = self.queries.add_iter_closest(target.clone(), peers, inner); // (*)
let id = self.queries.add_iter_closest(target.clone(), peers, inner);

Is this reference this relevant?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed, not sure what this was from


// Instantly finish the query if we already have enough records.
if done {
self.queries.get_mut(&id).expect("by (*)").finish();
}
// No queries were actually done for the results yet.
let stats = QueryStats::empty();

self.queued_events
.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::GetRecord(Ok(GetRecordOk {
record,
cache_candidates: Default::default(),
})),
step: ProgressStep {
count: 1,
last: false,
},
stats,
},
));

id
}
Expand Down Expand Up @@ -1302,22 +1320,20 @@ where
count,
providers_found,
..
} => {
Some(KademliaEvent::OutboundQueryProgressed {
id: query_id,
stats: result.stats,
result: QueryResult::GetProviders(Ok(GetProvidersOk {
key,
providers: Default::default(),
providers_so_far: providers_found,
closest_peers: result.peers.collect(),
})),
step: ProgressStep {
count, // FIXME: count?
last: true,
},
})
}
} => Some(KademliaEvent::OutboundQueryProgressed {
id: query_id,
stats: result.stats,
result: QueryResult::GetProviders(Ok(GetProvidersOk {
key,
providers: Default::default(),
providers_so_far: providers_found,
closest_peers: result.peers.collect(),
})),
step: ProgressStep {
count: count + 1,
last: true,
},
}),

QueryInfo::AddProvider {
context,
Expand Down Expand Up @@ -1364,16 +1380,15 @@ where

QueryInfo::GetRecord {
key,
records,
quorum,
count,
record_to_cache,
cache_candidates,
} => {
let results = if records.len() >= quorum.get() {
let results = if let Some(record) = record_to_cache {
// [not empty]
if quorum.get() == 1 && !cache_candidates.is_empty() {
if !cache_candidates.is_empty() {
// Cache the record at the closest node(s) to the key that
// did not return the record.
let record = records.first().expect("[not empty]").record.clone();
let quorum = NonZeroUsize::new(1).expect("1 > 0");
let context = PutRecordContext::Cache;
let info = QueryInfo::PutRecord {
Expand All @@ -1390,26 +1405,23 @@ where
.add_fixed(cache_candidates.values().copied(), inner);
}
Ok(GetRecordOk {
records,
record: Default::default(),
cache_candidates,
})
} else if records.is_empty() {
} else {
Err(GetRecordError::NotFound {
key,
closest_peers: result.peers.collect(),
})
} else {
Err(GetRecordError::QuorumFailed {
key,
records,
quorum,
})
};
Some(KademliaEvent::OutboundQueryProgressed {
id: query_id,
stats: result.stats,
result: QueryResult::GetRecord(results),
step: ProgressStep::single(),
step: ProgressStep {
count: count + 1,
last: true,
},
})
}

Expand Down Expand Up @@ -1598,31 +1610,32 @@ where
}
}

QueryInfo::GetRecord {
key,
records,
quorum,
..
} => Some(KademliaEvent::OutboundQueryProgressed {
id: query_id,
stats: result.stats,
result: QueryResult::GetRecord(Err(GetRecordError::Timeout {
key,
records,
quorum,
})),
step: ProgressStep::single(),
}),
QueryInfo::GetRecord { key, count, .. } => {
Some(KademliaEvent::OutboundQueryProgressed {
id: query_id,
stats: result.stats,
result: QueryResult::GetRecord(Err(GetRecordError::Timeout { key })),
step: ProgressStep {
count: count + 1,
last: true,
},
})
}

QueryInfo::GetProviders { key, .. } => Some(KademliaEvent::OutboundQueryProgressed {
id: query_id,
stats: result.stats,
result: QueryResult::GetProviders(Err(GetProvidersError::Timeout {
key,
closest_peers: result.peers.collect(),
})),
step: ProgressStep::single(),
}),
QueryInfo::GetProviders { key, count, .. } => {
Some(KademliaEvent::OutboundQueryProgressed {
id: query_id,
stats: result.stats,
result: QueryResult::GetProviders(Err(GetProvidersError::Timeout {
key,
closest_peers: result.peers.collect(),
})),
step: ProgressStep {
count: count + 1,
last: true,
},
})
}
}
}

Expand Down Expand Up @@ -2220,40 +2233,39 @@ where
user_data,
} => {
if let Some(query) = self.queries.get_mut(&user_data) {
let stats = query.stats().clone();
if let QueryInfo::GetRecord {
key,
records,
quorum,
ref mut count,
cache_candidates,
ref mut record_to_cache,
} = &mut query.inner.info
{
if let Some(record) = record {
records.push(PeerRecord {
if record_to_cache.is_none() {
*record_to_cache = Some(record.clone());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Say we discover multiple records for the same key. With the above only the first record is replicated. libp2p-kad can not know which of the records is the one to be cached, i.e. it can not make a qualified judgement on which of the records is the best one.

How about removing this automated publishing mechanism and depend on the user to call Kademlia::put_record_to instead?

/// Stores a record at specific peers, without storing it locally.
///
/// The given [`Quorum`] is understood in the context of the total
/// number of distinct peers given.
///
/// If the record's expiration is `None`, the configured record TTL is used.
///
/// > **Note**: This is not a regular Kademlia DHT operation. It may be
/// > used to selectively update or store a record to specific peers
/// > for the purpose of e.g. making sure these peers have the latest
/// > "version" of a record or to "cache" a record at further peers
/// > to increase the lookup success rate on the DHT for other peers.
/// >
/// > In particular, if lookups are performed with a quorum > 1 multiple
/// > possibly different records may be returned and the standard Kademlia
/// > procedure of "caching" (i.e. storing) a found record at the closest
/// > node to the key that _did not_ return it cannot be employed
/// > transparently. In that case, client code can explicitly choose
/// > which record to store at which peers for analogous write-back
/// > caching or for other reasons.
pub fn put_record_to<I>(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId

Either way, the comment on put_record_to needs updating.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the caching, but I am confused why the behaviour of put_record_to changed if I only changed the behaviour of get_record, does that rely on that caching to do this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the Kademlia paper:

For caching purposes, once a lookup succeeds, the requesting node stores the (key, value) pair at the closest node it observed to the key that did not return the value.

Before this pull request, when we did a Kademlia::get_record with a quorum of 1, we could replicate the (key, value) pair to the closest node that did not have the pair. But if we did a quorum of >1, the Kademlia::get_record query would potentially discover more than one value for the given key. Which value would you replicate to the closest node that did not have the value? libp2p-kad can not make that choice. With a quorum of >1 Kademlia::get_record would thus not replicate the pair to the closest node that did not have the value. Instead it relied on the user first picking the right value and then calling Kademlia::put_record_to.

This pull request removes the quorum mechanism. To keep things simple, I suggest also removing the automated replication to the closest node that does not have the value, but instead entirely rely on the user to call put_record_to.

Does that make sense @dignifiedquire? Happy to expand further.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe d8ac894 does what you requested

let record = PeerRecord {
peer: Some(source),
record,
});
};

let quorum = quorum.get();
if records.len() >= quorum {
// Desired quorum reached. The query may finish. See
// [`Query::try_finish`] for details.
let peers = records
.iter()
.filter_map(|PeerRecord { peer, .. }| peer.as_ref())
.cloned()
.collect::<Vec<_>>();
let finished = query.try_finish(peers.iter());
if !finished {
debug!(
"GetRecord query ({:?}) reached quorum ({}/{}) with \
response from peer {} but could not yet finish.",
user_data,
peers.len(),
quorum,
source,
);
}
}
*count += 1;
self.queued_events
.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::OutboundQueryProgressed {
id: user_data,
result: QueryResult::GetRecord(Ok(GetRecordOk {
record: Some(record),
cache_candidates: cache_candidates.clone(),
})),
step: ProgressStep {
count: *count,
last: false,
},
stats,
},
));
} else {
log::trace!("Record with key {:?} not found at {}", key, source);
if let KademliaCaching::Enabled { max_peers } = self.caching {
Expand Down Expand Up @@ -2663,8 +2675,8 @@ pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
/// The successful result of [`Kademlia::get_record`].
#[derive(Debug, Clone)]
pub struct GetRecordOk {
/// The records found, including the peer that returned them.
pub records: Vec<PeerRecord>,
/// The record found in this iteration, including the peer that returned them.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// The record found in this iteration, including the peer that returned them.
/// The record found in this iteration, including the peer that returned it.

pub record: Option<PeerRecord>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it confusing that this is an Option. At least we need to document what None means here. Or we refactor GetRecordOk:

enum GetRecordOk {
  FoundRecord (PeerRecord),
  FinishedWithNoAdditionalRecord,
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, fixed

/// If caching is enabled, these are the peers closest
/// _to the record key_ (not the local node) that were queried but
/// did not return the record, sorted by distance to the record key
Expand Down Expand Up @@ -2692,11 +2704,7 @@ pub enum GetRecordError {
quorum: NonZeroUsize,
},
#[error("the request timed out")]
Timeout {
key: record::Key,
records: Vec<PeerRecord>,
quorum: NonZeroUsize,
},
Timeout { key: record::Key },
}

impl GetRecordError {
Expand Down Expand Up @@ -2982,7 +2990,7 @@ pub enum QueryInfo {
/// A query initiated by [`Kademlia::get_closest_peers`].
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// A query initiated by [`Kademlia::get_closest_peers`].
/// A (repeated) query initiated by [`Kademlia::get_closest_peers`].

To be consistent with the comment updates below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

GetClosestPeers { key: Vec<u8> },

/// A query initiated by [`Kademlia::get_providers`].
/// A (repeated) query initiated by [`Kademlia::get_providers`].
GetProviders {
/// The key for which to search for providers.
key: record::Key,
Expand Down Expand Up @@ -3013,15 +3021,14 @@ pub enum QueryInfo {
context: PutRecordContext,
},

/// A query initiated by [`Kademlia::get_record`].
/// A (repeated) query initiated by [`Kademlia::get_record`].
GetRecord {
/// The key to look for.
key: record::Key,
/// The records with the id of the peer that returned them. `None` when
/// the record was found in the local store.
records: Vec<PeerRecord>,
/// The number of records to look for.
quorum: NonZeroUsize,
/// Current index of events.
count: usize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my eyes tracking this as a usize is error prone and non-descriptive. Why not use ProgressStep here?

Done via cc1b2bd in case you want to cherry-pick.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

/// The record that is to be cached in the `cache_candidates`.
record_to_cache: Option<Record>,
/// The peers closest to the `key` that were queried but did not return a record,
/// i.e. the peers that are candidates for caching the record.
cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
Expand Down
Loading