Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions appinfo/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ See [the nextcloud admin docs](https://docs.nextcloud.com/server/latest/admin_ma
</docker-install>
<scopes>
<value>AI_PROVIDERS</value>
<value>TASK_PROCESSING</value>
</scopes>
<system>false</system>
</external-app>
Expand Down
76 changes: 36 additions & 40 deletions lib/main.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
"""Tha main module of the llm2 app
"""

import queue
import threading
import typing
import time
from contextlib import asynccontextmanager
from time import perf_counter

import pydantic
from chains import generate_chains
from fastapi import Depends, FastAPI, responses
from fastapi import FastAPI
from nc_py_api import AsyncNextcloudApp, NextcloudApp
from nc_py_api.ex_app import LogLvl, anc_app, run_app, set_handlers
from nc_py_api.ex_app import LogLvl, run_app, set_handlers

from chains import generate_chains

chains = generate_chains()


@asynccontextmanager
async def lifespan(_app: FastAPI):
set_handlers(
Expand All @@ -27,57 +27,54 @@ async def lifespan(_app: FastAPI):


APP = FastAPI(lifespan=lifespan)
TASK_LIST: queue.Queue = queue.Queue(maxsize=100)


class BackgroundProcessTask(threading.Thread):
def run(self, *args, **kwargs): # pylint: disable=unused-argument
nc = NextcloudApp()

provider_ids = set()
task_type_ids = set()
for chain_name, _ in chains.items():
provider_ids.add("llm2:" + chain_name)
(model, task) = chain_name.split(":", 2)
task_type_ids.add("core:text2text:" + task)

while True:
task = TASK_LIST.get(block=True)
response = nc.providers.task_processing.next_task(list(provider_ids), list(task_type_ids))
if not response:
time.sleep(5)
continue

task = response["task"]
provider = response["provider"]

try:
chain_name = task.get("chain")
chain_name = provider["name"][5:]
print(f"chain: {chain_name}", flush=True)
chain_load = chains.get(chain_name)
if chain_load is None:
NextcloudApp().providers.text_processing.report_result(
task["id"], error="Requested model is not available"
NextcloudApp().providers.task_processing.report_result(
task["id"], error_message="Requested model is not available"
)
continue
chain = chain_load()
print("Generating reply", flush=True)
time_start = perf_counter()
print(task.get("prompt"))
result = chain.invoke(task.get("prompt")).get("text")
print(task.get("input").get("input"))
result = chain.invoke(task.get("input").get("input")).get("text")
del chain
print(f"reply generated: {round(float(perf_counter() - time_start), 2)}s", flush=True)
print(result, flush=True)
NextcloudApp().providers.text_processing.report_result(
NextcloudApp().providers.task_processing.report_result(
task["id"],
str(result),
{"output": str(result)},
)
except Exception as e: # noqa
print(str(e), flush=True)
nc = NextcloudApp()
nc.log(LogLvl.ERROR, str(e))
nc.providers.text_processing.report_result(task["id"], error=str(e))


class Input(pydantic.BaseModel):
prompt: str
task_id: int


@APP.post("/chain/{chain_name}")
async def tiny_llama(
_nc: typing.Annotated[AsyncNextcloudApp, Depends(anc_app)],
req: Input,
chain_name=None,
):
try:
TASK_LIST.put({"prompt": req.prompt, "id": req.task_id, "chain": chain_name}, block=False)
except queue.Full:
return responses.JSONResponse(content={"error": "task queue is full"}, status_code=429)
return responses.Response()
nc.providers.task_processing.report_result(task["id"], error_message=str(e))


async def enabled_handler(enabled: bool, nc: AsyncNextcloudApp) -> str:
Expand All @@ -86,17 +83,16 @@ async def enabled_handler(enabled: bool, nc: AsyncNextcloudApp) -> str:
for chain_name, _ in chains.items():
(model, task) = chain_name.split(":", 2)
try:
await nc.providers.text_processing.register(
"llm2:"+chain_name, "Local Large language Model: " + model, "/chain/" + chain_name, task
await nc.providers.task_processing.register(
"llm2:" + chain_name, "Local Large language Model: " + model, "core:text2text:" + task
)
print(f"Registering {model} - {task}", flush=True)
print(f"Registering {chain_name}", flush=True)
except Exception as e:
print(f"Failed to register", f"{model} - {task}", f"Error:", f"{e}\n", flush=True)
else:
for chain_name, chain in chains.items():
(model, task) = chain_name.split(":", 2)
await nc.providers.text_processing.unregister(model)
print(f"Unregistering {model} - {task}", flush=True)
await nc.providers.task_processing.unregister("llm2:" + chain_name)
print(f"Unregistering {chain_name}", flush=True)
return ""


Expand Down
8 changes: 4 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ package-mode = false
python = "^3.10"
pydantic = "^2.8.2"
fastapi = "^0.111.0"
nc-py-api = {extras = ["app"], version = "^0.13.0"}
nc-py-api = {extras = ["app"], version = "^0.14.0"}
langchain = "^0.1.0"
llama-cpp-python = "0.2.76"


[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
build-backend = "poetry.core.masonry.api"