Skip to content

Commit d7ad517

Browse files
authored
Home Database Cache : Stripped Back Solution (#1235)
Implements a cache mapping users to their last known home database. Allowing more intelligent routing decisions when queries are run without a specified database. This saves up to 33% of round trips in some scenarios.
1 parent e9c1a5a commit d7ad517

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+2773
-90
lines changed

packages/bolt-connection/src/bolt/bolt-protocol-v5x6.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
import BoltProtocolV5x5 from './bolt-protocol-v5x5'
1818

19-
import transformersFactories from './bolt-protocol-v5x5.transformer'
19+
import transformersFactories from './bolt-protocol-v5x6.transformer'
2020
import Transformer from './transformer'
2121

2222
import { internal } from 'neo4j-driver-core'

packages/bolt-connection/src/bolt/bolt-protocol-v5x7.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
import BoltProtocolV5x6 from './bolt-protocol-v5x6'
1818

19-
import transformersFactories from './bolt-protocol-v5x5.transformer'
19+
import transformersFactories from './bolt-protocol-v5x7.transformer'
2020
import Transformer from './transformer'
2121

2222
import { internal } from 'neo4j-driver-core'
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/**
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [https://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
import BoltProtocolV5x7 from './bolt-protocol-v5x7'
18+
19+
import transformersFactories from './bolt-protocol-v5x8.transformer'
20+
import Transformer from './transformer'
21+
import RequestMessage from './request-message'
22+
import { ResultStreamObserver } from './stream-observers'
23+
24+
import { internal } from 'neo4j-driver-core'
25+
26+
const {
27+
constants: { BOLT_PROTOCOL_V5_8, FETCH_ALL }
28+
} = internal
29+
30+
export default class BoltProtocol extends BoltProtocolV5x7 {
31+
get version () {
32+
return BOLT_PROTOCOL_V5_8
33+
}
34+
35+
get transformer () {
36+
if (this._transformer === undefined) {
37+
this._transformer = new Transformer(Object.values(transformersFactories).map(create => create(this._config, this._log)))
38+
}
39+
return this._transformer
40+
}
41+
42+
run (
43+
query,
44+
parameters,
45+
{
46+
bookmarks,
47+
txConfig,
48+
database,
49+
mode,
50+
impersonatedUser,
51+
notificationFilter,
52+
beforeKeys,
53+
afterKeys,
54+
beforeError,
55+
afterError,
56+
beforeComplete,
57+
afterComplete,
58+
flush = true,
59+
reactive = false,
60+
fetchSize = FETCH_ALL,
61+
highRecordWatermark = Number.MAX_VALUE,
62+
lowRecordWatermark = Number.MAX_VALUE,
63+
onDb
64+
} = {}
65+
) {
66+
const observer = new ResultStreamObserver({
67+
server: this._server,
68+
reactive,
69+
fetchSize,
70+
moreFunction: this._requestMore.bind(this),
71+
discardFunction: this._requestDiscard.bind(this),
72+
beforeKeys,
73+
afterKeys,
74+
beforeError,
75+
afterError,
76+
beforeComplete,
77+
afterComplete,
78+
highRecordWatermark,
79+
lowRecordWatermark,
80+
enrichMetadata: this._enrichMetadata,
81+
onDb
82+
})
83+
84+
const flushRun = reactive
85+
this.write(
86+
RequestMessage.runWithMetadata5x5(query, parameters, {
87+
bookmarks,
88+
txConfig,
89+
database,
90+
mode,
91+
impersonatedUser,
92+
notificationFilter
93+
}),
94+
observer,
95+
flushRun && flush
96+
)
97+
98+
if (!reactive) {
99+
this.write(RequestMessage.pull({ n: fetchSize }), observer, flush)
100+
}
101+
102+
return observer
103+
}
104+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/**
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [https://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
import v5x7 from './bolt-protocol-v5x7.transformer'
19+
20+
export default {
21+
...v5x7
22+
}

packages/bolt-connection/src/bolt/create.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import BoltProtocolV5x4 from './bolt-protocol-v5x4'
3232
import BoltProtocolV5x5 from './bolt-protocol-v5x5'
3333
import BoltProtocolV5x6 from './bolt-protocol-v5x6'
3434
import BoltProtocolV5x7 from './bolt-protocol-v5x7'
35+
import BoltProtocolV5x8 from './bolt-protocol-v5x8'
3536
// eslint-disable-next-line no-unused-vars
3637
import { Chunker, Dechunker } from '../channel'
3738
import ResponseHandler from './response-handler'
@@ -257,6 +258,14 @@ function createProtocol (
257258
log,
258259
onProtocolError,
259260
serversideRouting)
261+
case 5.8:
262+
return new BoltProtocolV5x8(server,
263+
chunker,
264+
packingConfig,
265+
createResponseHandler,
266+
log,
267+
onProtocolError,
268+
serversideRouting)
260269
default:
261270
throw newError('Unknown Bolt protocol version: ' + version)
262271
}

packages/bolt-connection/src/bolt/handshake.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ function parseNegotiatedResponse (buffer, log) {
7676
*/
7777
function newHandshakeBuffer () {
7878
return createHandshakeMessage([
79-
[version(5, 7), version(5, 0)],
79+
[version(5, 8), version(5, 0)],
8080
[version(4, 4), version(4, 2)],
8181
version(4, 1),
8282
version(3, 0)

packages/bolt-connection/src/bolt/stream-observers.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ class ResultStreamObserver extends StreamObserver {
7979
server,
8080
highRecordWatermark = Number.MAX_VALUE,
8181
lowRecordWatermark = Number.MAX_VALUE,
82-
enrichMetadata
82+
enrichMetadata,
83+
onDb
8384
} = {}) {
8485
super()
8586

@@ -113,6 +114,7 @@ class ResultStreamObserver extends StreamObserver {
113114
this._paused = false
114115
this._pulled = !reactive
115116
this._haveRecordStreamed = false
117+
this._onDb = onDb
116118
}
117119

118120
/**
@@ -319,6 +321,10 @@ class ResultStreamObserver extends StreamObserver {
319321
}
320322
}
321323

324+
if (meta.db !== null && this._onDb !== undefined) {
325+
this._onDb(meta.db)
326+
}
327+
322328
if (meta.fields != null) {
323329
// remove fields key from metadata object
324330
delete meta.fields

packages/bolt-connection/src/connection-provider/connection-provider-routing.js

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
7878
this._createConnectionErrorHandler(),
7979
this._log,
8080
await this._clientCertificateHolder.getClientCertificate(),
81-
this._routingContext
81+
this._routingContext,
82+
this._channelSsrCallback.bind(this)
8283
)
8384
})
8485

@@ -99,6 +100,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
99100
)
100101

101102
this._refreshRoutingTable = functional.reuseOngoingRequest(this._refreshRoutingTable, this)
103+
this._withSSR = 0
104+
this._withoutSSR = 0
102105
}
103106

104107
_createConnectionErrorHandler () {
@@ -139,19 +142,30 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
139142
* See {@link ConnectionProvider} for more information about this method and
140143
* its arguments.
141144
*/
142-
async acquireConnection ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved, auth } = {}) {
143-
let name
144-
let address
145+
async acquireConnection ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved, auth, homeDb } = {}) {
145146
const context = { database: database || DEFAULT_DB_NAME }
146147

147148
const databaseSpecificErrorHandler = new ConnectionErrorHandler(
148149
SESSION_EXPIRED,
149150
(error, address) => this._handleUnavailability(error, address, context.database),
150-
(error, address) => this._handleWriteFailure(error, address, context.database),
151-
(error, address, conn) =>
152-
this._handleSecurityError(error, address, conn, context.database)
151+
(error, address) => this._handleWriteFailure(error, address, homeDb ?? context.database),
152+
(error, address, conn) => this._handleSecurityError(error, address, conn, context.database)
153153
)
154154

155+
let conn
156+
if (this.SSREnabled() && homeDb !== undefined && database === '') {
157+
const currentRoutingTable = this._routingTableRegistry.get(
158+
homeDb,
159+
() => new RoutingTable({ database: homeDb })
160+
)
161+
if (currentRoutingTable && !currentRoutingTable.isStaleFor(accessMode)) {
162+
conn = await this.getConnectionFromRoutingTable(currentRoutingTable, auth, accessMode, databaseSpecificErrorHandler)
163+
if (this.SSREnabled()) {
164+
return conn
165+
}
166+
conn.release()
167+
}
168+
}
155169
const routingTable = await this._freshRoutingTable({
156170
accessMode,
157171
database: context.database,
@@ -165,7 +179,12 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
165179
}
166180
}
167181
})
182+
return this.getConnectionFromRoutingTable(routingTable, auth, accessMode, databaseSpecificErrorHandler)
183+
}
168184

185+
async getConnectionFromRoutingTable (routingTable, auth, accessMode, databaseSpecificErrorHandler) {
186+
let name
187+
let address
169188
// select a target server based on specified access mode
170189
if (accessMode === READ) {
171190
address = this._loadBalancingStrategy.selectReader(routingTable.readers)
@@ -663,6 +682,28 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
663682
routingTable.forgetRouter(address)
664683
}
665684
}
685+
686+
_channelSsrCallback (isEnabled, action) {
687+
if (action === 'OPEN') {
688+
if (isEnabled === true) {
689+
this._withSSR = this._withSSR + 1
690+
} else {
691+
this._withoutSSR = this._withoutSSR + 1
692+
}
693+
} else if (action === 'CLOSE') {
694+
if (isEnabled === true) {
695+
this._withSSR = this._withSSR - 1
696+
} else {
697+
this._withoutSSR = this._withoutSSR - 1
698+
}
699+
} else {
700+
throw newError("Channel SSR Callback invoked with action other than 'OPEN' or 'CLOSE'")
701+
}
702+
}
703+
704+
SSREnabled () {
705+
return this._withSSR > 0 && this._withoutSSR === 0
706+
}
666707
}
667708

668709
/**

packages/bolt-connection/src/connection/connection-channel.js

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ let idGenerator = 0
3434
* @param {ConnectionErrorHandler} errorHandler - the error handler for connection errors.
3535
* @param {Logger} log - configured logger.
3636
* @param {clientCertificate} clientCertificate - configured client certificate
37+
* @param ssrCallback - callback function used to update the counts of ssr enabled and disabled connections
38+
* @param createChannel - function taking a channelConfig object and creating a channel with it
3739
* @return {Connection} - new connection.
3840
*/
3941
export function createChannelConnection (
@@ -43,6 +45,7 @@ export function createChannelConnection (
4345
log,
4446
clientCertificate,
4547
serversideRouting = null,
48+
ssrCallback,
4649
createChannel = channelConfig => new Channel(channelConfig)
4750
) {
4851
const channelConfig = new ChannelConfig(
@@ -89,7 +92,8 @@ export function createChannelConnection (
8992
chunker,
9093
config.notificationFilter,
9194
createProtocol,
92-
config.telemetryDisabled
95+
config.telemetryDisabled,
96+
ssrCallback
9397
)
9498

9599
// forward all pending bytes to the dechunker
@@ -110,9 +114,11 @@ export default class ChannelConnection extends Connection {
110114
* @param {ConnectionErrorHandler} errorHandler the error handler.
111115
* @param {ServerAddress} address - the server address to connect to.
112116
* @param {Logger} log - the configured logger.
113-
* @param {boolean} disableLosslessIntegers if this connection should convert all received integers to native JS numbers.
114-
* @param {Chunker} chunker the chunker
115-
* @param protocolSupplier Bolt protocol supplier
117+
* @param {boolean} disableLosslessIntegers - if this connection should convert all received integers to native JS numbers.
118+
* @param {Chunker} chunker - the chunker
119+
* @param protocolSupplier - Bolt protocol supplier
120+
* @param {boolean} telemetryDisabled - wether telemetry has been disabled in driver config.
121+
* @param ssrCallback - callback function used to update the counts of ssr enabled and disabled connections.
116122
*/
117123
constructor (
118124
channel,
@@ -124,7 +130,8 @@ export default class ChannelConnection extends Connection {
124130
chunker, // to be removed,
125131
notificationFilter,
126132
protocolSupplier,
127-
telemetryDisabled
133+
telemetryDisabled,
134+
ssrCallback = (_) => {}
128135
) {
129136
super(errorHandler)
130137
this._authToken = null
@@ -143,6 +150,7 @@ export default class ChannelConnection extends Connection {
143150
this._notificationFilter = notificationFilter
144151
this._telemetryDisabledDriverConfig = telemetryDisabled === true
145152
this._telemetryDisabledConnection = true
153+
this._ssrCallback = ssrCallback
146154

147155
// connection from the database, returned in response for HELLO message and might not be available
148156
this._dbConnectionId = null
@@ -331,7 +339,9 @@ export default class ChannelConnection extends Connection {
331339
if (telemetryEnabledHint === true) {
332340
this._telemetryDisabledConnection = false
333341
}
342+
this.SSREnabledHint = metadata.hints['ssr.enabled']
334343
}
344+
this._ssrCallback(this.SSREnabledHint ?? false, 'OPEN')
335345
}
336346
resolve(self)
337347
}
@@ -538,6 +548,7 @@ export default class ChannelConnection extends Connection {
538548
* @returns {Promise<void>} - A promise that will be resolved when the underlying channel is closed.
539549
*/
540550
async close () {
551+
this._ssrCallback(this.SSREnabledHint ?? false, 'CLOSE')
541552
if (this._log.isDebugEnabled()) {
542553
this._log.debug('closing')
543554
}

0 commit comments

Comments
 (0)