From b45d91b0d60e587c1edbb071c21e49d767a51a1b Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 17 Feb 2022 17:25:15 +0100 Subject: [PATCH 1/8] Introduce Testkit Reactive Backend This implementation of the Testkit Backend handles requests with using the `driver.rxSession`, so this is not compatible with the `neo4j-driver-lite`. For introducing the new backend handlers, the previous ones had to be refactoty for decoupling part of the code for the protocol and enable it to be used by the two set of handlers. --- .../testkit-backend/src/controller/local.js | 29 +++--- .../testkit-backend/src/request-handlers.js | 90 +++++++---------- packages/testkit-backend/src/responses.js | 98 +++++++++++++++++++ 3 files changed, 149 insertions(+), 68 deletions(-) create mode 100644 packages/testkit-backend/src/responses.js diff --git a/packages/testkit-backend/src/controller/local.js b/packages/testkit-backend/src/controller/local.js index a4faddaee..968662f7f 100644 --- a/packages/testkit-backend/src/controller/local.js +++ b/packages/testkit-backend/src/controller/local.js @@ -35,38 +35,45 @@ export default class LocalController extends Controller { } return await this._requestHandlers[name](this._contexts.get(contextId), data, { - writeResponse: (name, data) => this._writeResponse(contextId, name, data), + writeResponse: (response) => this._writeResponse(contextId, response), writeError: (e) => this._writeError(contextId, e), writeBackendError: (msg) => this._writeBackendError(contextId, msg) }) } - _writeResponse (contextId, name, data) { - console.log('> writing response', name, data) - let response = { - name: name, - data: data - } - + _writeResponse (contextId, response) { + console.log('> writing response', response.name, response.data) this.emit('response', { contextId, response }) } _writeBackendError (contextId, msg) { - this._writeResponse(contextId, 'BackendError', { msg: msg }) + this._writeResponse(contextId, newResponse('BackendError', { msg: msg })) } _writeError (contextId, e) { if (e.name) { const id = this._contexts.get(contextId).addError(e) - this._writeResponse(contextId, 'DriverError', { + this._writeResponse(contextId, newResponse('DriverError', { id, msg: e.message + ' (' + e.code + ')', code: e.code - }) + })) return } this._writeBackendError(contextId, e) } + + _msg (name, data) { + return { + name, data + } + } } + +function newResponse (name, data) { + return { + name, data + } +} diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index 2611312d7..ab7a358a0 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -1,11 +1,8 @@ import neo4j from './neo4j' import { - cypherToNative, - nativeToCypher, + cypherToNative } from './cypher-native-binders.js' -import { - nativeToTestkitSummary, -} from './summary-binder.js' +import * as responses from './responses.js' import tls from 'tls' const SUPPORTED_TLS = (() => { @@ -55,7 +52,7 @@ export function NewDriver (context, data, wire) { ? address => new Promise((resolve, reject) => { const id = context.addResolverRequest(resolve, reject) - wire.writeResponse('ResolverResolutionRequired', { id, address }) + wire.writeResponse(responses.ResolverResolutionRequired({ id, address })) }) : undefined const config = { @@ -102,7 +99,7 @@ export function NewDriver (context, data, wire) { return } const id = context.addDriver(driver) - wire.writeResponse('Driver', { id }) + wire.writeResponse(responses.Driver({ id })) } export function DriverClose (context, data, wire) { @@ -111,7 +108,7 @@ export function DriverClose (context, data, wire) { return driver .close() .then(() => { - wire.writeResponse('Driver', { id: driverId }) + wire.writeResponse(responses.Driver({ id: driverId })) }) .catch(err => wire.writeError(err)) .finally(() => context.removeDriver(driverId)) @@ -139,7 +136,7 @@ export function NewSession (context, data, wire) { impersonatedUser }) const id = context.addSession(session) - wire.writeResponse('Session', { id }) + wire.writeResponse(responses.Session({ id })) } export function SessionClose (context, data, wire) { @@ -148,7 +145,7 @@ export function SessionClose (context, data, wire) { return session .close() .then(() => { - wire.writeResponse('Session', { id: sessionId }) + wire.writeResponse(responses.Session({ id: sessionId })) }) .catch(err => wire.writeError(err)) } @@ -173,7 +170,7 @@ export function SessionRun (context, data, wire) { let id = context.addResult(result) - wire.writeResponse('Result', { id }) + wire.writeResponse(responses.Result({ id })) } export function ResultNext (context, data, wire) { @@ -184,12 +181,9 @@ export function ResultNext (context, data, wire) { } return result.recordIt.next().then(({ value, done }) => { if (done) { - wire.writeResponse('NullRecord', null) + wire.writeResponse(responses.NullRecord()) } else { - const values = Array.from(value.values()).map(nativeToCypher) - wire.writeResponse('Record', { - values: values - }) + wire.writeResponse(responses.Record({ record: value })) } }).catch(e => { console.log('got some err: ' + JSON.stringify(e)) @@ -205,12 +199,9 @@ export function ResultPeek (context, data, wire) { } return result.recordIt.peek().then(({ value, done }) => { if (done) { - wire.writeResponse('NullRecord', null) + wire.writeResponse(responses.NullRecord()) } else { - const values = Array.from(value.values()).map(nativeToCypher) - wire.writeResponse('Record', { - values: values - }) + wire.writeResponse(responses.Record({ record: value })) } }).catch(e => { console.log('got some err: ' + JSON.stringify(e)) @@ -219,13 +210,11 @@ export function ResultPeek (context, data, wire) { } export function ResultConsume (context, data, wire) { - - const { resultId } = data const result = context.getResult(resultId) return result.summary().then(summary => { - wire.writeResponse('Summary', nativeToTestkitSummary(summary)) + wire.writeResponse(responses.Summary({ summary })) }).catch(e => wire.writeError(e)) } @@ -236,10 +225,7 @@ export function ResultList (context, data, wire) { return result .then(({ records }) => { - const cypherRecords = records.map(rec => { - return { values: Array.from(rec.values()).map(nativeToCypher) } - }) - wire.writeResponse('RecordList', { records: cypherRecords}) + wire.writeResponse(responses.RecordList({ records })) }) .catch(error => wire.writeError(error)) } @@ -252,10 +238,10 @@ export function SessionReadTransaction (context, data, wire) { tx => new Promise((resolve, reject) => { const id = context.addTx(tx, sessionId, resolve, reject) - wire.writeResponse('RetryableTry', { id }) + wire.writeResponse(responses.RetryableTry({ id })) }) , { metadata }) - .then(_ => wire.writeResponse('RetryableDone', null)) + .then(_ => wire.writeResponse(responses.RetryableDone())) .catch(error => wire.writeError(error)) } @@ -270,7 +256,7 @@ export function TransactionRun (context, data, wire) { const result = tx.tx.run(cypher, params) const id = context.addResult(result) - wire.writeResponse('Result', { id }) + wire.writeResponse(responses.Result({ id })) } export function RetryablePositive (context, data, wire) { @@ -296,7 +282,7 @@ export function SessionBeginTransaction (context, data, wire) { return session.beginTransaction({ metadata, timeout }) .then(tx => { const id = context.addTx(tx, sessionId) - wire.writeResponse('Transaction', { id }) + wire.writeResponse(responses.Transaction({ id })) }).catch(e => { console.log('got some err: ' + JSON.stringify(e)) wire.writeError(e) @@ -312,7 +298,7 @@ export function TransactionCommit (context, data, wire) { const { txId: id } = data const { tx } = context.getTx(id) return tx.commit() - .then(() => wire.writeResponse('Transaction', { id })) + .then(() => wire.writeResponse(responses.Transaction({ id }))) .catch(e => { console.log('got some err: ' + JSON.stringify(e)) wire.writeError(e) @@ -323,7 +309,7 @@ export function TransactionRollback (context, data, wire) { const { txId: id } = data const { tx } = context.getTx(id) return tx.rollback() - .then(() => wire.writeResponse('Transaction', { id })) + .then(() => wire.writeResponse(responses.Transaction({ id }))) .catch(e => wire.writeError(e)) } @@ -331,7 +317,7 @@ export function TransactionClose (context, data, wire) { const { txId: id } = data const { tx } = context.getTx(id) return tx.close() - .then(() => wire.writeResponse('Transaction', { id })) + .then(() => wire.writeResponse(responses.Transaction({ id }))) .catch(e => wire.writeError(e)) } @@ -339,7 +325,7 @@ export function SessionLastBookmarks (context, data, wire) { const { sessionId } = data const session = context.getSession(sessionId) const bookmarks = session.lastBookmarks() - wire.writeResponse('Bookmarks', { bookmarks }) + wire.writeResponse(responses.Bookmarks({ bookmarks })) } export function SessionWriteTransaction (context, data, wire) { @@ -350,23 +336,23 @@ export function SessionWriteTransaction (context, data, wire) { tx => new Promise((resolve, reject) => { const id = context.addTx(tx, sessionId, resolve, reject) - wire.writeResponse('RetryableTry', { id }) + wire.writeResponse(responses.RetryableTry({ id })) }) , { metadata }) - .then(_ => wire.writeResponse('RetryableDone', null)) + .then(_ => wire.writeResponse(responses.RetryableDone())) .catch(error => wire.writeError(error)) } export function StartTest (context, { testName }, wire) { const shouldRunTest = context.getShouldRunTestFunction() shouldRunTest(testName, { - onRun: () => wire.writeResponse('RunTest', null), - onSkip: reason => wire.writeResponse('SkipTest', { reason }) + onRun: () => wire.writeResponse(responses.RunTest()), + onSkip: reason => wire.writeResponse(responses.SkipTest({ reason })) }) } export function GetFeatures (_context, _params, wire) { - wire.writeResponse('FeatureList', { + wire.writeResponse(responses.FeatureList({ features: [ 'Feature:Auth:Custom', 'Feature:Auth:Kerberos', @@ -401,14 +387,14 @@ export function GetFeatures (_context, _params, wire) { 'Temporary:TransactionClose', ...SUPPORTED_TLS ] - }) + })) } export function VerifyConnectivity (context, { driverId }, wire) { const driver = context.getDriver(driverId) return driver .verifyConnectivity() - .then(() => wire.writeResponse('Driver', { id: driverId })) + .then(() => wire.writeResponse(responses.Driver({ id: driverId }))) .catch(error => wire.writeError(error)) } @@ -416,10 +402,7 @@ export function GetServerInfo (context, { driverId }, wire) { const driver = context.getDriver(driverId) return driver .getServerInfo() - .then(info => wire.writeResponse('ServerInfo', { - ...info, - protocolVersion: info.protocolVersion.toFixed(1) - })) + .then(serverInfo => wire.writeResponse(responses.ServerInfo({ serverInfo }))) .catch(error => wire.writeError(error)) } @@ -428,7 +411,7 @@ export function CheckMultiDBSupport (context, { driverId }, wire) { return driver .supportsMultiDb() .then(available => - wire.writeResponse('MultiDBSupport', { id: driverId, available }) + wire.writeResponse(responses.MultiDBSupport({ id: driverId, available })) ) .catch(error => wire.writeError(error)) } @@ -443,7 +426,6 @@ export function ResolverResolutionCompleted ( } export function GetRoutingTable (context, { driverId, database }, wire) { - const serverAddressToString = serverAddress => serverAddress.asHostPort() const driver = context.getDriver(driverId) const routingTable = driver && @@ -460,13 +442,7 @@ export function GetRoutingTable (context, { driverId, database }, wire) { }) if (routingTable) { - wire.writeResponse('RoutingTable', { - database: routingTable.database, - ttl: Number(routingTable.ttl), - readers: routingTable.readers.map(serverAddressToString), - writers: routingTable.writers.map(serverAddressToString), - routers: routingTable.routers.map(serverAddressToString) - }) + wire.writeResponse(responses.RoutingTable({ routingTable })) } else { wire.writeError('Driver does not support routing') } @@ -485,7 +461,7 @@ export function ForcedRoutingTableUpdate (context, { driverId, database, bookmar bookmarks: bookmarks, onDatabaseNameResolved: () => {} }) - .then(() => wire.writeResponse("Driver", { "id": driverId })) + .then(() => wire.writeResponse(responses.Driver({ "id": driverId }))) .catch(error => wire.writeError(error)) } else { wire.writeError('Driver does not support routing') diff --git a/packages/testkit-backend/src/responses.js b/packages/testkit-backend/src/responses.js new file mode 100644 index 000000000..9ae1ec5f9 --- /dev/null +++ b/packages/testkit-backend/src/responses.js @@ -0,0 +1,98 @@ +import { + nativeToCypher +} from './cypher-native-binders.js' + +import { + nativeToTestkitSummary +} from './summary-binder.js' + +export function Driver ({ id }) { + return response('Driver', { id }) +} + +export function ResolverResolutionRequired ({ id, address }) { + return response('ResolverResolutionRequired', { id, address }) +} + +export function Session ({ id }) { + return response('Session', { id }) +} + +export function Transaction ({ id }) { + return response('Transaction', { id }) +} + +export function RetryableTry ({ id }) { + return response('RetryableTry', { id }) +} + +export function RetryableDone () { + return response('RetryableDone', null) +} + +export function Result ({ id }) { + return response('Result', { id }) +} + +export function NullRecord () { + return response('NullRecord', null) +} + +export function Record ({ record }) { + const values = Array.from(record.values()).map(nativeToCypher) + return response('Record', { values }) +} + +export function RecordList ({ records }) { + const cypherRecords = records.map(rec => { + return { values: Array.from(rec.values()).map(nativeToCypher) } + }) + return response('RecordList', { records: cypherRecords }) +} + +export function Summary ({ summary }) { + return response('Summary', nativeToTestkitSummary(summary)) +} + +export function Bookmarks ({ bookmarks }) { + return response('Bookmarks', { bookmarks }) +} + +export function ServerInfo ({ serverInfo }) { + return response('ServerInfo', { + ...serverInfo, + protocolVersion: serverInfo.protocolVersion.toFixed(1) + }) +} + +export function MultiDBSupport ({ id, available }) { + return response('MultiDBSupport', { id, available }) +} + +export function RoutingTable ({ routingTable }) { + const serverAddressToString = serverAddress => serverAddress.asHostPort() + return response('RoutingTable', { + database: routingTable.database, + ttl: Number(routingTable.ttl), + readers: routingTable.readers.map(serverAddressToString), + writers: routingTable.writers.map(serverAddressToString), + routers: routingTable.routers.map(serverAddressToString) + }) +} + +// Testkit controller messages +export function RunTest () { + return response('RunTest', null) +} + +export function SkipTest ({ reason }) { + return response('SkipTest', { reason }) +} + +export function FeatureList ({ features }) { + return response('FeatureList', { features }) +} + +function response(name, data) { + return { name, data } +} From bdd6f49527348c241b6372a586781df537baa76d Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 17 Feb 2022 17:48:37 +0100 Subject: [PATCH 2/8] Introduce `request-handlers-rx` with the common methods in place --- packages/testkit-backend/src/index.js | 14 +++++++++++--- .../testkit-backend/src/request-handlers-rx.js | 15 +++++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) create mode 100644 packages/testkit-backend/src/request-handlers-rx.js diff --git a/packages/testkit-backend/src/index.js b/packages/testkit-backend/src/index.js index 0f2db7c53..a8cb2d340 100644 --- a/packages/testkit-backend/src/index.js +++ b/packages/testkit-backend/src/index.js @@ -2,7 +2,8 @@ import Backend from './backend' import { SocketChannel, WebSocketChannel } from './channel' import { LocalController, RemoteController } from './controller' import { getShouldRunTest } from './skipped-tests' -import * as REQUEST_HANDLERS from './request-handlers' +import * as REQUEST_HANDLERS from './request-handlers.js' +import * as RX_REQUEST_HANDLERS from './request-handlers-rx.js' /** * Responsible for configure and run the backend server. @@ -13,6 +14,7 @@ function main( ) { const backendPort = process.env.BACKEND_PORT || 9876 const webserverPort = process.env.WEB_SERVER_PORT || 8000 const driverDescriptor = process.env.DRIVER_DESCRIPTOR || '' + const sessionType = process.env.SESSION_TYPE || 'RX' const driverDescriptorList = driverDescriptor .split(',').map(s => s.trim().toLowerCase()) @@ -21,7 +23,6 @@ function main( ) { const newChannel = () => { if ( channelType.toUpperCase() === 'WEBSOCKET' ) { return new WebSocketChannel(new URL(`ws://localhost:${backendPort}`)) - } return new SocketChannel(backendPort) } @@ -30,7 +31,7 @@ function main( ) { if ( testEnviroment.toUpperCase() === 'REMOTE' ) { return new RemoteController(webserverPort) } - return new LocalController(REQUEST_HANDLERS, shouldRunTest) + return new LocalController(getRequestHandlers(sessionType), shouldRunTest) } const backend = new Backend(newController, newChannel) @@ -52,4 +53,11 @@ function main( ) { } } +function getRequestHandlers(sessionType) { + if (sessionType.toUpperCase() === 'RX') { + return RX_REQUEST_HANDLERS + } + return REQUEST_HANDLERS +} + main() diff --git a/packages/testkit-backend/src/request-handlers-rx.js b/packages/testkit-backend/src/request-handlers-rx.js new file mode 100644 index 000000000..78c8513e0 --- /dev/null +++ b/packages/testkit-backend/src/request-handlers-rx.js @@ -0,0 +1,15 @@ +// Handlers which didn't change depending +export { + NewDriver, + DriverClose, + SessionLastBookmarks, + StartTest, + GetFeatures, + VerifyConnectivity, + GetServerInfo, + CheckMultiDBSupport, + ResolverResolutionCompleted, + GetRoutingTable, + ForcedRoutingTableUpdate, +} from './request-handlers'; + From 723fe14f5cb718131e6a89ecaebb4879d00110ad Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 17 Feb 2022 18:16:52 +0100 Subject: [PATCH 3/8] Add rx handlers for: NewSession, SessionClose, SessionRun --- .../src/request-handlers-rx.js | 62 ++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/packages/testkit-backend/src/request-handlers-rx.js b/packages/testkit-backend/src/request-handlers-rx.js index 78c8513e0..4f891fd29 100644 --- a/packages/testkit-backend/src/request-handlers-rx.js +++ b/packages/testkit-backend/src/request-handlers-rx.js @@ -1,3 +1,6 @@ +import * as responses from './responses.js'; +import neo4j from './neo4j.js'; + // Handlers which didn't change depending export { NewDriver, @@ -11,5 +14,62 @@ export { ResolverResolutionCompleted, GetRoutingTable, ForcedRoutingTableUpdate, -} from './request-handlers'; +} from './request-handlers.js'; + +export function NewSession(context, data, wire) { + let { driverId, accessMode, bookmarks, database, fetchSize, impersonatedUser } = data + switch (accessMode) { + case 'r': + accessMode = neo4j.session.READ + break + case 'w': + accessMode = neo4j.session.WRITE + break + default: + wire.writeBackendError('Unknown accessmode: ' + accessMode) + return + } + const driver = context.getDriver(driverId) + const session = driver.rxSession({ + defaultAccessMode: accessMode, + bookmarks, + database, + fetchSize, + impersonatedUser + }) + const id = context.addSession(session) + wire.writeResponse(responses.Session({ id })) +} + +export function SessionClose (context, data, wire) { + const { sessionId } = data + const session = context.getSession(sessionId) + return session + .close() + .toPromise() + .then(() => wire.writeResponse(responses.Session({ id: sessionId }))) + .catch(err => wire.writeError(err)) +} + +export function SessionRun (context, data, wire) { + const { sessionId, cypher, params, txMeta: metadata, timeout } = data + const session = context.getSession(sessionId) + if (params) { + for (const [key, value] of Object.entries(params)) { + params[key] = cypherToNative(value) + } + } + + let result + try { + result = session.run(cypher, params, { metadata, timeout }) + } catch (e) { + console.log('got some err: ' + JSON.stringify(e)) + wire.writeError(e) + return + } + + let id = context.addResult(result) + wire.writeResponse(responses.Result({ id })) +} From 5b70f95198acb0d7921db345479a8a3daf7f2351 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 17 Feb 2022 18:36:36 +0100 Subject: [PATCH 4/8] Add rx support for `ResultNext` and make tests on stub.session_run_parameters.test_session_run_parameters.TestSessionRunParameters pass --- .../src/request-handlers-rx.js | 113 ++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/packages/testkit-backend/src/request-handlers-rx.js b/packages/testkit-backend/src/request-handlers-rx.js index 4f891fd29..aa1b70186 100644 --- a/packages/testkit-backend/src/request-handlers-rx.js +++ b/packages/testkit-backend/src/request-handlers-rx.js @@ -1,5 +1,8 @@ import * as responses from './responses.js'; import neo4j from './neo4j.js'; +import { + cypherToNative +} from './cypher-native-binders.js' // Handlers which didn't change depending export { @@ -14,6 +17,7 @@ export { ResolverResolutionCompleted, GetRoutingTable, ForcedRoutingTableUpdate, + ResultNext } from './request-handlers.js'; export function NewSession(context, data, wire) { @@ -69,7 +73,116 @@ export function SessionRun (context, data, wire) { return } + result[Symbol.asyncIterator] = () => toAsyncIterator(result) + let id = context.addResult(result) wire.writeResponse(responses.Result({ id })) } + +function toAsyncIterator(result) { + function queueObserver () { + function createResolvablePromise (){ + const resolvablePromise = {} + resolvablePromise.promise = new Promise((resolve, reject) => { + resolvablePromise.resolve = resolve + resolvablePromise.reject = reject + }); + return resolvablePromise; + } + + + function isError(elementOrError){ + return elementOrError instanceof Error + } + + const buffer = [] + const promiseHolder = { resolvable: null } + + const observer = { + next: (record) => { + observer._push({ done: false, value: record }) + }, + complete: (summary) => { + observer._push({ done: true, value: summary }) + }, + error: (error) => { + observer._push(error) + }, + _push(element) { + if (promiseHolder.resolvable !== null) { + const resolvable = promiseHolder.resolvable + promiseHolder.resolvable = null + if (isError(element)) { + resolvable.reject(element) + } else { + resolvable.resolve(element) + } + } else { + buffer.push(element) + } + }, + dequeue: async () => { + if (buffer.length > 0) { + const element = buffer.shift() + if (isError(element)) { + throw element + } + return element + } + promiseHolder.resolvable = createResolvablePromise() + return await promiseHolder.resolvable.promise + }, + head: async () => { + if (buffer.length > 0) { + const element = buffer[0] + if (isError(element)) { + throw element + } + return element + } + promiseHolder.resolvable = createResolvablePromise() + try { + const element = await promiseHolder.resolvable.promise + buffer.unshift(element) + return element + } catch (error) { + buffer.unshift(error) + throw error + } + }, + get size () { + return buffer.length + } + } + + return observer + } + const records = result.records() + + const observer = queueObserver() + records.subscribe(observer); + + const state = { + finished: false + } + + return { + next: async () => { + if (state.finished) { + return { done: true } + } + const next = observer.dequeue() + if (next.done) { + state.finished = next.done + state.summary = next.value + } + return next + }, + return: async (value) => { + state.finished = true + state.summary = value + return { done: true, value: value } + } + } +} From affc1c6cb2e95e78472c5b44ab175e8627cdd3ce Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 17 Feb 2022 18:45:26 +0100 Subject: [PATCH 5/8] Add rx support for SessionBeginTransaction, TransactionRun and TransactionCommit --- .../src/request-handlers-rx.js | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/packages/testkit-backend/src/request-handlers-rx.js b/packages/testkit-backend/src/request-handlers-rx.js index aa1b70186..f85f23883 100644 --- a/packages/testkit-backend/src/request-handlers-rx.js +++ b/packages/testkit-backend/src/request-handlers-rx.js @@ -80,6 +80,54 @@ export function SessionRun (context, data, wire) { wire.writeResponse(responses.Result({ id })) } +export function SessionBeginTransaction (context, data, wire) { + const { sessionId, txMeta: metadata, timeout } = data + const session = context.getSession(sessionId) + + try { + return session.beginTransaction({ metadata, timeout }) + .toPromise() + .then(tx => { + const id = context.addTx(tx, sessionId) + wire.writeResponse(responses.Transaction({ id })) + }).catch(e => { + console.log('got some err: ' + JSON.stringify(e)) + wire.writeError(e) + }) + } catch (e) { + console.log('got some err: ' + JSON.stringify(e)) + wire.writeError(e) + return + } +} + +export function TransactionRun (context, data, wire) { + const { txId, cypher, params } = data + const tx = context.getTx(txId) + if (params) { + for (const [key, value] of Object.entries(params)) { + params[key] = cypherToNative(value) + } + } + const result = tx.tx.run(cypher, params) + result[Symbol.asyncIterator] = () => toAsyncIterator(result) + const id = context.addResult(result) + + wire.writeResponse(responses.Result({ id })) +} + +export function TransactionCommit (context, data, wire) { + const { txId: id } = data + const { tx } = context.getTx(id) + return tx.commit() + .toPromise() + .then(() => wire.writeResponse(responses.Transaction({ id }))) + .catch(e => { + console.log('got some err: ' + JSON.stringify(e)) + wire.writeError(e) + }) +} + function toAsyncIterator(result) { function queueObserver () { function createResolvablePromise (){ From 2ad97e1cc8d59ba0290636b5057c877f88e9a0ec Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Mon, 21 Feb 2022 12:53:54 +0100 Subject: [PATCH 6/8] Add TxFunctions and ResultConsume to RX. Segregate features and skipped tests --- packages/testkit-backend/src/context.js | 7 +- .../testkit-backend/src/controller/local.js | 5 +- packages/testkit-backend/src/feature/async.js | 6 ++ .../testkit-backend/src/feature/common.js | 51 ++++++++++++++ packages/testkit-backend/src/feature/index.js | 18 +++++ packages/testkit-backend/src/feature/rx.js | 6 ++ packages/testkit-backend/src/index.js | 7 +- .../src/request-handlers-rx.js | 70 ++++++++++++++++++- .../testkit-backend/src/request-handlers.js | 51 +------------- .../src/skipped-tests/index.js | 4 +- .../testkit-backend/src/skipped-tests/rx.js | 11 +++ 11 files changed, 180 insertions(+), 56 deletions(-) create mode 100644 packages/testkit-backend/src/feature/async.js create mode 100644 packages/testkit-backend/src/feature/common.js create mode 100644 packages/testkit-backend/src/feature/index.js create mode 100644 packages/testkit-backend/src/feature/rx.js create mode 100644 packages/testkit-backend/src/skipped-tests/rx.js diff --git a/packages/testkit-backend/src/context.js b/packages/testkit-backend/src/context.js index df1c80b1e..bbe8dc234 100644 --- a/packages/testkit-backend/src/context.js +++ b/packages/testkit-backend/src/context.js @@ -1,5 +1,5 @@ export default class Context { - constructor (shouldRunTest) { + constructor (shouldRunTest, getFeatures) { this._id = 0 this._drivers = {} this._sessions = {} @@ -7,6 +7,7 @@ export default class Context { this._resolverRequests = {} this._errors = {} this._shouldRunTest = shouldRunTest + this._getFeatures = getFeatures this._results = {} } @@ -101,6 +102,10 @@ export default class Context { return this._shouldRunTest } + getFeatures() { + return this._getFeatures() + } + _add (map, object) { this._id++ map[this._id] = object diff --git a/packages/testkit-backend/src/controller/local.js b/packages/testkit-backend/src/controller/local.js index 968662f7f..c5aa1aad6 100644 --- a/packages/testkit-backend/src/controller/local.js +++ b/packages/testkit-backend/src/controller/local.js @@ -10,15 +10,16 @@ import stringify from '../stringify' */ export default class LocalController extends Controller { - constructor(requestHandlers = {}, shouldRunTest = () => {}) { + constructor(requestHandlers = {}, shouldRunTest = () => {}, getFeatures = () => []) { super() this._requestHandlers = requestHandlers this._shouldRunTest = shouldRunTest + this._getFeatures = getFeatures this._contexts = new Map() } openContext (contextId) { - this._contexts.set(contextId, new Context(this._shouldRunTest)) + this._contexts.set(contextId, new Context(this._shouldRunTest, this._getFeatures)) } closeContext (contextId) { diff --git a/packages/testkit-backend/src/feature/async.js b/packages/testkit-backend/src/feature/async.js new file mode 100644 index 000000000..a5fd52593 --- /dev/null +++ b/packages/testkit-backend/src/feature/async.js @@ -0,0 +1,6 @@ + +const features = [ + +] + +export default features diff --git a/packages/testkit-backend/src/feature/common.js b/packages/testkit-backend/src/feature/common.js new file mode 100644 index 000000000..e162a6e14 --- /dev/null +++ b/packages/testkit-backend/src/feature/common.js @@ -0,0 +1,51 @@ +import tls from 'tls' + +const SUPPORTED_TLS = (() => { + if (tls.DEFAULT_MAX_VERSION) { + const min = Number(tls.DEFAULT_MIN_VERSION.split('TLSv')[1]) + const max = Number(tls.DEFAULT_MAX_VERSION.split('TLSv')[1]) + const result = []; + for (let version = min > 1 ? min : 1.1; version <= max; version = Number((version + 0.1).toFixed(1)) ) { + result.push(`Feature:TLS:${version.toFixed(1)}`) + } + return result; + } + return []; +})(); + +const features = [ + 'Feature:Auth:Custom', + 'Feature:Auth:Kerberos', + 'Feature:Auth:Bearer', + 'Feature:API:SSLConfig', + 'Feature:API:SSLSchemes', + 'AuthorizationExpiredTreatment', + 'ConfHint:connection.recv_timeout_seconds', + 'Feature:Impersonation', + 'Feature:Bolt:3.0', + 'Feature:Bolt:4.1', + 'Feature:Bolt:4.2', + 'Feature:Bolt:4.3', + 'Feature:Bolt:4.4', + 'Feature:API:Driver:GetServerInfo', + 'Feature:API:Driver.VerifyConnectivity', + 'Feature:API:Result.List', + 'Feature:API:Result.Peek', + 'Feature:Configuration:ConnectionAcquisitionTimeout', + 'Optimization:EagerTransactionBegin', + 'Optimization:ImplicitDefaultArguments', + 'Optimization:PullPipelining', + 'Temporary:ConnectionAcquisitionTimeout', + 'Temporary:CypherPathAndRelationship', + 'Temporary:DriverFetchSize', + 'Temporary:DriverMaxConnectionPoolSize', + 'Temporary:DriverMaxTxRetryTime', + 'Temporary:GetConnectionPoolMetrics', + 'Temporary:FastFailingDiscovery', + 'Temporary:FullSummary', + 'Temporary:ResultKeys', + 'Temporary:TransactionClose', + ...SUPPORTED_TLS +] + +export default features diff --git a/packages/testkit-backend/src/feature/index.js b/packages/testkit-backend/src/feature/index.js new file mode 100644 index 000000000..6f3e815bf --- /dev/null +++ b/packages/testkit-backend/src/feature/index.js @@ -0,0 +1,18 @@ +import commonFeatures from './common' +import rxFeatures from './rx' +import asyncFeatures from './async' + +const featuresByContext = new Map([ + ['async', asyncFeatures], + ['rx', rxFeatures], +]) + + +export function createGetFeatures (contexts) { + const features = contexts + .filter(context => featuresByContext.has(context)) + .map(context => featuresByContext.get(context)) + .reduce((previous, current) => [ ...previous, ...current ], commonFeatures) + + return () => features +} diff --git a/packages/testkit-backend/src/feature/rx.js b/packages/testkit-backend/src/feature/rx.js new file mode 100644 index 000000000..a5fd52593 --- /dev/null +++ b/packages/testkit-backend/src/feature/rx.js @@ -0,0 +1,6 @@ + +const features = [ + +] + +export default features diff --git a/packages/testkit-backend/src/index.js b/packages/testkit-backend/src/index.js index a8cb2d340..cbc86f1a5 100644 --- a/packages/testkit-backend/src/index.js +++ b/packages/testkit-backend/src/index.js @@ -2,6 +2,7 @@ import Backend from './backend' import { SocketChannel, WebSocketChannel } from './channel' import { LocalController, RemoteController } from './controller' import { getShouldRunTest } from './skipped-tests' +import { createGetFeatures } from './feature' import * as REQUEST_HANDLERS from './request-handlers.js' import * as RX_REQUEST_HANDLERS from './request-handlers-rx.js' @@ -15,10 +16,12 @@ function main( ) { const webserverPort = process.env.WEB_SERVER_PORT || 8000 const driverDescriptor = process.env.DRIVER_DESCRIPTOR || '' const sessionType = process.env.SESSION_TYPE || 'RX' + const sessionTypeDescriptor = sessionType === 'RX' ? 'rx' : 'async' const driverDescriptorList = driverDescriptor .split(',').map(s => s.trim().toLowerCase()) - const shouldRunTest = getShouldRunTest(driverDescriptorList) + const shouldRunTest = getShouldRunTest([...driverDescriptorList, sessionTypeDescriptor]) + const getFeatures = createGetFeatures([sessionTypeDescriptor]) const newChannel = () => { if ( channelType.toUpperCase() === 'WEBSOCKET' ) { @@ -31,7 +34,7 @@ function main( ) { if ( testEnviroment.toUpperCase() === 'REMOTE' ) { return new RemoteController(webserverPort) } - return new LocalController(getRequestHandlers(sessionType), shouldRunTest) + return new LocalController(getRequestHandlers(sessionType), shouldRunTest, getFeatures) } const backend = new Backend(newController, newChannel) diff --git a/packages/testkit-backend/src/request-handlers-rx.js b/packages/testkit-backend/src/request-handlers-rx.js index f85f23883..6ca72660c 100644 --- a/packages/testkit-backend/src/request-handlers-rx.js +++ b/packages/testkit-backend/src/request-handlers-rx.js @@ -3,6 +3,7 @@ import neo4j from './neo4j.js'; import { cypherToNative } from './cypher-native-binders.js' +import { from } from 'rxjs'; // Handlers which didn't change depending export { @@ -17,7 +18,9 @@ export { ResolverResolutionCompleted, GetRoutingTable, ForcedRoutingTableUpdate, - ResultNext + ResultNext, + RetryablePositive, + RetryableNegative, } from './request-handlers.js'; export function NewSession(context, data, wire) { @@ -80,6 +83,18 @@ export function SessionRun (context, data, wire) { wire.writeResponse(responses.Result({ id })) } +export function ResultConsume (context, data, wire) { + const { resultId } = data + const result = context.getResult(resultId) + + return result.consume() + .toPromise() + .then(summary => { + wire.writeResponse(responses.Summary({ summary })) + }).catch(e => wire.writeError(e)) +} + + export function SessionBeginTransaction (context, data, wire) { const { sessionId, txMeta: metadata, timeout } = data const session = context.getSession(sessionId) @@ -116,6 +131,18 @@ export function TransactionRun (context, data, wire) { wire.writeResponse(responses.Result({ id })) } +export function TransactionRollback (context, data, wire) { + const { txId: id } = data + const { tx } = context.getTx(id) + return tx.rollback() + .toPromise() + .then(() => wire.writeResponse(responses.Transaction({ id }))) + .catch(e => { + console.log('got some err: ' + JSON.stringify(e)) + wire.writeError(e) + }) +} + export function TransactionCommit (context, data, wire) { const { txId: id } = data const { tx } = context.getTx(id) @@ -128,6 +155,47 @@ export function TransactionCommit (context, data, wire) { }) } +export function SessionReadTransaction (context, data, wire) { + const { sessionId, txMeta: metadata } = data + const session = context.getSession(sessionId) + + try { + return session.readTransaction(tx => { + return from(new Promise((resolve, reject) => { + const id = context.addTx(tx, sessionId, resolve, reject) + wire.writeResponse(responses.RetryableTry({ id })) + })) + }, { metadata }) + .toPromise() + .then(() => wire.writeResponse(responses.RetryableDone())) + .catch(e => wire.writeError(e)) + } catch (e) { + wire.writeError(e) + return + } +} + +export function SessionWriteTransaction (context, data, wire) { + const { sessionId, txMeta: metadata } = data + const session = context.getSession(sessionId) + + try { + return session.writeTransaction(tx => { + return from(new Promise((resolve, reject) => { + const id = context.addTx(tx, sessionId, resolve, reject) + wire.writeResponse(responses.RetryableTry({ id })) + })) + }, { metadata }) + .toPromise() + .then(() => wire.writeResponse(responses.RetryableDone())) + .catch(e => wire.writeError(e)) + } catch (e) { + wire.writeError(e) + return + } +} + + function toAsyncIterator(result) { function queueObserver () { function createResolvablePromise (){ diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index ab7a358a0..01e8b420b 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -3,20 +3,6 @@ import { cypherToNative } from './cypher-native-binders.js' import * as responses from './responses.js' -import tls from 'tls' - -const SUPPORTED_TLS = (() => { - if (tls.DEFAULT_MAX_VERSION) { - const min = Number(tls.DEFAULT_MIN_VERSION.split('TLSv')[1]) - const max = Number(tls.DEFAULT_MAX_VERSION.split('TLSv')[1]) - const result = []; - for (let version = min > 1 ? min : 1.1; version <= max; version = Number((version + 0.1).toFixed(1)) ) { - result.push(`Feature:TLS:${version.toFixed(1)}`) - } - return result; - } - return []; -})(); export function NewDriver (context, data, wire) { const { @@ -351,42 +337,9 @@ export function StartTest (context, { testName }, wire) { }) } -export function GetFeatures (_context, _params, wire) { +export function GetFeatures (context, _params, wire) { wire.writeResponse(responses.FeatureList({ - features: [ - 'Feature:Auth:Custom', - 'Feature:Auth:Kerberos', - 'Feature:Auth:Bearer', - 'Feature:API:SSLConfig', - 'Feature:API:SSLSchemes', - 'AuthorizationExpiredTreatment', - 'ConfHint:connection.recv_timeout_seconds', - 'Feature:Impersonation', - 'Feature:Bolt:3.0', - 'Feature:Bolt:4.1', - 'Feature:Bolt:4.2', - 'Feature:Bolt:4.3', - 'Feature:Bolt:4.4', - 'Feature:API:Driver:GetServerInfo', - 'Feature:API:Driver.VerifyConnectivity', - 'Feature:API:Result.List', - 'Feature:API:Result.Peek', - 'Feature:Configuration:ConnectionAcquisitionTimeout', - 'Optimization:EagerTransactionBegin', - 'Optimization:ImplicitDefaultArguments', - 'Optimization:PullPipelining', - 'Temporary:ConnectionAcquisitionTimeout', - 'Temporary:CypherPathAndRelationship', - 'Temporary:DriverFetchSize', - 'Temporary:DriverMaxConnectionPoolSize', - 'Temporary:DriverMaxTxRetryTime', - 'Temporary:GetConnectionPoolMetrics', - 'Temporary:FastFailingDiscovery', - 'Temporary:FullSummary', - 'Temporary:ResultKeys', - 'Temporary:TransactionClose', - ...SUPPORTED_TLS - ] + features: context.getFeatures() })) } diff --git a/packages/testkit-backend/src/skipped-tests/index.js b/packages/testkit-backend/src/skipped-tests/index.js index 946f114ba..4f135b291 100644 --- a/packages/testkit-backend/src/skipped-tests/index.js +++ b/packages/testkit-backend/src/skipped-tests/index.js @@ -1,8 +1,10 @@ import commonSkippedTests from './common' import browserSkippedTests from './browser' +import rxSessionSkippedTests from './rx' const skippedTestsByContext = new Map([ - ['browser', browserSkippedTests] + ['browser', browserSkippedTests], + ['rx', rxSessionSkippedTests], ]) export function getShouldRunTest (contexts) { diff --git a/packages/testkit-backend/src/skipped-tests/rx.js b/packages/testkit-backend/src/skipped-tests/rx.js new file mode 100644 index 000000000..84afc0d67 --- /dev/null +++ b/packages/testkit-backend/src/skipped-tests/rx.js @@ -0,0 +1,11 @@ +import { skip, ifEndsWith } from './skip' + +const skippedTests = [ + skip( + 'Blowing the backends up', + ifEndsWith('test_should_fail_when_writing_on_unexpectedly_interrupting_writer_using_session_run'), + ifEndsWith('test_should_fail_when_writing_on_unexpectedly_interrupting_writer_using_tx_run') + ) +] + +export default skippedTests From af9aa74b2672ffa73b3e586171883f844dcba12a Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Mon, 21 Feb 2022 14:29:34 +0100 Subject: [PATCH 7/8] Implement Transaction.close and skip unrelated features --- packages/testkit-backend/src/feature/async.js | 6 +- .../testkit-backend/src/feature/common.js | 3 - .../src/request-handlers-rx.js | 93 +++++++++++-------- .../testkit-backend/src/skipped-tests/rx.js | 9 +- 4 files changed, 64 insertions(+), 47 deletions(-) diff --git a/packages/testkit-backend/src/feature/async.js b/packages/testkit-backend/src/feature/async.js index a5fd52593..510225864 100644 --- a/packages/testkit-backend/src/feature/async.js +++ b/packages/testkit-backend/src/feature/async.js @@ -1,6 +1,8 @@ - const features = [ - + 'Feature:API:Result.List', + 'Feature:API:Result.Peek', + 'Optimization:EagerTransactionBegin', + 'Optimization:PullPipelining', ] export default features diff --git a/packages/testkit-backend/src/feature/common.js b/packages/testkit-backend/src/feature/common.js index e162a6e14..167f0f9ef 100644 --- a/packages/testkit-backend/src/feature/common.js +++ b/packages/testkit-backend/src/feature/common.js @@ -29,12 +29,9 @@ const features = [ 'Feature:Bolt:4.4', 'Feature:API:Driver:GetServerInfo', 'Feature:API:Driver.VerifyConnectivity', - 'Feature:API:Result.List', 'Feature:API:Result.Peek', 'Feature:Configuration:ConnectionAcquisitionTimeout', - 'Optimization:EagerTransactionBegin', 'Optimization:ImplicitDefaultArguments', - 'Optimization:PullPipelining', 'Temporary:ConnectionAcquisitionTimeout', 'Temporary:CypherPathAndRelationship', 'Temporary:DriverFetchSize', diff --git a/packages/testkit-backend/src/request-handlers-rx.js b/packages/testkit-backend/src/request-handlers-rx.js index 6ca72660c..606554fb7 100644 --- a/packages/testkit-backend/src/request-handlers-rx.js +++ b/packages/testkit-backend/src/request-handlers-rx.js @@ -48,7 +48,7 @@ export function NewSession(context, data, wire) { wire.writeResponse(responses.Session({ id })) } -export function SessionClose (context, data, wire) { +export function SessionClose(context, data, wire) { const { sessionId } = data const session = context.getSession(sessionId) return session @@ -58,7 +58,7 @@ export function SessionClose (context, data, wire) { .catch(err => wire.writeError(err)) } -export function SessionRun (context, data, wire) { +export function SessionRun(context, data, wire) { const { sessionId, cypher, params, txMeta: metadata, timeout } = data const session = context.getSession(sessionId) if (params) { @@ -76,14 +76,15 @@ export function SessionRun (context, data, wire) { return } - result[Symbol.asyncIterator] = () => toAsyncIterator(result) + const it = toAsyncIterator(result) + result[Symbol.asyncIterator] = () => it let id = context.addResult(result) wire.writeResponse(responses.Result({ id })) } -export function ResultConsume (context, data, wire) { +export function ResultConsume(context, data, wire) { const { resultId } = data const result = context.getResult(resultId) @@ -95,20 +96,20 @@ export function ResultConsume (context, data, wire) { } -export function SessionBeginTransaction (context, data, wire) { +export function SessionBeginTransaction(context, data, wire) { const { sessionId, txMeta: metadata, timeout } = data const session = context.getSession(sessionId) - + try { return session.beginTransaction({ metadata, timeout }) - .toPromise() - .then(tx => { - const id = context.addTx(tx, sessionId) - wire.writeResponse(responses.Transaction({ id })) - }).catch(e => { - console.log('got some err: ' + JSON.stringify(e)) - wire.writeError(e) - }) + .toPromise() + .then(tx => { + const id = context.addTx(tx, sessionId) + wire.writeResponse(responses.Transaction({ id })) + }).catch(e => { + console.log('got some err: ' + JSON.stringify(e)) + wire.writeError(e) + }) } catch (e) { console.log('got some err: ' + JSON.stringify(e)) wire.writeError(e) @@ -116,7 +117,7 @@ export function SessionBeginTransaction (context, data, wire) { } } -export function TransactionRun (context, data, wire) { +export function TransactionRun(context, data, wire) { const { txId, cypher, params } = data const tx = context.getTx(txId) if (params) { @@ -125,13 +126,16 @@ export function TransactionRun (context, data, wire) { } } const result = tx.tx.run(cypher, params) - result[Symbol.asyncIterator] = () => toAsyncIterator(result) + + const it = toAsyncIterator(result) + result[Symbol.asyncIterator] = () => it + const id = context.addResult(result) wire.writeResponse(responses.Result({ id })) } -export function TransactionRollback (context, data, wire) { +export function TransactionRollback(context, data, wire) { const { txId: id } = data const { tx } = context.getTx(id) return tx.rollback() @@ -143,7 +147,7 @@ export function TransactionRollback (context, data, wire) { }) } -export function TransactionCommit (context, data, wire) { +export function TransactionCommit(context, data, wire) { const { txId: id } = data const { tx } = context.getTx(id) return tx.commit() @@ -155,17 +159,26 @@ export function TransactionCommit (context, data, wire) { }) } -export function SessionReadTransaction (context, data, wire) { +export function TransactionClose(context, data, wire) { + const { txId: id } = data + const { tx } = context.getTx(id) + return tx.close() + .toPromise() + .then(() => wire.writeResponse(responses.Transaction({ id }))) + .catch(e => wire.writeError(e)) +} + +export function SessionReadTransaction(context, data, wire) { const { sessionId, txMeta: metadata } = data const session = context.getSession(sessionId) - + try { return session.readTransaction(tx => { - return from(new Promise((resolve, reject) => { - const id = context.addTx(tx, sessionId, resolve, reject) - wire.writeResponse(responses.RetryableTry({ id })) - })) - }, { metadata }) + return from(new Promise((resolve, reject) => { + const id = context.addTx(tx, sessionId, resolve, reject) + wire.writeResponse(responses.RetryableTry({ id })) + })) + }, { metadata }) .toPromise() .then(() => wire.writeResponse(responses.RetryableDone())) .catch(e => wire.writeError(e)) @@ -175,17 +188,17 @@ export function SessionReadTransaction (context, data, wire) { } } -export function SessionWriteTransaction (context, data, wire) { +export function SessionWriteTransaction(context, data, wire) { const { sessionId, txMeta: metadata } = data const session = context.getSession(sessionId) - + try { return session.writeTransaction(tx => { - return from(new Promise((resolve, reject) => { - const id = context.addTx(tx, sessionId, resolve, reject) - wire.writeResponse(responses.RetryableTry({ id })) - })) - }, { metadata }) + return from(new Promise((resolve, reject) => { + const id = context.addTx(tx, sessionId, resolve, reject) + wire.writeResponse(responses.RetryableTry({ id })) + })) + }, { metadata }) .toPromise() .then(() => wire.writeResponse(responses.RetryableDone())) .catch(e => wire.writeError(e)) @@ -197,8 +210,8 @@ export function SessionWriteTransaction (context, data, wire) { function toAsyncIterator(result) { - function queueObserver () { - function createResolvablePromise (){ + function queueObserver() { + function createResolvablePromise() { const resolvablePromise = {} resolvablePromise.promise = new Promise((resolve, reject) => { resolvablePromise.resolve = resolve @@ -208,7 +221,7 @@ function toAsyncIterator(result) { } - function isError(elementOrError){ + function isError(elementOrError) { return elementOrError instanceof Error } @@ -242,7 +255,7 @@ function toAsyncIterator(result) { if (buffer.length > 0) { const element = buffer.shift() if (isError(element)) { - throw element + throw element } return element } @@ -253,13 +266,13 @@ function toAsyncIterator(result) { if (buffer.length > 0) { const element = buffer[0] if (isError(element)) { - throw element + throw element } return element } promiseHolder.resolvable = createResolvablePromise() try { - const element = await promiseHolder.resolvable.promise + const element = await promiseHolder.resolvable.promise buffer.unshift(element) return element } catch (error) { @@ -267,7 +280,7 @@ function toAsyncIterator(result) { throw error } }, - get size () { + get size() { return buffer.length } } @@ -278,7 +291,7 @@ function toAsyncIterator(result) { const observer = queueObserver() records.subscribe(observer); - + const state = { finished: false } diff --git a/packages/testkit-backend/src/skipped-tests/rx.js b/packages/testkit-backend/src/skipped-tests/rx.js index 84afc0d67..4c40a02aa 100644 --- a/packages/testkit-backend/src/skipped-tests/rx.js +++ b/packages/testkit-backend/src/skipped-tests/rx.js @@ -1,10 +1,15 @@ -import { skip, ifEndsWith } from './skip' +import { skip, ifEndsWith, ifEquals } from './skip' const skippedTests = [ skip( 'Blowing the backends up', ifEndsWith('test_should_fail_when_writing_on_unexpectedly_interrupting_writer_using_session_run'), - ifEndsWith('test_should_fail_when_writing_on_unexpectedly_interrupting_writer_using_tx_run') + ifEndsWith('test_should_fail_when_writing_on_unexpectedly_interrupting_writer_using_tx_run'), + ifEndsWith('test_should_fail_when_reading_from_unexpectedly_interrupting_reader_using_tx_run') + ), + skip( + 'Throws after run insted of the first next because of the backend implementation', + ifEquals('stub.disconnects.test_disconnects.TestDisconnects.test_disconnect_on_tx_begin') ) ] From 7e59f62e6cf62f607eb12dcf386abcdf82c11dae Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Mon, 21 Feb 2022 16:29:22 +0100 Subject: [PATCH 8/8] Fix asyncIt and remove skipped-tests --- packages/testkit-backend/src/index.js | 2 +- packages/testkit-backend/src/request-handlers-rx.js | 2 +- packages/testkit-backend/src/skipped-tests/rx.js | 8 +------- 3 files changed, 3 insertions(+), 9 deletions(-) diff --git a/packages/testkit-backend/src/index.js b/packages/testkit-backend/src/index.js index cbc86f1a5..33657b18c 100644 --- a/packages/testkit-backend/src/index.js +++ b/packages/testkit-backend/src/index.js @@ -15,7 +15,7 @@ function main( ) { const backendPort = process.env.BACKEND_PORT || 9876 const webserverPort = process.env.WEB_SERVER_PORT || 8000 const driverDescriptor = process.env.DRIVER_DESCRIPTOR || '' - const sessionType = process.env.SESSION_TYPE || 'RX' + const sessionType = process.env.SESSION_TYPE || 'ASYNC' const sessionTypeDescriptor = sessionType === 'RX' ? 'rx' : 'async' const driverDescriptorList = driverDescriptor .split(',').map(s => s.trim().toLowerCase()) diff --git a/packages/testkit-backend/src/request-handlers-rx.js b/packages/testkit-backend/src/request-handlers-rx.js index 606554fb7..0582d8df2 100644 --- a/packages/testkit-backend/src/request-handlers-rx.js +++ b/packages/testkit-backend/src/request-handlers-rx.js @@ -301,7 +301,7 @@ function toAsyncIterator(result) { if (state.finished) { return { done: true } } - const next = observer.dequeue() + const next = await observer.dequeue() if (next.done) { state.finished = next.done state.summary = next.value diff --git a/packages/testkit-backend/src/skipped-tests/rx.js b/packages/testkit-backend/src/skipped-tests/rx.js index 4c40a02aa..0d0be9514 100644 --- a/packages/testkit-backend/src/skipped-tests/rx.js +++ b/packages/testkit-backend/src/skipped-tests/rx.js @@ -1,12 +1,6 @@ -import { skip, ifEndsWith, ifEquals } from './skip' +import { skip, ifEquals } from './skip' const skippedTests = [ - skip( - 'Blowing the backends up', - ifEndsWith('test_should_fail_when_writing_on_unexpectedly_interrupting_writer_using_session_run'), - ifEndsWith('test_should_fail_when_writing_on_unexpectedly_interrupting_writer_using_tx_run'), - ifEndsWith('test_should_fail_when_reading_from_unexpectedly_interrupting_reader_using_tx_run') - ), skip( 'Throws after run insted of the first next because of the backend implementation', ifEquals('stub.disconnects.test_disconnects.TestDisconnects.test_disconnect_on_tx_begin')