Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cypress/e2e/api/SyncServiceProvider.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ describe('Sync service provider', function () {
baseVersionEtag,
)
const queue = []
syncService.on('opened', () => syncService.startSync())
syncService.bus.on('opened', () => syncService.startSync())
return createSyncServiceProvider({
ydoc,
syncService,
Expand Down
4 changes: 2 additions & 2 deletions lib/Controller/PublicSessionController.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public function close(int $documentId, int $sessionId, string $sessionToken): Da
#[PublicPage]
#[RequireDocumentBaseVersionEtag]
#[RequireDocumentSession]
public function push(int $documentId, int $sessionId, string $sessionToken, int $version, array $steps, string $awareness, string $token): DataResponse {
return $this->apiService->push($this->getSession(), $this->getDocument(), $version, $steps, $awareness, $token);
public function push(int $documentId, int $sessionId, string $sessionToken, int $version, array $steps, string $awareness, string $token, ?int $recoveryAttempt = null): DataResponse {
return $this->apiService->push($this->getSession(), $this->getDocument(), $version, $steps, $awareness, $recoveryAttempt, $token);
}

#[NoAdminRequired]
Expand Down
4 changes: 2 additions & 2 deletions lib/Controller/SessionController.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ public function close(int $documentId, int $sessionId, string $sessionToken): Da
#[PublicPage]
#[RequireDocumentBaseVersionEtag]
#[RequireDocumentSession]
public function push(int $version, array $steps, string $awareness): DataResponse {
public function push(int $version, array $steps, string $awareness, ?int $recoveryAttempt = null): DataResponse {
try {
$this->loginSessionUser();
return $this->apiService->push($this->getSession(), $this->getDocument(), $version, $steps, $awareness);
return $this->apiService->push($this->getSession(), $this->getDocument(), $version, $steps, $awareness, $recoveryAttempt);
} finally {
$this->restoreSessionUser();
}
Expand Down
4 changes: 2 additions & 2 deletions lib/Service/ApiService.php
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,15 @@ public function close(int $documentId, int $sessionId, string $sessionToken): Da
/**
* @throws NotFoundException
*/
public function push(Session $session, Document $document, int $version, array $steps, string $awareness, ?string $token = null): DataResponse {
public function push(Session $session, Document $document, int $version, array $steps, string $awareness, ?int $recoveryAttempt, ?string $token = null): DataResponse {
try {
$session = $this->sessionService->updateSessionAwareness($session, $awareness);
} catch (DoesNotExistException $e) {
// Session was removed in the meantime. #3875
return new DataResponse(['error' => $this->l10n->t('Editing session has expired. Please reload the page.')], Http::STATUS_PRECONDITION_FAILED);
}
try {
$result = $this->documentService->addStep($document, $session, $steps, $version, $token);
$result = $this->documentService->addStep($document, $session, $steps, $version, $recoveryAttempt, $token);
$this->addToPushQueue($document, [$awareness, ...array_values($steps)]);
} catch (InvalidArgumentException $e) {
return new DataResponse(['error' => $e->getMessage()], Http::STATUS_UNPROCESSABLE_ENTITY);
Expand Down
7 changes: 6 additions & 1 deletion lib/Service/DocumentService.php
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public function writeDocumentState(int $documentId, string $content): void {
* @throws NotPermittedException
* @throws DoesNotExistException
*/
public function addStep(Document $document, Session $session, array $steps, int $version, ?string $shareToken): array {
public function addStep(Document $document, Session $session, array $steps, int $version, ?int $recoveryAttempt, ?string $shareToken): array {
$documentId = $session->getDocumentId();
$readOnly = $this->isReadOnlyCached($session, $shareToken);
$stepsToInsert = [];
Expand Down Expand Up @@ -234,6 +234,11 @@ public function addStep(Document $document, Session $session, array $steps, int
// By default, send all steps the user has not received yet.
$getStepsSinceVersion = $version;
if ($stepsIncludeQuery) {
if ($recoveryAttempt === 1) {
$this->logger->error('Recovery attempt #' . $recoveryAttempt . ' from ' . $session->getId() . ' for ' . $documentId);
} elseif ($recoveryAttempt > 1) {
$this->logger->debug('Recovery attempt #' . $recoveryAttempt . ' from ' . $session->getId() . ' for ' . $documentId);
}
$this->logger->debug('Loading document state for ' . $documentId);
try {
$stateFile = $this->getStateFile($documentId);
Expand Down
2 changes: 2 additions & 0 deletions src/apis/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ interface PushData {
version: number
steps: string[]
awareness: string
recoveryAttempt?: number
}

interface PushResponse {
Expand Down Expand Up @@ -44,6 +45,7 @@ export function push(
version: data.version,
steps: data.steps.filter((s) => s),
awareness: data.awareness,
recoveryAttempt: data.recoveryAttempt,
})
}

Expand Down
36 changes: 18 additions & 18 deletions src/components/Editor.vue
Original file line number Diff line number Diff line change
Expand Up @@ -455,27 +455,27 @@ export default defineComponent({
},

listenSyncServiceEvents() {
this.syncService
.on('opened', this.onOpened)
.on('change', this.onChange)
.on('loaded', this.onLoaded)
.on('sync', this.onSync)
.on('error', this.onError)
.on('stateChange', this.onStateChange)
.on('idle', this.onIdle)
.on('save', this.onSave)
const bus = this.syncService.bus
bus.on('opened', this.onOpened)
bus.on('change', this.onChange)
bus.on('loaded', this.onLoaded)
bus.on('sync', this.onSync)
bus.on('error', this.onError)
bus.on('stateChange', this.onStateChange)
bus.on('idle', this.onIdle)
bus.on('save', this.onSave)
},

unlistenSyncServiceEvents() {
this.syncService
.off('opened', this.onOpened)
.off('change', this.onChange)
.off('loaded', this.onLoaded)
.off('sync', this.onSync)
.off('error', this.onError)
.off('stateChange', this.onStateChange)
.off('idle', this.onIdle)
.off('save', this.onSave)
const bus = this.syncService.bus
bus.off('opened', this.onOpened)
bus.off('change', this.onChange)
bus.off('loaded', this.onLoaded)
bus.off('sync', this.onSync)
bus.off('error', this.onError)
bus.off('stateChange', this.onStateChange)
bus.off('idle', this.onIdle)
bus.off('save', this.onSave)
},

reconnect() {
Expand Down
24 changes: 24 additions & 0 deletions src/helpers/steps.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/**
* SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

import type { Session, Step } from '../services/SyncService'
import { COLLABORATOR_DISCONNECT_TIME } from '../services/SyncService'

/**
* Get the recent awareness messages as steps
* @param sessions to process.
*/
export function awarenessSteps(sessions: Session[]): Step[] {
const lastContactThreshold =
Math.floor(Date.now() / 1000) - COLLABORATOR_DISCONNECT_TIME
return sessions
.filter((s) => s.lastContact > lastContactThreshold)
.filter((s) => Boolean(s.lastAwarenessMessage))
.map((s) => ({
data: [s.lastAwarenessMessage],
sessionId: s.id,
version: 0,
}))
}
33 changes: 17 additions & 16 deletions src/helpers/yjs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import * as decoding from 'lib0/decoding.js'
import * as encoding from 'lib0/encoding.js'
import * as syncProtocol from 'y-protocols/sync'
import * as Y from 'yjs'
import type { Step } from '../services/SyncService.js'
import { messageSync } from '../services/y-websocket.js'
import { decodeArrayBuffer, encodeArrayBuffer } from './base64'

Expand Down Expand Up @@ -42,13 +43,11 @@ export function applyDocumentState(
* and encode it and wrap it in a step data structure.
*
* @param documentState - base64 encoded doc state
* @return base64 encoded yjs sync protocol update message
* @return base64 encoded yjs sync protocol update message and version
*/
export function documentStateToStep(documentState: string): {
step: string
} {
export function documentStateToStep(documentState: string): Step {
const message = documentStateToUpdateMessage(documentState)
return { step: encodeArrayBuffer(message) }
return { data: [encodeArrayBuffer(message)], sessionId: 0, version: -1 }
}

/**
Expand All @@ -72,20 +71,22 @@ function documentStateToUpdateMessage(documentState: string): Uint8Array {
* Only used in tests right now.
* @param ydoc - encode state of this doc
* @param step - step data
* @param step.step - base64 encoded yjs sync update message
* @param step.data - array of base64 encoded yjs sync update messages
* @param origin - initiator object e.g. WebsocketProvider
*/
export function applyStep(ydoc: Y.Doc, step: { step: string }, origin = 'origin') {
const updateMessage = decodeArrayBuffer(step.step)
const decoder = decoding.createDecoder(updateMessage)
const messageType = decoding.readVarUint(decoder)
if (messageType !== messageSync) {
console.error('y.js update message with invalid type', messageType)
return
export function applyStep(ydoc: Y.Doc, step: Step, origin = 'origin') {
for (const encoded of step.data) {
const updateMessage = decodeArrayBuffer(encoded)
const decoder = decoding.createDecoder(updateMessage)
const messageType = decoding.readVarUint(decoder)
if (messageType !== messageSync) {
console.error('y.js update message with invalid type', messageType)
return
}
// There are no responses to updates - so this is a dummy.
const encoder = encoding.createEncoder()
syncProtocol.readSyncMessage(decoder, encoder, ydoc, origin)
}
// There are no responses to updates - so this is a dummy.
const encoder = encoding.createEncoder()
syncProtocol.readSyncMessage(decoder, encoder, ydoc, origin)
}

/**
Expand Down
23 changes: 16 additions & 7 deletions src/services/NotifyService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import { listen } from '@nextcloud/notify_push'
import mitt, { type Emitter } from 'mitt'

export declare type EventTypes = {
notify_push: { messageType: unknown; messageBody: object }
notify_push: {
messageType: unknown
messageBody: { steps: string[]; documentId: number }
}
}

declare global {
Expand All @@ -20,12 +23,18 @@ declare global {
if (!window._nc_text_notify) {
const isPushEnabled = loadState('text', 'notify_push', false)
const useNotifyPush = isPushEnabled
? listen('text_steps', (messageType, messageBody) => {
window._nc_text_notify?.emit('notify_push', {
messageType,
messageBody,
})
})
? listen(
'text_steps',
(
messageType: string,
messageBody: { steps: string[]; documentId: number },
) => {
window._nc_text_notify?.emit('notify_push', {
messageType,
messageBody,
})
},
)
: undefined
window._nc_text_notify = useNotifyPush ? mitt() : undefined
}
Expand Down
15 changes: 15 additions & 0 deletions src/services/Outbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export default class Outbox {
#awarenessUpdate = ''
#syncUpdate = ''
#syncQuery = ''
#recoveryAttemptCounter = 0
#isRecoveringSync = false

storeStep(step: Uint8Array<ArrayBufferLike>) {
const encoded = encodeArrayBuffer(step)
Expand All @@ -33,13 +35,25 @@ export default class Outbox {
this.#awarenessUpdate = encoded
}

setRecoveringSync() {
this.#isRecoveringSync = true
this.#recoveryAttemptCounter++
}

getDataToSend(): Sendable {
return {
steps: [this.#syncUpdate, this.#syncQuery].filter((s) => s),
awareness: this.#awarenessUpdate,
...this.recoveryData,
}
}

get recoveryData(): { recoveryAttempt?: number } {
return this.#isRecoveringSync
? { recoveryAttempt: this.#recoveryAttemptCounter }
: {}
}

get hasUpdate(): boolean {
return !!this.#syncUpdate
}
Expand All @@ -56,6 +70,7 @@ export default class Outbox {
}
if (steps.includes(this.#syncQuery)) {
this.#syncQuery = ''
this.#isRecoveringSync = false
}
if (this.#awarenessUpdate === awareness) {
this.#awarenessUpdate = ''
Expand Down
16 changes: 8 additions & 8 deletions src/services/PollingBackend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class PollingBackend {
const { document, sessions } = data
this.#fetchRetryCounter = 0

this.#syncService.emit('change', { document, sessions })
this.#syncService.bus.emit('change', { document, sessions })
this.#syncService.receiveSteps(data)

if (data.steps.length === 0) {
Expand All @@ -164,7 +164,7 @@ class PollingBackend {
} else {
this.increaseRefetchTimer()
}
this.#syncService.emit('stateChange', { initialLoading: true })
this.#syncService.bus.emit('stateChange', { initialLoading: true })
return
}

Expand All @@ -182,7 +182,7 @@ class PollingBackend {
logger.error(
'[PollingBackend:fetchSteps] Network error when fetching steps, emitting CONNECTION_FAILED',
)
this.#syncService.emit('error', {
this.#syncService.bus.emit('error', {
type: ERROR_TYPE.CONNECTION_FAILED,
data: {},
})
Expand All @@ -195,27 +195,27 @@ class PollingBackend {
// Still apply the steps to update our version of the document
this._handleResponse(e.response)
logger.error('Conflict during file save, please resolve')
this.#syncService.emit('error', {
this.#syncService.bus.emit('error', {
type: ERROR_TYPE.SAVE_COLLISSION,
data: {
outsideChange: e.response.data.outsideChange,
},
})
} else if (e.response.status === 412) {
this.#syncService.emit('error', {
this.#syncService.bus.emit('error', {
type: ERROR_TYPE.LOAD_ERROR,
data: e.response,
})
this.disconnect()
} else if ([403, 404].includes(e.response.status)) {
this.#syncService.emit('error', {
this.#syncService.bus.emit('error', {
type: ERROR_TYPE.SOURCE_NOT_FOUND,
data: {},
})
this.disconnect()
} else if ([502, 503].includes(e.response.status)) {
this.increaseRefetchTimer()
this.#syncService.emit('error', {
this.#syncService.bus.emit('error', {
type: ERROR_TYPE.CONNECTION_FAILED,
data: {},
})
Expand All @@ -224,7 +224,7 @@ class PollingBackend {
})
} else {
this.disconnect()
this.#syncService.emit('error', {
this.#syncService.bus.emit('error', {
type: ERROR_TYPE.CONNECTION_FAILED,
data: {},
})
Expand Down
4 changes: 2 additions & 2 deletions src/services/SaveService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class SaveService {
this.serialize = serialize
this.getDocumentState = getDocumentState
this.autosave = debounce(this._autosave.bind(this), AUTOSAVE_INTERVAL)
this.syncService.on('close', () => {
this.syncService.bus.on('close', () => {
this.autosave.clear()
})
}
Expand All @@ -51,7 +51,7 @@ class SaveService {
}

get emit() {
return this.syncService.emit.bind(this.syncService)
return this.syncService.bus.emit
}

_getContent() {
Expand Down
Loading
Loading