Skip to content

RxJS 7 #774

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ rxSession
.records()
.pipe(
map(record => record.get('name')),
concat(rxSession.close())
concatWith(rxSession.close())
)
.subscribe({
next: data => console.log(data),
Expand Down Expand Up @@ -373,7 +373,7 @@ try {
rxSession
.beginTransaction()
.pipe(
flatMap(txc =>
mergeMap(txc =>
concat(
txc
.run(
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,6 @@
"@babel/runtime": "^7.5.5",
"neo4j-driver-bolt-connection": "file:./bolt-connection",
"neo4j-driver-core": "file:./core",
"rxjs": "^6.6.3"
"rxjs": "^7.3.0"
}
}
6 changes: 3 additions & 3 deletions src/internal/retry-logic-rx.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { newError, error, internal } from 'neo4j-driver-core'
import { Observable, throwError, of } from 'rxjs'
import { retryWhen, flatMap, delay } from 'rxjs/operators'
import { retryWhen, mergeMap as flatMap, delay } from 'rxjs/operators'

const {
logger: {
Expand Down Expand Up @@ -82,7 +82,7 @@ export default class RxRetryLogic {
return failedWork.pipe(
flatMap(err => {
if (!canRetryOn(err)) {
return throwError(err)
return throwError(() => err)
}

handledExceptions.push(err)
Expand All @@ -98,7 +98,7 @@ export default class RxRetryLogic {

error.seenErrors = handledExceptions

return throwError(error)
return throwError(() => error)
}

const nextDelayDuration = this._computeNextDelay(delayDuration)
Expand Down
7 changes: 3 additions & 4 deletions src/result-rx.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
import { newError, Record, ResultSummary } from 'neo4j-driver-core'
import { Observable, Subject, ReplaySubject, from } from 'rxjs'
import { flatMap, publishReplay, refCount, shareReplay } from 'rxjs/operators'
import { mergeMap as flatMap, shareReplay } from 'rxjs/operators'

const States = {
READY: 0,
Expand All @@ -36,13 +36,12 @@ export default class RxResult {
* @param {Observable<Result>} result - An observable of single Result instance to relay requests.
*/
constructor (result) {
const replayedResult = result.pipe(publishReplay(1), refCount())
const replayedResult = result.pipe(shareReplay(1))

this._result = replayedResult
this._keys = replayedResult.pipe(
flatMap(r => from(r.keys())),
publishReplay(1),
refCount()
shareReplay(1)
)
this._records = new Subject()
this._summary = new ReplaySubject()
Expand Down
67 changes: 20 additions & 47 deletions src/session-rx.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { defer, Observable, throwError } from 'rxjs'
import { flatMap, catchError, concat } from 'rxjs/operators'
import { defer, from, Observable, throwError } from 'rxjs'
import {
mergeMap as flatMap,
catchError,
concatWith as concat
} from 'rxjs/operators'
import RxResult from './result-rx'
import { Session, internal } from 'neo4j-driver-core'
import RxTransaction from './transaction-rx'
Expand Down Expand Up @@ -56,24 +60,15 @@ export default class RxSession {
*/
run (query, parameters, transactionConfig) {
return new RxResult(
new Observable(observer => {
try {
observer.next(this._session.run(query, parameters, transactionConfig))
observer.complete()
} catch (err) {
observer.error(err)
}

return () => {}
})
defer(() => [this._session.run(query, parameters, transactionConfig)])
)
}

/**
* Starts a new explicit transaction with the provided transaction configuration.
*
* @public
* @param {TransactionConfig} transactionConfig - Configuration for the new transaction.
* @param {TransactionConfig} [transactionConfig] - Configuration for the new transaction.
* @returns {Observable<RxTransaction>} - A reactive stream that will generate at most **one** RxTransaction instance.
*/
beginTransaction (transactionConfig) {
Expand All @@ -85,7 +80,7 @@ export default class RxSession {
* transaction configuration.
* @public
* @param {function(txc: RxTransaction): Observable} work - A unit of work to be executed.
* @param {TransactionConfig} transactionConfig - Configuration for the enclosing transaction created by the driver.
* @param {TransactionConfig} [transactionConfig] - Configuration for the enclosing transaction created by the driver.
* @returns {Observable} - A reactive stream returned by the unit of work.
*/
readTransaction (work, transactionConfig) {
Expand All @@ -111,14 +106,7 @@ export default class RxSession {
* @returns {Observable} - An empty reactive stream
*/
close () {
return new Observable(observer => {
this._session
.close()
.then(() => {
observer.complete()
})
.catch(err => observer.error(err))
})
return from(this._session.close())
}

/**
Expand All @@ -140,47 +128,32 @@ export default class RxSession {
* @private
*/
_beginTransaction (accessMode, transactionConfig) {
let txConfig = TxConfig.empty()
if (transactionConfig) {
txConfig = new TxConfig(transactionConfig)
}

return new Observable(observer => {
try {
observer.next(
new RxTransaction(
this._session._beginTransaction(accessMode, txConfig)
)
)
observer.complete()
} catch (err) {
observer.error(err)
}
const txConfig = transactionConfig
? new TxConfig(transactionConfig)
: TxConfig.empty()

return () => {}
})
return defer(() => [
new RxTransaction(this._session._beginTransaction(accessMode, txConfig))
])
}

/**
* @private
*/
_runTransaction (accessMode, work, transactionConfig) {
let txConfig = TxConfig.empty()
if (transactionConfig) {
txConfig = new TxConfig(transactionConfig)
}

return this._retryLogic.retry(
this._beginTransaction(accessMode, transactionConfig).pipe(
flatMap(txc =>
defer(() => {
try {
return work(txc)
} catch (err) {
return throwError(err)
return throwError(() => err)
}
}).pipe(
catchError(err => txc.rollback().pipe(concat(throwError(err)))),
catchError(err =>
txc.rollback().pipe(concat(throwError(() => err)))
),
concat(txc.commit())
)
)
Expand Down
38 changes: 6 additions & 32 deletions src/transaction-rx.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Observable } from 'rxjs'
import { defer, from, Observable } from 'rxjs'
import RxResult from './result-rx'
import Transaction from 'neo4j-driver-core'

Expand All @@ -41,53 +41,27 @@ export default class RxTransaction {
* @param {Object} parameters - Parameter values to use in query execution.
* @returns {RxResult} - A reactive result
*/

run (query, parameters) {
return new RxResult(
new Observable(observer => {
try {
observer.next(this._txc.run(query, parameters))
observer.complete()
} catch (err) {
observer.error(err)
}

return () => {}
})
)
return new RxResult(defer(() => [this._txc.run(query, parameters)]))
}

/**
* Commits the transaction.
*
* @public
* @returns {Observable} - An empty observable
* @returns {Observable<void>} - An empty observable
*/
commit () {
return new Observable(observer => {
this._txc
.commit()
.then(() => {
observer.complete()
})
.catch(err => observer.error(err))
})
return from(this._txc.commit())
}

/**
* Rolls back the transaction.
*
* @public
* @returns {Observable} - An empty observable
* @returns {Observable<void>} - An empty observable
*/
rollback () {
return new Observable(observer => {
this._txc
.rollback()
.then(() => {
observer.complete()
})
.catch(err => observer.error(err))
})
return from(this._txc.rollback())
}
}
25 changes: 11 additions & 14 deletions test/examples.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import neo4j from '../src'
import sharedNeo4j from './internal/shared-neo4j'
import { ServerVersion, VERSION_4_0_0 } from '../src/internal/server-version'
import { map, materialize, toArray } from 'rxjs/operators'
import { Notification } from 'rxjs'
import { lastValueFrom } from 'rxjs'

/**
* The tests below are examples that get pulled into the Driver Manual using the tags inside the tests.
Expand Down Expand Up @@ -143,12 +143,9 @@ describe('#integration examples', () => {
}

// end::rx-autocommit-transaction[]
const result = await readProductTitles().toPromise()
const result = await lastValueFrom(readProductTitles())

expect(result).toEqual([
Notification.createNext('Product-0'),
Notification.createComplete()
])
expect(result).toEqual([{ kind: 'N', value: 'Product-0' }, { kind: 'C' }])
}, 60000)

it('basic auth example', async () => {
Expand Down Expand Up @@ -623,11 +620,11 @@ describe('#integration examples', () => {
)
// end::rx-result-consume[]

const people = await result.toPromise()
const people = await lastValueFrom(result)
expect(people).toEqual([
Notification.createNext('Alice'),
Notification.createNext('Bob'),
Notification.createComplete()
{ kind: 'N', value: 'Alice' },
{ kind: 'N', value: 'Bob' },
{ kind: 'C' }
])
}, 60000)

Expand Down Expand Up @@ -802,11 +799,11 @@ describe('#integration examples', () => {
)
// end::rx-transaction-function[]

const people = await result.toPromise()
const people = await lastValueFrom(result)
expect(people).toEqual([
Notification.createNext('Infinity Gauntlet'),
Notification.createNext('Mjölnir'),
Notification.createComplete()
{ kind: 'N', value: 'Infinity Gauntlet' },
{ kind: 'N', value: 'Mjölnir' },
{ kind: 'C' }
])
}, 60000)

Expand Down
2 changes: 1 addition & 1 deletion test/internal/retry-logic-rx.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ describe('#unit-rx retrylogic', () => {
clock.tick(delayBy)
}
if (index < errors.length) {
return throwError(errors[index++])
return throwError(() => errors[index++])
} else {
return of(value)
}
Expand Down
Loading