Skip to content

Conversation

@codeflash-ai
Copy link
Contributor

@codeflash-ai codeflash-ai bot commented Feb 6, 2025

⚡️ This pull request contains optimizations for PR #6028

If you approve this dependent PR, these changes will be merged into the original PR branch PlaygroundPage.

This PR will be automatically closed if the original PR is merged.


📄 11% (0.11x) speedup for build_output_logs in src/backend/base/langflow/schema/schema.py

⏱️ Runtime : 2.08 milliseconds 1.87 millisecond (best of 50 runs)

📝 Explanation and details

To improve the performance of the given Python program, we aim to optimize the computational complexity, reduce redundant operations, and utilize more efficient pattern matching and serialization techniques. Below are several modifications to enhance the runtime.

  1. Rewriting the get_type method using a dictionary to eliminate multiple match cases.
  2. Enhancing the get_message function with early returns to reduce checks.
  3. Rewriting the build_output_logs function to eliminate redundant attributes lookups and unnecessary type checking.
  4. Optimizing the serialize function to avoid repeated calls and checks, and streamlining the exception handling to be more efficient.

Summary of Changes.

  1. get_type function: Used explicit type-checking and consolidated match cases.
  2. get_message function: Early returns to streamline checks.
  3. build_output_logs function: Optimized loops and conditions, combined checks to remove redundancy.
  4. serialize function: Simplified the logic to avoid repeated processes and added inline handling for common cases. Streamlined error handling to improve performance.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 23 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage undefined
🌀 Generated Regression Tests Details
from collections.abc import Generator
from enum import Enum
from typing import Any, cast

import pandas as pd
# imports
import pytest  # used for our unit tests
from langflow.schema.schema import build_output_logs
from pydantic import BaseModel
from typing_extensions import TypedDict


# function to test
class OutputValue(BaseModel):
    message: Any
    type: str

class LogType(str, Enum):
    MESSAGE = "message"
    DATA = "data"
    STREAM = "stream"
    OBJECT = "object"
    ARRAY = "array"
    TEXT = "text"
    UNKNOWN = "unknown"

class StreamURL(TypedDict):
    location: str

class Data:
    def __init__(self, data):
        self.data = data

class Message:
    def __init__(self, text):
        self.text = text

class DataFrame(pd.DataFrame):
    def to_data_list(self):
        list_of_dicts = self.to_dict(orient="records")
        return [Data(data=row) for row in list_of_dicts]

    def add_row(self, data: dict | Data) -> "DataFrame":
        if isinstance(data, Data):
            data = data.data
        new_df = self._constructor([data])
        return cast("DataFrame", pd.concat([self, new_df], ignore_index=True))

    def add_rows(self, data: list[dict | Data]) -> "DataFrame":
        processed_data = []
        for item in data:
            if isinstance(item, Data):
                processed_data.append(item.data)
            else:
                processed_data.append(item)
        new_df = self._constructor(processed_data)
        return cast("DataFrame", pd.concat([self, new_df], ignore_index=True))

    @property
    def _constructor(self):
        def _c(*args, **kwargs):
            return DataFrame(*args, **kwargs).__finalize__(self)

        return _c

    def __bool__(self):
        return not self.empty
from langflow.schema.schema import build_output_logs


# Mock Classes for Testing
class MockVertex:
    def __init__(self, outputs):
        self.outputs = outputs

class MockComponentInstance:
    def __init__(self, status, _results=None, _artifacts=None):
        self.status = status
        self._results = _results or {}
        self._artifacts = _artifacts or {}

# unit tests

# Basic Functionality
def test_single_output_simple_string():
    vertex = MockVertex(outputs=[{"name": "output1"}])
    result = [MockComponentInstance(status=None, _results={"output1": "simple string"})]
    expected = {"output1": {"message": "simple string", "type": "text"}}
    codeflash_output = build_output_logs(vertex, result)

def test_multiple_outputs_simple_dicts():
    vertex = MockVertex(outputs=[{"name": "output1"}, {"name": "output2"}])
    result = [MockComponentInstance(status=None, _results={"output1": {"key": "value"}, "output2": {"key2": "value2"}})]
    expected = {
        "output1": {"message": {"key": "value"}, "type": "object"},
        "output2": {"message": {"key2": "value2"}, "type": "object"}
    }
    codeflash_output = build_output_logs(vertex, result)

# Different Payload Types
def test_message_payload():
    message = Message(text="This is a message")
    vertex = MockVertex(outputs=[{"name": "output1"}])
    result = [MockComponentInstance(status=None, _results={"output1": message})]
    expected = {"output1": {"message": "This is a message", "type": "message"}}
    codeflash_output = build_output_logs(vertex, result)

def test_dataframe_payload():
    df = DataFrame([{"name": "John"}, {"name": "Jane"}])
    vertex = MockVertex(outputs=[{"name": "output1"}])
    result = [MockComponentInstance(status=None, _results={"output1": df})]
    expected = {"output1": {"message": [{"name": "John"}, {"name": "Jane"}], "type": "array"}}
    codeflash_output = build_output_logs(vertex, result)

# Edge Cases
def test_empty_payload():
    vertex = MockVertex(outputs=[{"name": "output1"}])
    result = [MockComponentInstance(status=None, _results={"output1": None})]
    expected = {"output1": {"message": None, "type": "unknown"}}
    codeflash_output = build_output_logs(vertex, result)

def test_unknown_type_payload():
    class CustomType:
        pass
    payload = CustomType()
    vertex = MockVertex(outputs=[{"name": "output1"}])
    result = [MockComponentInstance(status=None, _results={"output1": payload})]
    expected = {"output1": {"message": "", "type": "unknown"}}
    codeflash_output = build_output_logs(vertex, result)

# Performance and Scalability
def test_large_dataframe_payload():
    df = DataFrame([{"name": f"User{i}"} for i in range(1000)])
    vertex = MockVertex(outputs=[{"name": "output1"}])
    result = [MockComponentInstance(status=None, _results={"output1": df})]
    expected = {"output1": {"message": df.to_dict(orient="records"), "type": "array"}}
    codeflash_output = build_output_logs(vertex, result)

def test_high_complexity_payload():
    nested_data = {"level1": {"level2": {"level3": "deep value"}}}
    vertex = MockVertex(outputs=[{"name": "output1"}])
    result = [MockComponentInstance(status=None, _results={"output1": nested_data})]
    expected = {"output1": {"message": nested_data, "type": "object"}}
    codeflash_output = build_output_logs(vertex, result)

# Error Handling
def test_invalid_payload():
    vertex = MockVertex(outputs=[{"name": "output1"}])
    result = [MockComponentInstance(status=None, _results={"output1": 12345})]
    expected = {"output1": {"message": 12345, "type": "unknown"}}
    codeflash_output = build_output_logs(vertex, result)

def test_serialization_failure():
    class Unserializable:
        pass
    payload = Unserializable()
    vertex = MockVertex(outputs=[{"name": "output1"}])
    result = [MockComponentInstance(status=None, _results={"output1": payload})]
    expected = {"output1": {"message": "[Unserializable Object]", "type": "unknown"}}
    codeflash_output = build_output_logs(vertex, result)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

from collections.abc import Generator
from enum import Enum
from typing import Any, cast

import pandas as pd
# imports
import pytest  # used for our unit tests
# function to test
from langflow.schema.data import Data
from langflow.schema.dataframe import DataFrame
from langflow.schema.message import Message
from langflow.schema.schema import build_output_logs
from langflow.serialization.constants import MAX_ITEMS_LENGTH, MAX_TEXT_LENGTH
from langflow.serialization.serialization import serialize
from loguru import logger
from pydantic import BaseModel
from typing_extensions import TypedDict


class OutputValue(BaseModel):
    message: Any
    type: str

class LogType(str, Enum):
    MESSAGE = "message"
    DATA = "data"
    STREAM = "stream"
    OBJECT = "object"
    ARRAY = "array"
    TEXT = "text"
    UNKNOWN = "unknown"

class StreamURL(TypedDict):
    location: str

class DataFrame(pd.DataFrame):
    def __init__(self, data: list[dict] | list[Data] | pd.DataFrame | None = None, **kwargs):
        if data is None:
            super().__init__(**kwargs)
            return

        if isinstance(data, list):
            if all(isinstance(x, Data) for x in data):
                data = [d.data for d in data if hasattr(d, "data")]
            elif not all(isinstance(x, dict) for x in data):
                msg = "List items must be either all Data objects or all dictionaries"
                raise ValueError(msg)
            kwargs["data"] = data
        elif isinstance(data, (dict, pd.DataFrame)):
            kwargs["data"] = data

        super().__init__(**kwargs)

    def to_data_list(self) -> list[Data]:
        list_of_dicts = self.to_dict(orient="records")
        return [Data(data=row) for row in list_of_dicts]

    def add_row(self, data: dict | Data) -> "DataFrame":
        if isinstance(data, Data):
            data = data.data
        new_df = self._constructor([data])
        return cast("DataFrame", pd.concat([self, new_df], ignore_index=True))

    def add_rows(self, data: list[dict | Data]) -> "DataFrame":
        processed_data = []
        for item in data:
            if isinstance(item, Data):
                processed_data.append(item.data)
            else:
                processed_data.append(item)
        new_df = self._constructor(processed_data)
        return cast("DataFrame", pd.concat([self, new_df], ignore_index=True))

    @property
    def _constructor(self):
        def _c(*args, **kwargs):
            return DataFrame(*args, **kwargs).__finalize__(self)
        return _c

    def __bool__(self):
        return not self.empty
from langflow.schema.schema import build_output_logs

# unit tests

class MockComponentInstance:
    def __init__(self, status, results=None, artifacts=None):
        self.status = status
        self._results = results or {}
        self._artifacts = artifacts or {}

class MockVertex:
    def __init__(self, outputs):
        self.outputs = outputs

def test_single_output_string():
    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": "test_string"})]
    expected_output = {"output_1": {"message": "test_string", "type": "text"}}
    codeflash_output = build_output_logs(vertex, result)

def test_multiple_outputs_mixed_types():
    vertex = MockVertex(outputs=[{"name": "output_1"}, {"name": "output_2"}])
    result = [MockComponentInstance(status=None, results={"output_1": "test_string", "output_2": 123})]
    expected_output = {
        "output_1": {"message": "test_string", "type": "text"},
        "output_2": {"message": 123, "type": "unknown"},
    }
    codeflash_output = build_output_logs(vertex, result)

def test_output_as_dictionary():
    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": {"key": "value"}})]
    expected_output = {"output_1": {"message": {"key": "value"}, "type": "object"}}
    codeflash_output = build_output_logs(vertex, result)

def test_output_as_list():
    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": ["item1", "item2"]})]
    expected_output = {"output_1": {"message": ["item1", "item2"], "type": "array"}}
    codeflash_output = build_output_logs(vertex, result)

def test_output_as_dataframe():
    df = DataFrame([{"col1": "value1"}, {"col1": "value2"}])
    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": df})]
    expected_output = {"output_1": {"message": [{"col1": "value1"}, {"col1": "value2"}], "type": "array"}}
    codeflash_output = build_output_logs(vertex, result)

def test_output_with_stream_url():
    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": {"stream_url": "http://example.com"}})]
    expected_output = {"output_1": {"message": {"location": "http://example.com"}, "type": "stream"}}
    codeflash_output = build_output_logs(vertex, result)

def test_output_with_message_object():
    class CustomMessage:
        def __init__(self, message):
            self.message = message

    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": CustomMessage("test_message")})]
    expected_output = {"output_1": {"message": "test_message", "type": "message"}}
    codeflash_output = build_output_logs(vertex, result)

def test_empty_output():
    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": ""})]
    expected_output = {"output_1": {"message": "", "type": "text"}}
    codeflash_output = build_output_logs(vertex, result)

def test_unknown_type_output():
    class CustomObject:
        pass

    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": CustomObject()})]
    expected_output = {"output_1": {"message": "[Unserializable Object]", "type": "unknown"}}
    codeflash_output = build_output_logs(vertex, result)

def test_large_scale_data():
    large_dict = {f"key_{i}": i for i in range(1000)}
    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": large_dict})]
    expected_output = {"output_1": {"message": large_dict, "type": "object"}}
    codeflash_output = build_output_logs(vertex, result)

def test_circular_reference():
    circular_dict = {}
    circular_dict["self"] = circular_dict
    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": circular_dict})]
    expected_output = {"output_1": {"message": "[Unserializable Object]", "type": "unknown"}}
    codeflash_output = build_output_logs(vertex, result)

def test_deeply_nested_structure():
    nested_dict = {"level1": {"level2": {"level3": "value"}}}
    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": nested_dict})]
    expected_output = {"output_1": {"message": nested_dict, "type": "object"}}
    codeflash_output = build_output_logs(vertex, result)

def test_special_characters_in_strings():
    special_string = "Line1\nLine2\tTabbed"
    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": special_string})]
    expected_output = {"output_1": {"message": special_string, "type": "text"}}
    codeflash_output = build_output_logs(vertex, result)

def test_large_string_value():
    large_string = "a" * (MAX_TEXT_LENGTH + 1)
    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": large_string})]
    expected_output = {"output_1": {"message": large_string[:MAX_TEXT_LENGTH], "type": "text"}}
    codeflash_output = build_output_logs(vertex, result)

def test_large_collection():
    large_list = ["item"] * (MAX_ITEMS_LENGTH + 1)
    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": large_list})]
    expected_output = {"output_1": {"message": large_list[:MAX_ITEMS_LENGTH], "type": "array"}}
    codeflash_output = build_output_logs(vertex, result)

def test_immutable_data_structure():
    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": (1, 2, 3)})]
    expected_output = {"output_1": {"message": [1, 2, 3], "type": "array"}}
    codeflash_output = build_output_logs(vertex, result)

def test_custom_object_with_special_methods():
    class CustomObject:
        def __str__(self):
            raise Exception("Serialization error")

    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": CustomObject()})]
    expected_output = {"output_1": {"message": "[Unserializable Object]", "type": "unknown"}}
    codeflash_output = build_output_logs(vertex, result)


def test_concurrency_issues():
    import threading

    def modify_shared_resource():
        shared_resource.append("new_item")

    shared_resource = []
    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": shared_resource})]

    thread = threading.Thread(target=modify_shared_resource)
    thread.start()
    thread.join()

    expected_output = {"output_1": {"message": shared_resource, "type": "array"}}
    codeflash_output = build_output_logs(vertex, result)

def test_invalid_stream_url():
    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": {"stream_url": "invalid_url"}})]
    expected_output = {"output_1": {"message": {"location": "invalid_url"}, "type": "stream"}}
    codeflash_output = build_output_logs(vertex, result)

def test_data_with_special_types():
    from datetime import datetime

    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": datetime(2023, 10, 1)})]
    expected_output = {"output_1": {"message": "2023-10-01 00:00:00", "type": "unknown"}}
    codeflash_output = build_output_logs(vertex, result)

def test_mixed_type_collections():
    mixed_dict = {"str_key": "value", 1: "int_key"}
    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": mixed_dict})]
    expected_output = {"output_1": {"message": mixed_dict, "type": "object"}}
    codeflash_output = build_output_logs(vertex, result)

def test_non_standard_dataframe_indexes():
    df = DataFrame({"col1": ["value1", "value2"]}, index=["row1", "row2"])
    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": df})]
    expected_output = {"output_1": {"message": [{"col1": "value1"}, {"col1": "value2"}], "type": "array"}}
    codeflash_output = build_output_logs(vertex, result)

def test_corrupted_data():
    corrupted_data = Data()
    vertex = MockVertex(outputs=[{"name": "output_1"}])
    result = [MockComponentInstance(status=None, results={"output_1": corrupted_data})]
    expected_output = {"output_1": {"message": "[Unserializable Object]", "type": "unknown"}}
    codeflash_output = build_output_logs(vertex, result)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

Codeflash

…undPage`)

To improve the performance of the given Python program, we aim to optimize the computational complexity, reduce redundant operations, and utilize more efficient pattern matching and serialization techniques. Below are several modifications to enhance the runtime.

1. **Rewriting the `get_type` method using a dictionary to eliminate multiple `match` cases.**
2. **Enhancing the `get_message` function with early returns to reduce checks.**
3. **Rewriting the `build_output_logs` function to eliminate redundant attributes lookups and unnecessary type checking.**
4. **Optimizing the `serialize` function to avoid repeated calls and checks, and streamlining the exception handling to be more efficient.**




### Summary of Changes.
1. **`get_type` function:** Used explicit type-checking and consolidated match cases.
2. **`get_message` function:** Early returns to streamline checks.
3. **`build_output_logs` function:** Optimized loops and conditions, combined checks to remove redundancy.
4. **`serialize` function:** Simplified the logic to avoid repeated processes and added inline handling for common cases. Streamlined error handling to improve performance.
@codeflash-ai codeflash-ai bot added the ⚡️ codeflash Optimization PR opened by Codeflash AI label Feb 6, 2025
@dosubot dosubot bot added size:L This PR changes 100-499 lines, ignoring generated files. python Pull requests that update Python code labels Feb 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI python Pull requests that update Python code size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant