diff --git a/cypress/e2e/api/SyncServiceProvider.spec.js b/cypress/e2e/api/SyncServiceProvider.spec.js index 664453ae7cb..8844754aab0 100644 --- a/cypress/e2e/api/SyncServiceProvider.spec.js +++ b/cypress/e2e/api/SyncServiceProvider.spec.js @@ -53,7 +53,7 @@ describe('Sync service provider', function () { baseVersionEtag, ) const queue = [] - syncService.on('opened', () => syncService.startSync()) + syncService.bus.on('opened', () => syncService.startSync()) return createSyncServiceProvider({ ydoc, syncService, diff --git a/lib/Controller/PublicSessionController.php b/lib/Controller/PublicSessionController.php index e529523737a..c6cf2a2d8c3 100644 --- a/lib/Controller/PublicSessionController.php +++ b/lib/Controller/PublicSessionController.php @@ -78,8 +78,8 @@ public function close(int $documentId, int $sessionId, string $sessionToken): Da #[PublicPage] #[RequireDocumentBaseVersionEtag] #[RequireDocumentSession] - public function push(int $documentId, int $sessionId, string $sessionToken, int $version, array $steps, string $awareness, string $token): DataResponse { - return $this->apiService->push($this->getSession(), $this->getDocument(), $version, $steps, $awareness, $token); + public function push(int $documentId, int $sessionId, string $sessionToken, int $version, array $steps, string $awareness, string $token, ?int $recoveryAttempt = null): DataResponse { + return $this->apiService->push($this->getSession(), $this->getDocument(), $version, $steps, $awareness, $recoveryAttempt, $token); } #[NoAdminRequired] diff --git a/lib/Controller/SessionController.php b/lib/Controller/SessionController.php index c243534ade7..8028e8609f3 100644 --- a/lib/Controller/SessionController.php +++ b/lib/Controller/SessionController.php @@ -56,10 +56,10 @@ public function close(int $documentId, int $sessionId, string $sessionToken): Da #[PublicPage] #[RequireDocumentBaseVersionEtag] #[RequireDocumentSession] - public function push(int $version, array $steps, string $awareness): DataResponse { + public function push(int $version, array $steps, string $awareness, ?int $recoveryAttempt = null): DataResponse { try { $this->loginSessionUser(); - return $this->apiService->push($this->getSession(), $this->getDocument(), $version, $steps, $awareness); + return $this->apiService->push($this->getSession(), $this->getDocument(), $version, $steps, $awareness, $recoveryAttempt); } finally { $this->restoreSessionUser(); } diff --git a/lib/Service/ApiService.php b/lib/Service/ApiService.php index 2d277d3f4a9..93a546ed529 100644 --- a/lib/Service/ApiService.php +++ b/lib/Service/ApiService.php @@ -174,7 +174,7 @@ public function close(int $documentId, int $sessionId, string $sessionToken): Da /** * @throws NotFoundException */ - public function push(Session $session, Document $document, int $version, array $steps, string $awareness, ?string $token = null): DataResponse { + public function push(Session $session, Document $document, int $version, array $steps, string $awareness, ?int $recoveryAttempt, ?string $token = null): DataResponse { try { $session = $this->sessionService->updateSessionAwareness($session, $awareness); } catch (DoesNotExistException $e) { @@ -182,7 +182,7 @@ public function push(Session $session, Document $document, int $version, array $ return new DataResponse(['error' => $this->l10n->t('Editing session has expired. Please reload the page.')], Http::STATUS_PRECONDITION_FAILED); } try { - $result = $this->documentService->addStep($document, $session, $steps, $version, $token); + $result = $this->documentService->addStep($document, $session, $steps, $version, $recoveryAttempt, $token); $this->addToPushQueue($document, [$awareness, ...array_values($steps)]); } catch (InvalidArgumentException $e) { return new DataResponse(['error' => $e->getMessage()], Http::STATUS_UNPROCESSABLE_ENTITY); diff --git a/lib/Service/DocumentService.php b/lib/Service/DocumentService.php index 18b5808fbd5..ca5aa551de5 100644 --- a/lib/Service/DocumentService.php +++ b/lib/Service/DocumentService.php @@ -205,7 +205,7 @@ public function writeDocumentState(int $documentId, string $content): void { * @throws NotPermittedException * @throws DoesNotExistException */ - public function addStep(Document $document, Session $session, array $steps, int $version, ?string $shareToken): array { + public function addStep(Document $document, Session $session, array $steps, int $version, ?int $recoveryAttempt, ?string $shareToken): array { $documentId = $session->getDocumentId(); $readOnly = $this->isReadOnlyCached($session, $shareToken); $stepsToInsert = []; @@ -234,6 +234,11 @@ public function addStep(Document $document, Session $session, array $steps, int // By default, send all steps the user has not received yet. $getStepsSinceVersion = $version; if ($stepsIncludeQuery) { + if ($recoveryAttempt === 1) { + $this->logger->error('Recovery attempt #' . $recoveryAttempt . ' from ' . $session->getId() . ' for ' . $documentId); + } elseif ($recoveryAttempt > 1) { + $this->logger->debug('Recovery attempt #' . $recoveryAttempt . ' from ' . $session->getId() . ' for ' . $documentId); + } $this->logger->debug('Loading document state for ' . $documentId); try { $stateFile = $this->getStateFile($documentId); diff --git a/src/apis/sync.ts b/src/apis/sync.ts index d9f8f4cce33..00b393eb427 100644 --- a/src/apis/sync.ts +++ b/src/apis/sync.ts @@ -13,6 +13,7 @@ interface PushData { version: number steps: string[] awareness: string + recoveryAttempt?: number } interface PushResponse { @@ -44,6 +45,7 @@ export function push( version: data.version, steps: data.steps.filter((s) => s), awareness: data.awareness, + recoveryAttempt: data.recoveryAttempt, }) } diff --git a/src/components/Editor.vue b/src/components/Editor.vue index f5415cb0194..b305336c120 100644 --- a/src/components/Editor.vue +++ b/src/components/Editor.vue @@ -455,27 +455,27 @@ export default defineComponent({ }, listenSyncServiceEvents() { - this.syncService - .on('opened', this.onOpened) - .on('change', this.onChange) - .on('loaded', this.onLoaded) - .on('sync', this.onSync) - .on('error', this.onError) - .on('stateChange', this.onStateChange) - .on('idle', this.onIdle) - .on('save', this.onSave) + const bus = this.syncService.bus + bus.on('opened', this.onOpened) + bus.on('change', this.onChange) + bus.on('loaded', this.onLoaded) + bus.on('sync', this.onSync) + bus.on('error', this.onError) + bus.on('stateChange', this.onStateChange) + bus.on('idle', this.onIdle) + bus.on('save', this.onSave) }, unlistenSyncServiceEvents() { - this.syncService - .off('opened', this.onOpened) - .off('change', this.onChange) - .off('loaded', this.onLoaded) - .off('sync', this.onSync) - .off('error', this.onError) - .off('stateChange', this.onStateChange) - .off('idle', this.onIdle) - .off('save', this.onSave) + const bus = this.syncService.bus + bus.off('opened', this.onOpened) + bus.off('change', this.onChange) + bus.off('loaded', this.onLoaded) + bus.off('sync', this.onSync) + bus.off('error', this.onError) + bus.off('stateChange', this.onStateChange) + bus.off('idle', this.onIdle) + bus.off('save', this.onSave) }, reconnect() { diff --git a/src/helpers/steps.ts b/src/helpers/steps.ts new file mode 100644 index 00000000000..725466e9ce1 --- /dev/null +++ b/src/helpers/steps.ts @@ -0,0 +1,24 @@ +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +import type { Session, Step } from '../services/SyncService' +import { COLLABORATOR_DISCONNECT_TIME } from '../services/SyncService' + +/** + * Get the recent awareness messages as steps + * @param sessions to process. + */ +export function awarenessSteps(sessions: Session[]): Step[] { + const lastContactThreshold = + Math.floor(Date.now() / 1000) - COLLABORATOR_DISCONNECT_TIME + return sessions + .filter((s) => s.lastContact > lastContactThreshold) + .filter((s) => Boolean(s.lastAwarenessMessage)) + .map((s) => ({ + data: [s.lastAwarenessMessage], + sessionId: s.id, + version: 0, + })) +} diff --git a/src/helpers/yjs.ts b/src/helpers/yjs.ts index 9c59a118470..aaf7eca82d0 100644 --- a/src/helpers/yjs.ts +++ b/src/helpers/yjs.ts @@ -7,6 +7,7 @@ import * as decoding from 'lib0/decoding.js' import * as encoding from 'lib0/encoding.js' import * as syncProtocol from 'y-protocols/sync' import * as Y from 'yjs' +import type { Step } from '../services/SyncService.js' import { messageSync } from '../services/y-websocket.js' import { decodeArrayBuffer, encodeArrayBuffer } from './base64' @@ -42,13 +43,11 @@ export function applyDocumentState( * and encode it and wrap it in a step data structure. * * @param documentState - base64 encoded doc state - * @return base64 encoded yjs sync protocol update message + * @return base64 encoded yjs sync protocol update message and version */ -export function documentStateToStep(documentState: string): { - step: string -} { +export function documentStateToStep(documentState: string): Step { const message = documentStateToUpdateMessage(documentState) - return { step: encodeArrayBuffer(message) } + return { data: [encodeArrayBuffer(message)], sessionId: 0, version: -1 } } /** @@ -72,20 +71,22 @@ function documentStateToUpdateMessage(documentState: string): Uint8Array { * Only used in tests right now. * @param ydoc - encode state of this doc * @param step - step data - * @param step.step - base64 encoded yjs sync update message + * @param step.data - array of base64 encoded yjs sync update messages * @param origin - initiator object e.g. WebsocketProvider */ -export function applyStep(ydoc: Y.Doc, step: { step: string }, origin = 'origin') { - const updateMessage = decodeArrayBuffer(step.step) - const decoder = decoding.createDecoder(updateMessage) - const messageType = decoding.readVarUint(decoder) - if (messageType !== messageSync) { - console.error('y.js update message with invalid type', messageType) - return +export function applyStep(ydoc: Y.Doc, step: Step, origin = 'origin') { + for (const encoded of step.data) { + const updateMessage = decodeArrayBuffer(encoded) + const decoder = decoding.createDecoder(updateMessage) + const messageType = decoding.readVarUint(decoder) + if (messageType !== messageSync) { + console.error('y.js update message with invalid type', messageType) + return + } + // There are no responses to updates - so this is a dummy. + const encoder = encoding.createEncoder() + syncProtocol.readSyncMessage(decoder, encoder, ydoc, origin) } - // There are no responses to updates - so this is a dummy. - const encoder = encoding.createEncoder() - syncProtocol.readSyncMessage(decoder, encoder, ydoc, origin) } /** diff --git a/src/services/NotifyService.ts b/src/services/NotifyService.ts index 112d2dd1b53..77938238275 100644 --- a/src/services/NotifyService.ts +++ b/src/services/NotifyService.ts @@ -8,7 +8,10 @@ import { listen } from '@nextcloud/notify_push' import mitt, { type Emitter } from 'mitt' export declare type EventTypes = { - notify_push: { messageType: unknown; messageBody: object } + notify_push: { + messageType: unknown + messageBody: { steps: string[]; documentId: number } + } } declare global { @@ -20,12 +23,18 @@ declare global { if (!window._nc_text_notify) { const isPushEnabled = loadState('text', 'notify_push', false) const useNotifyPush = isPushEnabled - ? listen('text_steps', (messageType, messageBody) => { - window._nc_text_notify?.emit('notify_push', { - messageType, - messageBody, - }) - }) + ? listen( + 'text_steps', + ( + messageType: string, + messageBody: { steps: string[]; documentId: number }, + ) => { + window._nc_text_notify?.emit('notify_push', { + messageType, + messageBody, + }) + }, + ) : undefined window._nc_text_notify = useNotifyPush ? mitt() : undefined } diff --git a/src/services/Outbox.ts b/src/services/Outbox.ts index 2c45cfb7ef4..69299400d64 100644 --- a/src/services/Outbox.ts +++ b/src/services/Outbox.ts @@ -15,6 +15,8 @@ export default class Outbox { #awarenessUpdate = '' #syncUpdate = '' #syncQuery = '' + #recoveryAttemptCounter = 0 + #isRecoveringSync = false storeStep(step: Uint8Array) { const encoded = encodeArrayBuffer(step) @@ -33,13 +35,25 @@ export default class Outbox { this.#awarenessUpdate = encoded } + setRecoveringSync() { + this.#isRecoveringSync = true + this.#recoveryAttemptCounter++ + } + getDataToSend(): Sendable { return { steps: [this.#syncUpdate, this.#syncQuery].filter((s) => s), awareness: this.#awarenessUpdate, + ...this.recoveryData, } } + get recoveryData(): { recoveryAttempt?: number } { + return this.#isRecoveringSync + ? { recoveryAttempt: this.#recoveryAttemptCounter } + : {} + } + get hasUpdate(): boolean { return !!this.#syncUpdate } @@ -56,6 +70,7 @@ export default class Outbox { } if (steps.includes(this.#syncQuery)) { this.#syncQuery = '' + this.#isRecoveringSync = false } if (this.#awarenessUpdate === awareness) { this.#awarenessUpdate = '' diff --git a/src/services/PollingBackend.ts b/src/services/PollingBackend.ts index 753a70e6796..d5c3044ed91 100644 --- a/src/services/PollingBackend.ts +++ b/src/services/PollingBackend.ts @@ -145,7 +145,7 @@ class PollingBackend { const { document, sessions } = data this.#fetchRetryCounter = 0 - this.#syncService.emit('change', { document, sessions }) + this.#syncService.bus.emit('change', { document, sessions }) this.#syncService.receiveSteps(data) if (data.steps.length === 0) { @@ -164,7 +164,7 @@ class PollingBackend { } else { this.increaseRefetchTimer() } - this.#syncService.emit('stateChange', { initialLoading: true }) + this.#syncService.bus.emit('stateChange', { initialLoading: true }) return } @@ -182,7 +182,7 @@ class PollingBackend { logger.error( '[PollingBackend:fetchSteps] Network error when fetching steps, emitting CONNECTION_FAILED', ) - this.#syncService.emit('error', { + this.#syncService.bus.emit('error', { type: ERROR_TYPE.CONNECTION_FAILED, data: {}, }) @@ -195,27 +195,27 @@ class PollingBackend { // Still apply the steps to update our version of the document this._handleResponse(e.response) logger.error('Conflict during file save, please resolve') - this.#syncService.emit('error', { + this.#syncService.bus.emit('error', { type: ERROR_TYPE.SAVE_COLLISSION, data: { outsideChange: e.response.data.outsideChange, }, }) } else if (e.response.status === 412) { - this.#syncService.emit('error', { + this.#syncService.bus.emit('error', { type: ERROR_TYPE.LOAD_ERROR, data: e.response, }) this.disconnect() } else if ([403, 404].includes(e.response.status)) { - this.#syncService.emit('error', { + this.#syncService.bus.emit('error', { type: ERROR_TYPE.SOURCE_NOT_FOUND, data: {}, }) this.disconnect() } else if ([502, 503].includes(e.response.status)) { this.increaseRefetchTimer() - this.#syncService.emit('error', { + this.#syncService.bus.emit('error', { type: ERROR_TYPE.CONNECTION_FAILED, data: {}, }) @@ -224,7 +224,7 @@ class PollingBackend { }) } else { this.disconnect() - this.#syncService.emit('error', { + this.#syncService.bus.emit('error', { type: ERROR_TYPE.CONNECTION_FAILED, data: {}, }) diff --git a/src/services/SaveService.ts b/src/services/SaveService.ts index 4c639c8a0f3..77fd50dcc98 100644 --- a/src/services/SaveService.ts +++ b/src/services/SaveService.ts @@ -41,7 +41,7 @@ class SaveService { this.serialize = serialize this.getDocumentState = getDocumentState this.autosave = debounce(this._autosave.bind(this), AUTOSAVE_INTERVAL) - this.syncService.on('close', () => { + this.syncService.bus.on('close', () => { this.autosave.clear() }) } @@ -51,7 +51,7 @@ class SaveService { } get emit() { - return this.syncService.emit.bind(this.syncService) + return this.syncService.bus.emit } _getContent() { diff --git a/src/services/SyncService.ts b/src/services/SyncService.ts index 61c96af452b..f4b2ce73e1f 100644 --- a/src/services/SyncService.ts +++ b/src/services/SyncService.ts @@ -5,13 +5,14 @@ /* eslint-disable jsdoc/valid-types */ -import mitt, { type Handler } from 'mitt' +import mitt from 'mitt' import type { ShallowRef } from 'vue' import { close, type OpenData } from '../apis/connect' import { push } from '../apis/sync' import type { Connection } from '../composables/useConnection.js' import { logger } from '../helpers/logger.js' +import { awarenessSteps } from '../helpers/steps' import { documentStateToStep } from '../helpers/yjs.js' import Outbox from './Outbox.js' import PollingBackend from './PollingBackend.js' @@ -46,7 +47,9 @@ const ERROR_TYPE = { SOURCE_NOT_FOUND: 4, PUSH_FORBIDDEN: 5, -} +} as const + +type ErrorType = (typeof ERROR_TYPE)[keyof typeof ERROR_TYPE] /* * Step as what we expect to be returned from the server right now. @@ -106,26 +109,23 @@ export declare type EventTypes = { /* Document state */ opened: OpenData - /* All initial steps fetched */ - fetched: unknown - /* received new steps */ - sync: unknown + sync: { document?: object; steps: Step[] } /* state changed (dirty) */ - stateChange: unknown + stateChange: { initialLoading?: boolean; dirty?: boolean } /* error */ - error: unknown + error: { type: ErrorType; data?: object } /* Events for session and document meta data */ change: { sessions: Session[]; document: Document } /* Emitted after successful save */ - save: unknown + save: object /* Emitted once a document becomes idle */ - idle: unknown + idle: void /* Emitted if the connection has been closed */ close: void @@ -176,7 +176,7 @@ class SyncService { this.version = data.document.lastSavedVersion this.backend = new PollingBackend(this, this.connection.value, data) // Make sure to only emit this once the backend is in place. - this.emit('opened', data) + this.bus.emit('opened', data) } startSync() { @@ -189,9 +189,12 @@ class SyncService { _emitError(error: { response?: object; code?: string }) { if (!error.response || error.code === 'ECONNABORTED') { - this.emit('error', { type: ERROR_TYPE.CONNECTION_FAILED, data: {} }) + this.bus.emit('error', { type: ERROR_TYPE.CONNECTION_FAILED, data: {} }) } else { - this.emit('error', { type: ERROR_TYPE.LOAD_ERROR, data: error.response }) + this.bus.emit('error', { + type: ERROR_TYPE.LOAD_ERROR, + data: error.response, + }) } } @@ -200,6 +203,12 @@ class SyncService { this.sendSteps() } + sendRecoveryStep(step: Uint8Array) { + this.#outbox.setRecoveringSync() + this.#outbox.storeStep(step) + this.sendSteps() + } + sendSteps() { // If already waiting to send, do nothing. if (this.#sendIntervalId) { @@ -216,13 +225,13 @@ class SyncService { this.#sending = true clearInterval(this.#sendIntervalId) this.#sendIntervalId = undefined - const sendable = this.#outbox.getDataToSend() - if (sendable.steps.length > 0) { - this.emit('stateChange', { dirty: true }) + if (this.#outbox.hasUpdate) { + this.bus.emit('stateChange', { dirty: true }) } if (!this.hasActiveConnection()) { return } + const sendable = this.#outbox.getDataToSend() return push(this.connection, { version: this.version, ...sendable, @@ -235,8 +244,7 @@ class SyncService { } if (documentState) { const documentStateStep = documentStateToStep(documentState) - this.emit('sync', { - version: this.version, + this.bus.emit('sync', { steps: [documentStateStep], }) } @@ -252,25 +260,28 @@ class SyncService { this.pushError++ logger.error('Failed to push the steps to the server', err) if (!response || code === 'ECONNABORTED') { - this.emit('error', { + this.bus.emit('error', { type: ERROR_TYPE.CONNECTION_FAILED, data: {}, }) } if (response?.status === 412) { - this.emit('error', { + this.bus.emit('error', { type: ERROR_TYPE.LOAD_ERROR, data: response, }) } else if (response?.status === 403) { // either the session is invalid or the document is read only. logger.error('failed to write to document - not allowed') - this.emit('error', { + this.bus.emit('error', { type: ERROR_TYPE.PUSH_FORBIDDEN, data: {}, }) } else { - this.emit('error', { type: ERROR_TYPE.PUSH_FAILURE, data: {} }) + this.bus.emit('error', { + type: ERROR_TYPE.PUSH_FAILURE, + data: {}, + }) } throw new Error('Failed to apply steps. Retry!', { cause: err }) }) @@ -281,46 +292,23 @@ class SyncService { document, sessions = [], }: { - steps: { data: string[]; version: number; sessionId: number }[] + steps: Step[] document?: object - sessions?: { - lastContact: number - lastAwarenessMessage: string - }[] + sessions?: Session[] }) { - const awareness = sessions - .filter( - (s) => - s.lastContact - > Math.floor(Date.now() / 1000) - COLLABORATOR_DISCONNECT_TIME, + const versionAfter = Math.max(this.version, ...steps.map((s) => s.version)) + this.bus.emit('sync', { + steps: [...awarenessSteps(sessions), ...steps], + document, + }) + if (this.version < versionAfter) { + // Steps up to version where emitted but it looks like they were not processed. + // Otherwise the WebsocketPolyfill would have increased the version counter. + console.warn( + `Failed to process steps leading up to version ${versionAfter}.`, ) - .filter((s) => s.lastAwarenessMessage) - .map((s) => { - return { step: s.lastAwarenessMessage } - }) - const newSteps = [...awareness] - for (let i = 0; i < steps.length; i++) { - const singleSteps = steps[i].data - if (this.version < steps[i].version) { - this.version = steps[i].version - } - if (!Array.isArray(singleSteps)) { - logger.error('Invalid step data, skipping step', { step: steps[i] }) - // TODO: recover - continue - } - singleSteps.forEach((step) => { - newSteps.push({ - step, - }) - }) } this.#lastStepPush = Date.now() - this.emit('sync', { - steps: newSteps, - document, - version: this.version, - }) } checkIdle() { @@ -329,7 +317,7 @@ class SyncService { logger.debug( `[SyncService] Document is idle for ${IDLE_TIMEOUT} minutes, suspending connection`, ) - this.emit('idle') + this.bus.emit('idle') return true } return false @@ -354,22 +342,7 @@ class SyncService { } // Clear connection so hasActiveConnection turns false and we can reconnect. this.connection.value = undefined - this.emit('close') - } - - // For better typing use the bus directly: `syncService.bus.on()`. - on(event: keyof EventTypes, callback: Handler) { - this.bus.on(event, callback) - return this - } - - off(event: keyof EventTypes, callback: Handler) { - this.bus.off(event, callback) - return this - } - - emit(event: keyof EventTypes, data?: unknown) { - this.bus.emit(event, data) + this.bus.emit('close') } } diff --git a/src/services/SyncServiceProvider.js b/src/services/SyncServiceProvider.js index ef95f4a51c4..4f1b9196f85 100644 --- a/src/services/SyncServiceProvider.js +++ b/src/services/SyncServiceProvider.js @@ -4,7 +4,7 @@ */ import { logger } from '../helpers/logger.js' -import initWebSocketPolyfill from './WebSocketPolyfill.js' +import initWebSocketPolyfill from './WebSocketPolyfill.ts' import { WebsocketProvider } from './y-websocket.js' /** diff --git a/src/services/WebSocketPolyfill.js b/src/services/WebSocketPolyfill.js deleted file mode 100644 index 7126ea96c14..00000000000 --- a/src/services/WebSocketPolyfill.js +++ /dev/null @@ -1,89 +0,0 @@ -/** - * SPDX-FileCopyrightText: 2022 Nextcloud GmbH and Nextcloud contributors - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -import { decodeArrayBuffer } from '../helpers/base64.ts' -import { logger } from '../helpers/logger.js' -import getNotifyBus from './NotifyService.ts' - -/** - * - * @param {object} syncService - the sync service to build upon - * @param {number} fileId - id of the file to open - * @param {object} initialSession - initial session to open - */ -export default function initWebSocketPolyfill(syncService, fileId, initialSession) { - return class WebSocketPolyfill { - #url - binaryType - onmessage - onerror - onclose - onopen - #handlers - #notifyPushBus - - constructor(url) { - this.#notifyPushBus = getNotifyBus() - this.#notifyPushBus?.on('notify_push', this.#onNotifyPush.bind(this)) - this.url = url - logger.debug('WebSocketPolyfill#constructor', { - url, - fileId, - initialSession, - }) - this.#registerHandlers({ - sync: ({ steps, version }) => { - if (steps) { - steps.forEach((s) => { - const data = decodeArrayBuffer(s.step) - this.onmessage({ data }) - }) - logger.debug('synced ', { version, steps }) - } - }, - }) - - syncService.open({ fileId, initialSession }).then(() => { - if (syncService.hasActiveConnection) { - this.onopen?.() - } - }) - } - - #registerHandlers(handlers) { - this.#handlers = handlers - Object.entries(this.#handlers).forEach(([key, value]) => - syncService.on(key, value), - ) - } - - send(step) { - // Useful for debugging what steps are sent and how they were initiated - // logStep(step) - syncService.sendStep(step) - } - - async close() { - Object.entries(this.#handlers).forEach(([key, value]) => - syncService.off(key, value), - ) - this.#handlers = [] - - this.#notifyPushBus?.off('notify_push', this.#onNotifyPush.bind(this)) - this.onclose() - logger.debug('Websocket closed') - } - - #onNotifyPush({ messageType, messageBody }) { - if (messageBody.documentId !== fileId) { - return - } - messageBody.steps.forEach((step) => { - const data = decodeArrayBuffer(step) - this.onmessage({ data }) - }) - } - } -} diff --git a/src/services/WebSocketPolyfill.ts b/src/services/WebSocketPolyfill.ts new file mode 100644 index 00000000000..7f2aa517bd7 --- /dev/null +++ b/src/services/WebSocketPolyfill.ts @@ -0,0 +1,115 @@ +/** + * SPDX-FileCopyrightText: 2022 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +import { decodeArrayBuffer } from '../helpers/base64.js' +import { logger } from '../helpers/logger.js' +import getNotifyBus from './NotifyService' +import type { Step, SyncService } from './SyncService.js' + +/** + * + * @param syncService - the sync service to build upon + * @param fileId - id of the file to open + */ +export default function initWebSocketPolyfill( + syncService: SyncService, + fileId: number, +) { + return class WebSocketPolyfill { + #url + binaryType: 'blob' | 'arraybuffer' = 'blob' + onmessage?: (message: MessageEvent) => void + onerror?: (error: Event) => void + onclose?: (event: CloseEvent) => void + onopen?: () => void + #notifyPushBus + #onSync + #processingVersion = 0 + + constructor(url: string) { + this.#notifyPushBus = getNotifyBus() + this.#notifyPushBus?.on('notify_push', this.#onNotifyPush.bind(this)) + this.#url = url + logger.debug('WebSocketPolyfill#constructor', { url, fileId }) + + this.#onSync = ({ steps }: { steps: Step[] }) => { + if (steps) { + this.#processSteps(steps) + logger.debug('synced ', { + version: syncService.version, + steps, + }) + } + } + + syncService.bus.on('sync', this.#onSync) + + syncService.open().then(() => { + if (syncService.hasActiveConnection()) { + this.onopen?.() + } + }) + } + + /** + * Process the given steps, handing them to the onmessage handler + * + * Set this.#processingVersion for detecting and logging immediate responses in `send()`. + * + * @param steps steps to process + */ + #processSteps(steps: Step[]): void { + steps.forEach((s) => { + this.#processingVersion = s.version + s.data.forEach((singleStep) => { + const data = decodeArrayBuffer(singleStep) + this.onmessage?.(new MessageEvent('processing step', { data })) + }) + syncService.version = Math.max( + syncService.version, + this.#processingVersion, + ) + }) + this.#processingVersion = 0 + } + + send(step: Uint8Array) { + // Useful for debugging what steps are sent and how they were initiated + // logStep(step) + if (this.#processingVersion) { + // this is a direct response while processing the step + console.error(`Failed to process step ${this.#processingVersion}.`, { + lastSuccessfullyProcessed: syncService.version, + sendingSyncStep1: step, + }) + // Do not increase the syncService.version for the current steps + // as we failed to process them. + this.#processingVersion = 0 + } + syncService.sendRecoveryStep(step) + } + + async close() { + syncService.bus.off('sync', this.#onSync) + this.#notifyPushBus?.off('notify_push', this.#onNotifyPush.bind(this)) + this.onclose?.(new CloseEvent('closing')) + logger.debug('Websocket closed') + } + + #onNotifyPush({ + messageBody, + }: { + messageBody: { documentId: number; steps: string[] } + }) { + if (messageBody.documentId !== fileId) { + return + } + messageBody.steps.forEach((step) => { + const data = decodeArrayBuffer(step) + this.onmessage?.(new MessageEvent('notify pushed', { data })) + }) + } + } +} diff --git a/src/tests/services/SyncService.spec.ts b/src/tests/services/SyncService.spec.ts index e680f0111e8..412615d993c 100644 --- a/src/tests/services/SyncService.spec.ts +++ b/src/tests/services/SyncService.spec.ts @@ -51,7 +51,7 @@ describe('Sync service', () => { vi.mocked(connect.open).mockResolvedValue(openResult) const openHandler = vi.fn() const service = new SyncService({ connection, openConnection }) - service.on('opened', openHandler) + service.bus.on('opened', openHandler) await service.open() expect(openHandler).toHaveBeenCalledWith( expect.objectContaining({ session: initialData.session }), diff --git a/src/tests/services/WebsocketPolyfill.spec.js b/src/tests/services/WebsocketPolyfill.spec.ts similarity index 51% rename from src/tests/services/WebsocketPolyfill.spec.js rename to src/tests/services/WebsocketPolyfill.spec.ts index 6f8d8ff7e94..989a4df1d0b 100644 --- a/src/tests/services/WebsocketPolyfill.spec.js +++ b/src/tests/services/WebsocketPolyfill.spec.ts @@ -4,39 +4,45 @@ */ import { describe, expect, it, vi } from 'vitest' +import { shallowRef } from 'vue' +import { SyncService } from '../../services/SyncService' import initWebSocketPolyfill from '../../services/WebSocketPolyfill.js' describe('Init function', () => { - const mockSyncService = (mocked = {}) => { - return { - on: vi.fn(), - open: vi.fn().mockImplementation(async () => ({})), - ...mocked, - } - } + vi.mock(import('../../services/SyncService'), () => { + const SyncService = vi.fn() + SyncService.prototype.bus = { on: vi.fn() } + SyncService.prototype.open = vi.fn().mockImplementation(async () => ({})) + SyncService.prototype.hasActiveConnection = vi.fn() + return { SyncService } + }) + const mockSyncService = () => + new SyncService({ + connection: shallowRef(undefined), + openConnection: vi.fn(), + }) it('returns a websocket polyfill class', () => { const syncService = mockSyncService() - const Polyfill = initWebSocketPolyfill(syncService) + const Polyfill = initWebSocketPolyfill(syncService, 123) const websocket = new Polyfill('url') expect(websocket).toBeInstanceOf(Polyfill) }) it('registers handlers', () => { const syncService = mockSyncService() - const Polyfill = initWebSocketPolyfill(syncService) + const Polyfill = initWebSocketPolyfill(syncService, 123) const websocket = new Polyfill('url') expect(websocket).toBeInstanceOf(Polyfill) - expect(syncService.on).toHaveBeenCalled() + expect(syncService.bus.on).toHaveBeenCalled() }) it('opens sync service', () => { const syncService = mockSyncService() const fileId = 123 - const initialSession = {} - const Polyfill = initWebSocketPolyfill(syncService, fileId, initialSession) + const Polyfill = initWebSocketPolyfill(syncService, fileId) const websocket = new Polyfill('url') expect(websocket).toBeInstanceOf(Polyfill) - expect(syncService.open).toHaveBeenCalledWith({ fileId, initialSession }) + expect(syncService.open).toHaveBeenCalled() }) })