Skip to content

Commit 09a5c7a

Browse files
committed
AsynIterator API for Result consumption
1 parent 2c1a02b commit 09a5c7a

File tree

3 files changed

+124
-24
lines changed

3 files changed

+124
-24
lines changed

packages/core/src/result.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,46 @@ class Result implements Promise<QueryResult> {
192192
return this._p
193193
}
194194

195+
async* ayncInterator(): any {
196+
function createResolvablePromise (): any {
197+
const resolvablePromise: any = {}
198+
resolvablePromise.promise = new Promise((resolve, reject) => {
199+
resolvablePromise.resolve = resolve
200+
resolvablePromise.reject = reject
201+
});
202+
return resolvablePromise;
203+
}
204+
205+
const observer = {
206+
_buffer: [createResolvablePromise()],
207+
onNext: (record: Record) => {
208+
observer._buffer[observer._buffer.length - 1].resolve({ record, done: false });
209+
observer._buffer.push(createResolvablePromise());
210+
},
211+
onCompleted: (summary: ResultSummary) => {
212+
observer._buffer[observer._buffer.length - 1].resolve({ summary, done: true });
213+
},
214+
onError: (error: Error) => {
215+
observer._buffer[observer._buffer.length - 1].reject(error);
216+
},
217+
consume: async () => {
218+
const value = await observer._buffer[0].promise
219+
observer._buffer.shift();
220+
return value
221+
}
222+
}
223+
224+
this.subscribe(observer)
225+
226+
while(true) {
227+
const value = await observer.consume()
228+
if (value.done) {
229+
return value.summary;
230+
}
231+
yield value.record
232+
}
233+
}
234+
195235
/**
196236
* Waits for all results and calls the passed in function with the results.
197237
*

packages/testkit-backend/src/context.js

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ export default class Context {
77
this._resolverRequests = {}
88
this._resultObservers = {}
99
this._errors = {}
10+
this._results = {}
1011
}
1112

1213
addDriver (driver) {
@@ -44,6 +45,18 @@ export default class Context {
4445
return id
4546
}
4647

48+
addResult (result) {
49+
return this._add(this._results, result)
50+
}
51+
52+
removeResult (id) {
53+
delete this._results[id]
54+
}
55+
56+
getResult (id) {
57+
return this._results[id]
58+
}
59+
4760
getDriver (id) {
4861
return this._drivers[id]
4962
}

packages/testkit-backend/src/request-handlers.js

Lines changed: 71 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import neo4j from 'neo4j'
22
import ResultObserver from './result-observer.js'
33
import { cypherToNative, nativeToCypher } from './cypher-native-binders.js'
44
import { shouldRunTest } from './skipped-tests'
5+
const USE_ASYNC = true
56

67
export function NewDriver (context, data, { writeResponse }) {
78
const {
@@ -89,49 +90,89 @@ export function SessionRun (context, data, wire) {
8990
.catch(_ => null)
9091
.then(_ => {
9192
const result = session.run(cypher, params, { metadata, timeout })
92-
const resultObserver = new ResultObserver({ sessionId, result })
93-
result.subscribe(resultObserver)
94-
const id = context.addResultObserver(resultObserver)
93+
let id
94+
if (USE_ASYNC) {
95+
id = context.addResult(result)
96+
} else {
97+
const resultObserver = new ResultObserver({ sessionId, result })
98+
result.subscribe(resultObserver)
99+
id = context.addResultObserver(resultObserver)
100+
}
95101
wire.writeResponse('Result', { id })
96102
})
97103
}
98104

99105
export function ResultNext (context, data, wire) {
100106
const { resultId } = data
101-
const resultObserver = context.getResultObserver(resultId)
102-
const nextPromise = resultObserver.next()
103-
nextPromise
104-
.then(rec => {
105-
if (rec) {
106-
const values = Array.from(rec.values()).map(nativeToCypher)
107+
if (USE_ASYNC) {
108+
const result = context.getResult(resultId)
109+
if (!("recordIt" in result)) {
110+
result.recordIt = result.ayncInterator()
111+
}
112+
result.recordIt.next().then(({ value, done }) => {
113+
if (done) {
114+
wire.writeResponse('NullRecord', null)
115+
} else {
116+
const values = Array.from(value.values()).map(nativeToCypher)
107117
wire.writeResponse('Record', {
108118
values: values
109119
})
110-
} else {
111-
wire.writeResponse('NullRecord', null)
112120
}
113-
})
114-
.catch(e => {
121+
}).catch(e => {
115122
console.log('got some err: ' + JSON.stringify(e))
116123
wire.writeError(e)
117-
})
124+
});
125+
} else {
126+
const resultObserver = context.getResultObserver(resultId)
127+
const nextPromise = resultObserver.next()
128+
nextPromise
129+
.then(rec => {
130+
if (rec) {
131+
const values = Array.from(rec.values()).map(nativeToCypher)
132+
wire.writeResponse('Record', {
133+
values: values
134+
})
135+
} else {
136+
wire.writeResponse('NullRecord', null)
137+
}
138+
})
139+
.catch(e => {
140+
console.log('got some err: ' + JSON.stringify(e))
141+
wire.writeError(e)
142+
})
143+
}
144+
118145
}
119146

120147
export function ResultConsume (context, data, wire) {
121148
const { resultId } = data
122-
const resultObserver = context.getResultObserver(resultId)
123-
resultObserver
124-
.completitionPromise()
125-
.then(summary => {
149+
if (USE_ASYNC) {
150+
const result = context.getResult(resultId)
151+
result.summary().then(summary => {
152+
console.log(summary);
126153
wire.writeResponse('Summary', {
127154
...summary,
128155
serverInfo: {
129156
agent: summary.server.agent,
130-
protocolVersion: summary.server.protocolVersion.toFixed(1)
157+
protocolVersion: summary.server.protocolVersion? summary.server.protocolVersion.toFixed(1) : 0
131158
}
132159
})
133-
})
134-
.catch(e => wire.writeError(e))
160+
}).catch(e => wire.writeError(e))
161+
} else {
162+
const resultObserver = context.getResultObserver(resultId)
163+
resultObserver
164+
.completitionPromise()
165+
.then(summary => {
166+
wire.writeResponse('Summary', {
167+
...summary,
168+
serverInfo: {
169+
agent: summary.server.agent,
170+
protocolVersion: summary.server.protocolVersion.toFixed(1)
171+
}
172+
})
173+
})
174+
.catch(e => wire.writeError(e))
175+
}
135176
}
136177

137178
export function SessionReadTransaction (context, data, wire) {
@@ -158,9 +199,15 @@ export function TransactionRun (context, data, wire) {
158199
}
159200
}
160201
const result = tx.tx.run(cypher, params)
161-
const resultObserver = new ResultObserver({ result })
162-
result.subscribe(resultObserver)
163-
const id = context.addResultObserver(resultObserver)
202+
203+
let id
204+
if (USE_ASYNC) {
205+
id = context.addResult(result)
206+
} else {
207+
const resultObserver = new ResultObserver({ result })
208+
result.subscribe(resultObserver)
209+
id = context.addResultObserver(resultObserver)
210+
}
164211
wire.writeResponse('Result', { id })
165212
}
166213

0 commit comments

Comments
 (0)