Skip to content

Commit 423223a

Browse files
samwillisUziniii
authored andcommitted
New collection event system (TanStack#555)
1 parent 2f4eb6a commit 423223a

File tree

4 files changed

+407
-0
lines changed

4 files changed

+407
-0
lines changed

.changeset/heavy-parts-grow.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@tanstack/db": patch
3+
---
4+
5+
Added a new events system for subscribing to status changes and other internal events.
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
import type { Collection } from "./collection"
2+
import type { CollectionStatus } from "./types"
3+
4+
/**
5+
* Event emitted when the collection status changes
6+
*/
7+
export interface CollectionStatusChangeEvent {
8+
type: `status:change`
9+
collection: Collection
10+
previousStatus: CollectionStatus
11+
status: CollectionStatus
12+
}
13+
14+
/**
15+
* Event emitted when the collection status changes to a specific status
16+
*/
17+
export interface CollectionStatusEvent<T extends CollectionStatus> {
18+
type: `status:${T}`
19+
collection: Collection
20+
previousStatus: CollectionStatus
21+
status: T
22+
}
23+
24+
/**
25+
* Event emitted when the number of subscribers to the collection changes
26+
*/
27+
export interface CollectionSubscribersChangeEvent {
28+
type: `subscribers:change`
29+
collection: Collection
30+
previousSubscriberCount: number
31+
subscriberCount: number
32+
}
33+
34+
export type AllCollectionEvents = {
35+
"status:change": CollectionStatusChangeEvent
36+
"subscribers:change": CollectionSubscribersChangeEvent
37+
} & {
38+
[K in CollectionStatus as `status:${K}`]: CollectionStatusEvent<K>
39+
}
40+
41+
export type CollectionEvent =
42+
| AllCollectionEvents[keyof AllCollectionEvents]
43+
| CollectionStatusChangeEvent
44+
| CollectionSubscribersChangeEvent
45+
46+
export type CollectionEventHandler<T extends keyof AllCollectionEvents> = (
47+
event: AllCollectionEvents[T]
48+
) => void
49+
50+
export class CollectionEvents {
51+
private collection: Collection<any, any, any, any, any>
52+
private listeners = new Map<
53+
keyof AllCollectionEvents,
54+
Set<CollectionEventHandler<any>>
55+
>()
56+
57+
constructor(collection: Collection<any, any, any, any, any>) {
58+
this.collection = collection
59+
}
60+
61+
on<T extends keyof AllCollectionEvents>(
62+
event: T,
63+
callback: CollectionEventHandler<T>
64+
) {
65+
if (!this.listeners.has(event)) {
66+
this.listeners.set(event, new Set())
67+
}
68+
this.listeners.get(event)!.add(callback)
69+
70+
return () => {
71+
this.listeners.get(event)!.delete(callback)
72+
}
73+
}
74+
75+
once<T extends keyof AllCollectionEvents>(
76+
event: T,
77+
callback: CollectionEventHandler<T>
78+
) {
79+
const unsubscribe = this.on(event, (eventPayload) => {
80+
callback(eventPayload)
81+
unsubscribe()
82+
})
83+
return unsubscribe
84+
}
85+
86+
off<T extends keyof AllCollectionEvents>(
87+
event: T,
88+
callback: CollectionEventHandler<T>
89+
) {
90+
this.listeners.get(event)?.delete(callback)
91+
}
92+
93+
waitFor<T extends keyof AllCollectionEvents>(
94+
event: T,
95+
timeout?: number
96+
): Promise<AllCollectionEvents[T]> {
97+
return new Promise((resolve, reject) => {
98+
let timeoutId: NodeJS.Timeout | undefined
99+
const unsubscribe = this.on(event, (eventPayload) => {
100+
if (timeoutId) {
101+
clearTimeout(timeoutId)
102+
timeoutId = undefined
103+
}
104+
resolve(eventPayload)
105+
unsubscribe()
106+
})
107+
if (timeout) {
108+
timeoutId = setTimeout(() => {
109+
timeoutId = undefined
110+
unsubscribe()
111+
reject(new Error(`Timeout waiting for event ${event}`))
112+
}, timeout)
113+
}
114+
})
115+
}
116+
117+
emit<T extends keyof AllCollectionEvents>(
118+
event: T,
119+
eventPayload: AllCollectionEvents[T]
120+
) {
121+
this.listeners.get(event)?.forEach((listener) => {
122+
try {
123+
listener(eventPayload)
124+
} catch (error) {
125+
// Re-throw in a microtask to surface the error
126+
queueMicrotask(() => {
127+
throw error
128+
})
129+
}
130+
})
131+
}
132+
133+
emitStatusChange<T extends CollectionStatus>(
134+
status: T,
135+
previousStatus: CollectionStatus
136+
) {
137+
this.emit(`status:change`, {
138+
type: `status:change`,
139+
collection: this.collection,
140+
previousStatus,
141+
status,
142+
})
143+
144+
// Emit specific status event using type assertion
145+
const eventKey: `status:${T}` = `status:${status}`
146+
this.emit(eventKey, {
147+
type: eventKey,
148+
collection: this.collection,
149+
previousStatus,
150+
status,
151+
} as AllCollectionEvents[`status:${T}`])
152+
}
153+
154+
emitSubscribersChange(
155+
subscriberCount: number,
156+
previousSubscriberCount: number
157+
) {
158+
this.emit(`subscribers:change`, {
159+
type: `subscribers:change`,
160+
collection: this.collection,
161+
previousSubscriberCount,
162+
subscriberCount,
163+
})
164+
}
165+
166+
cleanup() {
167+
this.listeners.clear()
168+
}
169+
}

packages/db/src/collection.ts

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ import {
3838
UpdateKeyNotFoundError,
3939
} from "./errors"
4040
import { createFilteredCallback, currentStateAsChanges } from "./change-events"
41+
import { CollectionEvents } from "./collection-events.js"
42+
import type {
43+
AllCollectionEvents,
44+
CollectionEventHandler,
45+
} from "./collection-events.js"
4146
import type { Transaction } from "./transactions"
4247
import type { StandardSchemaV1 } from "@standard-schema/spec"
4348
import type { SingleRowRefProxy } from "./query/builder/ref-proxy"
@@ -267,6 +272,9 @@ export class CollectionImpl<
267272
private preloadPromise: Promise<void> | null = null
268273
private syncCleanupFn: (() => void) | null = null
269274

275+
// Event system
276+
private events: CollectionEvents
277+
270278
/**
271279
* Register a callback to be executed when the collection first becomes ready
272280
* Useful for preloading collections
@@ -345,6 +353,13 @@ export class CollectionImpl<
345353
return this._status
346354
}
347355

356+
/**
357+
* Get the number of subscribers to the collection
358+
*/
359+
public get subscriberCount(): number {
360+
return this.activeSubscribersCount
361+
}
362+
348363
/**
349364
* Validates that the collection is in a usable state for data operations
350365
* @private
@@ -395,6 +410,7 @@ export class CollectionImpl<
395410
*/
396411
private setStatus(newStatus: CollectionStatus): void {
397412
this.validateStatusTransition(this._status, newStatus)
413+
const previousStatus = this._status
398414
this._status = newStatus
399415

400416
// Resolve indexes when collection becomes ready
@@ -404,6 +420,9 @@ export class CollectionImpl<
404420
console.warn(`Failed to resolve indexes:`, error)
405421
})
406422
}
423+
424+
// Emit event
425+
this.events.emitStatusChange(newStatus, previousStatus)
407426
}
408427

409428
/**
@@ -445,6 +464,9 @@ export class CollectionImpl<
445464
this.syncedData = new Map<TKey, TOutput>()
446465
}
447466

467+
// Set up event system
468+
this.events = new CollectionEvents(this)
469+
448470
// Only start sync immediately if explicitly enabled
449471
if (config.startSync === true) {
450472
this.startSync()
@@ -663,6 +685,8 @@ export class CollectionImpl<
663685
this.batchedEvents = []
664686
this.shouldBatchEvents = false
665687

688+
this.events.cleanup()
689+
666690
// Update status
667691
this.setStatus(`cleaned-up`)
668692

@@ -707,26 +731,38 @@ export class CollectionImpl<
707731
* Increment the active subscribers count and start sync if needed
708732
*/
709733
private addSubscriber(): void {
734+
const previousSubscriberCount = this.activeSubscribersCount
710735
this.activeSubscribersCount++
711736
this.cancelGCTimer()
712737

713738
// Start sync if collection was cleaned up
714739
if (this._status === `cleaned-up` || this._status === `idle`) {
715740
this.startSync()
716741
}
742+
743+
this.events.emitSubscribersChange(
744+
this.activeSubscribersCount,
745+
previousSubscriberCount
746+
)
717747
}
718748

719749
/**
720750
* Decrement the active subscribers count and start GC timer if needed
721751
*/
722752
private removeSubscriber(): void {
753+
const previousSubscriberCount = this.activeSubscribersCount
723754
this.activeSubscribersCount--
724755

725756
if (this.activeSubscribersCount === 0) {
726757
this.startGCTimer()
727758
} else if (this.activeSubscribersCount < 0) {
728759
throw new NegativeActiveSubscribersError()
729760
}
761+
762+
this.events.emitSubscribersChange(
763+
this.activeSubscribersCount,
764+
previousSubscriberCount
765+
)
730766
}
731767

732768
/**
@@ -2485,4 +2521,44 @@ export class CollectionImpl<
24852521

24862522
this.recomputeOptimisticState(false)
24872523
}
2524+
2525+
/**
2526+
* Subscribe to a collection event
2527+
*/
2528+
public on<T extends keyof AllCollectionEvents>(
2529+
event: T,
2530+
callback: CollectionEventHandler<T>
2531+
) {
2532+
return this.events.on(event, callback)
2533+
}
2534+
2535+
/**
2536+
* Subscribe to a collection event once
2537+
*/
2538+
public once<T extends keyof AllCollectionEvents>(
2539+
event: T,
2540+
callback: CollectionEventHandler<T>
2541+
) {
2542+
return this.events.once(event, callback)
2543+
}
2544+
2545+
/**
2546+
* Unsubscribe from a collection event
2547+
*/
2548+
public off<T extends keyof AllCollectionEvents>(
2549+
event: T,
2550+
callback: CollectionEventHandler<T>
2551+
) {
2552+
this.events.off(event, callback)
2553+
}
2554+
2555+
/**
2556+
* Wait for a collection event
2557+
*/
2558+
public waitFor<T extends keyof AllCollectionEvents>(
2559+
event: T,
2560+
timeout?: number
2561+
) {
2562+
return this.events.waitFor(event, timeout)
2563+
}
24882564
}

0 commit comments

Comments
 (0)