Skip to content

Commit 5897991

Browse files
committed
waitForPendingWrites + allow-unlisten-after-shutdown.
1 parent e4eea3f commit 5897991

File tree

11 files changed

+282
-12
lines changed

11 files changed

+282
-12
lines changed

packages/firestore/src/api/database.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,24 @@ export class Firestore implements firestore.FirebaseFirestore, FirebaseService {
478478
return this._firestoreClient!.clientShutdown;
479479
}
480480

481+
/**
482+
* Waits until all currently pending writes for the active user have been acknowledged by the
483+
* backend.
484+
*
485+
* The returned Promise completes immediately if there are no outstanding writes. Otherwise, the
486+
* Task waits for all previously issued writes (including those written in a previous app
487+
* session), but it does not wait for writes that were added after the method is called. If you
488+
* wish to wait for additional writes, you have to call `waitForPendingWrites()` again.
489+
*
490+
* Any outstanding `waitForPendingWrites()` Promises are rejected during user changes.
491+
*
492+
* @return A `Promise` which resolves when all currently pending writes have been
493+
* acknowledged by the backend.
494+
*/
495+
_waitForPendingWrites(): Promise<void> {
496+
return this._firestoreClient!.waitForPendingWrites();
497+
}
498+
481499
ensureClientConfigured(): FirestoreClient {
482500
if (!this._firestoreClient) {
483501
// Kick off starting the client but don't actually wait for it.

packages/firestore/src/core/firestore_client.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,20 @@ export class FirestoreClient {
522522
});
523523
}
524524

525+
/**
526+
* Returns a `Promise` resolves when all the pending writes at the time when this method is called
527+
* received server acknowledgement. An acknowledgement can be either acceptance or rejections.
528+
*/
529+
waitForPendingWrites(): Promise<void> {
530+
this.verifyNotShutdown();
531+
532+
const deferred = new Deferred<void>();
533+
this.asyncQueue.enqueueAndForget(() => {
534+
return this.syncEngine.registerPendingWritesCallback(deferred);
535+
});
536+
return deferred.promise;
537+
}
538+
525539
listen(
526540
query: Query,
527541
observer: Observer<ViewSnapshot>,
@@ -536,7 +550,6 @@ export class FirestoreClient {
536550
}
537551

538552
unlisten(listener: QueryListener): void {
539-
this.verifyNotShutdown();
540553
this.asyncQueue.enqueueAndForget(() => {
541554
return this.eventMgr.unlisten(listener);
542555
});

packages/firestore/src/core/sync_engine.ts

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ import {
2828
import { MaybeDocument, NoDocument } from '../model/document';
2929
import { DocumentKey } from '../model/document_key';
3030
import { Mutation } from '../model/mutation';
31-
import { MutationBatchResult } from '../model/mutation_batch';
31+
import { MutationBatchResult, BATCHID_UNKNOWN } from '../model/mutation_batch';
3232
import { RemoteEvent, TargetChange } from '../remote/remote_event';
3333
import { RemoteStore } from '../remote/remote_store';
3434
import { RemoteSyncer } from '../remote/remote_syncer';
3535
import { assert, fail } from '../util/assert';
36-
import { FirestoreError } from '../util/error';
36+
import { Code, FirestoreError } from '../util/error';
3737
import * as log from '../util/log';
3838
import { primitiveComparator } from '../util/misc';
3939
import { ObjectMap } from '../util/obj_map';
@@ -160,6 +160,8 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
160160
private mutationUserCallbacks = {} as {
161161
[uidKey: string]: SortedMap<BatchId, Deferred<void>>;
162162
};
163+
/** Stores user callbacks waiting for all pending writes to be acknowledged. */
164+
private pendingWritesCallbacks = new Map<BatchId, Array<Deferred<void>>>();
163165
private limboTargetIdGenerator = TargetIdGenerator.forSyncEngine();
164166

165167
// The primary state is set to `true` or `false` immediately after Firestore
@@ -450,6 +452,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
450452
(this.isPrimary && source === OnlineStateSource.RemoteStore) ||
451453
(!this.isPrimary && source === OnlineStateSource.SharedClientState)
452454
) {
455+
this.assertSubscribed('applyOnlineStateChange()');
453456
const newViewSnapshots = [] as ViewSnapshot[];
454457
this.queryViewsByQuery.forEach((query, queryView) => {
455458
const viewChange = queryView.view.applyOnlineStateChange(onlineState);
@@ -570,6 +573,8 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
570573
// before listen events.
571574
this.processUserCallback(batchId, /*error=*/ null);
572575

576+
this.triggerPendingWritesCallbacks(batchId);
577+
573578
try {
574579
const changes = await this.localStore.acknowledgeBatch(
575580
mutationBatchResult
@@ -593,6 +598,8 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
593598
// listen events.
594599
this.processUserCallback(batchId, error);
595600

601+
this.triggerPendingWritesCallbacks(batchId);
602+
596603
try {
597604
const changes = await this.localStore.rejectBatch(batchId);
598605
this.sharedClientState.updateMutationState(batchId, 'rejected', error);
@@ -602,6 +609,62 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
602609
}
603610
}
604611

612+
/**
613+
* Takes a snapshot of current mutation queue, and register a user callback
614+
* which will be called when all those mutations in the snapshot are either
615+
* accepted or rejected by the server.
616+
*/
617+
async registerPendingWritesCallback(callback: Deferred<void>): Promise<void> {
618+
if (!this.remoteStore.canUseNetwork()) {
619+
log.debug(
620+
LOG_TAG,
621+
'The network is disabled. The task returned by ' +
622+
"'awaitPendingWrites()' will not complete until the network is enabled."
623+
);
624+
}
625+
626+
const largestBatchId = await this.localStore.getHighestUnacknowledgedBatchId();
627+
628+
if (largestBatchId === BATCHID_UNKNOWN) {
629+
// Trigger the callback right away if there is no pending writes at the moment.
630+
callback.resolve();
631+
return Promise.resolve();
632+
}
633+
634+
const callbacks = this.pendingWritesCallbacks.get(largestBatchId) || [];
635+
callbacks.push(callback);
636+
this.pendingWritesCallbacks.set(largestBatchId, callbacks);
637+
return Promise.resolve();
638+
}
639+
640+
/**
641+
* Triggers callbacks waiting for this batch id to get acknowledged by server,
642+
* if there are any.
643+
*/
644+
private triggerPendingWritesCallbacks(batchId: BatchId): void {
645+
(this.pendingWritesCallbacks.get(batchId) || []).forEach(callback => {
646+
callback.resolve();
647+
});
648+
649+
this.pendingWritesCallbacks.delete(batchId);
650+
}
651+
652+
/** Reject all outstanding callbacks waiting for pending writes to complete. */
653+
private rejectOutstandingPendingWritesCallbacks(): void {
654+
this.pendingWritesCallbacks.forEach(callbacks => {
655+
callbacks.forEach(callback => {
656+
callback.reject(
657+
new FirestoreError(
658+
Code.CANCELLED,
659+
"'waitForPendingWrites' task is cancelled due to User change."
660+
)
661+
);
662+
});
663+
});
664+
665+
this.pendingWritesCallbacks.clear();
666+
}
667+
605668
private addMutationCallback(
606669
batchId: BatchId,
607670
callback: Deferred<void>
@@ -799,6 +862,9 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
799862
this.currentUser = user;
800863

801864
if (userChanged) {
865+
// Fails tasks waiting for pending writes requested by previous user.
866+
this.rejectOutstandingPendingWritesCallbacks();
867+
802868
const result = await this.localStore.handleUserChange(user);
803869
// TODO(b/114226417): Consider calling this only in the primary tab.
804870
this.sharedClientState.handleUserChange(

packages/firestore/src/local/indexeddb_mutation_queue.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,26 @@ export class IndexedDbMutationQueue implements MutationQueue {
265265
.next(() => foundBatch);
266266
}
267267

268+
getHighestUnacknowledgedBatchId(
269+
transaction: PersistenceTransaction
270+
): PersistencePromise<BatchId> {
271+
const range = IDBKeyRange.bound(
272+
[this.userId, Number.NEGATIVE_INFINITY],
273+
[this.userId, Number.POSITIVE_INFINITY]
274+
);
275+
276+
let batchId = BATCHID_UNKNOWN;
277+
return mutationsStore(transaction)
278+
.iterate(
279+
{ index: DbMutationBatch.userMutationsIndex, range, reverse: true },
280+
(key, dbBatch, control) => {
281+
batchId = dbBatch.batchId;
282+
control.done();
283+
}
284+
)
285+
.next(() => batchId);
286+
}
287+
268288
getAllMutationBatches(
269289
transaction: PersistenceTransaction
270290
): PersistencePromise<MutationBatch[]> {

packages/firestore/src/local/local_store.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,20 @@ export class LocalStore {
387387
);
388388
}
389389

390+
/**
391+
* Returns the largest (latest) batch id in mutation queue that is pending server response.
392+
* Returns `BATCHID_UNKNOWN` if the queue is empty.
393+
*/
394+
getHighestUnacknowledgedBatchId(): Promise<BatchId> {
395+
return this.persistence.runTransaction(
396+
'Get highest unacknowledged batch id',
397+
'readonly',
398+
txn => {
399+
return this.mutationQueue.getHighestUnacknowledgedBatchId(txn);
400+
}
401+
);
402+
}
403+
390404
/** Returns the last recorded stream token for the current user. */
391405
getLastStreamToken(): Promise<ProtoByteString> {
392406
return this.persistence.runTransaction(

packages/firestore/src/local/memory_mutation_queue.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { BatchId, ProtoByteString } from '../core/types';
2121
import { DocumentKeySet } from '../model/collections';
2222
import { DocumentKey } from '../model/document_key';
2323
import { Mutation } from '../model/mutation';
24-
import { MutationBatch } from '../model/mutation_batch';
24+
import { MutationBatch, BATCHID_UNKNOWN } from '../model/mutation_batch';
2525
import { emptyByteString } from '../platform/platform';
2626
import { assert } from '../util/assert';
2727
import { primitiveComparator } from '../util/misc';
@@ -177,6 +177,12 @@ export class MemoryMutationQueue implements MutationQueue {
177177
);
178178
}
179179

180+
getHighestUnacknowledgedBatchId(): PersistencePromise<BatchId> {
181+
return PersistencePromise.resolve(
182+
this.mutationQueue.length === 0 ? BATCHID_UNKNOWN : this.nextBatchId - 1
183+
);
184+
}
185+
180186
getAllMutationBatches(
181187
transaction: PersistenceTransaction
182188
): PersistencePromise<MutationBatch[]> {

packages/firestore/src/local/mutation_queue.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,16 @@ export interface MutationQueue {
103103
batchId: BatchId
104104
): PersistencePromise<MutationBatch | null>;
105105

106+
/**
107+
* Gets the largest (latest) batch id in mutation queue for the current user that is pending
108+
* server response, returns `BATCHID_UNKNOWN` if the queue is empty.
109+
*
110+
* @return the largest batch id in the mutation queue that is not acknowledged.
111+
*/
112+
getHighestUnacknowledgedBatchId(
113+
transaction: PersistenceTransaction
114+
): PersistencePromise<BatchId>;
115+
106116
/** Gets all mutation batches in the mutation queue. */
107117
// TODO(mikelehen): PERF: Current consumer only needs mutated keys; if we can
108118
// provide that cheaply, we should replace this.

packages/firestore/src/remote/remote_store.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ export class RemoteStore implements TargetMetadataProvider {
327327
);
328328
}
329329

330-
private canUseNetwork(): boolean {
330+
canUseNetwork(): boolean {
331331
return this.isPrimary && this.networkEnabled;
332332
}
333333

packages/firestore/test/integration/api/database.test.ts

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,11 @@ import {
3737
withTestDbs,
3838
withTestDoc,
3939
withTestDocAndInitialData,
40-
DEFAULT_SETTINGS
40+
DEFAULT_SETTINGS,
41+
waitForPendingWrites,
42+
withMockCredentialProviderTestDb
4143
} from '../util/helpers';
44+
import { User } from '../../../src/auth/user';
4245

4346
// tslint:disable:no-floating-promises
4447

@@ -743,7 +746,7 @@ apiDescribe('Database', (persistence: boolean) => {
743746
});
744747
});
745748
});
746-
return Promise.all([deferred1.promise, deferred2.promise]).then(() => {});
749+
return Promise.all([deferred1.promise, deferred2.promise]).then(() => { });
747750
});
748751
});
749752

@@ -782,7 +785,7 @@ apiDescribe('Database', (persistence: boolean) => {
782785
it('will reject listens', () => {
783786
const deferred = new Deferred();
784787
queryForRejection.onSnapshot(
785-
() => {},
788+
() => { },
786789
(err: Error) => {
787790
expect(err.name).to.exist;
788791
expect(err.message).to.exist;
@@ -795,12 +798,12 @@ apiDescribe('Database', (persistence: boolean) => {
795798
it('will reject same listens twice in a row', () => {
796799
const deferred = new Deferred();
797800
queryForRejection.onSnapshot(
798-
() => {},
801+
() => { },
799802
(err: Error) => {
800803
expect(err.name).to.exist;
801804
expect(err.message).to.exist;
802805
queryForRejection.onSnapshot(
803-
() => {},
806+
() => { },
804807
(err2: Error) => {
805808
expect(err2.name).to.exist;
806809
expect(err2.message).to.exist;
@@ -1098,6 +1101,21 @@ apiDescribe('Database', (persistence: boolean) => {
10981101
});
10991102
});
11001103

1104+
it('can unlisten queries after shutdown', async () => {
1105+
return withTestDoc(persistence, async docRef => {
1106+
const firestore = docRef.firestore;
1107+
const accumulator = new EventsAccumulator<firestore.DocumentSnapshot>();
1108+
const unsubscribe = docRef.onSnapshot(accumulator.storeEvent);
1109+
await accumulator.awaitEvent();
1110+
await shutdownDb(firestore);
1111+
1112+
// This should proceed without error.
1113+
unsubscribe();
1114+
// Multiple calls should proceed as well.
1115+
unsubscribe();
1116+
});
1117+
});
1118+
11011119
it('new operation after shutdown should throw', async () => {
11021120
await withTestDoc(persistence, async docRef => {
11031121
const firestore = docRef.firestore;
@@ -1120,4 +1138,50 @@ apiDescribe('Database', (persistence: boolean) => {
11201138
}).to.throw();
11211139
});
11221140
});
1141+
1142+
it('can wait for pending writes as expected', async () => {
1143+
await withTestDoc(persistence, async docRef => {
1144+
const firestore = docRef.firestore;
1145+
// Prevent pending writes receiving acknowledgement.
1146+
await firestore.disableNetwork();
1147+
1148+
const awaitPendingWrites1 = waitForPendingWrites(firestore);
1149+
const pendingWrites = docRef.set({ foo: 'bar' });
1150+
const awaitPendingWrites2 = waitForPendingWrites(firestore);
1151+
1152+
// `awaitsPendingWrites1` resolves immediately because there are no pending writes at
1153+
// the time it is created.
1154+
await expect(awaitPendingWrites1).to.be.eventually.fulfilled;
1155+
1156+
// pending writes can receive acknowledgements now.
1157+
await firestore.enableNetwork();
1158+
await expect(pendingWrites).to.be.eventually.fulfilled;
1159+
await expect(awaitPendingWrites2).to.be.eventually.fulfilled;
1160+
});
1161+
});
1162+
1163+
it('waiting for pending writes should fail when user changes', async () => {
1164+
await withMockCredentialProviderTestDb(persistence, async (db, mockCredentialsProvider) => {
1165+
// Prevent pending writes receiving acknowledgement.
1166+
await db.disableNetwork();
1167+
db.doc('abc/123').set({ foo: 'bar' });
1168+
const awaitPendingWrite = waitForPendingWrites(db);
1169+
1170+
mockCredentialsProvider.changeUserTo(new User('user_1'));
1171+
1172+
await expect(awaitPendingWrite).to.be.eventually.rejected;
1173+
});
1174+
});
1175+
1176+
it('waiting for pending writes resolves immediately when offline and no pending writes',
1177+
async () => {
1178+
await withTestDoc(persistence, async docRef => {
1179+
const firestore = docRef.firestore;
1180+
// Prevent pending writes receiving acknowledgement.
1181+
await firestore.disableNetwork();
1182+
1183+
const awaitPendingWrites = waitForPendingWrites(firestore);
1184+
await expect(awaitPendingWrites).to.be.eventually.fulfilled;
1185+
});
1186+
});
11231187
});

0 commit comments

Comments
 (0)