-
Notifications
You must be signed in to change notification settings - Fork 978
copy-tracking: refactor async stream for classifying diff entries #9087
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ilyagr
wants to merge
2
commits into
main
Choose a base branch
from
ig/refactor-copy-diff-stream
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+114
−147
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,11 +23,7 @@ use std::task::ready; | |
|
|
||
| use futures::Stream; | ||
| use futures::StreamExt as _; | ||
| use futures::future::BoxFuture; | ||
| use futures::future::ready; | ||
| use futures::future::try_join_all; | ||
| use futures::stream::Fuse; | ||
| use futures::stream::FuturesOrdered; | ||
| use indexmap::IndexMap; | ||
| use indexmap::IndexSet; | ||
| use pollster::FutureExt as _; | ||
|
|
@@ -366,153 +362,114 @@ impl CopyHistoryTreeDiffEntry { | |
| } | ||
| } | ||
|
|
||
| /// Should keep the memory usage of `copy_history_diff_stream` bounded and allow | ||
| /// some concurrency even if files with changes are rare among all files in the | ||
| /// tree. | ||
| /// | ||
| /// Could be adjusted if we find the value too low or that this doesn't bound | ||
| /// memory use enough (seems unlikely to be over a megabyte with a rough | ||
| /// estimate) | ||
| pub const RECOMMENDED_CONCURRENCY_BUFFER_SIZE: usize = 1024; | ||
|
|
||
| /// Adapts a `TreeDiffStream` to follow copies / renames. | ||
| pub struct CopyHistoryDiffStream<'a> { | ||
| inner: Fuse<TreeDiffStream<'a>>, | ||
| /// | ||
| /// Generally prefer `MergedTree::diff_stream_with_copy_history()` instead of | ||
| /// calling this directly. | ||
| /// | ||
| /// For `concurrency_buffer_size`, it is recommended to either use 1 (to disable | ||
| /// concurrency) or at least [`RECOMMENDED_CONCURRENCY_BUFFER_SIZE`], unless | ||
| /// this takes up too much memory. The number of concurrent calls into the store | ||
| /// will generally be much lower than `concurrency_buffer_size` since entries | ||
| /// that do not require any processing take up space in this queue. | ||
| pub fn copy_history_diff_stream<'a>( | ||
| inner: TreeDiffStream<'a>, | ||
| before_tree: &'a MergedTree, | ||
| after_tree: &'a MergedTree, | ||
| pending: FuturesOrdered<BoxFuture<'static, CopyHistoryTreeDiffEntry>>, | ||
| } | ||
|
|
||
| impl<'a> CopyHistoryDiffStream<'a> { | ||
| /// Creates an iterator over the differences between two trees, taking copy | ||
| /// history into account. Generally prefer | ||
| /// `MergedTree::diff_stream_with_copy_history()` instead of calling this | ||
| /// directly. | ||
| pub fn new( | ||
| inner: TreeDiffStream<'a>, | ||
| before_tree: &'a MergedTree, | ||
| after_tree: &'a MergedTree, | ||
| ) -> Self { | ||
| Self { | ||
| inner: inner.fuse(), | ||
| before_tree, | ||
| after_tree, | ||
| pending: FuturesOrdered::new(), | ||
| } | ||
| } | ||
| concurrency_buffer_size: usize, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: This could be an Option to represent the intention here with |
||
| ) -> impl Stream<Item = CopyHistoryTreeDiffEntry> + 'a { | ||
| let before_tree = before_tree.clone(); | ||
| let after_tree = after_tree.clone(); | ||
| inner | ||
| .map(move |entry| resolve_diff_entry_copies(before_tree.clone(), after_tree.clone(), entry)) | ||
| .buffered(concurrency_buffer_size) | ||
| .flat_map(futures::stream::iter) | ||
| } | ||
|
|
||
| impl Stream for CopyHistoryDiffStream<'_> { | ||
| type Item = CopyHistoryTreeDiffEntry; | ||
|
|
||
| fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
| loop { | ||
| // First, check if we have newly-finished futures. If this returns Pending, we | ||
| // intentionally fall through to poll `self.inner`. | ||
| if let Poll::Ready(Some(next)) = self.pending.poll_next_unpin(cx) { | ||
| return Poll::Ready(Some(next)); | ||
| } | ||
|
|
||
| // If we didn't have queued results above, we want to check our wrapped stream | ||
| // for the next non-copy-matched diff entry. | ||
| let next_diff_entry = match ready!(self.inner.poll_next_unpin(cx)) { | ||
| Some(diff_entry) => diff_entry, | ||
| None if self.pending.is_empty() => return Poll::Ready(None), | ||
| _ => return Poll::Pending, | ||
| }; | ||
|
|
||
| let Ok(Diff { before, after }) = &next_diff_entry.values else { | ||
| self.pending | ||
| .push_back(Box::pin(ready(CopyHistoryTreeDiffEntry::normal( | ||
| next_diff_entry, | ||
| )))); | ||
| continue; | ||
| }; | ||
|
|
||
| // Don't try copy-tracing if we have conflicts on either side. | ||
| // | ||
| // TODO: consider accepting conflicts if the copy IDs can be resolved. | ||
| let Some(before) = before.as_resolved() else { | ||
| self.pending | ||
| .push_back(Box::pin(ready(CopyHistoryTreeDiffEntry::normal( | ||
| next_diff_entry, | ||
| )))); | ||
| continue; | ||
| }; | ||
| let Some(after) = after.as_resolved() else { | ||
| self.pending | ||
| .push_back(Box::pin(ready(CopyHistoryTreeDiffEntry::normal( | ||
| next_diff_entry, | ||
| )))); | ||
| continue; | ||
| }; | ||
| /// Classifies a single `TreeDiffEntry`, performing copy-tracing when needed. | ||
| /// Returns a `Vec` because in some cases (different copy IDs at the same path) | ||
| /// a single TreeDiffEntry decomposes into a deletion + copy-traced creation. | ||
| async fn resolve_diff_entry_copies( | ||
| before_tree: MergedTree, | ||
| after_tree: MergedTree, | ||
| diff_entry: TreeDiffEntry, | ||
| ) -> Vec<CopyHistoryTreeDiffEntry> { | ||
| let Ok(ref diff) = diff_entry.values else { | ||
| return vec![CopyHistoryTreeDiffEntry::normal(diff_entry)]; | ||
| }; | ||
|
|
||
| match (before, after) { | ||
| // If we have files with matching copy_ids, no need to do copy-tracing. | ||
| ( | ||
| Some(TreeValue::File { copy_id: id1, .. }), | ||
| Some(TreeValue::File { copy_id: id2, .. }), | ||
| ) if id1 == id2 => { | ||
| self.pending | ||
| .push_back(Box::pin(ready(CopyHistoryTreeDiffEntry::normal( | ||
| next_diff_entry, | ||
| )))); | ||
| } | ||
| // Don't try copy-tracing if we have conflicts on either side. | ||
| // | ||
| // TODO: consider handling conflicts, especially in the simpler case where the | ||
| // corresponding "copy ID conflict" can be resolved. | ||
| let (Some(before), Some(after)) = (diff.before.as_resolved(), diff.after.as_resolved()) else { | ||
| return vec![CopyHistoryTreeDiffEntry::normal(diff_entry)]; | ||
| }; | ||
|
|
||
| (other, Some(f @ TreeValue::File { .. })) => { | ||
| if let Some(other) = other { | ||
| // For files with non-matching copy-ids, or for a non-file that changes to a | ||
| // file, mark the first as deleted and do copy-tracing on the second. | ||
| // | ||
| // NOTE[deletion-diff-entry]: this may emit two diff entries, where the old | ||
| // diffstream would contain only one (even with gix's heuristic-based copy | ||
| // detection). | ||
| // | ||
| // This may be desirable in some cases (such as replacing a file X with a | ||
| // copy of some other file Y; the deletion entry makes it more clear that | ||
| // the original X was replaced by a formerly unrelated file). It is less | ||
| // desirable in cases where the new file shares some actual relation to the | ||
| // old one. | ||
| // | ||
| // We plan to improve this in the near future, but for now we'll keep the | ||
| // simpler implementation since this behavior is not visible outside of | ||
| // tests yet. | ||
| self.pending | ||
| .push_back(Box::pin(ready(CopyHistoryTreeDiffEntry { | ||
| target_path: next_diff_entry.path.clone(), | ||
| diffs: Ok(Merge::resolved(CopyHistoryDiffTerm { | ||
| target_value: None, | ||
| sources: vec![( | ||
| CopyHistorySource::Normal, | ||
| Merge::resolved(Some(other.clone())), | ||
| )], | ||
| })), | ||
| }))); | ||
| } | ||
|
|
||
| let future = tree_diff_entry_from_copies( | ||
| self.before_tree.clone(), | ||
| self.after_tree.clone(), | ||
| f.clone(), | ||
| next_diff_entry.path.clone(), | ||
| ); | ||
| self.pending.push_back(Box::pin(future)); | ||
| } | ||
| match (before, after) { | ||
| // If we have files with matching copy_ids, no need to do copy-tracing. | ||
| ( | ||
| Some(TreeValue::File { copy_id: id1, .. }), | ||
| Some(TreeValue::File { copy_id: id2, .. }), | ||
| ) if id1 == id2 => vec![CopyHistoryTreeDiffEntry::normal(diff_entry)], | ||
|
|
||
| // New file with copy history — needs copy-tracing. | ||
| (None, Some(f @ TreeValue::File { .. })) => { | ||
| let f = f.clone(); | ||
| vec![CopyHistoryTreeDiffEntry { | ||
| target_path: diff_entry.path, | ||
| diffs: diffs_from_copies(before_tree, after_tree, f).await, | ||
| }] | ||
| } | ||
|
|
||
| // Anything else (e.g. file => non-file non-tree), issue a simple diff entry. | ||
| // | ||
| // NOTE[deletion-diff-entry2]: this is another point where a spurious deletion entry | ||
| // can be generated; we have a planned fix in the works. | ||
| _ => self | ||
| .pending | ||
| .push_back(Box::pin(ready(CopyHistoryTreeDiffEntry::normal( | ||
| next_diff_entry, | ||
| )))), | ||
| } | ||
| // For files with non-matching copy-ids, or for a non-file that changes to a | ||
| // file, mark the first as deleted and do copy-tracing on the second. | ||
| // | ||
| // NOTE[deletion-diff-entry]: this may emit two diff entries, where the old | ||
| // diffstream would contain only one (even with gix's heuristic-based copy | ||
| // detection). | ||
| // | ||
| // This may be desirable in some cases (such as replacing a file X with a | ||
| // copy of some other file Y; the deletion entry makes it more clear that | ||
| // the original X was replaced by a formerly unrelated file). It is less | ||
| // desirable in cases where the new file shares some actual relation to the | ||
| // old one. | ||
| // | ||
| // We plan to improve this in the near future, but for now we'll keep the | ||
| // simpler implementation since this behavior is not visible outside of | ||
| // tests yet. | ||
| (Some(other), Some(f @ TreeValue::File { .. })) => { | ||
| let other = other.clone(); | ||
| let f = f.clone(); | ||
| vec![ | ||
| CopyHistoryTreeDiffEntry { | ||
| target_path: diff_entry.path.clone(), | ||
| diffs: Ok(Merge::resolved(CopyHistoryDiffTerm { | ||
| target_value: None, | ||
| sources: vec![(CopyHistorySource::Normal, Merge::resolved(Some(other)))], | ||
| })), | ||
| }, | ||
| CopyHistoryTreeDiffEntry { | ||
| target_path: diff_entry.path, | ||
| diffs: diffs_from_copies(before_tree, after_tree, f).await, | ||
| }, | ||
| ] | ||
| } | ||
| } | ||
| } | ||
|
|
||
| async fn tree_diff_entry_from_copies( | ||
| before_tree: MergedTree, | ||
| after_tree: MergedTree, | ||
| file: TreeValue, | ||
| target_path: RepoPathBuf, | ||
| ) -> CopyHistoryTreeDiffEntry { | ||
| CopyHistoryTreeDiffEntry { | ||
| target_path, | ||
| diffs: diffs_from_copies(before_tree, after_tree, file).await, | ||
| // Anything else (e.g. file => non-file non-tree), issue a simple diff entry. | ||
| // | ||
| // NOTE[deletion-diff-entry2]: this is another point where a spurious deletion entry | ||
| // can be generated; we have a planned fix in the works. | ||
| _ => vec![CopyHistoryTreeDiffEntry::normal(diff_entry)], | ||
| } | ||
| } | ||
|
|
||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think this needs to clarify that only certain backends will get bitten by this, e.g ersc and Google atm otherwise this looks a bit misleading.