Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Cleanup and comments
  • Loading branch information
jamesdaniels committed Feb 7, 2020
commit 5cdd030edf9a07603618b470eaeb4124a2ef7dda
54 changes: 29 additions & 25 deletions reactfire/useObservable/behaviorReplaySubject.ts
Original file line number Diff line number Diff line change
@@ -1,56 +1,56 @@
import {
Observable,
Subject,
Subscription,
Subscriber,
throwError
} from 'rxjs';
import { Observable, Subject, Subscription, Subscriber } from 'rxjs';
import { tap, share, first } from 'rxjs/operators';

export class BehaviorReplaySubject<T> extends Subject<T> {
private _value: T | undefined;
private _hasValue = false;
private _timeoutHandler: NodeJS.Timeout | undefined;
private _innerSubscriber: any;
private _first: Promise<void>;
private _resolveFirst: () => void;
private _firstEmission: Promise<void>;
private _resolveFirstEmission: () => void;
private _error: any = undefined;
private _innerObservable: Observable<T>;

constructor(
innerObservable: Observable<T>,
private _timeoutWindow: number,
private _id?: string,
private _defaultValue?: T
) {
constructor(innerObservable: Observable<T>, private _timeoutWindow: number) {
super();
this._first = new Promise<void>(resolve => (this._resolveFirst = resolve));
this._firstEmission = new Promise<void>(
resolve => (this._resolveFirstEmission = resolve)
);
this._innerObservable = innerObservable.pipe(
tap(
v => {
this._next(v);
},
e => {
// save the error, so that we can raise on subscription or .value
// resolve the promise, so suspense tries again
this._error = e;
this._resolveFirst();
this._resolveFirstEmission();
}
),
share()
);
// warm up the observable
this._innerObservable.pipe(first()).subscribe();
}

get hasValue(): boolean {
// hasValue returns true if there's an error too
// so that after we resolve the promise & useObservable is called again
// we won't throw again
return this._hasValue || !!this._error;
}

get value(): T {
// throw on .value since the first().subscribe would otherwise
// absorb it, clear the error for retry
if (this._error) {
const error = this._error;
this._error = undefined;
if (!this._hasValue) {
this._first = new Promise<void>(
resolve => (this._resolveFirst = resolve)
// if we cheated around hasValue let's reset the suspense promise too
this._firstEmission = new Promise<void>(
resolve => (this._resolveFirstEmission = resolve)
);
}
throw error;
Expand All @@ -59,28 +59,32 @@ export class BehaviorReplaySubject<T> extends Subject<T> {
}

get firstEmission(): Promise<void> {
return this._first;
return this._firstEmission;
}

_next(value: T) {
private _next(value: T) {
this._hasValue = true;
this._value = value;
this._resolveFirst();
this._resolveFirstEmission();
}

_reset() {
private _reset() {
// set a timeout for reseting the cache, subscriptions will cancel the timeout
// and reschedule again on unsubscribe
this._timeoutHandler = setTimeout(() => {
this._hasValue = false;
this._value = undefined;
this._error = undefined;
this._first = new Promise<void>(
resolve => (this._resolveFirst = resolve)
this._firstEmission = new Promise<void>(
resolve => (this._resolveFirstEmission = resolve)
);
}, this._timeoutWindow);
}

_subscribe(subscriber: Subscriber<T>): Subscription {
// throw the error if there is one
if (this._error) {
// reset, so they can retry
const error = this._error;
this._error = undefined;
throw error;
Expand Down
20 changes: 4 additions & 16 deletions reactfire/useObservable/index.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,18 @@
import * as React from 'react';
import { Observable } from 'rxjs';
import { first, tap, startWith } from 'rxjs/operators';
import { BehaviorReplaySubject } from './behaviorReplaySubject';

// TODO why can't I use underscore? 30_000
const DEFAULT_TIMEOUT = 30000;
const DEFAULT_TIMEOUT = 30_000;
const preloadedObservables = new Map<string, BehaviorReplaySubject<unknown>>();

// Starts listening to an Observable.
// Call this once you know you're going to render a
// child that will consume the observable
export function preloadObservable<T>(
source: Observable<T>,
id: string,
defaultValue?: T
) {
// TODO wrap in our own Subject implementation
export function preloadObservable<T>(source: Observable<T>, id: string) {
if (preloadedObservables.has(id)) {
return preloadedObservables.get(id) as BehaviorReplaySubject<T>;
} else {
const observable = new BehaviorReplaySubject(
source,
DEFAULT_TIMEOUT,
id,
defaultValue
);
const observable = new BehaviorReplaySubject(source, DEFAULT_TIMEOUT);
preloadedObservables.set(id, observable);
return observable;
}
Expand All @@ -38,7 +26,7 @@ export function useObservable<T>(
if (!observableId) {
throw new Error('cannot call useObservable without an observableId');
}
const observable = preloadObservable(source, observableId, startWithValue);
const observable = preloadObservable(source, observableId);
if (!observable.hasValue && !startWithValue) {
throw observable.firstEmission;
}
Expand Down