Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 108 additions & 12 deletions src/backend/base/langflow/components/models/document_intelligence.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import concurrent.futures
import hashlib
import mimetypes
import os
import tempfile
Expand Down Expand Up @@ -83,6 +84,13 @@ class AzureDocumentIntelligenceComponent(BaseFileComponent):
advanced=True,
info="Include confidence scores in the extracted text",
),
BoolInput(
name="generate_hash",
display_name="Generate File Hash",
value=True,
advanced=True,
info="Generate MD5 hash of file content for deduplication",
),
BoolInput(
name="use_multithreading",
display_name="Use Concurrent Processing",
Expand Down Expand Up @@ -114,6 +122,32 @@ def get_text_content(self) -> str:
"""Return the concatenated text content from all processed pages."""
return self._text_content

def _generate_file_hash(self, file_path: str) -> str | None:
"""
Generate MD5 hash of file content.

Args:
file_path: Path to the file

Returns:
MD5 hash as hex string, or None if error
"""
try:
md5_hash = hashlib.md5()

with open(file_path, 'rb') as f:
# Read in 8KB chunks for memory efficiency
for chunk in iter(lambda: f.read(8192), b''):
md5_hash.update(chunk)

hash_value = md5_hash.hexdigest()
logger.info(f"Generated hash for {Path(file_path).name}: {hash_value}")
return hash_value

except Exception as e:
logger.error(f"Error generating hash for {file_path}: {e}")
return None

def _extract_filename_from_url(self, url: str) -> str:
"""Extract filename from URL or generate a default one."""
try:
Expand Down Expand Up @@ -340,8 +374,32 @@ def _validate_and_resolve_paths(self) -> list[BaseFileComponent.BaseFile]:
logger.debug(f"Resolved {len(resolved_files)} files")
return resolved_files

def _get_original_url_from_basefile(self, base_file) -> str | None:
"""Extract original_url from BaseFile object."""
try:
# BaseFile typically has attributes like: data_object or the Data object directly
# Try different possible attribute names
if hasattr(base_file, 'data_object'):
return base_file.data_object.data.get("original_url")
elif hasattr(base_file, 'data'):
if isinstance(base_file.data, Data):
return base_file.data.data.get("original_url")
elif isinstance(base_file.data, dict):
return base_file.data.get("original_url")
# If BaseFile is a namedtuple or has indexed access
elif hasattr(base_file, '__getitem__'):
try:
data_obj = base_file[0] # First element is usually the Data object
if isinstance(data_obj, Data):
return data_obj.data.get("original_url")
except (IndexError, TypeError):
pass
except Exception as e:
logger.debug(f"Could not extract original_url: {e}")
return None

async def process_file(
self, file_path: str, *, silent_errors: bool = False
self, file_path: str, original_url: str = None, *, silent_errors: bool = False
) -> tuple[Data, str]:
"""Process a single file using the OCR service."""
try:
Expand All @@ -353,20 +411,52 @@ async def process_file(
with open(file_path, "rb") as file:
file_content = file.read()

extracted_content,plain_text,document_uuid = await ocr_service.process_document(
# Generate hash if enabled
file_hash=None
if self.generate_hash:
file_hash = self._generate_file_hash(file_path)


extracted_content, plain_text, document_uuid = await ocr_service.process_document(
file_content=file_content,
model_type=self.model_type,
include_confidence=self.include_confidence,
extract_tables=self.extract_tables,
file_hash=file_hash
)

# Build data dictionary
data_dict = {
self.SERVER_FILE_PATH_FIELDNAME: str(file_path),
"result": extracted_content,
"document_uuid": document_uuid,
}

if file_hash:
data_dict["file_hash"] = file_hash
data_dict["hash_type"] = "file_content"

# Add to metadata field
data_dict["metadata"] = {
"file_hash": file_hash,
"hash_type": "file_content",
"file_name": Path(file_path).name,
}

# Add original URL if available
if original_url:
data_dict["original_url"] = original_url
data_dict["metadata"]["original_url"] = original_url
data_dict["metadata"]["source_type"] = "url"
else:
data_dict["metadata"]["source_type"] = "local"

logger.info(f"Added hash to OCR output: {file_hash}")


structured_data = Data(
text=plain_text,
data={
self.SERVER_FILE_PATH_FIELDNAME: str(file_path),
"result": extracted_content,
"document_uuid":document_uuid
},
data=data_dict,
)

return structured_data, plain_text
Expand Down Expand Up @@ -406,12 +496,14 @@ def process_files(
) as executor:
future_to_file = {
executor.submit(
lambda path: loop.run_until_complete(
lambda f: loop.run_until_complete(
self.process_file(
str(path), silent_errors=self.silent_errors
str(f.path),
original_url=self._get_original_url_from_basefile(f),
silent_errors=self.silent_errors
)
),
file.path,
file,
): file
for file in file_list
}
Expand All @@ -434,9 +526,13 @@ def process_files(
try:
for file in file_list:
try:
original_url = self._get_original_url_from_basefile(file)

structured_data, plain_text = loop.run_until_complete(
self.process_file(
str(file.path), silent_errors=self.silent_errors
str(file.path),
original_url=original_url,
silent_errors=self.silent_errors
)
)
processed_data.append(structured_data)
Expand Down Expand Up @@ -466,4 +562,4 @@ def __del__(self):
# Remove the temporary directory
os.rmdir(self.temp_dir)
except Exception as e:
logger.error(f"Error cleaning up temporary files: {e!s}")
logger.error(f"Error cleaning up temporary files: {e!s}")
Loading