Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions core/Controller/TextProcessingApiController.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
use OCP\AppFramework\Http\Attribute\UserRateLimit;
use OCP\AppFramework\Http\DataResponse;
use OCP\Common\Exception\NotFoundException;
use OCP\DB\Exception;
use OCP\IL10N;
use OCP\IRequest;
use OCP\TextProcessing\Exception\TaskFailureException;
use OCP\TextProcessing\ITaskType;
use OCP\TextProcessing\Task;
use OCP\TextProcessing\IManager;
Expand Down Expand Up @@ -102,7 +104,7 @@ public function taskTypes(): DataResponse {
* @param string $appId ID of the app that will execute the task
* @param string $identifier An arbitrary identifier for the task
*
* @return DataResponse<Http::STATUS_OK, array{task: CoreTextProcessingTask}, array{}>|DataResponse<Http::STATUS_BAD_REQUEST|Http::STATUS_PRECONDITION_FAILED, array{message: string}, array{}>
* @return DataResponse<Http::STATUS_OK, array{task: CoreTextProcessingTask}, array{}>|DataResponse<Http::STATUS_INTERNAL_SERVER_ERROR|Http::STATUS_BAD_REQUEST|Http::STATUS_PRECONDITION_FAILED, array{message: string}, array{}>
*
* 200: Task scheduled successfully
* 400: Scheduling task is not possible
Expand All @@ -118,7 +120,11 @@ public function schedule(string $input, string $type, string $appId, string $ide
return new DataResponse(['message' => $this->l->t('Requested task type does not exist')], Http::STATUS_BAD_REQUEST);
}
try {
$this->textProcessingManager->scheduleTask($task);
try {
$this->textProcessingManager->runOrScheduleTask($task);
} catch(TaskFailureException) {
// noop, because the task object has the failure status set already, we just return the task json
}

$json = $task->jsonSerialize();

Expand All @@ -127,6 +133,8 @@ public function schedule(string $input, string $type, string $appId, string $ide
]);
} catch (PreConditionNotMetException) {
return new DataResponse(['message' => $this->l->t('Necessary language model provider is not available')], Http::STATUS_PRECONDITION_FAILED);
} catch (Exception) {
return new DataResponse(['message' => 'Internal server error'], Http::STATUS_INTERNAL_SERVER_ERROR);
}
}

Expand Down
61 changes: 61 additions & 0 deletions core/Migrations/Version28000Date20231103104802.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?php

declare(strict_types=1);

/**
* @copyright Copyright (c) 2023 Marcel Klehr <mklehr@gmx.net>
*
* @author Marcel Klehr <mklehr@gmx.net>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

namespace OC\Core\Migrations;

use Closure;
use OCP\DB\ISchemaWrapper;
use OCP\DB\Types;
use OCP\Migration\IOutput;
use OCP\Migration\SimpleMigrationStep;

/**
* Introduce completion_expected_at column in textprocessing_tasks table
*/
class Version28000Date20231103104802 extends SimpleMigrationStep {
/**
* @param IOutput $output
* @param Closure $schemaClosure The `\Closure` returns a `ISchemaWrapper`
* @param array $options
* @return null|ISchemaWrapper
*/
public function changeSchema(IOutput $output, Closure $schemaClosure, array $options): ?ISchemaWrapper {
/** @var ISchemaWrapper $schema */
$schema = $schemaClosure();
if ($schema->hasTable('textprocessing_tasks')) {
$table = $schema->getTable('textprocessing_tasks');

if (!$table->hasColumn('completion_expected_at')) {
$table->addColumn('completion_expected_at', Types::DATETIME, [
'notnull' => false,
]);
return $schema;
}
}

return null;
}
}
1 change: 1 addition & 0 deletions core/ResponseDefinitions.php
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
* input: string,
* output: ?string,
* identifier: string,
* completionExpectedAt: ?int
* }
*
* @psalm-type CoreTextToImageTask = array{
Expand Down
46 changes: 45 additions & 1 deletion core/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,8 @@
"appId",
"input",
"output",
"identifier"
"identifier",
"completionExpectedAt"
],
"properties": {
"id": {
Expand Down Expand Up @@ -447,6 +448,11 @@
},
"identifier": {
"type": "string"
},
"completionExpectedAt": {
"type": "integer",
"format": "int64",
"nullable": true
}
}
},
Expand Down Expand Up @@ -4661,6 +4667,44 @@
}
}
},
"500": {
"description": "",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"message"
],
"properties": {
"message": {
"type": "string"
}
}
}
}
}
}
}
}
}
},
"400": {
"description": "Scheduling task is not possible",
"content": {
Expand Down
10 changes: 8 additions & 2 deletions lib/private/TextProcessing/Db/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
* @method string getAppId()
* @method setIdentifier(string $identifier)
* @method string getIdentifier()
* @method setCompletionExpectedAt(null|\DateTime $completionExpectedAt)
* @method null|\DateTime getCompletionExpectedAt()
*/
class Task extends Entity {
protected $lastUpdated;
Expand All @@ -55,16 +57,17 @@ class Task extends Entity {
protected $userId;
protected $appId;
protected $identifier;
protected $completionExpectedAt;

/**
* @var string[]
*/
public static array $columns = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'identifier'];
public static array $columns = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'identifier', 'completion_expected_at'];

/**
* @var string[]
*/
public static array $fields = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'identifier'];
public static array $fields = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'identifier', 'completionExpectedAt'];


public function __construct() {
Expand All @@ -78,6 +81,7 @@ public function __construct() {
$this->addType('userId', 'string');
$this->addType('appId', 'string');
$this->addType('identifier', 'string');
$this->addType('completionExpectedAt', 'datetime');
}

public function toRow(): array {
Expand All @@ -98,6 +102,7 @@ public static function fromPublicTask(OCPTask $task): Task {
'userId' => $task->getUserId(),
'appId' => $task->getAppId(),
'identifier' => $task->getIdentifier(),
'completionExpectedAt' => $task->getCompletionExpectedAt(),
]);
return $task;
}
Expand All @@ -107,6 +112,7 @@ public function toPublicTask(): OCPTask {
$task->setId($this->getId());
$task->setStatus($this->getStatus());
$task->setOutput($this->getOutput());
$task->setCompletionExpectedAt($this->getCompletionExpectedAt());
return $task;
}
}
83 changes: 59 additions & 24 deletions lib/private/TextProcessing/Manager.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
use OC\AppFramework\Bootstrap\Coordinator;
use OC\TextProcessing\Db\Task as DbTask;
use OCP\IConfig;
use OCP\TextProcessing\Exception\TaskFailureException;
use OCP\TextProcessing\IProvider2;
use OCP\TextProcessing\Task;
use OCP\TextProcessing\Task as OCPTask;
use OC\TextProcessing\Db\TaskMapper;
Expand Down Expand Up @@ -114,26 +116,16 @@ public function runTask(OCPTask $task): string {
if (!$this->canHandleTask($task)) {
throw new PreConditionNotMetException('No text processing provider is installed that can handle this task');
}
$providers = $this->getProviders();
$json = $this->config->getAppValue('core', 'ai.textprocessing_provider_preferences', '');
if ($json !== '') {
$preferences = json_decode($json, true);
if (isset($preferences[$task->getType()])) {
// If a preference for this task type is set, move the preferred provider to the start
$provider = current(array_filter($providers, fn ($provider) => $provider::class === $preferences[$task->getType()]));
if ($provider !== false) {
$providers = array_filter($providers, fn ($p) => $p !== $provider);
array_unshift($providers, $provider);
}
}
}
$providers = $this->getPreferredProviders($task);

foreach ($providers as $provider) {
if (!$task->canUseProvider($provider)) {
continue;
}
try {
$task->setStatus(OCPTask::STATUS_RUNNING);
if ($provider instanceof IProvider2) {
$completionExpectedAt = new \DateTime('now');
$completionExpectedAt->add(new \DateInterval('PT'.$provider->getExpectedRuntime().'S'));
$task->setCompletionExpectedAt($completionExpectedAt);
}
if ($task->getId() === null) {
$taskEntity = $this->taskMapper->insert(DbTask::fromPublicTask($task));
$task->setId($taskEntity->getId());
Expand All @@ -145,31 +137,33 @@ public function runTask(OCPTask $task): string {
$task->setStatus(OCPTask::STATUS_SUCCESSFUL);
$this->taskMapper->update(DbTask::fromPublicTask($task));
return $output;
} catch (\RuntimeException $e) {
$this->logger->info('LanguageModel call using provider ' . $provider->getName() . ' failed', ['exception' => $e]);
$task->setStatus(OCPTask::STATUS_FAILED);
$this->taskMapper->update(DbTask::fromPublicTask($task));
throw $e;
} catch (\Throwable $e) {
$this->logger->info('LanguageModel call using provider ' . $provider->getName() . ' failed', ['exception' => $e]);
$task->setStatus(OCPTask::STATUS_FAILED);
$this->taskMapper->update(DbTask::fromPublicTask($task));
throw new RuntimeException('LanguageModel call using provider ' . $provider->getName() . ' failed: ' . $e->getMessage(), 0, $e);
throw new TaskFailureException('LanguageModel call using provider ' . $provider->getName() . ' failed: ' . $e->getMessage(), 0, $e);
}
}

throw new RuntimeException('Could not run task');
$task->setStatus(OCPTask::STATUS_FAILED);
$this->taskMapper->update(DbTask::fromPublicTask($task));
throw new TaskFailureException('Could not run task');
}

/**
* @inheritDoc
* @throws Exception
*/
public function scheduleTask(OCPTask $task): void {
if (!$this->canHandleTask($task)) {
throw new PreConditionNotMetException('No LanguageModel provider is installed that can handle this task');
}
$task->setStatus(OCPTask::STATUS_SCHEDULED);
[$provider, ] = $this->getPreferredProviders($task);
if ($provider instanceof IProvider2) {
$completionExpectedAt = new \DateTime('now');
$completionExpectedAt->add(new \DateInterval('PT'.$provider->getExpectedRuntime().'S'));
$task->setCompletionExpectedAt($completionExpectedAt);
}
$taskEntity = DbTask::fromPublicTask($task);
$this->taskMapper->insert($taskEntity);
$task->setId($taskEntity->getId());
Expand All @@ -178,6 +172,25 @@ public function scheduleTask(OCPTask $task): void {
]);
}

/**
* @inheritDoc
*/
public function runOrScheduleTask(OCPTask $task) : bool {
if (!$this->canHandleTask($task)) {
throw new PreConditionNotMetException('No LanguageModel provider is installed that can handle this task');
}
[$provider,] = $this->getPreferredProviders($task);
$maxExecutionTime = (int) ini_get('max_execution_time');
// Offload the task to a background job if the expected runtime of the likely provider is longer than 80% of our max execution time
// or if the provider doesn't provide a getExpectedRuntime() method
if (!$provider instanceof IProvider2 || $provider->getExpectedRuntime() > $maxExecutionTime * 0.8) {
$this->scheduleTask($task);
return false;
}
$this->runTask($task);
return true;
}

/**
* @inheritDoc
*/
Expand Down Expand Up @@ -253,4 +266,26 @@ public function getUserTasksByApp(string $userId, string $appId, ?string $identi
throw new RuntimeException('Failure while trying to find tasks by appId and identifier: ' . $e->getMessage(), 0, $e);
}
}

/**
* @param OCPTask $task
* @return IProvider[]
*/
public function getPreferredProviders(OCPTask $task): array {
$providers = $this->getProviders();
$json = $this->config->getAppValue('core', 'ai.textprocessing_provider_preferences', '');
if ($json !== '') {
$preferences = json_decode($json, true);
if (isset($preferences[$task->getType()])) {
// If a preference for this task type is set, move the preferred provider to the start
$provider = current(array_filter($providers, fn ($provider) => $provider::class === $preferences[$task->getType()]));
if ($provider !== false) {
$providers = array_filter($providers, fn ($p) => $p !== $provider);
array_unshift($providers, $provider);
}
}
}
$providers = array_filter($providers, fn (IProvider $provider) => $task->canUseProvider($provider));
return $providers;
}
}
10 changes: 10 additions & 0 deletions lib/public/TextProcessing/Exception/TaskFailureException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

namespace OCP\TextProcessing\Exception;

/**
* Exception thrown when a task failed
* @since 28.0.0
*/
class TaskFailureException extends \RuntimeException {
}
Loading