Skip to content

Commit caec8f0

Browse files
committed
fix(sync): keep queue around during reconnects
When yjs does not receive awareness updates it will close and reopen the websocket. Keep the content of the queue, i.e. the outgoing steps so they can be send out once the connection is back. Signed-off-by: Max <[email protected]>
1 parent 045391f commit caec8f0

File tree

4 files changed

+19
-15
lines changed

4 files changed

+19
-15
lines changed

cypress/e2e/api/SyncServiceProvider.spec.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ describe('Sync service provider', function() {
6060
* @param {object} ydoc Yjs document
6161
*/
6262
function createProvider(ydoc) {
63+
const queue = []
6364
const syncService = new SyncService({
6465
serialize: () => 'Serialized',
6566
getDocumentState: () => null,
@@ -70,6 +71,7 @@ describe('Sync service provider', function() {
7071
syncService,
7172
fileId,
7273
initialSession: null,
74+
queue,
7375
disableBc: true,
7476
})
7577
}

src/components/Editor.vue

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ export default {
329329
},
330330
created() {
331331
this.$ydoc = new Doc()
332+
this.$queue = []
332333
// The following can be useful for debugging ydoc updates
333334
// this.$ydoc.on('update', function(update, origin, doc, tr) {
334335
// console.debug('ydoc update', update, origin, doc, tr)
@@ -381,6 +382,7 @@ export default {
381382
ydoc: this.$ydoc,
382383
syncService: this.$syncService,
383384
fileId: this.fileId,
385+
queue: this.$queue,
384386
initialSession: this.initialSession,
385387
})
386388
this.$providers.push(syncServiceProvider)

src/services/SyncServiceProvider.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,16 @@ import { logger } from '../helpers/logger.js'
3030
* @param {object} options.ydoc - the Ydoc
3131
* @param {object} options.syncService - sync service to build upon
3232
* @param {number} options.fileId - file id of the file to open
33+
* @param {number} options.queue - queue for outgoing steps
3334
* @param {object} options.initialSession - initialSession to start from
3435
* @param {boolean} options.disableBc - disable broadcast channel synchronization (default: disabled in debug mode, enabled otherwise)
3536
*/
36-
export default function createSyncServiceProvider({ ydoc, syncService, fileId, initialSession, disableBc }) {
37+
export default function createSyncServiceProvider({ ydoc, syncService, fileId, initialSession, queue, disableBc }) {
3738
if (!fileId) {
3839
// We need a file id as a unique identifier for y.js as otherwise state might leak between different files
3940
throw new Error('fileId is required')
4041
}
41-
const WebSocketPolyfill = initWebSocketPolyfill(syncService, fileId, initialSession)
42+
const WebSocketPolyfill = initWebSocketPolyfill(syncService, fileId, initialSession, queue)
4243
disableBc = disableBc ?? !!window?._oc_debug
4344
const websocketProvider = new WebsocketProvider(
4445
'ws://localhost:1234',

src/services/WebSocketPolyfill.js

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ import { encodeArrayBuffer, decodeArrayBuffer } from '../helpers/base64.js'
2828
* @param {object} syncService - the sync service to build upon
2929
* @param {number} fileId - id of the file to open
3030
* @param {object} initialSession - initial session to open
31+
* @param {object[]} queue - queue for the outgoing steps
3132
*/
32-
export default function initWebSocketPolyfill(syncService, fileId, initialSession) {
33+
export default function initWebSocketPolyfill(syncService, fileId, initialSession, queue) {
3334
return class WebSocketPolyfill {
3435

3536
#url
@@ -41,11 +42,9 @@ export default function initWebSocketPolyfill(syncService, fileId, initialSessio
4142
onclose
4243
onopen
4344
#handlers
44-
#queue
4545

4646
constructor(url) {
4747
this.url = url
48-
this.#queue = []
4948
logger.debug('WebSocketPolyfill#constructor', { url, fileId, initialSession })
5049
this.#registerHandlers({
5150
opened: ({ version, session }) => {
@@ -83,32 +82,32 @@ export default function initWebSocketPolyfill(syncService, fileId, initialSessio
8382
// Useful for debugging what steps are sent and how they were initiated
8483
// data.forEach(logStep)
8584

86-
this.#queue.push(...data)
85+
queue.push(...data)
8786
let outbox = []
8887
return syncService.sendSteps(() => {
89-
outbox = [...this.#queue]
88+
outbox = [...queue]
9089
const data = {
9190
steps: this.#steps,
9291
awareness: this.#awareness,
9392
version: this.#version,
9493
}
95-
this.#queue = []
94+
queue = []
9695
logger.debug('sending steps ', data)
9796
return data
9897
})?.catch(err => {
9998
logger.error(err)
10099
// try to send the steps again
101-
this.#queue = [...outbox, ...this.#queue]
100+
queue = [...outbox, ...queue]
102101
})
103102
}
104103

105104
get #steps() {
106-
return this.#queue.map(s => encodeArrayBuffer(s))
105+
return queue.map(s => encodeArrayBuffer(s))
107106
.filter(s => s < 'AQ')
108107
}
109108

110109
get #awareness() {
111-
return this.#queue.map(s => encodeArrayBuffer(s))
110+
return queue.map(s => encodeArrayBuffer(s))
112111
.findLast(s => s > 'AQ') || ''
113112
}
114113

@@ -124,21 +123,21 @@ export default function initWebSocketPolyfill(syncService, fileId, initialSessio
124123
}
125124

126125
#sendRemainingSteps() {
127-
if (this.#queue.length) {
126+
if (queue.length) {
128127
let outbox = []
129128
return syncService.sendStepsNow(() => {
130-
outbox = [...this.#queue]
129+
outbox = [...queue]
131130
const data = {
132131
steps: this.#steps,
133132
awareness: this.#awareness,
134133
version: this.#version,
135134
}
136-
this.#queue = []
135+
queue = []
137136
logger.debug('sending final steps ', data)
138137
return data
139138
})?.catch(err => {
140139
logger.error(err)
141-
this.#queue = [...outbox, ...this.#queue]
140+
queue = [...outbox, ...queue]
142141
})
143142
}
144143
}

0 commit comments

Comments
 (0)