Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix(sync): process steps one version at a time
Drop the `FlatStep` type.
Process each version iterating through its data.
Increment the version counter afterwards.

Signed-off-by: Max <[email protected]>
  • Loading branch information
max-nextcloud authored and backportbot[bot] committed Oct 9, 2025
commit 161d20db999a7d3e4465e8e12a034005416dd607
18 changes: 7 additions & 11 deletions src/helpers/steps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,22 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

import type { FlatStep, Session, Step } from '../services/SyncService'
import type { Session, Step } from '../services/SyncService'
import { COLLABORATOR_DISCONNECT_TIME } from '../services/SyncService'

/**
* Get the recent awareness messages as steps
* @param sessions to process.
*/
export function awarenessSteps(sessions: Session[]): FlatStep[] {
export function awarenessSteps(sessions: Session[]): Step[] {
const lastContactThreshold =
Math.floor(Date.now() / 1000) - COLLABORATOR_DISCONNECT_TIME
return sessions
.filter((s) => s.lastContact > lastContactThreshold)
.filter((s) => Boolean(s.lastAwarenessMessage))
.map((s) => ({ step: s.lastAwarenessMessage, version: 0 }))
}

/**
* Flatten the provided steps and assign the version to each of them.
* @param steps to process.
*/
export function flatSteps(steps: Step[]): FlatStep[] {
return steps.flatMap((s) => s.data.map((step) => ({ step, version: s.version })))
.map((s) => ({
data: [s.lastAwarenessMessage],
sessionId: s.id,
version: 0,
}))
}
30 changes: 15 additions & 15 deletions src/helpers/yjs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import * as decoding from 'lib0/decoding.js'
import * as encoding from 'lib0/encoding.js'
import * as syncProtocol from 'y-protocols/sync'
import * as Y from 'yjs'
import type { Step } from '../services/SyncService.js'
import { messageSync } from '../services/y-websocket.js'
import { decodeArrayBuffer, encodeArrayBuffer } from './base64'

Expand Down Expand Up @@ -44,12 +45,9 @@ export function applyDocumentState(
* @param documentState - base64 encoded doc state
* @return base64 encoded yjs sync protocol update message and version
*/
export function documentStateToStep(documentState: string): {
step: string,
version: number,
} {
export function documentStateToStep(documentState: string): Step {
const message = documentStateToUpdateMessage(documentState)
return { step: encodeArrayBuffer(message), version: -1 }
return { data: [encodeArrayBuffer(message)], sessionId: 0, version: -1 }
}

/**
Expand All @@ -76,17 +74,19 @@ function documentStateToUpdateMessage(documentState: string): Uint8Array {
* @param step.step - base64 encoded yjs sync update message
* @param origin - initiator object e.g. WebsocketProvider
*/
export function applyStep(ydoc: Y.Doc, step: { step: string }, origin = 'origin') {
const updateMessage = decodeArrayBuffer(step.step)
const decoder = decoding.createDecoder(updateMessage)
const messageType = decoding.readVarUint(decoder)
if (messageType !== messageSync) {
console.error('y.js update message with invalid type', messageType)
return
export function applyStep(ydoc: Y.Doc, step: Step, origin = 'origin') {
for (const encoded of step.data) {
const updateMessage = decodeArrayBuffer(encoded)
const decoder = decoding.createDecoder(updateMessage)
const messageType = decoding.readVarUint(decoder)
if (messageType !== messageSync) {
console.error('y.js update message with invalid type', messageType)
return
}
// There are no responses to updates - so this is a dummy.
const encoder = encoding.createEncoder()
syncProtocol.readSyncMessage(decoder, encoder, ydoc, origin)
}
// There are no responses to updates - so this is a dummy.
const encoder = encoding.createEncoder()
syncProtocol.readSyncMessage(decoder, encoder, ydoc, origin)
}

/**
Expand Down
14 changes: 3 additions & 11 deletions src/services/SyncService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { close, type OpenData } from '../apis/connect'
import { push } from '../apis/sync'
import type { Connection } from '../composables/useConnection.js'
import { logger } from '../helpers/logger.js'
import { awarenessSteps, flatSteps } from '../helpers/steps'
import { awarenessSteps } from '../helpers/steps'
import { documentStateToStep } from '../helpers/yjs.js'
import Outbox from './Outbox.js'
import PollingBackend from './PollingBackend.js'
Expand Down Expand Up @@ -60,14 +60,6 @@ export interface Step {
sessionId: number
}

/*
* Step as what we process it in the WebsocketPolyfill
*/
export interface FlatStep {
step: string
version: number
}

export interface UserSession {
id: number
userId: string
Expand Down Expand Up @@ -118,7 +110,7 @@ export declare type EventTypes = {
opened: OpenData

/* received new steps */
sync: { document?: object; steps: { step: string; version: number }[] }
sync: { document?: object; steps: Step[] }

/* state changed (dirty) */
stateChange: { initialLoading?: boolean; dirty?: boolean }
Expand Down Expand Up @@ -300,7 +292,7 @@ class SyncService {
}) {
const versionAfter = Math.max(this.version, ...steps.map((s) => s.version))
this.bus.emit('sync', {
steps: [...awarenessSteps(sessions), ...flatSteps(steps)],
steps: [...awarenessSteps(sessions), ...steps],
document,
})
if (this.version < versionAfter) {
Expand Down
59 changes: 33 additions & 26 deletions src/services/WebSocketPolyfill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import { decodeArrayBuffer } from '../helpers/base64.js'
import { logger } from '../helpers/logger.js'
import getNotifyBus from './NotifyService'
import type { FlatStep, SyncService } from './SyncService.js'
import type { Step, SyncService } from './SyncService.js'

/**
*
Expand All @@ -25,27 +25,22 @@ export default function initWebSocketPolyfill(
onclose?: (event: CloseEvent) => void
onopen?: () => void
#notifyPushBus
#processingVersion = 0
#onSync
#processingVersion = 0

constructor(url: string) {
this.#notifyPushBus = getNotifyBus()
this.#notifyPushBus?.on('notify_push', this.#onNotifyPush.bind(this))
this.#url = url
logger.debug('WebSocketPolyfill#constructor', { url, fileId })

this.#onSync = ({ steps }: { steps: FlatStep[] }) => {
this.#onSync = ({ steps }: { steps: Step[] }) => {
if (steps) {
steps.forEach((s) => this.#processStep(s))
syncService.version = Math.max(
syncService.version,
this.#processingVersion,
)
this.#processSteps(steps)
logger.debug('synced ', {
version: this.#processingVersion,
version: syncService.version,
steps,
})
this.#processingVersion = 0
}
}

Expand All @@ -58,14 +53,39 @@ export default function initWebSocketPolyfill(
})
}

send(step: ArrayBuffer) {
/**
* Process the given steps, handing them to the onmessage handler
*
* Set this.#processingVersion for detecting and logging immediate responses in `send()`.
*
* @param steps steps to process
*/
#processSteps(steps: Step[]): void {
steps.forEach((s) => {
this.#processingVersion = s.version
s.data.forEach((singleStep) => {
const data = decodeArrayBuffer(singleStep)
this.onmessage?.(new MessageEvent('processing step', { data }))
})
syncService.version = Math.max(
syncService.version,
this.#processingVersion,
)
})
this.#processingVersion = 0
}

send(step: Uint8Array<ArrayBufferLike>) {
// Useful for debugging what steps are sent and how they were initiated
// logStep(step)
if (this.#processingVersion) {
// this is a direct response while processing the step
console.warn(`Failed to process step ${this.#processingVersion}.`, {
step,
console.error(`Failed to process step ${this.#processingVersion}.`, {
lastSuccessfullyProcessed: syncService.version,
sendingSyncStep1: step,
})
// Do not increase the syncService.version for the current steps
// as we failed to process them.
this.#processingVersion = 0
}
syncService.sendStep(step)
Expand All @@ -91,18 +111,5 @@ export default function initWebSocketPolyfill(
this.onmessage?.(new MessageEvent('notify pushed', { data }))
})
}

#processStep({ step, version }: { step: string; version: number }) {
// done processing the previous version
if ((version ?? 0) > this.#processingVersion) {
syncService.version = Math.max(
syncService.version,
this.#processingVersion,
)
}
this.#processingVersion = version
const data = decodeArrayBuffer(step)
this.onmessage?.(new MessageEvent('processing step', { data }))
}
}
}