Skip to content

Commit f096bbe

Browse files
author
Hui-Wu
authored
awaitPendingWrites initial revision (#689)
* awaitPendingWrites initial revision * Add test to check task failure upon user change. * address comments #1 * addressing comments #2 * addressing comments #3 * addressing comments #4: better public comment. * fixing more nits
1 parent af38de9 commit f096bbe

File tree

11 files changed

+231
-2
lines changed

11 files changed

+231
-2
lines changed

firebase-firestore/src/androidTest/java/com/google/firebase/firestore/FirestoreTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import static com.google.firebase.firestore.AccessHelper.getAsyncQueue;
1818
import static com.google.firebase.firestore.testutil.IntegrationTestUtil.newTestSettings;
1919
import static com.google.firebase.firestore.testutil.IntegrationTestUtil.provider;
20+
import static com.google.firebase.firestore.testutil.IntegrationTestUtil.testChangeUserTo;
2021
import static com.google.firebase.firestore.testutil.IntegrationTestUtil.testCollection;
2122
import static com.google.firebase.firestore.testutil.IntegrationTestUtil.testCollectionWithDocs;
2223
import static com.google.firebase.firestore.testutil.IntegrationTestUtil.testDocument;
@@ -44,6 +45,7 @@
4445
import com.google.firebase.Timestamp;
4546
import com.google.firebase.firestore.FirebaseFirestoreException.Code;
4647
import com.google.firebase.firestore.Query.Direction;
48+
import com.google.firebase.firestore.auth.User;
4749
import com.google.firebase.firestore.testutil.EventAccumulator;
4850
import com.google.firebase.firestore.testutil.IntegrationTestUtil;
4951
import com.google.firebase.firestore.util.AsyncQueue.TimerId;
@@ -1096,4 +1098,62 @@ public void testShutdownCalledMultipleTimes() {
10961098

10971099
expectError(() -> waitFor(reference.get()), expectedMessage);
10981100
}
1101+
1102+
@Test
1103+
public void testWaitForPendingWritesResolves() {
1104+
DocumentReference documentReference = testCollection("abc").document("123");
1105+
FirebaseFirestore firestore = documentReference.getFirestore();
1106+
Map<String, Object> data = map("foo", "bar");
1107+
1108+
waitFor(firestore.disableNetwork());
1109+
Task<Void> awaitsPendingWrites1 = firestore.waitForPendingWrites();
1110+
Task<Void> pendingWrite = documentReference.set(data);
1111+
Task<Void> awaitsPendingWrites2 = firestore.waitForPendingWrites();
1112+
1113+
// `awaitsPendingWrites1` completes immediately because there are no pending writes at
1114+
// the time it is created.
1115+
waitFor(awaitsPendingWrites1);
1116+
assertTrue(awaitsPendingWrites1.isComplete() && awaitsPendingWrites1.isSuccessful());
1117+
assertTrue(!pendingWrite.isComplete());
1118+
assertTrue(!awaitsPendingWrites2.isComplete());
1119+
1120+
firestore.enableNetwork();
1121+
waitFor(awaitsPendingWrites2);
1122+
assertTrue(awaitsPendingWrites2.isComplete() && awaitsPendingWrites2.isSuccessful());
1123+
}
1124+
1125+
@Test
1126+
public void testWaitForPendingWritesFailsWhenUserChanges() {
1127+
DocumentReference documentReference = testCollection("abc").document("123");
1128+
FirebaseFirestore firestore = documentReference.getFirestore();
1129+
Map<String, Object> data = map("foo", "bar");
1130+
1131+
// Prevent pending writes receiving acknowledgement.
1132+
waitFor(firestore.disableNetwork());
1133+
Task<Void> pendingWrite = documentReference.set(data);
1134+
Task<Void> awaitsPendingWrites = firestore.waitForPendingWrites();
1135+
assertTrue(!pendingWrite.isComplete());
1136+
assertTrue(!awaitsPendingWrites.isComplete());
1137+
1138+
testChangeUserTo(new User("new user"));
1139+
1140+
assertTrue(!pendingWrite.isComplete());
1141+
assertEquals(
1142+
"'waitForPendingWrites' task is cancelled due to User change.",
1143+
waitForException(awaitsPendingWrites).getMessage());
1144+
}
1145+
1146+
@Test
1147+
public void testPendingWriteTaskResolveWhenOfflineIfThereIsNoPending() {
1148+
DocumentReference documentReference = testCollection("abc").document("123");
1149+
FirebaseFirestore firestore = documentReference.getFirestore();
1150+
Map<String, Object> data = map("foo", "bar");
1151+
1152+
// Prevent pending writes receiving acknowledgement.
1153+
waitFor(firestore.disableNetwork());
1154+
Task<Void> awaitsPendingWrites = firestore.waitForPendingWrites();
1155+
waitFor(awaitsPendingWrites);
1156+
1157+
assertTrue(awaitsPendingWrites.isComplete() && awaitsPendingWrites.isSuccessful());
1158+
}
10991159
}

firebase-firestore/src/androidTest/java/com/google/firebase/firestore/testutil/IntegrationTestUtil.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,13 @@
3636
import com.google.firebase.firestore.MetadataChanges;
3737
import com.google.firebase.firestore.QuerySnapshot;
3838
import com.google.firebase.firestore.auth.EmptyCredentialsProvider;
39+
import com.google.firebase.firestore.auth.User;
3940
import com.google.firebase.firestore.core.DatabaseInfo;
4041
import com.google.firebase.firestore.local.Persistence;
4142
import com.google.firebase.firestore.model.DatabaseId;
4243
import com.google.firebase.firestore.testutil.provider.FirestoreProvider;
4344
import com.google.firebase.firestore.util.AsyncQueue;
45+
import com.google.firebase.firestore.util.Listener;
4446
import com.google.firebase.firestore.util.Logger;
4547
import com.google.firebase.firestore.util.Logger.Level;
4648
import java.util.ArrayList;
@@ -53,6 +55,31 @@
5355
import java.util.concurrent.TimeUnit;
5456
import java.util.concurrent.TimeoutException;
5557

58+
class MockCredentialsProvider extends EmptyCredentialsProvider {
59+
60+
private static MockCredentialsProvider instance;
61+
private Listener<User> listener;
62+
63+
public static MockCredentialsProvider instance() {
64+
if (MockCredentialsProvider.instance == null) {
65+
MockCredentialsProvider.instance = new MockCredentialsProvider();
66+
}
67+
return MockCredentialsProvider.instance;
68+
}
69+
70+
private MockCredentialsProvider() {}
71+
72+
@Override
73+
public void setChangeListener(Listener<User> changeListener) {
74+
super.setChangeListener(changeListener);
75+
this.listener = changeListener;
76+
}
77+
78+
public void changeUserTo(User user) {
79+
listener.onValue(user);
80+
}
81+
}
82+
5683
/** A set of helper methods for tests */
5784
public class IntegrationTestUtil {
5885

@@ -239,7 +266,7 @@ public static FirebaseFirestore testFirestore(
239266
context,
240267
databaseId,
241268
persistenceKey,
242-
new EmptyCredentialsProvider(),
269+
MockCredentialsProvider.instance(),
243270
asyncQueue,
244271
/*firebaseApp=*/ null,
245272
/*instanceRegistry=*/ (dbId) -> {});
@@ -409,4 +436,8 @@ public static Map<String, Object> toDataMap(QuerySnapshot qrySnap) {
409436
public static boolean isRunningAgainstEmulator() {
410437
return CONNECT_TO_EMULATOR;
411438
}
439+
440+
public static void testChangeUserTo(User user) {
441+
MockCredentialsProvider.instance().changeUserTo(user);
442+
}
412443
}

firebase-firestore/src/main/java/com/google/firebase/firestore/FirebaseFirestore.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,24 @@ Task<Void> shutdown() {
362362
return shutdownInternal();
363363
}
364364

365+
/**
366+
* Waits until all currently pending writes for the active user have been acknowledged by the
367+
* backend.
368+
*
369+
* <p>The returned Task completes immediately if there are no outstanding writes. Otherwise, the
370+
* Task waits for all previously issued writes (including those written in a previous app
371+
* session), but it does not wait for writes that were added after the method is called. If you
372+
* wish to wait for additional writes, you have to call {@code waitForPendingWrites()} again.
373+
*
374+
* <p>Any outstanding {@code waitForPendingWrites()} Tasks are cancelled during user changes.
375+
*
376+
* @return A {@code Task} which resolves when all currently pending writes have been acknowledged
377+
* by the backend.
378+
*/
379+
Task<Void> waitForPendingWrites() {
380+
return client.waitForPendingWrites();
381+
}
382+
365383
@VisibleForTesting
366384
AsyncQueue getAsyncQueue() {
367385
return asyncQueue;

firebase-firestore/src/main/java/com/google/firebase/firestore/core/FirestoreClient.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,18 @@ public <TResult> Task<TResult> transaction(
224224
() -> syncEngine.transaction(asyncQueue, updateFunction, retries));
225225
}
226226

227+
/**
228+
* Returns a task resolves when all the pending writes at the time when this method is called
229+
* received server acknowledgement. An acknowledgement can be either acceptance or rejections.
230+
*/
231+
public Task<Void> waitForPendingWrites() {
232+
this.verifyNotShutdown();
233+
234+
final TaskCompletionSource<Void> source = new TaskCompletionSource<>();
235+
asyncQueue.enqueueAndForget(() -> syncEngine.registerPendingWritesTask(source));
236+
return source.getTask();
237+
}
238+
227239
private void initialize(Context context, User user, boolean usePersistence, long cacheSizeBytes) {
228240
// Note: The initialization work must all be synchronous (we can't dispatch more work) since
229241
// external write/listen operations could get queued to run before that subsequent work

firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.android.gms.tasks.TaskCompletionSource;
2424
import com.google.android.gms.tasks.Tasks;
2525
import com.google.common.base.Function;
26+
import com.google.common.collect.Lists;
2627
import com.google.firebase.database.collection.ImmutableSortedMap;
2728
import com.google.firebase.database.collection.ImmutableSortedSet;
2829
import com.google.firebase.firestore.FirebaseFirestoreException;
@@ -39,6 +40,7 @@
3940
import com.google.firebase.firestore.model.NoDocument;
4041
import com.google.firebase.firestore.model.SnapshotVersion;
4142
import com.google.firebase.firestore.model.mutation.Mutation;
43+
import com.google.firebase.firestore.model.mutation.MutationBatch;
4244
import com.google.firebase.firestore.model.mutation.MutationBatchResult;
4345
import com.google.firebase.firestore.remote.Datastore;
4446
import com.google.firebase.firestore.remote.RemoteEvent;
@@ -133,6 +135,9 @@ interface SyncEngineCallback {
133135
/** Stores user completion blocks, indexed by user and batch ID. */
134136
private final Map<User, Map<Integer, TaskCompletionSource<Void>>> mutationUserCallbacks;
135137

138+
/** Stores user callbacks waiting for all pending writes to be acknowledged. */
139+
private final Map<Integer, List<TaskCompletionSource<Void>>> pendingWritesCallbacks;
140+
136141
/** Used for creating the target IDs for the listens used to resolve limbo documents. */
137142
private final TargetIdGenerator targetIdGenerator;
138143

@@ -154,6 +159,8 @@ public SyncEngine(LocalStore localStore, RemoteStore remoteStore, User initialUs
154159
mutationUserCallbacks = new HashMap<>();
155160
targetIdGenerator = TargetIdGenerator.forSyncEngine();
156161
currentUser = initialUser;
162+
163+
pendingWritesCallbacks = new HashMap<>();
157164
}
158165

159166
public void setCallback(SyncEngineCallback callback) {
@@ -407,6 +414,8 @@ public void handleSuccessfulWrite(MutationBatchResult mutationBatchResult) {
407414
// they consistently happen before listen events.
408415
notifyUser(mutationBatchResult.getBatch().getBatchId(), /*status=*/ null);
409416

417+
resolvePendingWriteTasks(mutationBatchResult.getBatch().getBatchId());
418+
410419
ImmutableSortedMap<DocumentKey, MaybeDocument> changes =
411420
localStore.acknowledgeBatch(mutationBatchResult);
412421

@@ -427,9 +436,63 @@ public void handleRejectedWrite(int batchId, Status status) {
427436
// they consistently happen before listen events.
428437
notifyUser(batchId, status);
429438

439+
resolvePendingWriteTasks(batchId);
440+
430441
emitNewSnapsAndNotifyLocalStore(changes, /*remoteEvent=*/ null);
431442
}
432443

444+
/**
445+
* Takes a snapshot of current mutation queue, and register a user task which will resolve when
446+
* all those mutations are either accepted or rejected by the server.
447+
*/
448+
public void registerPendingWritesTask(TaskCompletionSource<Void> userTask) {
449+
if (!remoteStore.canUseNetwork()) {
450+
Logger.debug(
451+
TAG,
452+
"The network is disabled. The task returned by 'awaitPendingWrites()' will not "
453+
+ "complete until the network is enabled.");
454+
}
455+
456+
int largestPendingBatchId = localStore.getHighestUnacknowledgedBatchId();
457+
458+
if (largestPendingBatchId == MutationBatch.UNKNOWN) {
459+
// Complete the task right away if there is no pending writes at the moment.
460+
userTask.setResult(null);
461+
return;
462+
}
463+
464+
if (pendingWritesCallbacks.containsKey(largestPendingBatchId)) {
465+
pendingWritesCallbacks.get(largestPendingBatchId).add(userTask);
466+
} else {
467+
pendingWritesCallbacks.put(largestPendingBatchId, Lists.newArrayList(userTask));
468+
}
469+
}
470+
471+
/** Resolves tasks waiting for this batch id to get acknowledged by server, if there are any. */
472+
private void resolvePendingWriteTasks(int batchId) {
473+
if (pendingWritesCallbacks.containsKey(batchId)) {
474+
for (TaskCompletionSource<Void> task : pendingWritesCallbacks.get(batchId)) {
475+
task.setResult(null);
476+
}
477+
478+
pendingWritesCallbacks.remove(batchId);
479+
}
480+
}
481+
482+
private void failOutstandingPendingWritesAwaitingTasks() {
483+
for (Map.Entry<Integer, List<TaskCompletionSource<Void>>> entry :
484+
pendingWritesCallbacks.entrySet()) {
485+
for (TaskCompletionSource<Void> task : entry.getValue()) {
486+
task.setException(
487+
new FirebaseFirestoreException(
488+
"'waitForPendingWrites' task is cancelled due to User change.",
489+
FirebaseFirestoreException.Code.CANCELLED));
490+
}
491+
}
492+
493+
pendingWritesCallbacks.clear();
494+
}
495+
433496
/** Resolves the task corresponding to this write result. */
434497
private void notifyUser(int batchId, @Nullable Status status) {
435498
Map<Integer, TaskCompletionSource<Void>> userTasks = mutationUserCallbacks.get(currentUser);
@@ -562,6 +625,8 @@ public void handleCredentialChange(User user) {
562625
currentUser = user;
563626

564627
if (userChanged) {
628+
// Fails tasks waiting for pending writes requested by previous user.
629+
failOutstandingPendingWritesAwaitingTasks();
565630
// Notify local store and emit any resulting events from swapping out the mutation queue.
566631
ImmutableSortedMap<DocumentKey, MaybeDocument> changes = localStore.handleUserChange(user);
567632
emitNewSnapsAndNotifyLocalStore(changes, /*remoteEvent=*/ null);

firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalStore.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,14 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> rejectBatch(int batchId) {
282282
});
283283
}
284284

285+
/**
286+
* Returns the largest (latest) batch id in mutation queue that is pending server response.
287+
* Returns {@link MutationBatch#UNKNOWN} if the queue is empty.
288+
*/
289+
public int getHighestUnacknowledgedBatchId() {
290+
return mutationQueue.getHighestUnacknowledgedBatchId();
291+
}
292+
285293
/** Returns the last recorded stream token for the current user. */
286294
public ByteString getLastStreamToken() {
287295
return mutationQueue.getLastStreamToken();

firebase-firestore/src/main/java/com/google/firebase/firestore/local/MemoryMutationQueue.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,11 @@ public MutationBatch getNextMutationBatchAfterBatchId(int batchId) {
187187
return queue.size() > index ? queue.get(index) : null;
188188
}
189189

190+
@Override
191+
public int getHighestUnacknowledgedBatchId() {
192+
return queue.isEmpty() ? MutationBatch.UNKNOWN : nextBatchId - 1;
193+
}
194+
190195
@Override
191196
public List<MutationBatch> getAllMutationBatches() {
192197
return Collections.unmodifiableList(queue);

firebase-firestore/src/main/java/com/google/firebase/firestore/local/MutationQueue.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,12 @@ MutationBatch addMutationBatch(
7373
@Nullable
7474
MutationBatch getNextMutationBatchAfterBatchId(int batchId);
7575

76+
/**
77+
* @return The largest (latest) batch id in mutation queue for the current user that is pending
78+
* server response, {@link MutationBatch#UNKNOWN} if the queue is empty.
79+
*/
80+
int getHighestUnacknowledgedBatchId();
81+
7682
/** Returns all mutation batches in the mutation queue. */
7783
// TODO: PERF: Current consumer only needs mutated keys; if we can provide that
7884
// cheaply, we should replace this.

firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,13 @@ public MutationBatch getNextMutationBatchAfterBatchId(int batchId) {
249249
.firstValue(row -> decodeInlineMutationBatch(row.getInt(0), row.getBlob(1)));
250250
}
251251

252+
@Override
253+
public int getHighestUnacknowledgedBatchId() {
254+
return db.query("SELECT IFNULL(MAX(batch_id), ?) FROM mutations WHERE uid = ?")
255+
.binding(MutationBatch.UNKNOWN, uid)
256+
.firstValue(row -> row.getInt(0));
257+
}
258+
252259
@Override
253260
public List<MutationBatch> getAllMutationBatches() {
254261
List<MutationBatch> result = new ArrayList<>();

firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ private void handleWatchStreamClose(Status status) {
478478
}
479479
}
480480

481-
private boolean canUseNetwork() {
481+
public boolean canUseNetwork() {
482482
// PORTING NOTE: This method exists mostly because web also has to take into account primary
483483
// vs. secondary state.
484484
return networkEnabled;

0 commit comments

Comments
 (0)