Skip to content

Commit d79b7e8

Browse files
committed
Add notificationFilter to the bolt-messages in Bolt 5.2
1 parent a729785 commit d79b7e8

20 files changed

+2014
-29
lines changed
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/**
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
import BoltProtocolV5x1 from './bolt-protocol-v5x1'
20+
21+
import transformersFactories from './bolt-protocol-v5x2.transformer'
22+
import Transformer from './transformer'
23+
import RequestMessage from './request-message'
24+
import { LoginObserver, ResultStreamObserver } from './stream-observers'
25+
26+
import { internal } from 'neo4j-driver-core'
27+
28+
const {
29+
constants: { BOLT_PROTOCOL_V5_2, FETCH_ALL }
30+
} = internal
31+
32+
export default class BoltProtocol extends BoltProtocolV5x1 {
33+
get version () {
34+
return BOLT_PROTOCOL_V5_2
35+
}
36+
37+
get transformer () {
38+
if (this._transformer === undefined) {
39+
this._transformer = new Transformer(Object.values(transformersFactories).map(create => create(this._config, this._log)))
40+
}
41+
return this._transformer
42+
}
43+
44+
get supportsReAuth () {
45+
return true
46+
}
47+
48+
/**
49+
* Initialize a connection with the server
50+
*
51+
* @param {Object} args The params
52+
* @param {string} args.userAgent The user agent
53+
* @param {any} args.authToken The auth token
54+
* @param {NotificationFilter} args.notificationFilter The notification filters.
55+
* @param {function(error)} args.onError On error callback
56+
* @param {function(onComplete)} args.onComplete On complete callback
57+
* @returns {LoginObserver} The Login observer
58+
*/
59+
initialize ({ userAgent, authToken, notificationFilter, onError, onComplete } = {}) {
60+
const state = {}
61+
const observer = new LoginObserver({
62+
onError: error => this._onLoginError(error, onError),
63+
onCompleted: metadata => {
64+
state.metadata = metadata
65+
return this._onLoginCompleted(metadata)
66+
}
67+
})
68+
69+
this.write(
70+
RequestMessage.hello5x2(userAgent, notificationFilter, this._serversideRouting),
71+
observer,
72+
false
73+
)
74+
75+
return this.logon({
76+
authToken,
77+
onComplete: metadata => onComplete({ ...metadata, ...state.metadata }),
78+
onError,
79+
flush: true
80+
})
81+
}
82+
83+
beginTransaction ({
84+
bookmarks,
85+
txConfig,
86+
database,
87+
mode,
88+
impersonatedUser,
89+
notificationFilter,
90+
beforeError,
91+
afterError,
92+
beforeComplete,
93+
afterComplete
94+
} = {}) {
95+
const observer = new ResultStreamObserver({
96+
server: this._server,
97+
beforeError,
98+
afterError,
99+
beforeComplete,
100+
afterComplete
101+
})
102+
observer.prepareToHandleSingleResponse()
103+
104+
this.write(
105+
RequestMessage.begin({ bookmarks, txConfig, database, mode, impersonatedUser, notificationFilter }),
106+
observer,
107+
true
108+
)
109+
110+
return observer
111+
}
112+
113+
run (
114+
query,
115+
parameters,
116+
{
117+
bookmarks,
118+
txConfig,
119+
database,
120+
mode,
121+
impersonatedUser,
122+
notificationFilter,
123+
beforeKeys,
124+
afterKeys,
125+
beforeError,
126+
afterError,
127+
beforeComplete,
128+
afterComplete,
129+
flush = true,
130+
reactive = false,
131+
fetchSize = FETCH_ALL,
132+
highRecordWatermark = Number.MAX_VALUE,
133+
lowRecordWatermark = Number.MAX_VALUE
134+
} = {}
135+
) {
136+
const observer = new ResultStreamObserver({
137+
server: this._server,
138+
reactive: reactive,
139+
fetchSize: fetchSize,
140+
moreFunction: this._requestMore.bind(this),
141+
discardFunction: this._requestDiscard.bind(this),
142+
beforeKeys,
143+
afterKeys,
144+
beforeError,
145+
afterError,
146+
beforeComplete,
147+
afterComplete,
148+
highRecordWatermark,
149+
lowRecordWatermark
150+
})
151+
152+
const flushRun = reactive
153+
this.write(
154+
RequestMessage.runWithMetadata(query, parameters, {
155+
bookmarks,
156+
txConfig,
157+
database,
158+
mode,
159+
impersonatedUser,
160+
notificationFilter
161+
}),
162+
observer,
163+
flushRun && flush
164+
)
165+
166+
if (!reactive) {
167+
this.write(RequestMessage.pull({ n: fetchSize }), observer, flush)
168+
}
169+
170+
return observer
171+
}
172+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/**
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import v5x0 from './bolt-protocol-v5x0.transformer'
21+
22+
export default {
23+
...v5x0
24+
}

packages/bolt-connection/src/bolt/create.js

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import BoltProtocolV4x3 from './bolt-protocol-v4x3'
2828
import BoltProtocolV4x4 from './bolt-protocol-v4x4'
2929
import BoltProtocolV5x0 from './bolt-protocol-v5x0'
3030
import BoltProtocolV5x1 from './bolt-protocol-v5x1'
31+
import BoltProtocolV5x2 from './bolt-protocol-v5x2'
3132
// eslint-disable-next-line no-unused-vars
3233
import { Chunker, Dechunker } from '../channel'
3334
import ResponseHandler from './response-handler'
@@ -202,6 +203,16 @@ function createProtocol (
202203
onProtocolError,
203204
serversideRouting
204205
)
206+
case 5.2:
207+
return new BoltProtocolV5x2(
208+
server,
209+
chunker,
210+
packingConfig,
211+
createResponseHandler,
212+
log,
213+
onProtocolError,
214+
serversideRouting
215+
)
205216
default:
206217
throw newError('Unknown Bolt protocol version: ' + version)
207218
}

packages/bolt-connection/src/bolt/handshake.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ function parseNegotiatedResponse (buffer) {
7676
*/
7777
function newHandshakeBuffer () {
7878
return createHandshakeMessage([
79-
[version(5, 1), version(5, 0)],
79+
[version(5, 2), version(5, 0)],
8080
[version(4, 4), version(4, 2)],
8181
version(4, 1),
8282
version(3, 0)

packages/bolt-connection/src/bolt/request-message.js

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,37 @@ export default class RequestMessage {
160160
)
161161
}
162162

163+
/**
164+
* Create a new HELLO message.
165+
* @param {string} userAgent the user agent.
166+
* @param {NotificationFilter} notificationFilter the notification filter configured
167+
* @param {Object} routing server side routing, set to routing context to turn on server side routing (> 4.1)
168+
* @return {RequestMessage} new HELLO message.
169+
*/
170+
static hello5x2 (userAgent, notificationFilter = null, routing = null) {
171+
const metadata = { user_agent: userAgent }
172+
173+
if (notificationFilter) {
174+
if (notificationFilter.minimumSeverityLevel) {
175+
metadata.notifications_minimum_severity = notificationFilter.minimumSeverityLevel
176+
}
177+
178+
if (notificationFilter.disabledCategories) {
179+
metadata.notifications_disabled_categories = notificationFilter.disabledCategories
180+
}
181+
}
182+
183+
if (routing) {
184+
metadata.routing = routing
185+
}
186+
187+
return new RequestMessage(
188+
HELLO,
189+
[metadata],
190+
() => `HELLO ${json.stringify(metadata)}`
191+
)
192+
}
193+
163194
/**
164195
* Create a new LOGON message.
165196
*
@@ -194,10 +225,11 @@ export default class RequestMessage {
194225
* @param {string} database the database name.
195226
* @param {string} mode the access mode.
196227
* @param {string} impersonatedUser the impersonated user.
228+
* @param {NotificationFilter} notificationFilter the notification filter
197229
* @return {RequestMessage} new BEGIN message.
198230
*/
199-
static begin ({ bookmarks, txConfig, database, mode, impersonatedUser } = {}) {
200-
const metadata = buildTxMetadata(bookmarks, txConfig, database, mode, impersonatedUser)
231+
static begin ({ bookmarks, txConfig, database, mode, impersonatedUser, notificationFilter } = {}) {
232+
const metadata = buildTxMetadata(bookmarks, txConfig, database, mode, impersonatedUser, notificationFilter)
201233
return new RequestMessage(
202234
BEGIN,
203235
[metadata],
@@ -235,9 +267,9 @@ export default class RequestMessage {
235267
static runWithMetadata (
236268
query,
237269
parameters,
238-
{ bookmarks, txConfig, database, mode, impersonatedUser } = {}
270+
{ bookmarks, txConfig, database, mode, impersonatedUser, notificationFilter } = {}
239271
) {
240-
const metadata = buildTxMetadata(bookmarks, txConfig, database, mode, impersonatedUser)
272+
const metadata = buildTxMetadata(bookmarks, txConfig, database, mode, impersonatedUser, notificationFilter)
241273
return new RequestMessage(
242274
RUN,
243275
[query, parameters, metadata],
@@ -348,9 +380,10 @@ export default class RequestMessage {
348380
* @param {string} database the database name.
349381
* @param {string} mode the access mode.
350382
* @param {string} impersonatedUser the impersonated user mode.
383+
* @param {notificationFilter} notificationFilter the notification filter
351384
* @return {Object} a metadata object.
352385
*/
353-
function buildTxMetadata (bookmarks, txConfig, database, mode, impersonatedUser) {
386+
function buildTxMetadata (bookmarks, txConfig, database, mode, impersonatedUser, notificationFilter) {
354387
const metadata = {}
355388
if (!bookmarks.isEmpty()) {
356389
metadata.bookmarks = bookmarks.values()
@@ -370,6 +403,15 @@ function buildTxMetadata (bookmarks, txConfig, database, mode, impersonatedUser)
370403
if (mode === ACCESS_MODE_READ) {
371404
metadata.mode = READ_MODE
372405
}
406+
if (notificationFilter) {
407+
if (notificationFilter.minimumSeverityLevel) {
408+
metadata.notifications_minimum_severity = notificationFilter.minimumSeverityLevel
409+
}
410+
411+
if (notificationFilter.disabledCategories) {
412+
metadata.notifications_disabled_categories = notificationFilter.disabledCategories
413+
}
414+
}
373415
return metadata
374416
}
375417

packages/bolt-connection/src/connection/connection-channel.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ export function createChannelConnection (
8686
config.disableLosslessIntegers,
8787
serversideRouting,
8888
chunker,
89+
config.notificationFilter,
8990
createProtocol
9091
)
9192

@@ -119,6 +120,7 @@ export default class ChannelConnection extends Connection {
119120
disableLosslessIntegers = false,
120121
serversideRouting = null,
121122
chunker, // to be removed,
123+
notificationFilter,
122124
protocolSupplier
123125
) {
124126
super(errorHandler)
@@ -134,6 +136,7 @@ export default class ChannelConnection extends Connection {
134136
this._chunker = chunker
135137
this._log = createConnectionLogger(this, log)
136138
this._serversideRouting = serversideRouting
139+
this._notificationFilter = notificationFilter
137140

138141
// connection from the database, returned in response for HELLO message and might not be available
139142
this._dbConnectionId = null
@@ -187,6 +190,7 @@ export default class ChannelConnection extends Connection {
187190
this._protocol.initialize({
188191
userAgent,
189192
authToken,
193+
notificationFilter: this._notificationFilter,
190194
onError: err => reject(err),
191195
onComplete: metadata => {
192196
if (metadata) {

0 commit comments

Comments
 (0)