Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 129 additions & 55 deletions src/lfx/src/lfx/base/data/docling_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import sys
import traceback
from contextlib import suppress
from typing import TYPE_CHECKING
from functools import lru_cache

from docling_core.types.doc import DoclingDocument
from pydantic import BaseModel, SecretStr, TypeAdapter
Expand All @@ -12,9 +12,6 @@
from lfx.schema.data import Data
from lfx.schema.dataframe import DataFrame

if TYPE_CHECKING:
from langchain_core.language_models.chat_models import BaseChatModel


class DoclingDependencyError(Exception):
"""Custom exception for missing Docling dependencies."""
Expand Down Expand Up @@ -152,6 +149,80 @@ def _deserialize_pydantic_model(data: dict):
return adapter.validate_python(data["config"])


# Global cache for DocumentConverter instances
# This cache persists across multiple runs and thread invocations
@lru_cache(maxsize=4)
def _get_cached_converter(
pipeline: str,
ocr_engine: str,
*,
do_picture_classification: bool,
pic_desc_config_hash: str | None,
):
"""Create and cache a DocumentConverter instance based on configuration.

This function uses LRU caching to maintain DocumentConverter instances in memory,
eliminating the 15-20 minute model loading time on subsequent runs.

Args:
pipeline: The pipeline type ("standard" or "vlm")
ocr_engine: The OCR engine to use
do_picture_classification: Whether to enable picture classification
pic_desc_config_hash: Hash of the picture description config (for cache key)

Returns:
A cached or newly created DocumentConverter instance
"""
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import OcrOptions, PdfPipelineOptions, VlmPipelineOptions
from docling.document_converter import DocumentConverter, FormatOption, PdfFormatOption
from docling.models.factories import get_ocr_factory
from docling.pipeline.vlm_pipeline import VlmPipeline

logger.info(f"Creating DocumentConverter for pipeline={pipeline}, ocr_engine={ocr_engine}")

# Configure the standard PDF pipeline
def _get_standard_opts() -> PdfPipelineOptions:
pipeline_options = PdfPipelineOptions()
pipeline_options.do_ocr = ocr_engine not in {"", "None"}
if pipeline_options.do_ocr:
ocr_factory = get_ocr_factory(
allow_external_plugins=False,
)
ocr_options: OcrOptions = ocr_factory.create_options(
kind=ocr_engine,
)
pipeline_options.ocr_options = ocr_options

pipeline_options.do_picture_classification = do_picture_classification

# Note: pic_desc_config_hash is for cache key only
# Actual picture description is handled separately (non-cached path)
_ = pic_desc_config_hash # Mark as intentionally unused

return pipeline_options

# Configure the VLM pipeline
def _get_vlm_opts() -> VlmPipelineOptions:
return VlmPipelineOptions()

if pipeline == "standard":
pdf_format_option = PdfFormatOption(
pipeline_options=_get_standard_opts(),
)
elif pipeline == "vlm":
pdf_format_option = PdfFormatOption(pipeline_cls=VlmPipeline, pipeline_options=_get_vlm_opts())
else:
msg = f"Unknown pipeline: {pipeline!r}"
raise ValueError(msg)

format_options: dict[InputFormat, FormatOption] = {
InputFormat.PDF: pdf_format_option,
InputFormat.IMAGE: pdf_format_option,
}

return DocumentConverter(format_options=format_options)

def docling_worker(
*,
file_paths: list[str],
Expand All @@ -162,7 +233,12 @@ def docling_worker(
pic_desc_config: dict | None,
pic_desc_prompt: str,
):
"""Worker function for processing files with Docling in a separate process."""
"""Worker function for processing files with Docling using threading.

This function now uses a globally cached DocumentConverter instance,
significantly reducing processing time on subsequent runs from 15-20 minutes
to just seconds.
"""
# Signal handling for graceful shutdown
shutdown_requested = False

Expand Down Expand Up @@ -205,12 +281,12 @@ def check_shutdown() -> None:
check_shutdown()

try:
from docling.datamodel.base_models import ConversionStatus, InputFormat
from docling.datamodel.pipeline_options import OcrOptions, PdfPipelineOptions, VlmPipelineOptions
from docling.document_converter import DocumentConverter, FormatOption, PdfFormatOption
from docling.models.factories import get_ocr_factory
from docling.pipeline.vlm_pipeline import VlmPipeline
from langchain_docling.picture_description import PictureDescriptionLangChainOptions
from docling.datamodel.base_models import ConversionStatus, InputFormat # noqa: F401
from docling.datamodel.pipeline_options import OcrOptions, PdfPipelineOptions, VlmPipelineOptions # noqa: F401
from docling.document_converter import DocumentConverter, FormatOption, PdfFormatOption # noqa: F401
from docling.models.factories import get_ocr_factory # noqa: F401
from docling.pipeline.vlm_pipeline import VlmPipeline # noqa: F401
from langchain_docling.picture_description import PictureDescriptionLangChainOptions # noqa: F401

# Check for shutdown after imports
check_shutdown()
Expand All @@ -233,61 +309,59 @@ def check_shutdown() -> None:
queue.put({"error": "Worker interrupted during imports", "shutdown": True})
return

# Configure the standard PDF pipeline
def _get_standard_opts() -> PdfPipelineOptions:
# Use cached converter instead of creating new one each time
# This is the key optimization that eliminates 15-20 minute model load times
def _get_converter() -> DocumentConverter:
check_shutdown() # Check before heavy operations

pipeline_options = PdfPipelineOptions()
pipeline_options.do_ocr = ocr_engine not in {"", "None"}
if pipeline_options.do_ocr:
ocr_factory = get_ocr_factory(
allow_external_plugins=False,
)

ocr_options: OcrOptions = ocr_factory.create_options(
kind=ocr_engine,
)
pipeline_options.ocr_options = ocr_options

pipeline_options.do_picture_classification = do_picture_classification

# For now, we don't support pic_desc_config caching due to serialization complexity
# This is a known limitation that can be addressed in a future enhancement
if pic_desc_config:
pic_desc_llm: BaseChatModel = _deserialize_pydantic_model(pic_desc_config)

logger.warning(
"Picture description with LLM is not yet supported with cached converters. "
"Using non-cached converter for this request."
)
# Fall back to creating a new converter (old behavior)
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.document_converter import DocumentConverter, FormatOption, PdfFormatOption
from docling.models.factories import get_ocr_factory
from langchain_docling.picture_description import PictureDescriptionLangChainOptions

pipeline_options = PdfPipelineOptions()
pipeline_options.do_ocr = ocr_engine not in {"", "None"}
if pipeline_options.do_ocr:
ocr_factory = get_ocr_factory(allow_external_plugins=False)
ocr_options = ocr_factory.create_options(kind=ocr_engine)
pipeline_options.ocr_options = ocr_options

pipeline_options.do_picture_classification = do_picture_classification
pic_desc_llm = _deserialize_pydantic_model(pic_desc_config)
logger.info("Docling enabling the picture description stage.")
pipeline_options.do_picture_description = True
pipeline_options.allow_external_plugins = True
pipeline_options.picture_description_options = PictureDescriptionLangChainOptions(
llm=pic_desc_llm,
prompt=pic_desc_prompt,
)
return pipeline_options

# Configure the VLM pipeline
def _get_vlm_opts() -> VlmPipelineOptions:
check_shutdown() # Check before heavy operations
return VlmPipelineOptions()

# Configure the main format options and create the DocumentConverter()
def _get_converter() -> DocumentConverter:
check_shutdown() # Check before heavy operations

if pipeline == "standard":
pdf_format_option = PdfFormatOption(
pipeline_options=_get_standard_opts(),
)
elif pipeline == "vlm":
pdf_format_option = PdfFormatOption(pipeline_cls=VlmPipeline, pipeline_options=_get_vlm_opts())
else:
msg = f"Unknown pipeline: {pipeline!r}"
raise ValueError(msg)

format_options: dict[InputFormat, FormatOption] = {
InputFormat.PDF: pdf_format_option,
InputFormat.IMAGE: pdf_format_option,
}

return DocumentConverter(format_options=format_options)
pdf_format_option = PdfFormatOption(pipeline_options=pipeline_options)
format_options: dict[InputFormat, FormatOption] = {
InputFormat.PDF: pdf_format_option,
InputFormat.IMAGE: pdf_format_option,
}
return DocumentConverter(format_options=format_options)

# Use cached converter - this is where the magic happens!
# First run: creates and caches converter (15-20 min)
# Subsequent runs: reuses cached converter (seconds)
pic_desc_config_hash = None # Will be None since we checked above
return _get_cached_converter(
pipeline=pipeline,
ocr_engine=ocr_engine,
do_picture_classification=do_picture_classification,
pic_desc_config_hash=pic_desc_config_hash,
)

try:
# Check for shutdown before creating converter (can be slow)
Expand Down
88 changes: 39 additions & 49 deletions src/lfx/src/lfx/components/docling/docling_inline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import queue
import threading
import time
from multiprocessing import Queue, get_context
from queue import Empty

from lfx.base.data import BaseFileComponent
from lfx.base.data.docling_utils import _serialize_pydantic_model, docling_worker
Expand Down Expand Up @@ -92,60 +92,57 @@ class DoclingInlineComponent(BaseFileComponent):
*BaseFileComponent.get_base_outputs(),
]

def _wait_for_result_with_process_monitoring(self, queue: Queue, proc, timeout: int = 300):
"""Wait for result from queue while monitoring process health.
def _wait_for_result_with_thread_monitoring(
self, result_queue: queue.Queue, thread: threading.Thread, timeout: int = 300
):
"""Wait for result from queue while monitoring thread health.

Handles cases where process crashes without sending result.
Handles cases where thread crashes without sending result.
"""
start_time = time.time()

while time.time() - start_time < timeout:
# Check if process is still alive
if not proc.is_alive():
# Process died, try to get any result it might have sent
# Check if thread is still alive
if not thread.is_alive():
# Thread finished, try to get any result it might have sent
try:
result = queue.get_nowait()
except Empty:
# Process died without sending result
msg = f"Worker process crashed unexpectedly without producing result. Exit code: {proc.exitcode}"
result = result_queue.get_nowait()
except queue.Empty:
# Thread finished without sending result
msg = "Worker thread crashed unexpectedly without producing result."
raise RuntimeError(msg) from None
else:
self.log("Process completed and result retrieved")
self.log("Thread completed and result retrieved")
return result

# Poll the queue instead of blocking
try:
result = queue.get(timeout=1)
except Empty:
result = result_queue.get(timeout=1)
except queue.Empty:
# No result yet, continue monitoring
continue
else:
self.log("Result received from worker process")
self.log("Result received from worker thread")
return result

# Overall timeout reached
msg = f"Process timed out after {timeout} seconds"
msg = f"Thread timed out after {timeout} seconds"
raise TimeoutError(msg)

def _terminate_process_gracefully(self, proc, timeout_terminate: int = 10, timeout_kill: int = 5):
"""Terminate process gracefully with escalating signals.
def _stop_thread_gracefully(self, thread: threading.Thread, timeout: int = 10):
"""Wait for thread to complete gracefully.

First tries SIGTERM, then SIGKILL if needed.
Note: Python threads cannot be forcefully killed, so we just wait.
The thread should respond to shutdown signals via the queue.
"""
if not proc.is_alive():
if not thread.is_alive():
return

self.log("Attempting graceful process termination with SIGTERM")
proc.terminate() # Send SIGTERM
proc.join(timeout=timeout_terminate)
self.log("Waiting for thread to complete gracefully")
thread.join(timeout=timeout)

if proc.is_alive():
self.log("Process didn't respond to SIGTERM, using SIGKILL")
proc.kill() # Send SIGKILL
proc.join(timeout=timeout_kill)

if proc.is_alive():
self.log("Warning: Process still alive after SIGKILL")
if thread.is_alive():
self.log("Warning: Thread still alive after timeout")

def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:
try:
Expand All @@ -167,44 +164,37 @@ def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[Bas
if self.pic_desc_llm is not None:
pic_desc_config = _serialize_pydantic_model(self.pic_desc_llm)

ctx = get_context("spawn")
queue: Queue = ctx.Queue()
proc = ctx.Process(
# Use threading instead of multiprocessing for memory sharing
# This enables the global DocumentConverter cache to work across runs
result_queue: queue.Queue = queue.Queue()
thread = threading.Thread(
target=docling_worker,
kwargs={
"file_paths": file_paths,
"queue": queue,
"queue": result_queue,
"pipeline": self.pipeline,
"ocr_engine": self.ocr_engine,
"do_picture_classification": self.do_picture_classification,
"pic_desc_config": pic_desc_config,
"pic_desc_prompt": self.pic_desc_prompt,
},
daemon=False, # Allow thread to complete even if main thread exits
)

result = None
proc.start()
thread.start()

try:
result = self._wait_for_result_with_process_monitoring(queue, proc, timeout=300)
result = self._wait_for_result_with_thread_monitoring(result_queue, thread, timeout=300)
except KeyboardInterrupt:
self.log("Docling process cancelled by user")
self.log("Docling thread cancelled by user")
result = []
except Exception as e:
self.log(f"Error during processing: {e}")
raise
finally:
# Improved cleanup with graceful termination
try:
self._terminate_process_gracefully(proc)
finally:
# Always close and cleanup queue resources
try:
queue.close()
queue.join_thread()
except Exception as e: # noqa: BLE001
# Ignore cleanup errors, but log them
self.log(f"Warning: Error during queue cleanup - {e}")
# Wait for thread to complete gracefully
self._stop_thread_gracefully(thread)

# Enhanced error checking with dependency-specific handling
if isinstance(result, dict) and "error" in result:
Expand Down
Loading
Loading