diff --git a/packages/bolt-connection/src/bolt/handshake.js b/packages/bolt-connection/src/bolt/handshake.js index 58250e2af..47c26def7 100644 --- a/packages/bolt-connection/src/bolt/handshake.js +++ b/packages/bolt-connection/src/bolt/handshake.js @@ -19,6 +19,8 @@ import { alloc } from '../channel' import { newError } from 'neo4j-driver-core' const BOLT_MAGIC_PREAMBLE = 0x6060b017 +const AVAILABLE_BOLT_PROTOCOLS = [5.8, 5.7, 5.6, 5.4, 5.3, 5.2, 5.1, 5.0, 4.4, 4.3, 4.2, 3.0] // bolt protocols the client will accept, ordered by preference +const DESIRED_CAPABILITES = 0 function version (major, minor) { return { @@ -70,15 +72,69 @@ function parseNegotiatedResponse (buffer, log) { return Number(h[3] + '.' + h[2]) } +function handshakeNegotiationV2 (channel, buffer, log) { + const numVersions = buffer.readVarInt() + let versions = [] + for (let i = 0; i < numVersions; i++) { + const versionRange = [ + buffer.readUInt8(), + buffer.readUInt8(), + buffer.readUInt8(), + buffer.readUInt8() + ] + versions = versions.concat(getVersions(versionRange)) + } + const capabilityBitMask = buffer.readVarInt() + const capabilites = selectCapabilites(capabilityBitMask) + + let major = 0 + let minor = 0 + versions.sort((a, b) => { + if (Number(a.major) !== Number(b.major)) { + return Number(b.major) - Number(a.major) + } else { + return Number(b.minor) - Number(a.minor) + } + }) + for (let i = 0; i < versions.length; i++) { + const version = versions[i] + if (AVAILABLE_BOLT_PROTOCOLS.includes(Number(version.major + '.' + version.minor))) { + major = version.major + minor = version.minor + break + } + } + + return new Promise((resolve, reject) => { + try { + const selectionBuffer = alloc(5) + selectionBuffer.writeInt32((minor << 8) | major) + selectionBuffer.writeVarInt(capabilites) + channel.write(selectionBuffer) + resolve({ + protocolVersion: Number(major + '.' + minor), + capabilites, + consumeRemainingBuffer: consumer => { + if (buffer.hasRemaining()) { + consumer(buffer.readSlice(buffer.remaining())) + } + } + }) + } catch (e) { + reject(e) + } + }) +} + /** * @return {BaseBuffer} * @private */ function newHandshakeBuffer () { return createHandshakeMessage([ + version(255, 1), [version(5, 8), version(5, 0)], [version(4, 4), version(4, 2)], - version(4, 1), version(3, 0) ]) } @@ -91,8 +147,10 @@ function newHandshakeBuffer () { /** * @typedef HandshakeResult * @property {number} protocolVersion The protocol version negotiated in the handshake + * @property {number} capabilites A bitmask representing the capabilities negotiated in the handshake * @property {function(BufferConsumerCallback)} consumeRemainingBuffer A function to consume the remaining buffer if it exists */ + /** * Shake hands using the channel and return the protocol version * @@ -101,6 +159,23 @@ function newHandshakeBuffer () { * @returns {Promise} Promise of protocol version and consumeRemainingBuffer */ export default function handshake (channel, log) { + return initialHandshake(channel, log).then((result) => { + if (result.protocolVersion === 255.1) { + return handshakeNegotiationV2(channel, result.buffer, log) + } else { + return result + } + }) +} + +/** + * Shake hands using the channel and return the protocol version, or the improved handshake protocol if communicating with a newer server. + * + * @param {Channel} channel the channel use to shake hands + * @param {Logger} log the log object + * @returns {Promise} Promise of protocol version and consumeRemainingBuffer + */ +function initialHandshake (channel, log) { return new Promise((resolve, reject) => { const handshakeErrorHandler = error => { reject(error) @@ -115,9 +190,10 @@ export default function handshake (channel, log) { try { // read the response buffer and initialize the protocol const protocolVersion = parseNegotiatedResponse(buffer, log) - resolve({ protocolVersion, + capabilites: 0, + buffer, consumeRemainingBuffer: consumer => { if (buffer.hasRemaining()) { consumer(buffer.readSlice(buffer.remaining())) @@ -132,3 +208,17 @@ export default function handshake (channel, log) { channel.write(newHandshakeBuffer()) }) } + +function getVersions (versionArray) { + const resultArr = [] + const major = versionArray[3] + const minor = versionArray[2] + for (let i = 0; i <= versionArray[1]; i++) { + resultArr.push({ major, minor: minor - i }) + } + return resultArr +} + +function selectCapabilites (capabilityBitMask) { + return DESIRED_CAPABILITES // capabilites are currently unused and will always be 0. +} diff --git a/packages/bolt-connection/src/buf/base-buf.js b/packages/bolt-connection/src/buf/base-buf.js index 93d5f231d..871d8c996 100644 --- a/packages/bolt-connection/src/buf/base-buf.js +++ b/packages/bolt-connection/src/buf/base-buf.js @@ -47,6 +47,10 @@ export default class BaseBuffer { throw new Error('Not implemented') } + getVarInt (position) { + throw new Error('Not implemented') + } + putUInt8 (position, val) { throw new Error('Not implemented') } @@ -178,6 +182,20 @@ export default class BaseBuffer { this.putUInt8(p + 7, val & 0xff) } + putVarInt (p, val) { + let length = 0 + while (val > 1) { + let int = val % 128 + if (val >= 128) { + int += 128 + } + val = val / 128 + this.putUInt8(p + length, int) + length += 1 + } + return length + } + /** * @param position * @param other @@ -244,6 +262,15 @@ export default class BaseBuffer { return this.getFloat64(this._updatePos(8)) } + /** + * Read from state position + */ + readVarInt () { + const int = this.getVarInt(this.position) + this._updatePos(int.length) + return int.value + } + /** * Write to state position. * @param val @@ -300,6 +327,11 @@ export default class BaseBuffer { this.putFloat64(this._updatePos(8), val) } + writeVarInt (val) { + const length = this.putVarInt(this.position, val) + this._updatePos(length) + } + /** * Write to state position. * @param val diff --git a/packages/bolt-connection/src/channel/channel-buf.js b/packages/bolt-connection/src/channel/channel-buf.js index 8fabb2114..a5763aa44 100644 --- a/packages/bolt-connection/src/channel/channel-buf.js +++ b/packages/bolt-connection/src/channel/channel-buf.js @@ -37,6 +37,18 @@ export default class ChannelBuffer extends BaseBuffer { return this._buffer.readDoubleBE(position) } + getVarInt (position) { + let i = 0 + let currentValue = this._buffer.readInt8(position + i) + let total = currentValue % 128 + while (currentValue / 128 >= 1) { + i += 1 + currentValue = this._buffer.readInt8(position + i) + total += currentValue % 128 + } + return { length: i + 1, value: total } + } + putUInt8 (position, val) { this._buffer.writeUInt8(val, position) } diff --git a/packages/bolt-connection/test/bolt/index.test.js b/packages/bolt-connection/test/bolt/index.test.js index 7e3c1303b..6fb17e146 100644 --- a/packages/bolt-connection/test/bolt/index.test.js +++ b/packages/bolt-connection/test/bolt/index.test.js @@ -48,13 +48,13 @@ describe('#unit Bolt', () => { const writtenBuffer = channel.written[0] const boltMagicPreamble = '60 60 b0 17' - const protocolVersion5x7to5x0 = '00 08 08 05' + const handshakev2 = '00 00 01 ff' + const protocolVersion5x8to5x0 = '00 08 08 05' const protocolVersion4x4to4x2 = '00 02 04 04' - const protocolVersion4x1 = '00 00 01 04' const protocolVersion3 = '00 00 00 03' expect(writtenBuffer.toHex()).toEqual( - `${boltMagicPreamble} ${protocolVersion5x7to5x0} ${protocolVersion4x4to4x2} ${protocolVersion4x1} ${protocolVersion3}` + `${boltMagicPreamble} ${handshakev2} ${protocolVersion5x8to5x0} ${protocolVersion4x4to4x2} ${protocolVersion3}` ) }) diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/bolt/handshake.js b/packages/neo4j-driver-deno/lib/bolt-connection/bolt/handshake.js index caff6d596..9ad9846c1 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/bolt/handshake.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/bolt/handshake.js @@ -19,6 +19,8 @@ import { alloc } from '../channel/index.js' import { newError } from '../../core/index.ts' const BOLT_MAGIC_PREAMBLE = 0x6060b017 +const AVAILABLE_BOLT_PROTOCOLS = [5.8, 5.7, 5.6, 5.4, 5.3, 5.2, 5.1, 5.0, 4.4, 4.3, 4.2, 3.0] // bolt protocols the client will accept, ordered by preference +const DESIRED_CAPABILITES = 0 function version (major, minor) { return { @@ -70,15 +72,69 @@ function parseNegotiatedResponse (buffer, log) { return Number(h[3] + '.' + h[2]) } +function handshakeNegotiationV2 (channel, buffer, log) { + const numVersions = buffer.readVarInt() + let versions = [] + for (let i = 0; i < numVersions; i++) { + const versionRange = [ + buffer.readUInt8(), + buffer.readUInt8(), + buffer.readUInt8(), + buffer.readUInt8() + ] + versions = versions.concat(getVersions(versionRange)) + } + const capabilityBitMask = buffer.readVarInt() + const capabilites = selectCapabilites(capabilityBitMask) + + let major = 0 + let minor = 0 + versions.sort((a, b) => { + if (Number(a.major) !== Number(b.major)) { + return Number(b.major) - Number(a.major) + } else { + return Number(b.minor) - Number(a.minor) + } + }) + for (let i = 0; i < versions.length; i++) { + const version = versions[i] + if (AVAILABLE_BOLT_PROTOCOLS.includes(Number(version.major + '.' + version.minor))) { + major = version.major + minor = version.minor + break + } + } + + return new Promise((resolve, reject) => { + try { + const selectionBuffer = alloc(5) + selectionBuffer.writeInt32((minor << 8) | major) + selectionBuffer.writeVarInt(capabilites) + channel.write(selectionBuffer) + resolve({ + protocolVersion: Number(major + '.' + minor), + capabilites, + consumeRemainingBuffer: consumer => { + if (buffer.hasRemaining()) { + consumer(buffer.readSlice(buffer.remaining())) + } + } + }) + } catch (e) { + reject(e) + } + }) +} + /** * @return {BaseBuffer} * @private */ function newHandshakeBuffer () { return createHandshakeMessage([ + version(255, 1), [version(5, 8), version(5, 0)], [version(4, 4), version(4, 2)], - version(4, 1), version(3, 0) ]) } @@ -91,8 +147,10 @@ function newHandshakeBuffer () { /** * @typedef HandshakeResult * @property {number} protocolVersion The protocol version negotiated in the handshake + * @property {number} capabilites A bitmask representing the capabilities negotiated in the handshake * @property {function(BufferConsumerCallback)} consumeRemainingBuffer A function to consume the remaining buffer if it exists */ + /** * Shake hands using the channel and return the protocol version * @@ -101,6 +159,23 @@ function newHandshakeBuffer () { * @returns {Promise} Promise of protocol version and consumeRemainingBuffer */ export default function handshake (channel, log) { + return initialHandshake(channel, log).then((result) => { + if (result.protocolVersion === 255.1) { + return handshakeNegotiationV2(channel, result.buffer, log) + } else { + return result + } + }) +} + +/** + * Shake hands using the channel and return the protocol version, or the improved handshake protocol if communicating with a newer server. + * + * @param {Channel} channel the channel use to shake hands + * @param {Logger} log the log object + * @returns {Promise} Promise of protocol version and consumeRemainingBuffer + */ +function initialHandshake (channel, log) { return new Promise((resolve, reject) => { const handshakeErrorHandler = error => { reject(error) @@ -115,9 +190,10 @@ export default function handshake (channel, log) { try { // read the response buffer and initialize the protocol const protocolVersion = parseNegotiatedResponse(buffer, log) - resolve({ protocolVersion, + capabilites: 0, + buffer, consumeRemainingBuffer: consumer => { if (buffer.hasRemaining()) { consumer(buffer.readSlice(buffer.remaining())) @@ -132,3 +208,17 @@ export default function handshake (channel, log) { channel.write(newHandshakeBuffer()) }) } + +function getVersions (versionArray) { + const resultArr = [] + const major = versionArray[3] + const minor = versionArray[2] + for (let i = 0; i <= versionArray[1]; i++) { + resultArr.push({ major, minor: minor - i }) + } + return resultArr +} + +function selectCapabilites (capabilityBitMask) { + return DESIRED_CAPABILITES // capabilites are currently unused and will always be 0. +} diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/buf/base-buf.js b/packages/neo4j-driver-deno/lib/bolt-connection/buf/base-buf.js index 93d5f231d..871d8c996 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/buf/base-buf.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/buf/base-buf.js @@ -47,6 +47,10 @@ export default class BaseBuffer { throw new Error('Not implemented') } + getVarInt (position) { + throw new Error('Not implemented') + } + putUInt8 (position, val) { throw new Error('Not implemented') } @@ -178,6 +182,20 @@ export default class BaseBuffer { this.putUInt8(p + 7, val & 0xff) } + putVarInt (p, val) { + let length = 0 + while (val > 1) { + let int = val % 128 + if (val >= 128) { + int += 128 + } + val = val / 128 + this.putUInt8(p + length, int) + length += 1 + } + return length + } + /** * @param position * @param other @@ -244,6 +262,15 @@ export default class BaseBuffer { return this.getFloat64(this._updatePos(8)) } + /** + * Read from state position + */ + readVarInt () { + const int = this.getVarInt(this.position) + this._updatePos(int.length) + return int.value + } + /** * Write to state position. * @param val @@ -300,6 +327,11 @@ export default class BaseBuffer { this.putFloat64(this._updatePos(8), val) } + writeVarInt (val) { + const length = this.putVarInt(this.position, val) + this._updatePos(length) + } + /** * Write to state position. * @param val diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/channel/channel-buf.js b/packages/neo4j-driver-deno/lib/bolt-connection/channel/channel-buf.js index c433c59b3..c0674f5d7 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/channel/channel-buf.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/channel/channel-buf.js @@ -37,6 +37,18 @@ export default class ChannelBuffer extends BaseBuffer { return this._buffer.readDoubleBE(position) } + getVarInt (position) { + let i = 0 + let currentValue = this._buffer.readInt8(position + i) + let total = currentValue % 128 + while (currentValue / 128 >= 1) { + i += 1 + currentValue = this._buffer.readInt8(position + i) + total += currentValue % 128 + } + return { length: i + 1, value: total } + } + putUInt8 (position, val) { this._buffer.writeUInt8(val, position) } diff --git a/packages/testkit-backend/src/feature/common.js b/packages/testkit-backend/src/feature/common.js index 19253b8e5..2ceed6de5 100644 --- a/packages/testkit-backend/src/feature/common.js +++ b/packages/testkit-backend/src/feature/common.js @@ -15,7 +15,6 @@ const features = [ 'ConfHint:connection.recv_timeout_seconds', 'Feature:Impersonation', 'Feature:Bolt:3.0', - 'Feature:Bolt:4.1', 'Feature:Bolt:4.2', 'Feature:Bolt:4.3', 'Feature:Bolt:4.4', @@ -28,6 +27,7 @@ const features = [ 'Feature:Bolt:5.6', 'Feature:Bolt:5.7', 'Feature:Bolt:5.8', + 'Feature:Bolt:HandshakeManifestV1', 'Feature:Bolt:Patch:UTC', 'Feature:API:ConnectionAcquisitionTimeout', 'Feature:API:Driver.ExecuteQuery',