Skip to content
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'origin/main' into feat/COG-341-log-inte…
…ractions
  • Loading branch information
borisarzentar committed Nov 13, 2024
commit 420eb6416e3520d5845b86c45f48ac3975c9d402
281 changes: 5 additions & 276 deletions cognee/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,6 @@
from fastapi import FastAPI
from fastapi.responses import JSONResponse, Response
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel

from cognee.api.DTO import InDTO, OutDTO
from cognee.api.v1.search import SearchType
from cognee.modules.search.operations import get_history
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_authenticated_user
from cognee.modules.pipelines.models import PipelineRunStatus


# Set up logging
logging.basicConfig(
Expand Down Expand Up @@ -154,273 +145,11 @@ def health_check():
tags=["cognify"]
)

class DatasetDTO(OutDTO):
id: UUID
name: str
created_at: datetime
updated_at: Optional[datetime]
owner_id: UUID

@app.get("/api/v1/datasets", response_model = list[DatasetDTO])
async def get_datasets(user: User = Depends(get_authenticated_user)):
try:
from cognee.modules.data.methods import get_datasets
datasets = await get_datasets(user.id)

return datasets
except Exception as error:
logger.error(f"Error retrieving datasets: {str(error)}")
raise HTTPException(status_code = 500, detail = f"Error retrieving datasets: {str(error)}") from error


@app.delete("/api/v1/datasets/{dataset_id}", response_model = None, responses = { 404: { "model": ErrorResponseDTO }})
async def delete_dataset(dataset_id: str, user: User = Depends(get_authenticated_user)):
from cognee.modules.data.methods import get_dataset, delete_dataset

dataset = await get_dataset(user.id, dataset_id)

if dataset is None:
raise HTTPException(
status_code = 404,
detail = f"Dataset ({dataset_id}) not found."
)

await delete_dataset(dataset)


@app.get("/api/v1/datasets/{dataset_id}/graph", response_model = str)
async def get_dataset_graph(dataset_id: str, user: User = Depends(get_authenticated_user)):
from cognee.shared.utils import render_graph
from cognee.infrastructure.databases.graph import get_graph_engine

try:
graph_client = await get_graph_engine()
graph_url = await render_graph(graph_client.graph)

return JSONResponse(
status_code = 200,
content = str(graph_url),
)
except:
return JSONResponse(
status_code = 409,
content = "Graphistry credentials are not set. Please set them in your .env file.",
)


class DataDTO(OutDTO):
id: UUID
name: str
created_at: datetime
updated_at: Optional[datetime]
extension: str
mime_type: str
raw_data_location: str

@app.get("/api/v1/datasets/{dataset_id}/data", response_model = list[DataDTO], responses = { 404: { "model": ErrorResponseDTO }})
async def get_dataset_data(dataset_id: str, user: User = Depends(get_authenticated_user)):
from cognee.modules.data.methods import get_dataset_data, get_dataset

dataset = await get_dataset(user.id, dataset_id)

if dataset is None:
return JSONResponse(
status_code = 404,
content = ErrorResponseDTO(f"Dataset ({dataset_id}) not found."),
)

dataset_data = await get_dataset_data(dataset_id = dataset.id)

if dataset_data is None:
return []

return dataset_data


@app.get("/api/v1/datasets/status", response_model = dict[str, PipelineRunStatus])
async def get_dataset_status(datasets: Annotated[List[str], Query(alias="dataset")] = None, user: User = Depends(get_authenticated_user)):
from cognee.api.v1.datasets.datasets import datasets as cognee_datasets

try:
datasets_statuses = await cognee_datasets.get_status(datasets)

return datasets_statuses
except Exception as error:
return JSONResponse(
status_code = 409,
content = {"error": str(error)}
)


@app.get("/api/v1/datasets/{dataset_id}/data/{data_id}/raw", response_class = FileResponse)
async def get_raw_data(dataset_id: str, data_id: str, user: User = Depends(get_authenticated_user)):
from cognee.modules.data.methods import get_dataset, get_dataset_data

dataset = await get_dataset(user.id, dataset_id)

if dataset is None:
return JSONResponse(
status_code = 404,
content = {
"detail": f"Dataset ({dataset_id}) not found."
}
)

dataset_data = await get_dataset_data(dataset.id)

if dataset_data is None:
raise HTTPException(status_code = 404, detail = f"Dataset ({dataset_id}) not found.")

data = [data for data in dataset_data if str(data.id) == data_id][0]

if data is None:
return JSONResponse(
status_code = 404,
content = {
"detail": f"Data ({data_id}) not found in dataset ({dataset_id})."
}
)

return data.raw_data_location


@app.post("/api/v1/add", response_model = None)
async def add(
data: List[UploadFile],
datasetId: str = Form(...),
user: User = Depends(get_authenticated_user),
):
""" This endpoint is responsible for adding data to the graph."""
from cognee.api.v1.add import add as cognee_add
try:
if isinstance(data, str) and data.startswith("http"):
if "github" in data:
# Perform git clone if the URL is from GitHub
repo_name = data.split("/")[-1].replace(".git", "")
os.system(f"git clone {data} .data/{repo_name}")
await cognee_add(
"data://.data/",
f"{repo_name}",
)
else:
# Fetch and store the data from other types of URL using curl
async with aiohttp.ClientSession() as session:
async with session.get(data) as resp:
if resp.status == 200:
file_data = await resp.read()
with open(f".data/{data.split('/')[-1]}", "wb") as f:
f.write(file_data)
await cognee_add(
"data://.data/",
f"{data.split('/')[-1]}",
)
else:
await cognee_add(
data,
datasetId,
user = user,
)
except Exception as error:
return JSONResponse(
status_code = 409,
content = {"error": str(error)}
)


class CognifyPayloadDTO(BaseModel):
datasets: List[str]

@app.post("/api/v1/cognify", response_model = None)
async def cognify(payload: CognifyPayloadDTO, user: User = Depends(get_authenticated_user)):
""" This endpoint is responsible for the cognitive processing of the content."""
from cognee.api.v1.cognify.cognify_v2 import cognify as cognee_cognify
try:
await cognee_cognify(payload.datasets, user)
except Exception as error:
return JSONResponse(
status_code = 409,
content = {"error": str(error)}
)


class SearchPayloadDTO(InDTO):
search_type: SearchType
query: str

@app.post("/api/v1/search", response_model = list)
async def search(payload: SearchPayloadDTO, user: User = Depends(get_authenticated_user)):
""" This endpoint is responsible for searching for nodes in the graph."""
from cognee.api.v1.search import search as cognee_search

try:
results = await cognee_search(payload.search_type, payload.query, user)

return results
except Exception as error:
return JSONResponse(
status_code = 409,
content = {"error": str(error)}
)

class SearchHistoryItem(OutDTO):
id: UUID
text: str
user: str
created_at: datetime

@app.get("/api/v1/search", response_model = list[SearchHistoryItem])
async def get_search_history(user: User = Depends(get_authenticated_user)):
try:
history = await get_history(user.id)

return history
except Exception as error:
return JSONResponse(
status_code = 500,
content = {"error": str(error)}
)

from cognee.modules.settings.get_settings import LLMConfig, VectorDBConfig

class LLMConfigDTO(OutDTO, LLMConfig):
pass

class VectorDBConfigDTO(OutDTO, VectorDBConfig):
pass

class SettingsDTO(OutDTO):
llm: LLMConfigDTO
vector_db: VectorDBConfigDTO

@app.get("/api/v1/settings", response_model = SettingsDTO)
async def get_settings(user: User = Depends(get_authenticated_user)):
from cognee.modules.settings import get_settings as get_cognee_settings
return get_cognee_settings()


class LLMConfigDTO(InDTO):
provider: Union[Literal["openai"], Literal["ollama"], Literal["anthropic"]]
model: str
api_key: str

class VectorDBConfigDTO(InDTO):
provider: Union[Literal["lancedb"], Literal["qdrant"], Literal["weaviate"], Literal["pgvector"]]
url: str
api_key: str

class SettingsPayloadDTO(InDTO):
llm: Optional[LLMConfigDTO] = None
vector_db: Optional[VectorDBConfigDTO] = None

@app.post("/api/v1/settings", response_model = None)
async def save_settings(new_settings: SettingsPayloadDTO, user: User = Depends(get_authenticated_user)):
from cognee.modules.settings import save_llm_config, save_vector_db_config

if new_settings.llm is not None:
await save_llm_config(new_settings.llm)

if new_settings.vector_db is not None:
await save_vector_db_config(new_settings.vector_db)
app.include_router(
get_search_router(),
prefix="/api/v1/search",
tags=["search"]
)

app.include_router(
get_settings_router(),
Expand Down
29 changes: 25 additions & 4 deletions cognee/api/v1/search/routers/get_search_router.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from cognee.api.v1.search import SearchType
from uuid import UUID
from datetime import datetime
from fastapi import Depends, APIRouter
from fastapi.responses import JSONResponse
from cognee.api.v1.search import SearchType
from cognee.api.DTO import InDTO, OutDTO
from cognee.modules.users.models import User
from fastapi import Depends, APIRouter
from cognee.api.DTO import InDTO
from cognee.modules.search.operations import get_history
from cognee.modules.users.methods import get_authenticated_user


Expand All @@ -13,6 +16,24 @@ class SearchPayloadDTO(InDTO):
def get_search_router() -> APIRouter:
router = APIRouter()

class SearchHistoryItem(OutDTO):
id: UUID
text: str
user: str
created_at: datetime

@router.get("/", response_model = list[SearchHistoryItem])
async def get_search_history(user: User = Depends(get_authenticated_user)):
try:
history = await get_history(user.id)

return history
except Exception as error:
return JSONResponse(
status_code = 500,
content = {"error": str(error)}
)
Comment on lines +25 to +35
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling and add pagination

  1. The generic exception handling could mask specific errors. Consider handling specific exceptions:
     try:
         history = await get_history(user.id)
         return history
-    except Exception as error:
+    except ValueError as error:
         return JSONResponse(
             status_code = 500,
             content = {"error": str(error)}
         )
+    except Exception as error:
+        # Log the unexpected error
+        return JSONResponse(
+            status_code = 500,
+            content = {"error": "Internal server error"}
+        )
  1. Consider adding pagination to handle large result sets:
@router.get("/", response_model=Page[SearchHistoryItem])
async def get_search_history(
    user: User = Depends(get_authenticated_user),
    page: int = Query(1, ge=1),
    size: int = Query(20, ge=1, le=100)
):


@router.post("/", response_model = list)
async def search(payload: SearchPayloadDTO, user: User = Depends(get_authenticated_user)):
""" This endpoint is responsible for searching for nodes in the graph."""
Expand All @@ -28,4 +49,4 @@ async def search(payload: SearchPayloadDTO, user: User = Depends(get_authenticat
content = {"error": str(error)}
)

return router
return router
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.