From 2241933553df2d36e5579c755186fa9a98b2d65a Mon Sep 17 00:00:00 2001 From: Carson Full Date: Sat, 18 Sep 2021 11:19:05 -0500 Subject: [PATCH 01/11] Bump rxjs to v7 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index d050829bd..3cea5676a 100644 --- a/package.json +++ b/package.json @@ -108,6 +108,6 @@ "@babel/runtime": "^7.5.5", "neo4j-driver-bolt-connection": "file:./bolt-connection", "neo4j-driver-core": "file:./core", - "rxjs": "^6.6.3" + "rxjs": "^7.3.0" } } From 5870497ff1ca96647fcb7c572ad672584ddbc078 Mon Sep 17 00:00:00 2001 From: Carson Full Date: Sat, 18 Sep 2021 11:20:45 -0500 Subject: [PATCH 02/11] Change throwError usage to pass error creator --- src/internal/retry-logic-rx.js | 4 ++-- src/session-rx.js | 6 ++++-- test/internal/retry-logic-rx.test.js | 2 +- test/rx/nested-statements.test.js | 4 +++- test/rx/session.test.js | 4 +++- test/types/driver.test.ts | 2 +- test/types/session-rx.test.ts | 6 +++--- 7 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/internal/retry-logic-rx.js b/src/internal/retry-logic-rx.js index 08ede599e..a689f6844 100644 --- a/src/internal/retry-logic-rx.js +++ b/src/internal/retry-logic-rx.js @@ -82,7 +82,7 @@ export default class RxRetryLogic { return failedWork.pipe( flatMap(err => { if (!canRetryOn(err)) { - return throwError(err) + return throwError(() => err) } handledExceptions.push(err) @@ -98,7 +98,7 @@ export default class RxRetryLogic { error.seenErrors = handledExceptions - return throwError(error) + return throwError(() => error) } const nextDelayDuration = this._computeNextDelay(delayDuration) diff --git a/src/session-rx.js b/src/session-rx.js index ffbadeb64..b0a8ae466 100644 --- a/src/session-rx.js +++ b/src/session-rx.js @@ -177,10 +177,12 @@ export default class RxSession { try { return work(txc) } catch (err) { - return throwError(err) + return throwError(() => err) } }).pipe( - catchError(err => txc.rollback().pipe(concat(throwError(err)))), + catchError(err => + txc.rollback().pipe(concat(throwError(() => err))) + ), concat(txc.commit()) ) ) diff --git a/test/internal/retry-logic-rx.test.js b/test/internal/retry-logic-rx.test.js index 8752a2619..70103e1bc 100644 --- a/test/internal/retry-logic-rx.test.js +++ b/test/internal/retry-logic-rx.test.js @@ -245,7 +245,7 @@ describe('#unit-rx retrylogic', () => { clock.tick(delayBy) } if (index < errors.length) { - return throwError(errors[index++]) + return throwError(() => errors[index++]) } else { return of(value) } diff --git a/test/rx/nested-statements.test.js b/test/rx/nested-statements.test.js index 79d31a1bd..278b210cc 100644 --- a/test/rx/nested-statements.test.js +++ b/test/rx/nested-statements.test.js @@ -80,7 +80,9 @@ describe('#integration-rx transaction', () => { ), map(r => r.get(0)), concat(txc.commit()), - catchError(err => txc.rollback().pipe(concat(throwError(err)))), + catchError(err => + txc.rollback().pipe(concat(throwError(() => err))) + ), materialize(), toArray() ) diff --git a/test/rx/session.test.js b/test/rx/session.test.js index d4d82ddfd..634cdbb31 100644 --- a/test/rx/session.test.js +++ b/test/rx/session.test.js @@ -264,7 +264,9 @@ describe('#integration rx-session', () => { } if (this._reactiveFailuresIndex < this._reactiveFailures.length) { - return throwError(this._reactiveFailures[this._reactiveFailuresIndex++]) + return throwError( + () => this._reactiveFailures[this._reactiveFailuresIndex++] + ) } return txc diff --git a/test/types/driver.test.ts b/test/types/driver.test.ts index c9e2d9ad5..150f4f8b0 100644 --- a/test/types/driver.test.ts +++ b/test/types/driver.test.ts @@ -141,7 +141,7 @@ rxSession1 .pipe( map(r => r.get(0)), concat(rxSession1.close()), - catchError(err => rxSession1.close().pipe(concat(throwError(err)))) + catchError(err => rxSession1.close().pipe(concat(throwError(() => err)))) ) .subscribe({ next: data => console.log(data), diff --git a/test/types/session-rx.test.ts b/test/types/session-rx.test.ts index 53675c5a0..cbb44aeeb 100644 --- a/test/types/session-rx.test.ts +++ b/test/types/session-rx.test.ts @@ -112,7 +112,7 @@ result1 .consume() .pipe( concat(close1), - catchError(err => close1.pipe(concat(throwError(err)))) + catchError(err => close1.pipe(concat(throwError(() => err)))) ) .subscribe(summaryObserver) @@ -123,7 +123,7 @@ result2 .consume() .pipe( concat(close1), - catchError(err => close1.pipe(concat(throwError(err)))) + catchError(err => close1.pipe(concat(throwError(() => err)))) ) .subscribe(summaryObserver) @@ -138,7 +138,7 @@ result3 .consume() .pipe( concat(close1), - catchError(err => close1.pipe(concat(throwError(err)))) + catchError(err => close1.pipe(concat(throwError(() => err)))) ) .subscribe(summaryObserver) From dcba164fd214505212d2ab7e5cc7bb1b025f25ae Mon Sep 17 00:00:00 2001 From: Carson Full Date: Sat, 18 Sep 2021 11:22:06 -0500 Subject: [PATCH 03/11] Replace flatMap import with mergeMap --- README.md | 2 +- src/internal/retry-logic-rx.js | 2 +- src/result-rx.js | 2 +- src/session-rx.js | 2 +- test/rx/nested-statements.test.js | 2 +- test/rx/transaction.test.js | 9 +++------ 6 files changed, 8 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 95ef46651..9dd9540fa 100644 --- a/README.md +++ b/README.md @@ -373,7 +373,7 @@ try { rxSession .beginTransaction() .pipe( - flatMap(txc => + mergeMap(txc => concat( txc .run( diff --git a/src/internal/retry-logic-rx.js b/src/internal/retry-logic-rx.js index a689f6844..1868d28b9 100644 --- a/src/internal/retry-logic-rx.js +++ b/src/internal/retry-logic-rx.js @@ -19,7 +19,7 @@ import { newError, error, internal } from 'neo4j-driver-core' import { Observable, throwError, of } from 'rxjs' -import { retryWhen, flatMap, delay } from 'rxjs/operators' +import { retryWhen, mergeMap as flatMap, delay } from 'rxjs/operators' const { logger: { diff --git a/src/result-rx.js b/src/result-rx.js index dfb5cc01b..7b54b9550 100644 --- a/src/result-rx.js +++ b/src/result-rx.js @@ -18,7 +18,7 @@ */ import { newError, Record, ResultSummary } from 'neo4j-driver-core' import { Observable, Subject, ReplaySubject, from } from 'rxjs' -import { flatMap, publishReplay, refCount, shareReplay } from 'rxjs/operators' +import { mergeMap as flatMap, publishReplay, refCount } from 'rxjs/operators' const States = { READY: 0, diff --git a/src/session-rx.js b/src/session-rx.js index b0a8ae466..9f297140e 100644 --- a/src/session-rx.js +++ b/src/session-rx.js @@ -17,7 +17,7 @@ * limitations under the License. */ import { defer, Observable, throwError } from 'rxjs' -import { flatMap, catchError, concat } from 'rxjs/operators' +import { mergeMap as flatMap, catchError, concat } from 'rxjs/operators' import RxResult from './result-rx' import { Session, internal } from 'neo4j-driver-core' import RxTransaction from './transaction-rx' diff --git a/test/rx/nested-statements.test.js b/test/rx/nested-statements.test.js index 278b210cc..fd672cd0f 100644 --- a/test/rx/nested-statements.test.js +++ b/test/rx/nested-statements.test.js @@ -19,7 +19,7 @@ import { Notification, throwError } from 'rxjs' import { - flatMap, + mergeMap as flatMap, materialize, toArray, concat, diff --git a/test/rx/transaction.test.js b/test/rx/transaction.test.js index 783a0620b..125d41bf9 100644 --- a/test/rx/transaction.test.js +++ b/test/rx/transaction.test.js @@ -17,18 +17,15 @@ * limitations under the License. */ -import { Notification, throwError } from 'rxjs' +import { Notification } from 'rxjs' import { - flatMap, + mergeMap as flatMap, materialize, toArray, concat, - map, - bufferCount, - catchError + map } from 'rxjs/operators' import neo4j from '../../src' -import RxSession from '../../src/session-rx' import sharedNeo4j from '../internal/shared-neo4j' import { newError } from 'neo4j-driver-core' From 1f76f0a4f9b7fca2625c323c9b97e6a2b4682da2 Mon Sep 17 00:00:00 2001 From: Carson Full Date: Sat, 18 Sep 2021 11:22:59 -0500 Subject: [PATCH 04/11] Replace concat import with concatWith --- README.md | 2 +- src/session-rx.js | 6 +++++- test/rx/nested-statements.test.js | 2 +- test/rx/session.test.js | 2 +- test/rx/transaction.test.js | 2 +- test/types/driver.test.ts | 2 +- test/types/session-rx.test.ts | 2 +- test/types/transaction-rx.test.ts | 4 ++-- 8 files changed, 13 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 9dd9540fa..5ea0c0873 100644 --- a/README.md +++ b/README.md @@ -222,7 +222,7 @@ rxSession .records() .pipe( map(record => record.get('name')), - concat(rxSession.close()) + concatWith(rxSession.close()) ) .subscribe({ next: data => console.log(data), diff --git a/src/session-rx.js b/src/session-rx.js index 9f297140e..e925dae25 100644 --- a/src/session-rx.js +++ b/src/session-rx.js @@ -17,7 +17,11 @@ * limitations under the License. */ import { defer, Observable, throwError } from 'rxjs' -import { mergeMap as flatMap, catchError, concat } from 'rxjs/operators' +import { + mergeMap as flatMap, + catchError, + concatWith as concat +} from 'rxjs/operators' import RxResult from './result-rx' import { Session, internal } from 'neo4j-driver-core' import RxTransaction from './transaction-rx' diff --git a/test/rx/nested-statements.test.js b/test/rx/nested-statements.test.js index fd672cd0f..1eea7c29d 100644 --- a/test/rx/nested-statements.test.js +++ b/test/rx/nested-statements.test.js @@ -22,7 +22,7 @@ import { mergeMap as flatMap, materialize, toArray, - concat, + concatWith as concat, map, bufferCount, catchError diff --git a/test/rx/session.test.js b/test/rx/session.test.js index 634cdbb31..db1fdc4a7 100644 --- a/test/rx/session.test.js +++ b/test/rx/session.test.js @@ -18,7 +18,7 @@ */ import { Notification, throwError } from 'rxjs' -import { map, materialize, toArray, concat } from 'rxjs/operators' +import { map, materialize, toArray, concatWith as concat } from 'rxjs/operators' import neo4j from '../../src' import sharedNeo4j from '../internal/shared-neo4j' import { newError, error } from 'neo4j-driver-core' diff --git a/test/rx/transaction.test.js b/test/rx/transaction.test.js index 125d41bf9..5c3645477 100644 --- a/test/rx/transaction.test.js +++ b/test/rx/transaction.test.js @@ -22,7 +22,7 @@ import { mergeMap as flatMap, materialize, toArray, - concat, + concatWith as concat, map } from 'rxjs/operators' import neo4j from '../../src' diff --git a/test/types/driver.test.ts b/test/types/driver.test.ts index 150f4f8b0..507db091f 100644 --- a/test/types/driver.test.ts +++ b/test/types/driver.test.ts @@ -29,7 +29,7 @@ import Driver, { import { Parameters } from '../../types/query-runner' import { ServerInfo, Session } from 'neo4j-driver-core' import RxSession from '../../types/session-rx' -import { concat, map, catchError } from 'rxjs/operators' +import { concatWith as concat, map, catchError } from 'rxjs/operators' import { throwError } from 'rxjs' const dummy: any = null diff --git a/test/types/session-rx.test.ts b/test/types/session-rx.test.ts index cbb44aeeb..35006ad51 100644 --- a/test/types/session-rx.test.ts +++ b/test/types/session-rx.test.ts @@ -27,7 +27,7 @@ import { TransactionConfig } from 'neo4j-driver-core' import { Observable, of, Observer, throwError } from 'rxjs' -import { concat, finalize, catchError } from 'rxjs/operators' +import { concatWith as concat, finalize, catchError } from 'rxjs/operators' const dummy: any = null const intValue: Integer = Integer.fromInt(42) diff --git a/test/types/transaction-rx.test.ts b/test/types/transaction-rx.test.ts index 6f9275d8b..5ab197246 100644 --- a/test/types/transaction-rx.test.ts +++ b/test/types/transaction-rx.test.ts @@ -20,8 +20,8 @@ import RxTransaction from '../../types/transaction-rx' import { Record, ResultSummary } from 'neo4j-driver-core' import RxResult from '../../types/result-rx' -import { Observable, of, Observer, throwError } from 'rxjs' -import { concat, finalize, catchError } from 'rxjs/operators' +import { of, Observer } from 'rxjs' +import { concatWith as concat } from 'rxjs/operators' const dummy: any = null From 3cb871cc4bf8c105475eae4744cbffd02732b260 Mon Sep 17 00:00:00 2001 From: Carson Full Date: Sat, 18 Sep 2021 11:54:57 -0500 Subject: [PATCH 05/11] Replace toPromise with lastValueFrom --- test/examples.test.js | 8 +- test/rx/navigation.test.js | 59 +++--- test/rx/nested-statements.test.js | 48 ++--- test/rx/session.test.js | 115 +++++------ test/rx/summary.test.js | 85 ++++----- test/rx/transaction.test.js | 306 ++++++++++++++---------------- 6 files changed, 299 insertions(+), 322 deletions(-) diff --git a/test/examples.test.js b/test/examples.test.js index 536422969..0d26ba9f5 100644 --- a/test/examples.test.js +++ b/test/examples.test.js @@ -21,7 +21,7 @@ import neo4j from '../src' import sharedNeo4j from './internal/shared-neo4j' import { ServerVersion, VERSION_4_0_0 } from '../src/internal/server-version' import { map, materialize, toArray } from 'rxjs/operators' -import { Notification } from 'rxjs' +import { Notification, lastValueFrom } from 'rxjs' /** * The tests below are examples that get pulled into the Driver Manual using the tags inside the tests. @@ -143,7 +143,7 @@ describe('#integration examples', () => { } // end::rx-autocommit-transaction[] - const result = await readProductTitles().toPromise() + const result = await lastValueFrom(readProductTitles()) expect(result).toEqual([ Notification.createNext('Product-0'), @@ -623,7 +623,7 @@ describe('#integration examples', () => { ) // end::rx-result-consume[] - const people = await result.toPromise() + const people = await lastValueFrom(result) expect(people).toEqual([ Notification.createNext('Alice'), Notification.createNext('Bob'), @@ -802,7 +802,7 @@ describe('#integration examples', () => { ) // end::rx-transaction-function[] - const people = await result.toPromise() + const people = await lastValueFrom(result) expect(people).toEqual([ Notification.createNext('Infinity Gauntlet'), Notification.createNext('Mjölnir'), diff --git a/test/rx/navigation.test.js b/test/rx/navigation.test.js index 9c74d4d1a..f1870adc9 100644 --- a/test/rx/navigation.test.js +++ b/test/rx/navigation.test.js @@ -18,10 +18,8 @@ */ import neo4j from '../../src' import sharedNeo4j from '../internal/shared-neo4j' -import RxSession from '../../src/session-rx' -import { Notification, Observable } from 'rxjs' +import { lastValueFrom, Notification } from 'rxjs' import { materialize, toArray, map } from 'rxjs/operators' -import RxTransaction from '../../src/transaction-rx' describe('#integration-rx navigation', () => { describe('session', () => { @@ -43,7 +41,7 @@ describe('#integration-rx navigation', () => { afterEach(async () => { if (session) { - await session.close().toPromise() + await lastValueFrom(session.close(), { defaultValue: undefined }) } await driver.close() }) @@ -195,7 +193,7 @@ describe('#integration-rx navigation', () => { sharedNeo4j.authToken ) session = driver.rxSession() - txc = await session.beginTransaction().toPromise() + txc = await lastValueFrom(session.beginTransaction()) const normalSession = driver.session() try { @@ -209,13 +207,13 @@ describe('#integration-rx navigation', () => { afterEach(async () => { if (txc) { try { - await txc.commit().toPromise() + await lastValueFrom(txc.commit(), { defaultValue: undefined }) } catch (err) { // ignore } } if (session) { - await session.close().toPromise() + await lastValueFrom(session.close(), { defaultValue: undefined }) } await driver.close() }) @@ -366,11 +364,12 @@ describe('#integration-rx navigation', () => { return } - const result = await runnable - .run("RETURN 1 as f1, true as f2, 'string' as f3") - .keys() - .pipe(materialize(), toArray()) - .toPromise() + const result = await lastValueFrom( + runnable + .run("RETURN 1 as f1, true as f2, 'string' as f3") + .keys() + .pipe(materialize(), toArray()) + ) expect(result).toEqual([ Notification.createNext(['f1', 'f2', 'f3']), @@ -582,11 +581,12 @@ describe('#integration-rx navigation', () => { return } - const keys = await runnable - .run('CREATE ({id : $id})', { id: 5 }) - .keys() - .pipe(materialize(), toArray()) - .toPromise() + const keys = await lastValueFrom( + runnable + .run('CREATE ({id : $id})', { id: 5 }) + .keys() + .pipe(materialize(), toArray()) + ) expect(keys).toEqual([ Notification.createNext([]), Notification.createComplete() @@ -788,10 +788,9 @@ describe('#integration-rx navigation', () => { } async function collectAndAssertKeys (result) { - const keys = await result - .keys() - .pipe(materialize(), toArray()) - .toPromise() + const keys = await lastValueFrom( + result.keys().pipe(materialize(), toArray()) + ) expect(keys).toEqual([ Notification.createNext(['number', 'text']), Notification.createComplete() @@ -799,14 +798,13 @@ describe('#integration-rx navigation', () => { } async function collectAndAssertRecords (result) { - const records = await result - .records() - .pipe( + const records = await lastValueFrom( + result.records().pipe( map(r => [r.get(0), r.get(1)]), materialize(), toArray() ) - .toPromise() + ) expect(records).toEqual([ Notification.createNext([neo4j.int(1), 't1']), Notification.createNext([neo4j.int(2), 't2']), @@ -818,14 +816,13 @@ describe('#integration-rx navigation', () => { } async function collectAndAssertSummary (result, expectedQueryType = 'r') { - const summary = await result - .consume() - .pipe( + const summary = await lastValueFrom( + result.consume().pipe( map(s => s.queryType), materialize(), toArray() ) - .toPromise() + ) expect(summary).toEqual([ Notification.createNext(expectedQueryType), Notification.createComplete() @@ -833,7 +830,7 @@ describe('#integration-rx navigation', () => { } async function collectAndAssertEmpty (stream) { - const result = await stream.pipe(materialize(), toArray()).toPromise() + const result = await lastValueFrom(stream.pipe(materialize(), toArray())) expect(result).toEqual([Notification.createComplete()]) } @@ -843,7 +840,7 @@ describe('#integration-rx navigation', () => { * @param {function(err: Error): void} expectationFunc */ async function collectAndAssertError (stream, expectedError) { - const result = await stream.pipe(materialize(), toArray()).toPromise() + const result = await lastValueFrom(stream.pipe(materialize(), toArray())) expect(result).toEqual([Notification.createError(expectedError)]) } diff --git a/test/rx/nested-statements.test.js b/test/rx/nested-statements.test.js index 1eea7c29d..2e10f061e 100644 --- a/test/rx/nested-statements.test.js +++ b/test/rx/nested-statements.test.js @@ -17,7 +17,7 @@ * limitations under the License. */ -import { Notification, throwError } from 'rxjs' +import { lastValueFrom, Notification, throwError } from 'rxjs' import { mergeMap as flatMap, materialize, @@ -50,7 +50,7 @@ describe('#integration-rx transaction', () => { afterEach(async () => { if (session) { - await session.close().toPromise() + await lastValueFrom(session.close(), { defaultValue: undefined }) } await driver.close() }) @@ -61,9 +61,8 @@ describe('#integration-rx transaction', () => { return } - const messages = await session - .beginTransaction() - .pipe( + const messages = await lastValueFrom( + session.beginTransaction().pipe( flatMap(txc => txc .run('UNWIND RANGE(1, $size) AS x RETURN x', { size }) @@ -88,7 +87,7 @@ describe('#integration-rx transaction', () => { ) ) ) - .toPromise() + ) expect(messages.length).toBe(size + 1) expect(messages[size]).toEqual(Notification.createComplete()) @@ -100,24 +99,25 @@ describe('#integration-rx transaction', () => { return } - const result = await session - .run('UNWIND RANGE(1, $size) AS x RETURN x', { size }) - .records() - .pipe( - map(r => r.get(0)), - bufferCount(50), - flatMap(x => - session - .run('UNWIND $x AS id CREATE (n:Node {id: id}) RETURN n.id', { - x - }) - .records() - ), - map(r => r.get(0)), - materialize(), - toArray() - ) - .toPromise() + const result = await lastValueFrom( + session + .run('UNWIND RANGE(1, $size) AS x RETURN x', { size }) + .records() + .pipe( + map(r => r.get(0)), + bufferCount(50), + flatMap(x => + session + .run('UNWIND $x AS id CREATE (n:Node {id: id}) RETURN n.id', { + x + }) + .records() + ), + map(r => r.get(0)), + materialize(), + toArray() + ) + ) expect(result).toEqual([ Notification.createError( diff --git a/test/rx/session.test.js b/test/rx/session.test.js index db1fdc4a7..28884dfef 100644 --- a/test/rx/session.test.js +++ b/test/rx/session.test.js @@ -17,7 +17,7 @@ * limitations under the License. */ -import { Notification, throwError } from 'rxjs' +import { lastValueFrom, Notification, throwError } from 'rxjs' import { map, materialize, toArray, concatWith as concat } from 'rxjs/operators' import neo4j from '../../src' import sharedNeo4j from '../internal/shared-neo4j' @@ -44,7 +44,7 @@ describe('#integration rx-session', () => { afterEach(async () => { if (session) { - await session.close().toPromise() + await lastValueFrom(session.close(), { defaultValue: undefined }) } await driver.close() }) @@ -54,15 +54,16 @@ describe('#integration rx-session', () => { return } - const result = await session - .run('UNWIND [1,2,3,4] AS n RETURN n') - .records() - .pipe( - map(r => r.get('n').toInt()), - materialize(), - toArray() - ) - .toPromise() + const result = await lastValueFrom( + session + .run('UNWIND [1,2,3,4] AS n RETURN n') + .records() + .pipe( + map(r => r.get('n').toInt()), + materialize(), + toArray() + ) + ) expect(result).toEqual([ Notification.createNext(1), @@ -78,24 +79,26 @@ describe('#integration rx-session', () => { return } - const result1 = await session - .run('INVALID STATEMENT') - .records() - .pipe(materialize(), toArray()) - .toPromise() + const result1 = await lastValueFrom( + session + .run('INVALID STATEMENT') + .records() + .pipe(materialize(), toArray()) + ) expect(result1).toEqual([ Notification.createError(jasmine.stringMatching(/Invalid input/)) ]) - const result2 = await session - .run('RETURN 1') - .records() - .pipe( - map(r => r.get(0).toInt()), - materialize(), - toArray() - ) - .toPromise() + const result2 = await lastValueFrom( + session + .run('RETURN 1') + .records() + .pipe( + map(r => r.get(0).toInt()), + materialize(), + toArray() + ) + ) expect(result2).toEqual([ Notification.createNext(1), Notification.createComplete() @@ -111,10 +114,11 @@ describe('#integration rx-session', () => { query: 'CREATE (:WithoutRetry) RETURN 5' }) - const result = await session - .writeTransaction(txc => txcWork.work(txc)) - .pipe(materialize(), toArray()) - .toPromise() + const result = await lastValueFrom( + session + .writeTransaction(txc => txcWork.work(txc)) + .pipe(materialize(), toArray()) + ) expect(result).toEqual([ Notification.createNext(5), Notification.createComplete() @@ -138,10 +142,11 @@ describe('#integration rx-session', () => { ] }) - const result = await session - .writeTransaction(txc => txcWork.work(txc)) - .pipe(materialize(), toArray()) - .toPromise() + const result = await lastValueFrom( + session + .writeTransaction(txc => txcWork.work(txc)) + .pipe(materialize(), toArray()) + ) expect(result).toEqual([ Notification.createNext(7), Notification.createComplete() @@ -165,10 +170,11 @@ describe('#integration rx-session', () => { ] }) - const result = await session - .writeTransaction(txc => txcWork.work(txc)) - .pipe(materialize(), toArray()) - .toPromise() + const result = await lastValueFrom( + session + .writeTransaction(txc => txcWork.work(txc)) + .pipe(materialize(), toArray()) + ) expect(result).toEqual([ Notification.createNext(9), Notification.createComplete() @@ -187,10 +193,11 @@ describe('#integration rx-session', () => { query: 'UNWIND [10, 5, 0] AS x CREATE (:Hi) RETURN 10/x' }) - const result = await session - .writeTransaction(txc => txcWork.work(txc)) - .pipe(materialize(), toArray()) - .toPromise() + const result = await lastValueFrom( + session + .writeTransaction(txc => txcWork.work(txc)) + .pipe(materialize(), toArray()) + ) expect(result).toEqual([ Notification.createNext(1), Notification.createNext(2), @@ -219,10 +226,11 @@ describe('#integration rx-session', () => { ] }) - const result = await session - .writeTransaction(txc => txcWork.work(txc)) - .pipe(materialize(), toArray()) - .toPromise() + const result = await lastValueFrom( + session + .writeTransaction(txc => txcWork.work(txc)) + .pipe(materialize(), toArray()) + ) expect(result).toEqual([ Notification.createError(jasmine.stringMatching(/a database error/)) ]) @@ -233,14 +241,15 @@ describe('#integration rx-session', () => { async function countNodes (label) { const session = driver.rxSession() - return await session - .run(`MATCH (n:${label}) RETURN count(n)`) - .records() - .pipe( - map(r => r.get(0).toInt()), - concat(session.close()) - ) - .toPromise() + return await lastValueFrom( + session + .run(`MATCH (n:${label}) RETURN count(n)`) + .records() + .pipe( + map(r => r.get(0).toInt()), + concat(session.close()) + ) + ) } class ConfigurableTransactionWork { constructor ({ query, syncFailures = [], reactiveFailures = [] } = {}) { diff --git a/test/rx/summary.test.js b/test/rx/summary.test.js index e39e32c48..d28c95a09 100644 --- a/test/rx/summary.test.js +++ b/test/rx/summary.test.js @@ -16,9 +16,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +import { lastValueFrom } from 'rxjs' import neo4j from '../../src' -import RxSession from '../../src/session-rx' -import RxTransaction from '../../src/transaction-rx' import sharedNeo4j from '../internal/shared-neo4j' describe('#integration-rx summary', () => { @@ -42,7 +41,7 @@ describe('#integration-rx summary', () => { afterEach(async () => { if (session) { - await session.close().toPromise() + await lastValueFrom(session.close(), { defaultValue: undefined }) } await driver.close() }) @@ -128,7 +127,7 @@ describe('#integration-rx summary', () => { sharedNeo4j.authToken ) session = driver.rxSession() - txc = await session.beginTransaction().toPromise() + txc = await lastValueFrom(session.beginTransaction()) const normalSession = driver.session() try { @@ -144,13 +143,13 @@ describe('#integration-rx summary', () => { afterEach(async () => { if (txc) { try { - await txc.commit().toPromise() + await lastValueFrom(txc.commit(), { defaultValue: undefined }) } catch (err) { // ignore } } if (session) { - await session.close().toPromise() + await lastValueFrom(session.close(), { defaultValue: undefined }) } await driver.close() }) @@ -243,7 +242,7 @@ describe('#integration-rx summary', () => { afterEach(async () => { if (session) { - await session.close().toPromise() + await lastValueFrom(session.close(), { defaultValue: undefined }) } await driver.close() }) @@ -264,10 +263,9 @@ describe('#integration-rx summary', () => { return } - const summary = await runnable - .run('UNWIND RANGE(1,10) AS n RETURN n') - .consume() - .toPromise() + const summary = await lastValueFrom( + runnable.run('UNWIND RANGE(1,10) AS n RETURN n').consume() + ) expect(summary).toBeDefined() } @@ -339,7 +337,7 @@ describe('#integration-rx summary', () => { let runnable = session if (useTransaction) { - runnable = await session.beginTransaction().toPromise() + runnable = await lastValueFrom(session.beginTransaction()) } try { @@ -571,10 +569,9 @@ describe('#integration-rx summary', () => { return } - const summary = await runnable - .run('CREATE (n) RETURN n') - .consume() - .toPromise() + const summary = await lastValueFrom( + runnable.run('CREATE (n) RETURN n').consume() + ) expect(summary).toBeDefined() expect(summary.hasPlan()).toBeFalsy() expect(summary.plan).toBeFalsy() @@ -591,10 +588,9 @@ describe('#integration-rx summary', () => { return } - const summary = await runnable - .run('EXPLAIN CREATE (n) RETURN n') - .consume() - .toPromise() + const summary = await lastValueFrom( + runnable.run('EXPLAIN CREATE (n) RETURN n').consume() + ) expect(summary).toBeDefined() expect(summary.hasPlan()).toBeTruthy() expect(summary.plan.operatorType).toContain('ProduceResults') @@ -612,10 +608,9 @@ describe('#integration-rx summary', () => { return } - const summary = await runnable - .run('PROFILE CREATE (n) RETURN n') - .consume() - .toPromise() + const summary = await lastValueFrom( + runnable.run('PROFILE CREATE (n) RETURN n').consume() + ) expect(summary).toBeDefined() expect(summary.hasPlan()).toBeTruthy() expect(summary.plan.operatorType).toContain('ProduceResults') @@ -634,10 +629,9 @@ describe('#integration-rx summary', () => { return } - const summary = await runnable - .run('CREATE (n) RETURN n') - .consume() - .toPromise() + const summary = await lastValueFrom( + runnable.run('CREATE (n) RETURN n').consume() + ) expect(summary).toBeDefined() expect(summary.notifications).toBeTruthy() expect(summary.notifications.length).toBe(0) @@ -652,10 +646,11 @@ describe('#integration-rx summary', () => { return } - const summary = await runnable - .run('EXPLAIN MATCH (n:ThisLabelDoesNotExistRx) RETURN n') - .consume() - .toPromise() + const summary = await lastValueFrom( + runnable + .run('EXPLAIN MATCH (n:ThisLabelDoesNotExistRx) RETURN n') + .consume() + ) expect(summary).toBeDefined() expect(summary.notifications).toBeTruthy() expect(summary.notifications.length).toBeGreaterThan(0) @@ -678,10 +673,9 @@ describe('#integration-rx summary', () => { query, parameters = null ) { - const summary = await runnable - .run(query, parameters) - .consume() - .toPromise() + const summary = await lastValueFrom( + runnable.run(query, parameters).consume() + ) expect(summary).toBeDefined() expect(summary.query).toBeDefined() expect(summary.query.text).toBe(query) @@ -695,10 +689,7 @@ describe('#integration-rx summary', () => { * @param {string} expectedQueryType */ async function verifyQueryType (runnable, query, expectedQueryType) { - const summary = await runnable - .run(query) - .consume() - .toPromise() + const summary = await lastValueFrom(runnable.run(query).consume()) expect(summary).toBeDefined() expect(summary.queryType).toBe(expectedQueryType) } @@ -711,10 +702,9 @@ describe('#integration-rx summary', () => { * @param {*} stats */ async function verifyUpdates (runnable, query, parameters, stats) { - const summary = await runnable - .run(query, parameters) - .consume() - .toPromise() + const summary = await lastValueFrom( + runnable.run(query, parameters).consume() + ) expect(summary).toBeDefined() expect(summary.counters.containsUpdates()).toBeTruthy() expect(summary.counters.updates()).toEqual(stats) @@ -735,10 +725,9 @@ describe('#integration-rx summary', () => { parameters, systemUpdates ) { - const summary = await runnable - .run(query, parameters) - .consume() - .toPromise() + const summary = await lastValueFrom( + runnable.run(query, parameters).consume() + ) expect(summary).toBeDefined() expect(summary.counters.containsSystemUpdates()).toBeTruthy() diff --git a/test/rx/transaction.test.js b/test/rx/transaction.test.js index 5c3645477..c08e718ed 100644 --- a/test/rx/transaction.test.js +++ b/test/rx/transaction.test.js @@ -17,7 +17,7 @@ * limitations under the License. */ -import { Notification } from 'rxjs' +import { lastValueFrom, Notification } from 'rxjs' import { mergeMap as flatMap, materialize, @@ -48,7 +48,7 @@ describe('#integration-rx transaction', () => { afterEach(async () => { if (session) { - await session.close().toPromise() + await lastValueFrom(session.close(), { defaultValue: undefined }) } await driver.close() }) @@ -58,14 +58,13 @@ describe('#integration-rx transaction', () => { return } - const result = await session - .beginTransaction() - .pipe( + const result = await lastValueFrom( + session.beginTransaction().pipe( flatMap(txc => txc.commit()), materialize(), toArray() ) - .toPromise() + ) expect(result).toEqual([Notification.createComplete()]) }) @@ -75,14 +74,13 @@ describe('#integration-rx transaction', () => { return } - const result = await session - .beginTransaction() - .pipe( + const result = await lastValueFrom( + session.beginTransaction().pipe( flatMap(txc => txc.rollback()), materialize(), toArray() ) - .toPromise() + ) expect(result).toEqual([Notification.createComplete()]) }) @@ -92,9 +90,8 @@ describe('#integration-rx transaction', () => { return } - const result = await session - .beginTransaction() - .pipe( + const result = await lastValueFrom( + session.beginTransaction().pipe( flatMap(txc => txc .run('CREATE (n:Node {id: 42}) RETURN n') @@ -107,7 +104,7 @@ describe('#integration-rx transaction', () => { materialize(), toArray() ) - .toPromise() + ) expect(result).toEqual([ Notification.createNext(neo4j.int(42)), Notification.createComplete() @@ -121,9 +118,8 @@ describe('#integration-rx transaction', () => { return } - const result = await session - .beginTransaction() - .pipe( + const result = await lastValueFrom( + session.beginTransaction().pipe( flatMap(txc => txc .run('CREATE (n:Node {id: 42}) RETURN n') @@ -136,7 +132,7 @@ describe('#integration-rx transaction', () => { materialize(), toArray() ) - .toPromise() + ) expect(result).toEqual([ Notification.createNext(neo4j.int(42)), Notification.createComplete() @@ -174,14 +170,13 @@ describe('#integration-rx transaction', () => { return } - const txc = await session.beginTransaction().toPromise() + const txc = await lastValueFrom(session.beginTransaction()) await verifyFailsWithWrongQuery(txc) - const result = await txc - .commit() - .pipe(materialize(), toArray()) - .toPromise() + const result = await lastValueFrom( + txc.commit().pipe(materialize(), toArray()) + ) expect(result).toEqual([ Notification.createError( jasmine.objectContaining({ @@ -198,14 +193,13 @@ describe('#integration-rx transaction', () => { return } - const txc = await session.beginTransaction().toPromise() + const txc = await lastValueFrom(session.beginTransaction()) await verifyFailsWithWrongQuery(txc) - const result = await txc - .rollback() - .pipe(materialize(), toArray()) - .toPromise() + const result = await lastValueFrom( + txc.rollback().pipe(materialize(), toArray()) + ) expect(result).toEqual([Notification.createComplete()]) }) @@ -214,16 +208,15 @@ describe('#integration-rx transaction', () => { return } - const txc = await session.beginTransaction().toPromise() + const txc = await lastValueFrom(session.beginTransaction()) await verifyCanCreateNode(txc, 5) await verifyCanReturnOne(txc) await verifyFailsWithWrongQuery(txc) - const result = await txc - .commit() - .pipe(materialize(), toArray()) - .toPromise() + const result = await lastValueFrom( + txc.commit().pipe(materialize(), toArray()) + ) expect(result).toEqual([ Notification.createError( jasmine.objectContaining({ @@ -240,16 +233,15 @@ describe('#integration-rx transaction', () => { return } - const txc = await session.beginTransaction().toPromise() + const txc = await lastValueFrom(session.beginTransaction()) await verifyCanCreateNode(txc, 5) await verifyCanReturnOne(txc) await verifyFailsWithWrongQuery(txc) - const result = await txc - .rollback() - .pipe(materialize(), toArray()) - .toPromise() + const result = await lastValueFrom( + txc.rollback().pipe(materialize(), toArray()) + ) expect(result).toEqual([Notification.createComplete()]) }) @@ -258,15 +250,16 @@ describe('#integration-rx transaction', () => { return } - const txc = await session.beginTransaction().toPromise() + const txc = await lastValueFrom(session.beginTransaction()) await verifyFailsWithWrongQuery(txc) - const result = await txc - .run('CREATE ()') - .records() - .pipe(materialize(), toArray()) - .toPromise() + const result = await lastValueFrom( + txc + .run('CREATE ()') + .records() + .pipe(materialize(), toArray()) + ) expect(result).toEqual([ Notification.createError( jasmine.objectContaining({ @@ -283,15 +276,14 @@ describe('#integration-rx transaction', () => { return } - const txc = await session.beginTransaction().toPromise() + const txc = await lastValueFrom(session.beginTransaction()) await verifyCanCreateNode(txc, 6) await verifyCanCommit(txc) - const result = await txc - .commit() - .pipe(materialize(), toArray()) - .toPromise() + const result = await lastValueFrom( + txc.commit().pipe(materialize(), toArray()) + ) expect(result).toEqual([ Notification.createError( jasmine.objectContaining({ @@ -308,15 +300,14 @@ describe('#integration-rx transaction', () => { return } - const txc = await session.beginTransaction().toPromise() + const txc = await lastValueFrom(session.beginTransaction()) await verifyCanCreateNode(txc, 6) await verifyCanRollback(txc) - const result = await txc - .rollback() - .pipe(materialize(), toArray()) - .toPromise() + const result = await lastValueFrom( + txc.rollback().pipe(materialize(), toArray()) + ) expect(result).toEqual([ Notification.createError( jasmine.objectContaining({ @@ -333,15 +324,14 @@ describe('#integration-rx transaction', () => { return } - const txc = await session.beginTransaction().toPromise() + const txc = await lastValueFrom(session.beginTransaction()) await verifyCanCreateNode(txc, 6) await verifyCanCommit(txc) - const result = await txc - .rollback() - .pipe(materialize(), toArray()) - .toPromise() + const result = await lastValueFrom( + txc.rollback().pipe(materialize(), toArray()) + ) expect(result).toEqual([ Notification.createError( jasmine.objectContaining({ @@ -358,15 +348,14 @@ describe('#integration-rx transaction', () => { return } - const txc = await session.beginTransaction().toPromise() + const txc = await lastValueFrom(session.beginTransaction()) await verifyCanCreateNode(txc, 6) await verifyCanRollback(txc) - const result = await txc - .commit() - .pipe(materialize(), toArray()) - .toPromise() + const result = await lastValueFrom( + txc.commit().pipe(materialize(), toArray()) + ) expect(result).toEqual([ Notification.createError( jasmine.objectContaining({ @@ -393,12 +382,12 @@ describe('#integration-rx transaction', () => { const bookmark0 = session.lastBookmark() - const txc1 = await session.beginTransaction().toPromise() + const txc1 = await lastValueFrom(session.beginTransaction()) await verifyCanCreateNode(txc1, 20) await verifyCanCommit(txc1) const bookmark1 = session.lastBookmark() - const txc2 = await session.beginTransaction().toPromise() + const txc2 = await lastValueFrom(session.beginTransaction()) await verifyCanCreateNode(txc2, 21) await verifyCanCommit(txc2) const bookmark2 = session.lastBookmark() @@ -415,16 +404,15 @@ describe('#integration-rx transaction', () => { return } - const txc = await session.beginTransaction().toPromise() + const txc = await lastValueFrom(session.beginTransaction()) const result1 = txc.run('CREATE (:TestNode) RETURN 1 AS n') const result2 = txc.run('CREATE (:TestNode) RETURN 2 AS n') const result3 = txc.run('RETURN 10 / 0 AS n') const result4 = txc.run('CREATE (:TestNode) RETURN 3 AS n') - const result = await result1 - .records() - .pipe( + const result = await lastValueFrom( + result1.records().pipe( concat(result2.records()), concat(result3.records()), concat(result4.records()), @@ -432,7 +420,7 @@ describe('#integration-rx transaction', () => { materialize(), toArray() ) - .toPromise() + ) expect(result).toEqual([ Notification.createNext(1), Notification.createNext(2), @@ -447,16 +435,15 @@ describe('#integration-rx transaction', () => { return } - const txc = await session.beginTransaction().toPromise() + const txc = await lastValueFrom(session.beginTransaction()) const result1 = txc.run('RETURN 1') const result2 = txc.run('RETURN 2') const result3 = txc.run('RETURN 3') const result4 = txc.run('RETURN 4') - const result = await result4 - .records() - .pipe( + const result = await lastValueFrom( + result4.records().pipe( concat(result3.records()), concat(result2.records()), concat(result1.records()), @@ -464,7 +451,7 @@ describe('#integration-rx transaction', () => { materialize(), toArray() ) - .toPromise() + ) expect(result).toEqual([ Notification.createNext(4), Notification.createNext(3), @@ -491,20 +478,19 @@ describe('#integration-rx transaction', () => { return } - const txc = await session.beginTransaction().toPromise() + const txc = await lastValueFrom(session.beginTransaction()) const result = txc.run('RETURN Wrong') - const messages = await result - .records() - .pipe(materialize(), toArray()) - .toPromise() + const messages = await lastValueFrom( + result.records().pipe(materialize(), toArray()) + ) expect(messages).toEqual([ Notification.createError( jasmine.stringMatching(/Variable `Wrong` not defined/) ) ]) - const summary = await result.consume().toPromise() + const summary = await lastValueFrom(result.consume()) expect(summary).toBeTruthy() }) @@ -513,7 +499,7 @@ describe('#integration-rx transaction', () => { return } - const txc = await session.beginTransaction().toPromise() + const txc = await lastValueFrom(session.beginTransaction()) txc.run('RETURN ILLEGAL') @@ -525,15 +511,16 @@ describe('#integration-rx transaction', () => { return } - const txc = await session.beginTransaction().toPromise() + const txc = await lastValueFrom(session.beginTransaction()) await verifyCanCreateNode(txc, 15) await verifyCanCommitOrRollback(txc, commit) - const result = await txc - .run('CREATE ()') - .records() - .pipe(materialize(), toArray()) - .toPromise() + const result = await lastValueFrom( + txc + .run('CREATE ()') + .records() + .pipe(materialize(), toArray()) + ) expect(result).toEqual([ Notification.createError( jasmine.objectContaining({ @@ -550,20 +537,11 @@ describe('#integration-rx transaction', () => { return } - const txc = await session.beginTransaction().toPromise() - - await txc - .run('CREATE (n:Node {id: 1})') - .consume() - .toPromise() - await txc - .run('CREATE (n:Node {id: 2})') - .consume() - .toPromise() - await txc - .run('CREATE (n:Node {id: 1})') - .consume() - .toPromise() + const txc = await lastValueFrom(session.beginTransaction()) + + await lastValueFrom(txc.run('CREATE (n:Node {id: 1})').consume()) + await lastValueFrom(txc.run('CREATE (n:Node {id: 2})').consume()) + await lastValueFrom(txc.run('CREATE (n:Node {id: 1})').consume()) await verifyCanCommitOrRollback(txc, commit) await verifyCommittedOrRollbacked(commit) @@ -574,21 +552,22 @@ describe('#integration-rx transaction', () => { return } - const txc = await session.beginTransaction().toPromise() + const txc = await lastValueFrom(session.beginTransaction()) const result1 = txc.run('CREATE (n:Node {id: 1})') const result2 = txc.run('CREATE (n:Node {id: 2})') const result3 = txc.run('CREATE (n:Node {id: 1})') - const results = await result1 - .records() - .pipe( - concat(result2.records()), - concat(result3.records()), - materialize(), - toArray() - ) - .toPromise() + const results = await lastValueFrom( + result1 + .records() + .pipe( + concat(result2.records()), + concat(result3.records()), + materialize(), + toArray() + ) + ) expect(results).toEqual([Notification.createComplete()]) await verifyCanCommitOrRollback(txc, commit) @@ -600,21 +579,22 @@ describe('#integration-rx transaction', () => { return } - const txc = await session.beginTransaction().toPromise() + const txc = await lastValueFrom(session.beginTransaction()) const result1 = txc.run('CREATE (n:Node {id: 1})') const result2 = txc.run('CREATE (n:Node {id: 2})') const result3 = txc.run('CREATE (n:Node {id: 1})') - const results = await result1 - .keys() - .pipe( - concat(result2.keys()), - concat(result3.keys()), - materialize(), - toArray() - ) - .toPromise() + const results = await lastValueFrom( + result1 + .keys() + .pipe( + concat(result2.keys()), + concat(result3.keys()), + materialize(), + toArray() + ) + ) expect(results).toEqual([ Notification.createNext([]), Notification.createNext([]), @@ -627,18 +607,16 @@ describe('#integration-rx transaction', () => { } async function verifyCanCommit (txc) { - const result = await txc - .commit() - .pipe(materialize(), toArray()) - .toPromise() + const result = await lastValueFrom( + txc.commit().pipe(materialize(), toArray()) + ) expect(result).toEqual([Notification.createComplete()]) } async function verifyCanRollback (txc) { - const result = await txc - .rollback() - .pipe(materialize(), toArray()) - .toPromise() + const result = await lastValueFrom( + txc.rollback().pipe(materialize(), toArray()) + ) expect(result).toEqual([Notification.createComplete()]) } @@ -651,15 +629,16 @@ describe('#integration-rx transaction', () => { } async function verifyCanCreateNode (txc, id) { - const result = await txc - .run('CREATE (n:Node {id: $id}) RETURN n', { id: neo4j.int(id) }) - .records() - .pipe( - map(r => r.get('n').properties.id), - materialize(), - toArray() - ) - .toPromise() + const result = await lastValueFrom( + txc + .run('CREATE (n:Node {id: $id}) RETURN n', { id: neo4j.int(id) }) + .records() + .pipe( + map(r => r.get('n').properties.id), + materialize(), + toArray() + ) + ) expect(result).toEqual([ Notification.createNext(neo4j.int(id)), Notification.createComplete() @@ -667,15 +646,16 @@ describe('#integration-rx transaction', () => { } async function verifyCanReturnOne (txc) { - const result = await txc - .run('RETURN 1') - .records() - .pipe( - map(r => r.get(0)), - materialize(), - toArray() - ) - .toPromise() + const result = await lastValueFrom( + txc + .run('RETURN 1') + .records() + .pipe( + map(r => r.get(0)), + materialize(), + toArray() + ) + ) expect(result).toEqual([ Notification.createNext(neo4j.int(1)), Notification.createComplete() @@ -683,11 +663,12 @@ describe('#integration-rx transaction', () => { } async function verifyFailsWithWrongQuery (txc) { - const result = await txc - .run('RETURN') - .records() - .pipe(materialize(), toArray()) - .toPromise() + const result = await lastValueFrom( + txc + .run('RETURN') + .records() + .pipe(materialize(), toArray()) + ) expect(result).toEqual([ Notification.createError( jasmine.stringMatching(/Unexpected end of input|Invalid input/) @@ -707,13 +688,14 @@ describe('#integration-rx transaction', () => { async function countNodes (id) { const session = driver.rxSession() - return await session - .run('MATCH (n:Node {id: $id}) RETURN count(n)', { id: id }) - .records() - .pipe( - map(r => r.get(0).toInt()), - concat(session.close()) - ) - .toPromise() + return await lastValueFrom( + session + .run('MATCH (n:Node {id: $id}) RETURN count(n)', { id: id }) + .records() + .pipe( + map(r => r.get(0).toInt()), + concat(session.close()) + ) + ) } }) From f547997bd039950d71fd96834118c1218487158b Mon Sep 17 00:00:00 2001 From: Carson Full Date: Sat, 18 Sep 2021 12:24:23 -0500 Subject: [PATCH 06/11] Replace Notification instances with plain objects --- test/examples.test.js | 19 ++--- test/rx/navigation.test.js | 35 ++++---- test/rx/nested-statements.test.js | 12 +-- test/rx/session.test.js | 42 ++++------ test/rx/transaction.test.js | 130 +++++++++++++++--------------- 5 files changed, 109 insertions(+), 129 deletions(-) diff --git a/test/examples.test.js b/test/examples.test.js index 0d26ba9f5..ac953260b 100644 --- a/test/examples.test.js +++ b/test/examples.test.js @@ -21,7 +21,7 @@ import neo4j from '../src' import sharedNeo4j from './internal/shared-neo4j' import { ServerVersion, VERSION_4_0_0 } from '../src/internal/server-version' import { map, materialize, toArray } from 'rxjs/operators' -import { Notification, lastValueFrom } from 'rxjs' +import { lastValueFrom } from 'rxjs' /** * The tests below are examples that get pulled into the Driver Manual using the tags inside the tests. @@ -145,10 +145,7 @@ describe('#integration examples', () => { // end::rx-autocommit-transaction[] const result = await lastValueFrom(readProductTitles()) - expect(result).toEqual([ - Notification.createNext('Product-0'), - Notification.createComplete() - ]) + expect(result).toEqual([{ kind: 'N', value: 'Product-0' }, { kind: 'C' }]) }, 60000) it('basic auth example', async () => { @@ -625,9 +622,9 @@ describe('#integration examples', () => { const people = await lastValueFrom(result) expect(people).toEqual([ - Notification.createNext('Alice'), - Notification.createNext('Bob'), - Notification.createComplete() + { kind: 'N', value: 'Alice' }, + { kind: 'N', value: 'Bob' }, + { kind: 'C' } ]) }, 60000) @@ -804,9 +801,9 @@ describe('#integration examples', () => { const people = await lastValueFrom(result) expect(people).toEqual([ - Notification.createNext('Infinity Gauntlet'), - Notification.createNext('Mjölnir'), - Notification.createComplete() + { kind: 'N', value: 'Infinity Gauntlet' }, + { kind: 'N', value: 'Mjölnir' }, + { kind: 'C' } ]) }, 60000) diff --git a/test/rx/navigation.test.js b/test/rx/navigation.test.js index f1870adc9..6ae42d03b 100644 --- a/test/rx/navigation.test.js +++ b/test/rx/navigation.test.js @@ -18,7 +18,7 @@ */ import neo4j from '../../src' import sharedNeo4j from '../internal/shared-neo4j' -import { lastValueFrom, Notification } from 'rxjs' +import { lastValueFrom } from 'rxjs' import { materialize, toArray, map } from 'rxjs/operators' describe('#integration-rx navigation', () => { @@ -372,8 +372,8 @@ describe('#integration-rx navigation', () => { ) expect(result).toEqual([ - Notification.createNext(['f1', 'f2', 'f3']), - Notification.createComplete() + { kind: 'N', value: ['f1', 'f2', 'f3'] }, + { kind: 'C' } ]) } @@ -587,10 +587,7 @@ describe('#integration-rx navigation', () => { .keys() .pipe(materialize(), toArray()) ) - expect(keys).toEqual([ - Notification.createNext([]), - Notification.createComplete() - ]) + expect(keys).toEqual([{ kind: 'N', value: [] }, { kind: 'C' }]) } /** @@ -792,8 +789,8 @@ describe('#integration-rx navigation', () => { result.keys().pipe(materialize(), toArray()) ) expect(keys).toEqual([ - Notification.createNext(['number', 'text']), - Notification.createComplete() + { kind: 'N', value: ['number', 'text'] }, + { kind: 'C' } ]) } @@ -806,12 +803,12 @@ describe('#integration-rx navigation', () => { ) ) expect(records).toEqual([ - Notification.createNext([neo4j.int(1), 't1']), - Notification.createNext([neo4j.int(2), 't2']), - Notification.createNext([neo4j.int(3), 't3']), - Notification.createNext([neo4j.int(4), 't4']), - Notification.createNext([neo4j.int(5), 't5']), - Notification.createComplete() + { kind: 'N', value: [neo4j.int(1), 't1'] }, + { kind: 'N', value: [neo4j.int(2), 't2'] }, + { kind: 'N', value: [neo4j.int(3), 't3'] }, + { kind: 'N', value: [neo4j.int(4), 't4'] }, + { kind: 'N', value: [neo4j.int(5), 't5'] }, + { kind: 'C' } ]) } @@ -824,14 +821,14 @@ describe('#integration-rx navigation', () => { ) ) expect(summary).toEqual([ - Notification.createNext(expectedQueryType), - Notification.createComplete() + { kind: 'N', value: expectedQueryType }, + { kind: 'C' } ]) } async function collectAndAssertEmpty (stream) { const result = await lastValueFrom(stream.pipe(materialize(), toArray())) - expect(result).toEqual([Notification.createComplete()]) + expect(result).toEqual([{ kind: 'C' }]) } /** @@ -842,6 +839,6 @@ describe('#integration-rx navigation', () => { async function collectAndAssertError (stream, expectedError) { const result = await lastValueFrom(stream.pipe(materialize(), toArray())) - expect(result).toEqual([Notification.createError(expectedError)]) + expect(result).toEqual([{ kind: 'E', error: expectedError }]) } }) diff --git a/test/rx/nested-statements.test.js b/test/rx/nested-statements.test.js index 2e10f061e..6f9c284de 100644 --- a/test/rx/nested-statements.test.js +++ b/test/rx/nested-statements.test.js @@ -17,7 +17,7 @@ * limitations under the License. */ -import { lastValueFrom, Notification, throwError } from 'rxjs' +import { lastValueFrom, throwError } from 'rxjs' import { mergeMap as flatMap, materialize, @@ -28,7 +28,6 @@ import { catchError } from 'rxjs/operators' import neo4j from '../../src' -import RxSession from '../../src/session-rx' import sharedNeo4j from '../internal/shared-neo4j' describe('#integration-rx transaction', () => { @@ -90,7 +89,7 @@ describe('#integration-rx transaction', () => { ) expect(messages.length).toBe(size + 1) - expect(messages[size]).toEqual(Notification.createComplete()) + expect(messages[size]).toEqual({ kind: 'C' }) }) it('should give proper error when nesting queries within one session', async () => { @@ -120,11 +119,12 @@ describe('#integration-rx transaction', () => { ) expect(result).toEqual([ - Notification.createError( - jasmine.stringMatching( + { + kind: 'E', + error: jasmine.stringMatching( /Queries cannot be run directly on a session with an open transaction/ ) - ) + } ]) }) }) diff --git a/test/rx/session.test.js b/test/rx/session.test.js index 28884dfef..783dd200f 100644 --- a/test/rx/session.test.js +++ b/test/rx/session.test.js @@ -17,7 +17,7 @@ * limitations under the License. */ -import { lastValueFrom, Notification, throwError } from 'rxjs' +import { lastValueFrom, throwError } from 'rxjs' import { map, materialize, toArray, concatWith as concat } from 'rxjs/operators' import neo4j from '../../src' import sharedNeo4j from '../internal/shared-neo4j' @@ -66,11 +66,11 @@ describe('#integration rx-session', () => { ) expect(result).toEqual([ - Notification.createNext(1), - Notification.createNext(2), - Notification.createNext(3), - Notification.createNext(4), - Notification.createComplete() + { kind: 'N', value: 1 }, + { kind: 'N', value: 2 }, + { kind: 'N', value: 3 }, + { kind: 'N', value: 4 }, + { kind: 'C' } ]) }, 60000) @@ -86,7 +86,7 @@ describe('#integration rx-session', () => { .pipe(materialize(), toArray()) ) expect(result1).toEqual([ - Notification.createError(jasmine.stringMatching(/Invalid input/)) + { kind: 'E', error: jasmine.stringMatching(/Invalid input/) } ]) const result2 = await lastValueFrom( @@ -99,10 +99,7 @@ describe('#integration rx-session', () => { toArray() ) ) - expect(result2).toEqual([ - Notification.createNext(1), - Notification.createComplete() - ]) + expect(result2).toEqual([{ kind: 'N', value: 1 }, { kind: 'C' }]) }, 60000) it('should run transactions without retries', async () => { @@ -119,10 +116,7 @@ describe('#integration rx-session', () => { .writeTransaction(txc => txcWork.work(txc)) .pipe(materialize(), toArray()) ) - expect(result).toEqual([ - Notification.createNext(5), - Notification.createComplete() - ]) + expect(result).toEqual([{ kind: 'N', value: 5 }, { kind: 'C' }]) expect(txcWork.invocations).toBe(1) expect(await countNodes('WithoutRetry')).toBe(1) @@ -147,10 +141,7 @@ describe('#integration rx-session', () => { .writeTransaction(txc => txcWork.work(txc)) .pipe(materialize(), toArray()) ) - expect(result).toEqual([ - Notification.createNext(7), - Notification.createComplete() - ]) + expect(result).toEqual([{ kind: 'N', value: 7 }, { kind: 'C' }]) expect(txcWork.invocations).toBe(4) expect(await countNodes('WithReactiveFailure')).toBe(1) @@ -175,10 +166,7 @@ describe('#integration rx-session', () => { .writeTransaction(txc => txcWork.work(txc)) .pipe(materialize(), toArray()) ) - expect(result).toEqual([ - Notification.createNext(9), - Notification.createComplete() - ]) + expect(result).toEqual([{ kind: 'N', value: 9 }, { kind: 'C' }]) expect(txcWork.invocations).toBe(4) expect(await countNodes('WithSyncFailure')).toBe(1) @@ -199,9 +187,9 @@ describe('#integration rx-session', () => { .pipe(materialize(), toArray()) ) expect(result).toEqual([ - Notification.createNext(1), - Notification.createNext(2), - Notification.createError(jasmine.stringMatching(/\/ by zero/)) + { kind: 'N', value: 1 }, + { kind: 'N', value: 2 }, + { kind: 'E', error: jasmine.stringMatching(/\/ by zero/) } ]) expect(txcWork.invocations).toBe(1) @@ -232,7 +220,7 @@ describe('#integration rx-session', () => { .pipe(materialize(), toArray()) ) expect(result).toEqual([ - Notification.createError(jasmine.stringMatching(/a database error/)) + { kind: 'E', error: jasmine.stringMatching(/a database error/) } ]) expect(txcWork.invocations).toBe(2) diff --git a/test/rx/transaction.test.js b/test/rx/transaction.test.js index c08e718ed..82f6fb86f 100644 --- a/test/rx/transaction.test.js +++ b/test/rx/transaction.test.js @@ -17,7 +17,7 @@ * limitations under the License. */ -import { lastValueFrom, Notification } from 'rxjs' +import { lastValueFrom } from 'rxjs' import { mergeMap as flatMap, materialize, @@ -66,7 +66,7 @@ describe('#integration-rx transaction', () => { ) ) - expect(result).toEqual([Notification.createComplete()]) + expect(result).toEqual([{ kind: 'C' }]) }) it('should rollback an empty transaction', async () => { @@ -82,7 +82,7 @@ describe('#integration-rx transaction', () => { ) ) - expect(result).toEqual([Notification.createComplete()]) + expect(result).toEqual([{ kind: 'C' }]) }) it('should run query and commit', async () => { @@ -105,10 +105,7 @@ describe('#integration-rx transaction', () => { toArray() ) ) - expect(result).toEqual([ - Notification.createNext(neo4j.int(42)), - Notification.createComplete() - ]) + expect(result).toEqual([{ kind: 'N', value: neo4j.int(42) }, { kind: 'C' }]) expect(await countNodes(42)).toBe(1) }) @@ -133,10 +130,7 @@ describe('#integration-rx transaction', () => { toArray() ) ) - expect(result).toEqual([ - Notification.createNext(neo4j.int(42)), - Notification.createComplete() - ]) + expect(result).toEqual([{ kind: 'N', value: neo4j.int(42) }, { kind: 'C' }]) expect(await countNodes(42)).toBe(0) }) @@ -178,13 +172,14 @@ describe('#integration-rx transaction', () => { txc.commit().pipe(materialize(), toArray()) ) expect(result).toEqual([ - Notification.createError( - jasmine.objectContaining({ + { + kind: 'E', + error: jasmine.objectContaining({ message: jasmine.stringMatching( /Cannot commit this transaction, because .* of an error/ ) }) - ) + } ]) }) @@ -200,7 +195,7 @@ describe('#integration-rx transaction', () => { const result = await lastValueFrom( txc.rollback().pipe(materialize(), toArray()) ) - expect(result).toEqual([Notification.createComplete()]) + expect(result).toEqual([{ kind: 'C' }]) }) it('should fail to commit after successful and failed query', async () => { @@ -218,13 +213,14 @@ describe('#integration-rx transaction', () => { txc.commit().pipe(materialize(), toArray()) ) expect(result).toEqual([ - Notification.createError( - jasmine.objectContaining({ + { + kind: 'E', + error: jasmine.objectContaining({ message: jasmine.stringMatching( /Cannot commit this transaction, because .* of an error/ ) }) - ) + } ]) }) @@ -242,7 +238,7 @@ describe('#integration-rx transaction', () => { const result = await lastValueFrom( txc.rollback().pipe(materialize(), toArray()) ) - expect(result).toEqual([Notification.createComplete()]) + expect(result).toEqual([{ kind: 'C' }]) }) it('should fail to run another query after a failed one', async () => { @@ -261,13 +257,14 @@ describe('#integration-rx transaction', () => { .pipe(materialize(), toArray()) ) expect(result).toEqual([ - Notification.createError( - jasmine.objectContaining({ + { + kind: 'E', + error: jasmine.objectContaining({ message: jasmine.stringMatching( /Cannot run query in this transaction, because .* of an error/ ) }) - ) + } ]) }) @@ -285,13 +282,14 @@ describe('#integration-rx transaction', () => { txc.commit().pipe(materialize(), toArray()) ) expect(result).toEqual([ - Notification.createError( - jasmine.objectContaining({ + { + kind: 'E', + error: jasmine.objectContaining({ message: jasmine.stringMatching( /Cannot commit this transaction, because .* committed/ ) }) - ) + } ]) }) @@ -309,13 +307,14 @@ describe('#integration-rx transaction', () => { txc.rollback().pipe(materialize(), toArray()) ) expect(result).toEqual([ - Notification.createError( - jasmine.objectContaining({ + { + kind: 'E', + error: jasmine.objectContaining({ message: jasmine.stringMatching( /Cannot rollback this transaction, because .* rolled back/ ) }) - ) + } ]) }) @@ -333,13 +332,14 @@ describe('#integration-rx transaction', () => { txc.rollback().pipe(materialize(), toArray()) ) expect(result).toEqual([ - Notification.createError( - jasmine.objectContaining({ + { + kind: 'E', + error: jasmine.objectContaining({ message: jasmine.stringMatching( /Cannot rollback this transaction, because .* committed/ ) }) - ) + } ]) }) @@ -357,13 +357,14 @@ describe('#integration-rx transaction', () => { txc.commit().pipe(materialize(), toArray()) ) expect(result).toEqual([ - Notification.createError( - jasmine.objectContaining({ + { + kind: 'E', + error: jasmine.objectContaining({ message: jasmine.stringMatching( /Cannot commit this transaction, because .* rolled back/ ) }) - ) + } ]) }) @@ -422,9 +423,9 @@ describe('#integration-rx transaction', () => { ) ) expect(result).toEqual([ - Notification.createNext(1), - Notification.createNext(2), - Notification.createError(newError('/ by zero')) + { kind: 'N', value: 1 }, + { kind: 'N', value: 2 }, + { kind: 'E', error: newError('/ by zero') } ]) await verifyCanRollback(txc) @@ -453,11 +454,11 @@ describe('#integration-rx transaction', () => { ) ) expect(result).toEqual([ - Notification.createNext(4), - Notification.createNext(3), - Notification.createNext(2), - Notification.createNext(1), - Notification.createComplete() + { kind: 'N', value: 4 }, + { kind: 'N', value: 3 }, + { kind: 'N', value: 2 }, + { kind: 'N', value: 1 }, + { kind: 'C' } ]) await verifyCanCommit(txc) @@ -485,9 +486,10 @@ describe('#integration-rx transaction', () => { result.records().pipe(materialize(), toArray()) ) expect(messages).toEqual([ - Notification.createError( - jasmine.stringMatching(/Variable `Wrong` not defined/) - ) + { + kind: 'E', + error: jasmine.stringMatching(/Variable `Wrong` not defined/) + } ]) const summary = await lastValueFrom(result.consume()) @@ -522,13 +524,14 @@ describe('#integration-rx transaction', () => { .pipe(materialize(), toArray()) ) expect(result).toEqual([ - Notification.createError( - jasmine.objectContaining({ + { + kind: 'E', + error: jasmine.objectContaining({ message: jasmine.stringMatching( /Cannot run query in this transaction, because/ ) }) - ) + } ]) } @@ -568,7 +571,7 @@ describe('#integration-rx transaction', () => { toArray() ) ) - expect(results).toEqual([Notification.createComplete()]) + expect(results).toEqual([{ kind: 'C' }]) await verifyCanCommitOrRollback(txc, commit) await verifyCommittedOrRollbacked(commit) @@ -596,10 +599,10 @@ describe('#integration-rx transaction', () => { ) ) expect(results).toEqual([ - Notification.createNext([]), - Notification.createNext([]), - Notification.createNext([]), - Notification.createComplete() + { kind: 'N', value: [] }, + { kind: 'N', value: [] }, + { kind: 'N', value: [] }, + { kind: 'C' } ]) await verifyCanCommitOrRollback(txc, commit) @@ -610,14 +613,14 @@ describe('#integration-rx transaction', () => { const result = await lastValueFrom( txc.commit().pipe(materialize(), toArray()) ) - expect(result).toEqual([Notification.createComplete()]) + expect(result).toEqual([{ kind: 'C' }]) } async function verifyCanRollback (txc) { const result = await lastValueFrom( txc.rollback().pipe(materialize(), toArray()) ) - expect(result).toEqual([Notification.createComplete()]) + expect(result).toEqual([{ kind: 'C' }]) } async function verifyCanCommitOrRollback (txc, commit) { @@ -639,10 +642,7 @@ describe('#integration-rx transaction', () => { toArray() ) ) - expect(result).toEqual([ - Notification.createNext(neo4j.int(id)), - Notification.createComplete() - ]) + expect(result).toEqual([{ kind: 'N', value: neo4j.int(id) }, { kind: 'C' }]) } async function verifyCanReturnOne (txc) { @@ -656,10 +656,7 @@ describe('#integration-rx transaction', () => { toArray() ) ) - expect(result).toEqual([ - Notification.createNext(neo4j.int(1)), - Notification.createComplete() - ]) + expect(result).toEqual([{ kind: 'N', value: neo4j.int(1) }, { kind: 'C' }]) } async function verifyFailsWithWrongQuery (txc) { @@ -670,9 +667,10 @@ describe('#integration-rx transaction', () => { .pipe(materialize(), toArray()) ) expect(result).toEqual([ - Notification.createError( - jasmine.stringMatching(/Unexpected end of input|Invalid input/) - ) + { + kind: 'E', + error: jasmine.stringMatching(/Unexpected end of input|Invalid input/) + } ]) } From 2b1791d4eb354da4588b8beb9b299b3f38d26240 Mon Sep 17 00:00:00 2001 From: Carson Full Date: Sat, 18 Sep 2021 12:35:02 -0500 Subject: [PATCH 07/11] Replace publishReplay + refCount with shareReplay --- src/result-rx.js | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/result-rx.js b/src/result-rx.js index 7b54b9550..065587df5 100644 --- a/src/result-rx.js +++ b/src/result-rx.js @@ -18,7 +18,7 @@ */ import { newError, Record, ResultSummary } from 'neo4j-driver-core' import { Observable, Subject, ReplaySubject, from } from 'rxjs' -import { mergeMap as flatMap, publishReplay, refCount } from 'rxjs/operators' +import { mergeMap as flatMap, shareReplay } from 'rxjs/operators' const States = { READY: 0, @@ -36,13 +36,12 @@ export default class RxResult { * @param {Observable} result - An observable of single Result instance to relay requests. */ constructor (result) { - const replayedResult = result.pipe(publishReplay(1), refCount()) + const replayedResult = result.pipe(shareReplay(1)) this._result = replayedResult this._keys = replayedResult.pipe( flatMap(r => from(r.keys())), - publishReplay(1), - refCount() + shareReplay(1) ) this._records = new Subject() this._summary = new ReplaySubject() From f4598969bce077524ea46156074de5d90f6bcde2 Mon Sep 17 00:00:00 2001 From: Carson Full Date: Sat, 18 Sep 2021 12:35:58 -0500 Subject: [PATCH 08/11] Correct jsdoc to make txconfig param optional --- src/session-rx.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/session-rx.js b/src/session-rx.js index e925dae25..8abfbea81 100644 --- a/src/session-rx.js +++ b/src/session-rx.js @@ -77,7 +77,7 @@ export default class RxSession { * Starts a new explicit transaction with the provided transaction configuration. * * @public - * @param {TransactionConfig} transactionConfig - Configuration for the new transaction. + * @param {TransactionConfig} [transactionConfig] - Configuration for the new transaction. * @returns {Observable} - A reactive stream that will generate at most **one** RxTransaction instance. */ beginTransaction (transactionConfig) { @@ -89,7 +89,7 @@ export default class RxSession { * transaction configuration. * @public * @param {function(txc: RxTransaction): Observable} work - A unit of work to be executed. - * @param {TransactionConfig} transactionConfig - Configuration for the enclosing transaction created by the driver. + * @param {TransactionConfig} [transactionConfig] - Configuration for the enclosing transaction created by the driver. * @returns {Observable} - A reactive stream returned by the unit of work. */ readTransaction (work, transactionConfig) { From caf659da993f78ec0b1af186cc417ca905a85cf6 Mon Sep 17 00:00:00 2001 From: Carson Full Date: Sat, 18 Sep 2021 12:48:01 -0500 Subject: [PATCH 09/11] Simplify RxTransaction using from & defer --- src/transaction-rx.js | 38 ++++++-------------------------------- types/transaction-rx.d.ts | 4 ++-- 2 files changed, 8 insertions(+), 34 deletions(-) diff --git a/src/transaction-rx.js b/src/transaction-rx.js index 5fa713c5c..c7bcc5861 100644 --- a/src/transaction-rx.js +++ b/src/transaction-rx.js @@ -16,7 +16,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { Observable } from 'rxjs' +import { defer, from, Observable } from 'rxjs' import RxResult from './result-rx' import Transaction from 'neo4j-driver-core' @@ -41,53 +41,27 @@ export default class RxTransaction { * @param {Object} parameters - Parameter values to use in query execution. * @returns {RxResult} - A reactive result */ - run (query, parameters) { - return new RxResult( - new Observable(observer => { - try { - observer.next(this._txc.run(query, parameters)) - observer.complete() - } catch (err) { - observer.error(err) - } - - return () => {} - }) - ) + return new RxResult(defer(() => [this._txc.run(query, parameters)])) } /** * Commits the transaction. * * @public - * @returns {Observable} - An empty observable + * @returns {Observable} - An empty observable */ commit () { - return new Observable(observer => { - this._txc - .commit() - .then(() => { - observer.complete() - }) - .catch(err => observer.error(err)) - }) + return from(this._txc.commit()) } /** * Rolls back the transaction. * * @public - * @returns {Observable} - An empty observable + * @returns {Observable} - An empty observable */ rollback () { - return new Observable(observer => { - this._txc - .rollback() - .then(() => { - observer.complete() - }) - .catch(err => observer.error(err)) - }) + return from(this._txc.rollback()) } } diff --git a/types/transaction-rx.d.ts b/types/transaction-rx.d.ts index ddf69708c..ffe48e5e3 100644 --- a/types/transaction-rx.d.ts +++ b/types/transaction-rx.d.ts @@ -23,9 +23,9 @@ import RxResult from './result-rx' declare interface RxTransaction { run(query: string, parameters?: Parameters): RxResult - commit(): Observable + commit(): Observable - rollback(): Observable + rollback(): Observable } export default RxTransaction From 904564e7c191701c867b1f0af2a660793bc413cb Mon Sep 17 00:00:00 2001 From: Carson Full Date: Sat, 18 Sep 2021 12:54:00 -0500 Subject: [PATCH 10/11] Simplify RxSession using from & defer --- src/session-rx.js | 39 ++++++--------------------------------- 1 file changed, 6 insertions(+), 33 deletions(-) diff --git a/src/session-rx.js b/src/session-rx.js index 8abfbea81..17ca457cc 100644 --- a/src/session-rx.js +++ b/src/session-rx.js @@ -16,7 +16,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { defer, Observable, throwError } from 'rxjs' +import { defer, from, Observable, throwError } from 'rxjs' import { mergeMap as flatMap, catchError, @@ -60,16 +60,7 @@ export default class RxSession { */ run (query, parameters, transactionConfig) { return new RxResult( - new Observable(observer => { - try { - observer.next(this._session.run(query, parameters, transactionConfig)) - observer.complete() - } catch (err) { - observer.error(err) - } - - return () => {} - }) + defer(() => [this._session.run(query, parameters, transactionConfig)]) ) } @@ -115,14 +106,7 @@ export default class RxSession { * @returns {Observable} - An empty reactive stream */ close () { - return new Observable(observer => { - this._session - .close() - .then(() => { - observer.complete() - }) - .catch(err => observer.error(err)) - }) + return from(this._session.close()) } /** @@ -149,20 +133,9 @@ export default class RxSession { txConfig = new TxConfig(transactionConfig) } - return new Observable(observer => { - try { - observer.next( - new RxTransaction( - this._session._beginTransaction(accessMode, txConfig) - ) - ) - observer.complete() - } catch (err) { - observer.error(err) - } - - return () => {} - }) + return defer(() => [ + new RxTransaction(this._session._beginTransaction(accessMode, txConfig)) + ]) } /** From 6cf9f874d93051e94222cd2156c83c3a93d553a9 Mon Sep 17 00:00:00 2001 From: Carson Full Date: Sat, 18 Sep 2021 12:55:21 -0500 Subject: [PATCH 11/11] Simplify tx config in RxSession --- src/session-rx.js | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/session-rx.js b/src/session-rx.js index 17ca457cc..26d2dc4ca 100644 --- a/src/session-rx.js +++ b/src/session-rx.js @@ -128,10 +128,9 @@ export default class RxSession { * @private */ _beginTransaction (accessMode, transactionConfig) { - let txConfig = TxConfig.empty() - if (transactionConfig) { - txConfig = new TxConfig(transactionConfig) - } + const txConfig = transactionConfig + ? new TxConfig(transactionConfig) + : TxConfig.empty() return defer(() => [ new RxTransaction(this._session._beginTransaction(accessMode, txConfig)) @@ -142,11 +141,6 @@ export default class RxSession { * @private */ _runTransaction (accessMode, work, transactionConfig) { - let txConfig = TxConfig.empty() - if (transactionConfig) { - txConfig = new TxConfig(transactionConfig) - } - return this._retryLogic.retry( this._beginTransaction(accessMode, transactionConfig).pipe( flatMap(txc =>