Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
1825801
Add initial support for batched operations for suggest and index CLI …
juhoinkinen Jan 7, 2023
933990b
Use common helper function for output of suggest and index commands
juhoinkinen Jan 14, 2023
5064a48
Ensure tests fail if 'text' with wrong type ends up to dummy backend
juhoinkinen Jan 16, 2023
47423e5
Fix index command by using DocumentList in batch functions instead of…
juhoinkinen Jan 16, 2023
37ece6c
Add suggest-batch REST method
juhoinkinen Jan 14, 2023
294baab
Use common SuggestParameters via reference in suggest method too
juhoinkinen Jan 19, 2023
49931f8
Fix & improve testing of suggest CLI cmd with file input; adapt the c…
juhoinkinen Jan 23, 2023
74a9b80
Fix import order
juhoinkinen Jan 23, 2023
4919e03
Add tests for applying transform in suggest calls
juhoinkinen Jan 23, 2023
741494a
Apply transform to document batch on project level
juhoinkinen Jan 23, 2023
e6cf0d3
Use common _suggest function in REST methods
juhoinkinen Jan 24, 2023
dc7e868
Define error function for not supported language
juhoinkinen Jan 24, 2023
ba1a951
Merge branch 'master' into issue579-batch-suggest-operation
juhoinkinen Jan 24, 2023
b6558f8
Add optional id field to suggest-batch REST method
juhoinkinen Jan 24, 2023
47da0e1
Remove superfluous implementation of _suggest_batch fn in dummy backend
juhoinkinen Jan 25, 2023
1ebedde
Refactor to address complexity issue by CodeClimate
juhoinkinen Jan 26, 2023
45d6ed1
Refactor again
juhoinkinen Jan 27, 2023
0767d10
Fix typo
juhoinkinen Jan 27, 2023
e49e91f
Add swagger tests for 404 & 503 cases in suggest-batch request
juhoinkinen Jan 27, 2023
b8a411f
Remove debug message showing number of documents in suggest batch
juhoinkinen Jan 27, 2023
77d5266
Support "-" as file path in suggest CLI command for stdin
juhoinkinen Jan 28, 2023
53e39d2
Open text documents using generator instead of list in CLI suggest fn
juhoinkinen Jan 30, 2023
1a5af59
Implement minibatching of documents in suggest fn in project module
juhoinkinen Jan 30, 2023
f32621c
Revert commits for REST API changes as it's better to have own PR for…
juhoinkinen Jan 30, 2023
bdac84e
Remove unnecessary stdin mocking from CLI suggest tests
juhoinkinen Jan 30, 2023
395b26f
Add test for CLI suggest cmd for giving both real file and "-"
juhoinkinen Jan 30, 2023
63edc53
Rename dummy loop variable to _
juhoinkinen Jan 30, 2023
64bc203
Return generator of suggestions instead of list from suggest_batch
juhoinkinen Jan 30, 2023
017b9d1
Add BatchingDocumentCorpus
juhoinkinen Feb 1, 2023
49c06a6
Use BatchingDocumentCorpus in suggest, index, eval & optimize CLI cmds
juhoinkinen Feb 1, 2023
365dbcd
Add document batching method to DocumentCorpus class & basic test for it
juhoinkinen Feb 1, 2023
ba31310
Add evaluate_many() method to EvaluationBatch and use it in CLI cmds
juhoinkinen Feb 1, 2023
b4c059e
Turn DocumentCorpus.doc_batches() method to property with a constant …
juhoinkinen Feb 3, 2023
9e1c272
Remove single-text versions of suggest methods
juhoinkinen Feb 3, 2023
ec16a21
Refine document batching test
juhoinkinen Feb 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use BatchingDocumentCorpus in suggest, index, eval & optimize CLI cmds
  • Loading branch information
juhoinkinen committed Feb 1, 2023
commit 49c06a6e7dea0a0521b9fa1df3162c34fb1e446a
42 changes: 25 additions & 17 deletions annif/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,14 @@ def validate_backend_params(backend, beparam, project):
)


BATCH_MAX_LIMIT = 15
FILTER_BATCH_MAX_LIMIT = 15


def generate_filter_batches(subjects):
import annif.eval

filter_batches = collections.OrderedDict()
for limit in range(1, BATCH_MAX_LIMIT + 1):
for limit in range(1, FILTER_BATCH_MAX_LIMIT + 1):
for threshold in [i * 0.05 for i in range(20)]:
hit_filter = SuggestionFilter(subjects, limit, threshold)
batch = annif.eval.EvaluationBatch(subjects)
Expand Down Expand Up @@ -403,7 +403,7 @@ def run_suggest(

if paths and not (len(paths) == 1 and paths[0] == "-"):
docs = open_text_documents(paths, docs_limit)
subject_sets = project.suggest_batch(docs, backend_params)
subject_sets = project.suggest_corpus(docs, backend_params)
for (
subjects,
path,
Expand Down Expand Up @@ -452,7 +452,7 @@ def run_index(
documents = annif.corpus.DocumentDirectory(
directory, None, None, require_subjects=False
)
subject_sets = project.suggest_batch(documents, backend_params)
subject_sets = project.suggest_corpus(documents, backend_params)

for (docfilename, _), subjects in zip(documents, subject_sets):
subjectfilename = re.sub(r"\.txt$", suffix, docfilename)
Expand Down Expand Up @@ -550,7 +550,7 @@ def run_eval(
"cannot open results-file for writing: " + str(e)
)
docs = open_documents(paths, project.subjects, project.vocab_lang, docs_limit)

corpus = annif.corpus.BatchingDocumentCorpus(docs)
jobs, pool_class = annif.parallel.get_pool(jobs)

project.initialize(parallel=True)
Expand All @@ -559,8 +559,11 @@ def run_eval(
)

with pool_class(jobs) as pool:
for hits, subject_set in pool.imap_unordered(psmap.suggest, docs.documents):
eval_batch.evaluate(hits[project_id], subject_set)
for hit_sets, subject_sets in pool.imap_unordered(
psmap.suggest_batch, corpus.doc_batches(project.DOC_BATCH_SIZE)
):
for hits, subject_set in zip(hit_sets[project_id], subject_sets):
eval_batch.evaluate(hits, subject_set)

template = "{0:<30}\t{1}"
metrics = eval_batch.results(
Expand Down Expand Up @@ -606,16 +609,21 @@ def run_optimize(project_id, paths, docs_limit, backend_param):

ndocs = 0
docs = open_documents(paths, project.subjects, project.vocab_lang, docs_limit)
for doc in docs.documents:
raw_hits = project.suggest(doc.text, backend_params)
hits = raw_hits.filter(project.subjects, limit=BATCH_MAX_LIMIT)
assert isinstance(hits, ListSuggestionResult), (
"Optimize should only be done with ListSuggestionResult "
+ "as it would be very slow with VectorSuggestionResult."
)
for hit_filter, batch in filter_batches.values():
batch.evaluate(hit_filter(hits), doc.subject_set)
ndocs += 1
corpus = annif.corpus.BatchingDocumentCorpus(docs)

for docs_batch in corpus.doc_batches(project.DOC_BATCH_SIZE):
texts, subject_sets = zip(*[(doc.text, doc.subject_set) for doc in docs_batch])

raw_hit_sets = project.suggest_batch(texts, backend_params)
for raw_hits, subject_set in zip(raw_hit_sets, subject_sets):
hits = raw_hits.filter(project.subjects, limit=FILTER_BATCH_MAX_LIMIT)
assert isinstance(hits, ListSuggestionResult), (
"Optimize should only be done with ListSuggestionResult "
+ "as it would be very slow with VectorSuggestionResult."
)
for hit_filter, filter_batch in filter_batches.values():
filter_batch.evaluate(hit_filter(hits), subject_set)
ndocs += 1

click.echo("\t".join(("Limit", "Thresh.", "Prec.", "Rec.", "F1")))

Expand Down
14 changes: 14 additions & 0 deletions annif/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import multiprocessing
import multiprocessing.dummy
from collections import defaultdict

# Start method for processes created by the multiprocessing module.
# A value of None means using the platform-specific default.
Expand Down Expand Up @@ -48,6 +49,19 @@ def suggest(self, doc):
)
return (filtered_hits, doc.subject_set)

def suggest_batch(self, batch):
filtered_hit_sets = defaultdict(list)
texts, subject_sets = zip(*[(doc.text, doc.subject_set) for doc in batch])

for project_id in self.project_ids:
project = self.registry.get_project(project_id)
hit_sets = project.suggest_batch(texts, self.backend_params)
for hits in hit_sets:
filtered_hit_sets[project_id].append(
hits.filter(project.subjects, self.limit, self.threshold)
)
return (filtered_hit_sets, subject_sets)


def get_pool(n_jobs):
"""return a suitable multiprocessing pool class, and the correct jobs
Expand Down
37 changes: 16 additions & 21 deletions annif/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class AnnifProject(DatadirMixin):

# default values for configuration settings
DEFAULT_ACCESS = "public"
MINIBATCH_SIZE = 32
DOC_BATCH_SIZE = 32

def __init__(self, project_id, config, datadir, registry):
DatadirMixin.__init__(self, datadir, "projects", project_id)
Expand Down Expand Up @@ -119,22 +119,11 @@ def _suggest_with_backend(self, text, backend_params):
logger.debug("Got %d hits from backend %s", len(hits), self.backend.backend_id)
return hits

def _batched(self, iterable, n):
# From https://docs.python.org/3/library/itertools.html#itertools-recipes
it = iter(iterable)
while True:
batch = list(itertools.islice(it, n))
if not batch:
return
yield batch

def _suggest_batch_with_backend(self, corpus, backend_params):
def _suggest_batch_with_backend(self, texts, backend_params):
if backend_params is None:
backend_params = {}
beparams = backend_params.get(self.backend.backend_id, {})
for docs_minibatch in self._batched(corpus.documents, self.MINIBATCH_SIZE):
texts = [doc.text for doc in docs_minibatch]
yield self.backend.suggest_batch(texts, beparams)
return self.backend.suggest_batch(texts, beparams)

@property
def analyzer(self):
Expand Down Expand Up @@ -234,18 +223,24 @@ def suggest(self, text, backend_params=None):
logger.debug("%d hits from backend", len(hits))
return hits

def suggest_batch(self, corpus, backend_params=None):
"""Suggest subjects for the given documents using batches of documents in their
operations when possible."""
def suggest_corpus(self, corpus, backend_params=None):
"""Suggest subjects for the given documents corpus in batches of documents."""
corpus = annif.corpus.BatchingDocumentCorpus(corpus)
suggestions = (
self.suggest_batch([doc.text for doc in doc_batch], backend_params)
for doc_batch in corpus.doc_batches(self.DOC_BATCH_SIZE)
)
return itertools.chain.from_iterable(suggestions)

def suggest_batch(self, texts, backend_params=None):
"""Suggest subjects for the given documents batch."""
if not self.is_trained:
if self.is_trained is None:
logger.warning("Could not get train state information.")
else:
raise NotInitializedException("Project is not trained.")
corpus = self.transform.transform_corpus(corpus)
return itertools.chain.from_iterable(
self._suggest_batch_with_backend(corpus, backend_params)
)
texts = [self.transform.transform_text(text) for text in texts]
return self._suggest_batch_with_backend(texts, backend_params)

def train(self, corpus, backend_params=None, jobs=0):
"""train the project using documents from a metadata source"""
Expand Down
8 changes: 4 additions & 4 deletions tests/test_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,9 @@ def test_project_suggest_transform_limit(registry):
assert len(result) == 0


def test_project_suggest_batch(registry, fulltext_corpus):
def test_project_suggest_corpus(registry, fulltext_corpus):
project = registry.get_project("dummy-en")
result = list(project.suggest_batch(fulltext_corpus))
result = list(project.suggest_corpus(fulltext_corpus))
assert len(result) == 28 # Number of documents
first_doc_hits = result[0].as_list()
assert len(first_doc_hits) == 1
Expand All @@ -247,9 +247,9 @@ def test_project_suggest_batch(registry, fulltext_corpus):
assert first_doc_hits[0].score == 1.0


def test_project_suggest_batch_transform_limit(registry, fulltext_corpus):
def test_project_suggest_corpus_transform_limit(registry, fulltext_corpus):
project = registry.get_project("limit-transform")
result = list(project.suggest_batch(fulltext_corpus))
result = list(project.suggest_corpus(fulltext_corpus))
assert len(result) == 28 # Number of documents
assert len(result[0]) == 0

Expand Down