diff --git a/src/backend/base/langflow/api/v1/published_flows.py b/src/backend/base/langflow/api/v1/published_flows.py index 44a4633a1a92..12d8b9778582 100644 --- a/src/backend/base/langflow/api/v1/published_flows.py +++ b/src/backend/base/langflow/api/v1/published_flows.py @@ -721,6 +721,30 @@ async def list_all_published_flows( Public endpoint - no authentication required. Returns paginated list with denormalized flow and publisher information. """ + return await _list_all_published_flows( + session=session, + page=page, + limit=limit, + search=search, + category=category, + tags=tags, # This is now a string, not a Query object + status_filter=status_filter, + sort_by=sort_by, + order=order, + ) + + +async def _list_all_published_flows( + session: AsyncSession, + page: int = 1, + limit: int = 10, + search: str | None = None, + category: str | None = None, + tags: str | None = None, + status_filter: str | None = None, + sort_by: str = "published_at", + order: str = "desc", +): # Base query - apply status filter if status_filter == "published": query = ( @@ -740,7 +764,12 @@ async def list_all_published_flows( # Apply search filter on denormalized flow_name and description if search: search_pattern = f"%{search}%" - query = query.where(or_(PublishedFlow.flow_name.ilike(search_pattern), PublishedFlow.description.ilike(search_pattern))) + query = query.where( + or_( + PublishedFlow.flow_name.ilike(search_pattern), + PublishedFlow.description.ilike(search_pattern), + ) + ) # Apply category filter if category: @@ -756,9 +785,13 @@ async def list_all_published_flows( # Count total with same status filter if status_filter == "published": - count_query = select(func.count(PublishedFlow.id)).where(PublishedFlow.status == PublishStatusEnum.PUBLISHED) + count_query = select(func.count(PublishedFlow.id)).where( + PublishedFlow.status == PublishStatusEnum.PUBLISHED + ) elif status_filter == "unpublished": - count_query = select(func.count(PublishedFlow.id)).where(PublishedFlow.status == PublishStatusEnum.UNPUBLISHED) + count_query = select(func.count(PublishedFlow.id)).where( + PublishedFlow.status == PublishStatusEnum.UNPUBLISHED + ) else: # "all" or None - count all flows (default) count_query = select(func.count(PublishedFlow.id)) @@ -766,7 +799,10 @@ async def list_all_published_flows( if search: search_pattern = f"%{search}%" count_query = count_query.where( - or_(PublishedFlow.flow_name.ilike(search_pattern), PublishedFlow.description.ilike(search_pattern)) + or_( + PublishedFlow.flow_name.ilike(search_pattern), + PublishedFlow.description.ilike(search_pattern), + ) ) if category: count_query = count_query.where(PublishedFlow.category == category) @@ -774,7 +810,9 @@ async def list_all_published_flows( tag_list = [tag.strip() for tag in tags.split(",")] for tag in tag_list: # Cast JSON to text and check if tag is in the stringified array - count_query = count_query.where(cast(PublishedFlow.tags, Text).contains(f'"{tag}"')) + count_query = count_query.where( + cast(PublishedFlow.tags, Text).contains(f'"{tag}"') + ) total_result = await session.exec(count_query) total = total_result.one() @@ -804,7 +842,10 @@ async def list_all_published_flows( rows = results.all() # Format response - all fields already denormalized in PublishedFlow - items = [PublishedFlowRead.model_validate(published_flow, from_attributes=True) for published_flow in rows] + items = [ + PublishedFlowRead.model_validate(published_flow, from_attributes=True) + for published_flow in rows + ] pages = (total + limit - 1) // limit if limit > 0 else 0 diff --git a/src/backend/base/langflow/base/tools/run_flow.py b/src/backend/base/langflow/base/tools/run_flow.py index d7fdf3aba0b5..e04dcdb687f4 100644 --- a/src/backend/base/langflow/base/tools/run_flow.py +++ b/src/backend/base/langflow/base/tools/run_flow.py @@ -7,7 +7,7 @@ from langflow.field_typing import Tool from langflow.graph.graph.base import Graph from langflow.graph.vertex.base import Vertex -from langflow.helpers.flow import get_flow_inputs +from langflow.helpers.flow import get_flow_inputs, load_flow from langflow.inputs.inputs import DropdownInput, InputTypes, MessageInput from langflow.logging.logger import logger from langflow.schema.data import Data @@ -110,13 +110,13 @@ async def message_output(self) -> Message: async def get_flow_names(self) -> list[str]: # TODO: get flfow ID with flow name flow_data = await self.alist_flows() - return [flow_data.data["name"] for flow_data in flow_data] + return [flow_data.flow_name for flow_data in flow_data['items']] async def get_flow(self, flow_name_selected: str) -> Data | None: # get flow from flow id flow_datas = await self.alist_flows() - for flow_data in flow_datas: - if flow_data.data["name"] == flow_name_selected: + for flow_data in flow_datas['items']: + if flow_data.flow_name == flow_name_selected: return flow_data return None @@ -124,7 +124,8 @@ async def get_graph(self, flow_name_selected: str | None = None) -> Graph: if flow_name_selected: flow_data = await self.get_flow(flow_name_selected) if flow_data: - return Graph.from_payload(flow_data.data["data"]) + flow = await load_flow(user_id=self.user_id,flow_id=str(flow_data.flow_id)) + return flow msg = "Flow not found" raise ValueError(msg) # Ensure a Graph is always returned or an exception is raised diff --git a/src/backend/base/langflow/components/models/transcribe_model.py b/src/backend/base/langflow/components/models/transcribe_model.py new file mode 100644 index 000000000000..1f661265babd --- /dev/null +++ b/src/backend/base/langflow/components/models/transcribe_model.py @@ -0,0 +1,332 @@ +from langflow.custom.custom_component.component import Component +from langflow.inputs.inputs import ( + FileInput, + SecretStrInput, + MessageTextInput, + DropdownInput, + BoolInput, +) +from langflow.template import Output +from langflow.schema.message import Message +from pathlib import Path +from typing import Optional +import requests +import tempfile +import os + + +class AudioTranscriptionComponent(Component): + display_name = "Audio Transcription" + description = "Transcribe audio using various AI models (easily extensible)" + icon = "microphone" + name = "AudioTranscription" + + inputs = [ + # Audio Input - File or URL + FileInput( + name="audio_file", + display_name="Audio File", + file_types=[ + "mp3", + "wav", + "m4a", + "flac", + "ogg", + "webm", + "mp4", + "mpeg", + "mpga", + ], + info="Upload an audio file", + ), + MessageTextInput( + name="audio_url", + display_name="Audio URL", + info="Or provide a URL to an audio file (if file is not uploaded)", + ), + # Model Provider Selection + DropdownInput( + name="model_provider", + display_name="Model Provider", + options=[ + "azure_openai", + # Add more providers here as you extend + # "openai", + # "deepgram", + # "assemblyai", + ], + value="azure_openai", + info="Select the audio transcription provider", + ), + # Azure OpenAI Configuration + SecretStrInput( + name="azure_api_key", + display_name="Azure API Key", + required=True, + ), + MessageTextInput( + name="azure_endpoint", + display_name="Azure Endpoint", + placeholder="https://your-resource.openai.azure.com/", + required=True, + ), + MessageTextInput( + name="azure_deployment_name", + display_name="Deployment Name", + placeholder="whisper-deployment", + required=True, + ), + MessageTextInput( + name="azure_api_version", + display_name="API Version", + value="2024-02-01", + ), + # Common Transcription Options + DropdownInput( + name="language", + display_name="Language", + options=[ + "auto", + "en", + "es", + "fr", + "de", + "it", + "pt", + "nl", + "ja", + "ko", + "zh", + "ar", + "ru", + "hi", + ], + value="auto", + info="Language of the audio (auto for automatic detection)", + ), + MessageTextInput( + name="prompt", + display_name="Context Prompt", + info="Optional context to improve accuracy (technical terms, names, etc.)", + advanced=True, + ), + BoolInput( + name="include_timestamps", + display_name="Include Timestamps", + value=False, + info="Include segment timestamps in output", + advanced=True, + ), + ] + + outputs = [ + Output( + name="transcription", + display_name="Transcription", + method="transcribe_audio", + ), + ] + + def transcribe_audio(self) -> Message: + """Main transcription method - routes to appropriate provider""" + try: + # Step 1: Get audio file (from upload or URL) + audio_file_path = self._get_audio_file() + + if not audio_file_path: + return Message(text="No audio file or URL provided") + + # Step 2: Route to appropriate provider + if self.model_provider == "azure_openai": + result = self._transcribe_azure_openai(audio_file_path) + # Add more providers here as you extend: + # elif self.model_provider == "openai": + # result = self._transcribe_openai(audio_file_path) + # elif self.model_provider == "deepgram": + # result = self._transcribe_deepgram(audio_file_path) + else: + return Message(text=f"Unsupported provider: {self.model_provider}") + + # Step 3: Clean up temporary file if URL was used + if self.audio_url and not self.audio_file: + self._cleanup_temp_file(audio_file_path) + + return result + + except Exception as e: + self.status = f"✗ Error: {str(e)}" + return Message( + text=f"Error transcribing audio: {str(e)}", data={"error": str(e)} + ) + + # ==================== AUDIO FILE HANDLING ==================== + + def _get_audio_file(self) -> Optional[str]: + """Get audio file from upload or download from URL""" + # Priority 1: Uploaded file + if self.audio_file: + self.status = f"Using uploaded file: {Path(self.audio_file).name}" + return self.audio_file + + # Priority 2: URL + if self.audio_url: + self.status = f"Downloading audio from URL..." + return self._download_audio_from_url(self.audio_url) + + return None + + def _download_audio_from_url(self, url: str) -> str: + """Download audio file from URL to temporary file""" + try: + response = requests.get(url, stream=True, timeout=60) + response.raise_for_status() + + # Determine file extension from URL or Content-Type + file_extension = self._get_file_extension(url, response) + + # Create temporary file + temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=file_extension) + + # Download in chunks and track total size + total_bytes = 0 + for chunk in response.iter_content(chunk_size=8192): + if chunk: # Filter out keep-alive new chunks + temp_file.write(chunk) + total_bytes += len(chunk) + + temp_file.close() + + self.status = f"Downloaded audio ({total_bytes:,} bytes)" + return temp_file.name + + except requests.exceptions.RequestException as e: + raise Exception(f"Failed to download audio from URL: {str(e)}") + except Exception as e: + raise Exception(f"Failed to download audio from URL: {str(e)}") + + def _get_file_extension(self, url: str, response: requests.Response) -> str: + """Determine file extension from URL or Content-Type""" + # Try to get from URL + url_path = Path(url.split("?")[0]) # Remove query parameters + if url_path.suffix: + return url_path.suffix + + # Try to get from Content-Type + content_type = response.headers.get("Content-Type", "") + extension_map = { + "audio/mpeg": ".mp3", + "audio/mp3": ".mp3", + "audio/wav": ".wav", + "audio/wave": ".wav", + "audio/x-wav": ".wav", + "audio/m4a": ".m4a", + "audio/mp4": ".m4a", + "audio/flac": ".flac", + "audio/ogg": ".ogg", + "audio/webm": ".webm", + } + + return extension_map.get(content_type, ".mp3") # Default to .mp3 + + def _cleanup_temp_file(self, file_path: str): + """Clean up temporary file""" + try: + if os.path.exists(file_path): + os.unlink(file_path) + self.status = "Cleaned up temporary file" + except Exception as e: + # Log but don't fail if cleanup fails + print(f"Warning: Failed to cleanup temp file: {e}") + + # ==================== PROVIDER IMPLEMENTATIONS ==================== + + def _transcribe_azure_openai(self, audio_file_path: str) -> Message: + """Transcribe using Azure OpenAI Whisper""" + try: + from openai import AzureOpenAI + except ImportError: + raise ImportError("Install openai: pip install openai") + + # Validate Azure configuration + self._validate_azure_config() + + # Initialize Azure OpenAI client + client = AzureOpenAI( + api_key=self.azure_api_key, + api_version=self.azure_api_version, + azure_endpoint=self.azure_endpoint, + ) + + # Prepare transcription parameters + with open(audio_file_path, "rb") as audio_file: + params = { + "model": self.azure_deployment_name, + "file": audio_file, + } + + # Add optional parameters + if self.language and self.language != "auto": + params["language"] = self.language + + if self.prompt: + params["prompt"] = self.prompt + + if self.include_timestamps: + params["response_format"] = "verbose_json" + params["timestamp_granularities"] = ["segment"] + + # Transcribe + self.status = "Transcribing with Azure OpenAI..." + transcript = client.audio.transcriptions.create(**params) + + # Parse response + transcription_text = self._parse_transcript_response(transcript) + + # Build response data + data = { + "transcription": transcription_text, + "provider": "azure_openai", + "deployment": self.azure_deployment_name, + "endpoint": self.azure_endpoint, + "api_version": self.azure_api_version, + } + + # Add timestamps if requested + if self.include_timestamps and hasattr(transcript, "segments"): + data["segments"] = [ + { + "start": seg.get("start"), + "end": seg.get("end"), + "text": seg.get("text"), + } + for seg in transcript.segments + ] + + # Add language if detected + if hasattr(transcript, "language"): + data["detected_language"] = transcript.language + + file_name = Path(audio_file_path).name + self.status = f"✓ Transcribed {file_name}" + + return Message(text=transcription_text, data=data) + + def _validate_azure_config(self): + """Validate Azure OpenAI configuration""" + if not self.azure_api_key: + raise ValueError("Azure API Key is required") + + if not self.azure_endpoint: + raise ValueError("Azure Endpoint is required") + + if not self.azure_deployment_name: + raise ValueError("Azure Deployment Name is required") + + def _parse_transcript_response(self, transcript) -> str: + """Parse transcript response to extract text""" + if isinstance(transcript, str): + return transcript + elif hasattr(transcript, "text"): + return transcript.text + else: + return str(transcript) diff --git a/src/backend/base/langflow/custom/custom_component/custom_component.py b/src/backend/base/langflow/custom/custom_component/custom_component.py index cf54f6f9aebc..7f5bc054bb99 100644 --- a/src/backend/base/langflow/custom/custom_component/custom_component.py +++ b/src/backend/base/langflow/custom/custom_component/custom_component.py @@ -516,6 +516,11 @@ async def alist_flows(self) -> list[Data]: msg = "Session is invalid" raise ValueError(msg) try: + from langflow.api.v1.published_flows import _list_all_published_flows + async with session_scope() as session: + return await _list_all_published_flows( + session=session, status_filter="published" + ) return await list_flows(user_id=str(self.user_id)) except Exception as e: msg = f"Error listing flows: {e}"