From 686639eacefe1d9417f4294a32bc9f02d0551887 Mon Sep 17 00:00:00 2001 From: ygoel Date: Tue, 18 Nov 2025 00:35:34 +0530 Subject: [PATCH] Add file hash generation and metadata handling in Document Intelligence service --- .../models/document_intelligence.py | 120 ++++++++- .../processing/file_hash_generator.py | 233 ++++++++++++++++++ .../base/langflow/components/qdrant/qdrant.py | 151 ++++++++---- .../services/document_intelligence/service.py | 9 +- 4 files changed, 455 insertions(+), 58 deletions(-) create mode 100644 src/backend/base/langflow/components/processing/file_hash_generator.py diff --git a/src/backend/base/langflow/components/models/document_intelligence.py b/src/backend/base/langflow/components/models/document_intelligence.py index 98b51a370d33..faa68aeb3afb 100644 --- a/src/backend/base/langflow/components/models/document_intelligence.py +++ b/src/backend/base/langflow/components/models/document_intelligence.py @@ -2,6 +2,7 @@ import asyncio import concurrent.futures +import hashlib import mimetypes import os import tempfile @@ -83,6 +84,13 @@ class AzureDocumentIntelligenceComponent(BaseFileComponent): advanced=True, info="Include confidence scores in the extracted text", ), + BoolInput( + name="generate_hash", + display_name="Generate File Hash", + value=True, + advanced=True, + info="Generate MD5 hash of file content for deduplication", + ), BoolInput( name="use_multithreading", display_name="Use Concurrent Processing", @@ -114,6 +122,32 @@ def get_text_content(self) -> str: """Return the concatenated text content from all processed pages.""" return self._text_content + def _generate_file_hash(self, file_path: str) -> str | None: + """ + Generate MD5 hash of file content. + + Args: + file_path: Path to the file + + Returns: + MD5 hash as hex string, or None if error + """ + try: + md5_hash = hashlib.md5() + + with open(file_path, 'rb') as f: + # Read in 8KB chunks for memory efficiency + for chunk in iter(lambda: f.read(8192), b''): + md5_hash.update(chunk) + + hash_value = md5_hash.hexdigest() + logger.info(f"Generated hash for {Path(file_path).name}: {hash_value}") + return hash_value + + except Exception as e: + logger.error(f"Error generating hash for {file_path}: {e}") + return None + def _extract_filename_from_url(self, url: str) -> str: """Extract filename from URL or generate a default one.""" try: @@ -340,8 +374,32 @@ def _validate_and_resolve_paths(self) -> list[BaseFileComponent.BaseFile]: logger.debug(f"Resolved {len(resolved_files)} files") return resolved_files + def _get_original_url_from_basefile(self, base_file) -> str | None: + """Extract original_url from BaseFile object.""" + try: + # BaseFile typically has attributes like: data_object or the Data object directly + # Try different possible attribute names + if hasattr(base_file, 'data_object'): + return base_file.data_object.data.get("original_url") + elif hasattr(base_file, 'data'): + if isinstance(base_file.data, Data): + return base_file.data.data.get("original_url") + elif isinstance(base_file.data, dict): + return base_file.data.get("original_url") + # If BaseFile is a namedtuple or has indexed access + elif hasattr(base_file, '__getitem__'): + try: + data_obj = base_file[0] # First element is usually the Data object + if isinstance(data_obj, Data): + return data_obj.data.get("original_url") + except (IndexError, TypeError): + pass + except Exception as e: + logger.debug(f"Could not extract original_url: {e}") + return None + async def process_file( - self, file_path: str, *, silent_errors: bool = False + self, file_path: str, original_url: str = None, *, silent_errors: bool = False ) -> tuple[Data, str]: """Process a single file using the OCR service.""" try: @@ -353,20 +411,52 @@ async def process_file( with open(file_path, "rb") as file: file_content = file.read() - extracted_content,plain_text,document_uuid = await ocr_service.process_document( + # Generate hash if enabled + file_hash=None + if self.generate_hash: + file_hash = self._generate_file_hash(file_path) + + + extracted_content, plain_text, document_uuid = await ocr_service.process_document( file_content=file_content, model_type=self.model_type, include_confidence=self.include_confidence, extract_tables=self.extract_tables, + file_hash=file_hash ) + # Build data dictionary + data_dict = { + self.SERVER_FILE_PATH_FIELDNAME: str(file_path), + "result": extracted_content, + "document_uuid": document_uuid, + } + + if file_hash: + data_dict["file_hash"] = file_hash + data_dict["hash_type"] = "file_content" + + # Add to metadata field + data_dict["metadata"] = { + "file_hash": file_hash, + "hash_type": "file_content", + "file_name": Path(file_path).name, + } + + # Add original URL if available + if original_url: + data_dict["original_url"] = original_url + data_dict["metadata"]["original_url"] = original_url + data_dict["metadata"]["source_type"] = "url" + else: + data_dict["metadata"]["source_type"] = "local" + + logger.info(f"Added hash to OCR output: {file_hash}") + + structured_data = Data( text=plain_text, - data={ - self.SERVER_FILE_PATH_FIELDNAME: str(file_path), - "result": extracted_content, - "document_uuid":document_uuid - }, + data=data_dict, ) return structured_data, plain_text @@ -406,12 +496,14 @@ def process_files( ) as executor: future_to_file = { executor.submit( - lambda path: loop.run_until_complete( + lambda f: loop.run_until_complete( self.process_file( - str(path), silent_errors=self.silent_errors + str(f.path), + original_url=self._get_original_url_from_basefile(f), + silent_errors=self.silent_errors ) ), - file.path, + file, ): file for file in file_list } @@ -434,9 +526,13 @@ def process_files( try: for file in file_list: try: + original_url = self._get_original_url_from_basefile(file) + structured_data, plain_text = loop.run_until_complete( self.process_file( - str(file.path), silent_errors=self.silent_errors + str(file.path), + original_url=original_url, + silent_errors=self.silent_errors ) ) processed_data.append(structured_data) @@ -466,4 +562,4 @@ def __del__(self): # Remove the temporary directory os.rmdir(self.temp_dir) except Exception as e: - logger.error(f"Error cleaning up temporary files: {e!s}") + logger.error(f"Error cleaning up temporary files: {e!s}") \ No newline at end of file diff --git a/src/backend/base/langflow/components/processing/file_hash_generator.py b/src/backend/base/langflow/components/processing/file_hash_generator.py new file mode 100644 index 000000000000..37d96e698c98 --- /dev/null +++ b/src/backend/base/langflow/components/processing/file_hash_generator.py @@ -0,0 +1,233 @@ +import hashlib +from pathlib import Path +from typing import Any +from urllib.parse import urlparse + +import requests +from langflow.custom.custom_component.component import Component +from langflow.io import Output, MultilineInput, DropdownInput +from langflow.schema.data import Data +from loguru import logger + + +class FileHashGeneratorComponent(Component): + """Component for generating MD5 hashes of files for deduplication. + + This component takes a file path or URL and generates MD5 hash. + - For URLs: Downloads the file temporarily and hashes the content + - For local paths: Hashes the file content directly + """ + + display_name = "File Hash Generator" + description = "Generate MD5 hash for a file (local or URL) to enable deduplication" + documentation = "https://docs.langflow.org/components-processing" + icon = "hash" + name = "FileHashGenerator" + category = "processing" + + inputs = [ + MultilineInput( + name="file_data", + display_name="File Path", + info="File path or URL to be hashed.", + ), + DropdownInput( + name="hash_mode", + display_name="Hash Mode", + info="'content' to hash file content (downloads URLs), 'url' to hash URL string only (faster)", + value="content", + options=["content", "url"], + advanced=True, + ), + ] + + outputs = [ + Output( + display_name="Hash Data", + name="hash_output", + method="generate_hash", + ), + ] + + def _is_url(self, path: str) -> bool: + """Check if path is a URL.""" + return path.startswith(('http://', 'https://')) + + def _get_filename_from_url(self, url: str) -> str: + """Extract filename from URL.""" + parsed = urlparse(url) + path = parsed.path + + # Get the last part of the path + filename = path.split('/')[-1] + + # Remove query parameters from filename if present + if '?' in filename: + filename = filename.split('?')[0] + + # If no valid filename, create one from URL hash + if not filename or '.' not in filename: + url_hash = hashlib.md5(url.encode()).hexdigest()[:8] + filename = f"file_{url_hash}" + + return filename + + def hash_url_string(self, url: str) -> str: + """ + Generate MD5 hash of URL string (not content). + Fast method for URL-based deduplication. + + Args: + url: The URL string to hash + + Returns: + MD5 hash as hex string + """ + return hashlib.md5(url.encode()).hexdigest() + + def hash_file_content(self, file_path: str) -> str: + """ + Generate MD5 hash of file content. + For URLs: downloads temporarily and hashes content. + For local files: hashes content directly. + + Args: + file_path: Path to file or URL + + Returns: + MD5 hash as hex string + + Raises: + FileNotFoundError: If local file doesn't exist + requests.RequestException: If URL download fails + """ + if self._is_url(file_path): + # Download URL content and hash it + return self._hash_url_content(file_path) + else: + # Hash local file content + return self._hash_local_file(file_path) + + def _hash_local_file(self, file_path: str) -> str: + """Hash local file content.""" + path_obj = Path(file_path) + + if not path_obj.exists(): + raise FileNotFoundError(f"File not found: {file_path}") + + if not path_obj.is_file(): + raise ValueError(f"Not a file: {file_path}") + + md5_hash = hashlib.md5() + + with open(path_obj, 'rb') as f: + for chunk in iter(lambda: f.read(8192), b''): + md5_hash.update(chunk) + + return md5_hash.hexdigest() + + def _hash_url_content(self, url: str) -> str: + """Download URL content and hash it.""" + md5_hash = hashlib.md5() + + logger.info(f"Downloading file from URL to generate content hash...") + + # Stream download to avoid loading entire file into memory + with requests.get(url, stream=True, timeout=30) as response: + response.raise_for_status() + + for chunk in response.iter_content(chunk_size=8192): + if chunk: + md5_hash.update(chunk) + + return md5_hash.hexdigest() + + def generate_hash(self) -> Data: + """ + Generate MD5 hash for the provided file path/URL. + + Returns: + Data object containing hash information + + Raises: + ValueError: If no valid file path is provided + """ + file_path = self.file_data + + if not file_path or not file_path.strip(): + msg = "No file path or URL provided. Please provide a file via 'File Path' input." + raise ValueError(msg) + + file_path = file_path.strip() + logger.info(f"Processing file: {file_path} with hash mode: {self.hash_mode}") + + try: + is_url = self._is_url(file_path) + + # Generate hash based on mode + if self.hash_mode == "url" and is_url: + # Fast mode: hash URL string only + file_hash = self.hash_url_string(file_path) + hash_type = "url_string" + else: + # Content mode: hash actual file content + file_hash = self.hash_file_content(file_path) + hash_type = "file_content" + + # Extract filename + if is_url: + file_name = self._get_filename_from_url(file_path) + source_type = "url" + else: + file_name = Path(file_path).name + source_type = "local" + + result = { + 'file_path': file_path, + 'file_name': file_name, + 'hash': file_hash, + 'hash_type': hash_type, + 'source_type': source_type, + 'status': 'success' + } + + logger.info(f"Generated {hash_type} hash for {file_name}: {file_hash}") + self.status = f"Hash generated: {file_hash[:16]}..." + + return Data(data=result) + + except FileNotFoundError as e: + logger.error(f"File not found: {file_path}") + error_result = { + 'file_path': file_path, + 'file_name': self._get_filename_from_url(file_path) if self._is_url(file_path) else Path(file_path).name, + 'hash': None, + 'status': 'error', + 'error': f"File not found: {e}" + } + self.status = "Error: File not found" + return Data(data=error_result) + + except requests.RequestException as e: + logger.error(f"Error downloading URL {file_path}: {e}") + error_result = { + 'file_path': file_path, + 'file_name': self._get_filename_from_url(file_path), + 'hash': None, + 'status': 'error', + 'error': f"Download failed: {e}" + } + self.status = "Error: Download failed" + return Data(data=error_result) + + except Exception as e: + logger.error(f"Error processing {file_path}: {e}") + error_result = { + 'file_path': file_path, + 'file_name': self._get_filename_from_url(file_path) if self._is_url(file_path) else Path(file_path).name, + 'hash': None, + 'status': 'error', + 'error': str(e) + } + self.status = f"Error: {str(e)}" + return Data(data=error_result) diff --git a/src/backend/base/langflow/components/qdrant/qdrant.py b/src/backend/base/langflow/components/qdrant/qdrant.py index f7b8bf54f771..40e0856dc368 100644 --- a/src/backend/base/langflow/components/qdrant/qdrant.py +++ b/src/backend/base/langflow/components/qdrant/qdrant.py @@ -12,11 +12,14 @@ IntInput, SecretStrInput, StrInput, - BoolInput + BoolInput, + DictInput ) from langflow.schema.data import Data import uuid - +from loguru import logger +from qdrant_client import QdrantClient +from qdrant_client.models import Filter, FieldCondition, MatchValue class QdrantVectorStoreComponent(LCVectorStoreComponent): display_name = "Qdrant" @@ -25,7 +28,7 @@ class QdrantVectorStoreComponent(LCVectorStoreComponent): inputs = [ StrInput(name="collection_name", display_name="Collection Name", required=True), - MessageTextInput(name="document_uuid", display_name="Document UUID", required=False,tool_mode=True), + MessageTextInput(name="document_hash", display_name="Document Hash", required=False, tool_mode=True), StrInput(name="host", display_name="Host", value="localhost", advanced=True), IntInput(name="port", display_name="Port", value=6333, advanced=True), IntInput(name="grpc_port", display_name="gRPC Port", value=6334, advanced=True), @@ -52,15 +55,63 @@ class QdrantVectorStoreComponent(LCVectorStoreComponent): value=4, advanced=True, ), - BoolInput( - name="return_doc_uuids", - display_name="Return Document UUIDs", - value=True, - advanced=True, - info="will return list of unique document ids", - ), + DictInput( + name="filters", + display_name="Filters ", + info=( + "Filters on Metadata" + ), + input_types=["Data"], + show=True, + required=False, + is_list=True, + tool_mode=True + ) ] + def _check_document_exists(self, client: QdrantClient, document_hash: str) -> bool: + """Check if a document with the given hash already exists in the collection.""" + if not document_hash: + return False + + try: + # Check if collection exists first + collections = client.get_collections().collections + collection_exists = any(col.name == self.collection_name for col in collections) + + if not collection_exists: + logger.info(f"Collection {self.collection_name} does not exist yet") + return False + + # Search for documents with matching hash + search_result = client.scroll( + collection_name=self.collection_name, + scroll_filter=Filter( + must=[ + FieldCondition( + key="metadata.file_hash", + match=MatchValue(value=document_hash) + ) + ] + ), + limit=1 + ) + + # If we found any points, the document exists + points, _ = search_result + exists = len(points) > 0 + + if exists: + logger.info(f"Document with hash {document_hash} already exists in collection {self.collection_name}") + else: + logger.info(f"Document with hash {document_hash} not found, will proceed with ingestion") + + return exists + + except Exception as e: + logger.error(f"Error checking document existence: {e}") + return False + @check_cached_vector_store def build_vector_store(self) -> Qdrant: qdrant_kwargs = { @@ -73,11 +124,10 @@ def build_vector_store(self) -> Qdrant: server_kwargs = { "host": self.host or None, - "port": int(self.port), # Ensure port is an integer - "grpc_port": int(self.grpc_port), # Ensure grpc_port is an integer + "port": int(self.port), + "grpc_port": int(self.grpc_port), "api_key": api_key, "prefix": self.prefix, - # Ensure timeout is an integer "timeout": int(self.timeout) if self.timeout else None, "path": self.path or None, "url": self.url or None, @@ -85,23 +135,52 @@ def build_vector_store(self) -> Qdrant: server_kwargs = {k: v for k, v in server_kwargs.items() if v is not None} + # Create Qdrant client + client = QdrantClient(**server_kwargs) + + # Check if document already exists before processing + logger.warning(f"ingestion - document with hash {self.document_hash} checking") + if self.document_hash: + if self._check_document_exists(client, self.document_hash): + logger.info(f"Skipping ingestion - document with hash {self.document_hash} already exists") + # Return existing vector store without ingesting + return Qdrant(embeddings=self.embedding, client=client, **qdrant_kwargs) + # Convert DataFrame to Data if needed using parent's method self.ingest_data = self._prepare_ingest_data() documents = [] for _input in self.ingest_data or []: - # document_uuid = str(uuid.uuid4()) - if isinstance(_input, Data): - if _input.model_dump().get('data',{}).get('result',[]): - for page in _input.model_dump().get('data',{}).get('result',[]): - # page['document_uuid'] =document_uuid - page = Data(data=page) - documents.append(page.to_lc_document()) + if _input.model_dump().get('data', {}).get('result', []): + for page in _input.model_dump().get('data', {}).get('result', []): + page_data = Data(data=page) + doc = page_data.to_lc_document() + + # Add hash to metadata if provided + if self.document_hash: + if not hasattr(doc, 'metadata') or doc.metadata is None: + doc.metadata = {} + doc.metadata['file_hash'] = self.document_hash + + documents.append(doc) else: - # _input.document_uuid = document_uuid - documents.append(_input.to_lc_document()) + doc = _input.to_lc_document() + + # Add hash to metadata if provided + if self.document_hash: + if not hasattr(doc, 'metadata') or doc.metadata is None: + doc.metadata = {} + doc.metadata['file_hash'] = self.document_hash + + documents.append(doc) else: + # Add hash to metadata if provided + if self.document_hash: + if not hasattr(_input, 'metadata') or _input.metadata is None: + _input.metadata = {} + _input.metadata['file_hash'] = self.document_hash + documents.append(_input) if not isinstance(self.embedding, Embeddings): @@ -109,40 +188,26 @@ def build_vector_store(self) -> Qdrant: raise TypeError(msg) if documents: + logger.info(f"Ingesting {len(documents)} documents with hash {self.document_hash}") qdrant = Qdrant.from_documents(documents, embedding=self.embedding, **qdrant_kwargs, **server_kwargs) else: - from qdrant_client import QdrantClient - - client = QdrantClient(**server_kwargs) qdrant = Qdrant(embeddings=self.embedding, client=client, **qdrant_kwargs) return qdrant def search_documents(self) -> list[Data]: vector_store = self.build_vector_store() - base_filter: Dict = {"document_uuid": self.document_uuid} - print(f'base filter {base_filter}') + cleaned_filters = [] if not self.filters else self.filters + cleaned_filters = {k: v for k, v in self.filters.items() if k != "" and v != ""} + logger.warning(f'base filter {cleaned_filters}') + if self.search_query and isinstance(self.search_query, str) and self.search_query.strip(): docs = vector_store.similarity_search( query=self.search_query, k=self.number_of_results, - filter = base_filter + filter=cleaned_filters ) data = docs_to_data(docs) - # logger.warning(f'from qdrant db : {data[0].model_dump()['data']}') self.status = data - if self.return_doc_uuids: - - new_data=[] - for page in data: - x=page.model_dump()['data'] - x = x['document_uuid'] - page=Data(text=x) - new_data.append(page) - return new_data - # x= data[0].model_dump()['data'] - # logger.warning(f'qdrant logs : {(x['document_uuid'])}') - # data = [item['document_uuid'] for item in x] - # data =x['document_uuid'] return data - return [] + return [] \ No newline at end of file diff --git a/src/backend/base/langflow/services/document_intelligence/service.py b/src/backend/base/langflow/services/document_intelligence/service.py index 95c489bf72a4..c00b53d3b739 100644 --- a/src/backend/base/langflow/services/document_intelligence/service.py +++ b/src/backend/base/langflow/services/document_intelligence/service.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import Any +from typing import Any, Optional import uuid try: @@ -84,6 +84,7 @@ async def process_document( include_confidence: bool = False, extract_tables: bool = True, extract_key_value_pairs: bool = True, + file_hash: Optional[str]=None ) -> tuple[list[dict[str, Any]], str]: """Process a document using Azure Document Intelligence. @@ -146,7 +147,8 @@ async def process_document( "height": page.height if hasattr(page, 'height') else None, "unit": page.unit if hasattr(page, 'unit') else None, "chunks_metadata":chunks_metadata, - "document_uuid":document_uuid + "document_uuid":document_uuid, + "file_hash":file_hash } # Extract text content for each page for page in result.pages: @@ -231,7 +233,8 @@ async def process_document( "height": page_data.get("height"), "unit": page_data.get("unit"), "chunks_metadata": page_data.get("chunks_metadata"), - "document_uuid": page_data.get("document_uuid") + "document_uuid": page_data.get("document_uuid"), + "file_hash": page_data.get("file_hash") } for page_num, page_data in sorted(pages_data.items())