diff --git a/src/v1/internal/bolt-protocol-v1.js b/src/v1/internal/bolt-protocol-v1.js index 001c676bd..01f7c0f46 100644 --- a/src/v1/internal/bolt-protocol-v1.js +++ b/src/v1/internal/bolt-protocol-v1.js @@ -49,6 +49,15 @@ export default class BoltProtocol { return this._unpacker; } + /** + * Transform metadata received in SUCCESS message before it is passed to the handler. + * @param {object} metadata the received metadata. + * @return {object} transformed metadata. + */ + transformMetadata(metadata) { + return metadata; + } + /** * Perform initialization and authentication of the underlying connection. * @param {string} clientName the client name. diff --git a/src/v1/internal/bolt-protocol-v3.js b/src/v1/internal/bolt-protocol-v3.js new file mode 100644 index 000000000..bea3df77f --- /dev/null +++ b/src/v1/internal/bolt-protocol-v3.js @@ -0,0 +1,82 @@ +/** + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import BoltProtocolV2 from './bolt-protocol-v2'; +import RequestMessage from './request-message'; + +export default class BoltProtocol extends BoltProtocolV2 { + + constructor(connection, chunker, disableLosslessIntegers) { + super(connection, chunker, disableLosslessIntegers); + } + + transformMetadata(metadata) { + if (metadata.t_first) { + // Bolt V3 uses shorter key 't_first' to represent 'result_available_after' + // adjust the key to be the same as in Bolt V1 so that ResultSummary can retrieve the value + metadata.result_available_after = metadata.t_first; + delete metadata.t_first; + } + if (metadata.t_last) { + // Bolt V3 uses shorter key 't_last' to represent 'result_consumed_after' + // adjust the key to be the same as in Bolt V1 so that ResultSummary can retrieve the value + metadata.result_consumed_after = metadata.t_last; + delete metadata.t_last; + } + return metadata; + } + + initialize(userAgent, authToken, observer) { + prepareToHandleSingleResponse(observer); + const message = RequestMessage.hello(userAgent, authToken); + this._connection.write(message, observer, true); + } + + beginTransaction(bookmark, observer) { + prepareToHandleSingleResponse(observer); + const message = RequestMessage.begin(bookmark); + this._connection.write(message, observer, true); + } + + commitTransaction(observer) { + prepareToHandleSingleResponse(observer); + const message = RequestMessage.commit(); + this._connection.write(message, observer, true); + } + + rollbackTransaction(observer) { + prepareToHandleSingleResponse(observer); + const message = RequestMessage.rollback(); + this._connection.write(message, observer, true); + } + + run(statement, parameters, observer) { + const metadata = {}; + const runMessage = RequestMessage.runWithMetadata(statement, parameters, metadata); + const pullAllMessage = RequestMessage.pullAll(); + + this._connection.write(runMessage, observer, false); + this._connection.write(pullAllMessage, observer, true); + } +} + +function prepareToHandleSingleResponse(observer) { + if (observer && typeof observer.prepareToHandleSingleResponse === 'function') { + observer.prepareToHandleSingleResponse(); + } +} diff --git a/src/v1/internal/bookmark.js b/src/v1/internal/bookmark.js index c517c26f9..c059e737b 100644 --- a/src/v1/internal/bookmark.js +++ b/src/v1/internal/bookmark.js @@ -52,6 +52,14 @@ export default class Bookmark { return this._maxValue; } + /** + * Get all bookmark values as an array. + * @return {string[]} all values. + */ + values() { + return this._values; + } + /** * Get this bookmark as an object for begin transaction call. * @return {object} the value of this bookmark as object. diff --git a/src/v1/internal/connection.js b/src/v1/internal/connection.js index 69880bde3..4f297644e 100644 --- a/src/v1/internal/connection.js +++ b/src/v1/internal/connection.js @@ -267,7 +267,8 @@ export default class Connection { this._log.debug(`${this} S: SUCCESS ${JSON.stringify(msg)}`); } try { - this._currentObserver.onCompleted( payload ); + const metadata = this._protocol.transformMetadata(payload); + this._currentObserver.onCompleted(metadata); } finally { this._updateCurrentObserver(); } diff --git a/src/v1/internal/protocol-handshaker.js b/src/v1/internal/protocol-handshaker.js index 276c9d2db..d940373a7 100644 --- a/src/v1/internal/protocol-handshaker.js +++ b/src/v1/internal/protocol-handshaker.js @@ -21,6 +21,7 @@ import {alloc} from './buf'; import {newError} from '../error'; import BoltProtocolV1 from './bolt-protocol-v1'; import BoltProtocolV2 from './bolt-protocol-v2'; +import BoltProtocolV3 from './bolt-protocol-v3'; const HTTP_MAGIC_PREAMBLE = 1213486160; // == 0x48545450 == "HTTP" const BOLT_MAGIC_PREAMBLE = 0x6060B017; @@ -69,15 +70,18 @@ export default class ProtocolHandshaker { * @private */ _createProtocolWithVersion(version) { - if (version === 1) { - return new BoltProtocolV1(this._connection, this._chunker, this._disableLosslessIntegers); - } else if (version === 2) { - return new BoltProtocolV2(this._connection, this._chunker, this._disableLosslessIntegers); - } else if (version === HTTP_MAGIC_PREAMBLE) { - throw newError('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' + - '(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)'); - } else { - throw newError('Unknown Bolt protocol version: ' + version); + switch (version) { + case 1: + return new BoltProtocolV1(this._connection, this._chunker, this._disableLosslessIntegers); + case 2: + return new BoltProtocolV2(this._connection, this._chunker, this._disableLosslessIntegers); + case 3: + return new BoltProtocolV3(this._connection, this._chunker, this._disableLosslessIntegers); + case HTTP_MAGIC_PREAMBLE: + throw newError('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' + + '(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)'); + default: + throw newError('Unknown Bolt protocol version: ' + version); } } } @@ -93,10 +97,10 @@ function newHandshakeBuffer() { handshakeBuffer.writeInt32(BOLT_MAGIC_PREAMBLE); //proposed versions + handshakeBuffer.writeInt32(3); handshakeBuffer.writeInt32(2); handshakeBuffer.writeInt32(1); handshakeBuffer.writeInt32(0); - handshakeBuffer.writeInt32(0); // reset the reader position handshakeBuffer.reset(); diff --git a/src/v1/internal/request-message.js b/src/v1/internal/request-message.js index a9255a7b2..e18adb303 100644 --- a/src/v1/internal/request-message.js +++ b/src/v1/internal/request-message.js @@ -18,12 +18,17 @@ */ // Signature bytes for each request message type -const INIT = 0x01; // 0000 0001 // INIT +const INIT = 0x01; // 0000 0001 // INIT const ACK_FAILURE = 0x0E; // 0000 1110 // ACK_FAILURE - unused const RESET = 0x0F; // 0000 1111 // RESET const RUN = 0x10; // 0001 0000 // RUN -const DISCARD_ALL = 0x2F; // 0010 1111 // DISCARD * - unused -const PULL_ALL = 0x3F; // 0011 1111 // PULL * +const DISCARD_ALL = 0x2F; // 0010 1111 // DISCARD_ALL - unused +const PULL_ALL = 0x3F; // 0011 1111 // PULL_ALL + +const HELLO = 0x01; // 0000 0001 // HELLO +const BEGIN = 0x11; // 0001 0001 // BEGIN +const COMMIT = 0x12; // 0001 0010 // COMMIT +const ROLLBACK = 0x13; // 0001 0011 // ROLLBACK export default class RequestMessage { @@ -68,8 +73,59 @@ export default class RequestMessage { static reset() { return RESET_MESSAGE; } + + /** + * Create a new HELLO message. + * @param {string} userAgent the user agent. + * @param {object} authToken the authentication token. + * @return {RequestMessage} new HELLO message. + */ + static hello(userAgent, authToken) { + const metadata = Object.assign({user_agent: userAgent}, authToken); + return new RequestMessage(HELLO, [metadata], () => `HELLO {user_agent: '${userAgent}', ...}`); + } + + /** + * Create a new BEGIN message. + * @param {Bookmark} bookmark the bookmark. + * @return {RequestMessage} new BEGIN message. + */ + static begin(bookmark) { + const metadata = {bookmarks: bookmark.values()}; + return new RequestMessage(BEGIN, [metadata], () => `BEGIN ${JSON.stringify(metadata)}`); + } + + /** + * Get a COMMIT message. + * @return {RequestMessage} the COMMIT message. + */ + static commit() { + return COMMIT_MESSAGE; + } + + /** + * Get a ROLLBACK message. + * @return {RequestMessage} the ROLLBACK message. + */ + static rollback() { + return ROLLBACK_MESSAGE; + } + + /** + * Create a new RUN message with additional metadata. + * @param {string} statement the cypher statement. + * @param {object} parameters the statement parameters. + * @param {object} metadata the additional metadata. + * @return {RequestMessage} new RUN message with additional metadata. + */ + static runWithMetadata(statement, parameters, metadata) { + return new RequestMessage(RUN, [statement, parameters, metadata], + () => `RUN ${statement} ${JSON.stringify(parameters)} ${JSON.stringify(metadata)}`); + } } // constants for messages that never change const PULL_ALL_MESSAGE = new RequestMessage(PULL_ALL, [], () => 'PULL_ALL'); const RESET_MESSAGE = new RequestMessage(RESET, [], () => 'RESET'); +const COMMIT_MESSAGE = new RequestMessage(COMMIT, [], () => 'COMMIT'); +const ROLLBACK_MESSAGE = new RequestMessage(ROLLBACK, [], () => 'ROLLBACK'); diff --git a/src/v1/internal/stream-observer.js b/src/v1/internal/stream-observer.js index 398276d56..c6794b7e2 100644 --- a/src/v1/internal/stream-observer.js +++ b/src/v1/internal/stream-observer.js @@ -99,6 +99,20 @@ class StreamObserver { this._conn = conn; } + /** + * Stream observer defaults to handling responses for two messages: RUN + PULL_ALL or RUN + DISCARD_ALL. + * Response for RUN initializes statement keys. Response for PULL_ALL / DISCARD_ALL exposes the result stream. + * + * However, some operations can be represented as a single message which receives full metadata in a single response. + * For example, operations to begin, commit and rollback an explicit transaction use two messages in Bolt V1 but a single message in Bolt V3. + * Messages are `RUN "BEGIN" {}` + `PULL_ALL` in Bolt V1 and `BEGIN` in Bolt V3. + * + * This function prepares the observer to only handle a single response message. + */ + prepareToHandleSingleResponse() { + this._fieldKeys = []; + } + /** * Will be called on errors. * If user-provided observer is present, pass the error diff --git a/test/internal/bolt-protocol-v1.test.js b/test/internal/bolt-protocol-v1.test.js index b0bb65304..93b732dc8 100644 --- a/test/internal/bolt-protocol-v1.test.js +++ b/test/internal/bolt-protocol-v1.test.js @@ -44,6 +44,15 @@ class MessageRecorder { describe('BoltProtocolV1', () => { + it('should not change metadata', () => { + const metadata = {result_available_after: 1, result_consumed_after: 2, t_first: 3, t_last: 4}; + const protocol = new BoltProtocolV1(new MessageRecorder(), null, false); + + const transformedMetadata = protocol.transformMetadata(metadata); + + expect(transformedMetadata).toEqual({result_available_after: 1, result_consumed_after: 2, t_first: 3, t_last: 4}); + }); + it('should initialize the connection', () => { const recorder = new MessageRecorder(); const protocol = new BoltProtocolV1(recorder, null, false); diff --git a/test/internal/bolt-protocol-v3.test.js b/test/internal/bolt-protocol-v3.test.js new file mode 100644 index 000000000..837deee14 --- /dev/null +++ b/test/internal/bolt-protocol-v3.test.js @@ -0,0 +1,33 @@ +/** + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import BoltProtocolV3 from '../../src/v1/internal/bolt-protocol-v3'; + +describe('BoltProtocolV3', () => { + + it('should update metadata', () => { + const metadata = {t_first: 1, t_last: 2, db_hits: 3, some_other_key: 4}; + const protocol = new BoltProtocolV3(null, null, false); + + const transformedMetadata = protocol.transformMetadata(metadata); + + expect(transformedMetadata).toEqual({result_available_after: 1, result_consumed_after: 2, db_hits: 3, some_other_key: 4}); + }); + +}); diff --git a/test/internal/bookmark.test.js b/test/internal/bookmark.test.js index e2b10fcb8..b910553bd 100644 --- a/test/internal/bookmark.test.js +++ b/test/internal/bookmark.test.js @@ -118,4 +118,15 @@ describe('Bookmark', () => { }); }); + it('should expose bookmark values', () => { + expect(new Bookmark(undefined).values()).toEqual([]); + expect(new Bookmark(null).values()).toEqual([]); + + const bookmarkString = 'neo4j:bookmark:v1:tx123'; + expect(new Bookmark(bookmarkString).values()).toEqual([bookmarkString]); + + const bookmarkStrings = ['neo4j:bookmark:v1:tx1', 'neo4j:bookmark:v1:tx2', 'neo4j:bookmark:v1:tx3']; + expect(new Bookmark(bookmarkStrings).values()).toEqual(bookmarkStrings); + }); + }); diff --git a/test/internal/connection.test.js b/test/internal/connection.test.js index 7b63e7b28..8191e8c68 100644 --- a/test/internal/connection.test.js +++ b/test/internal/connection.test.js @@ -28,7 +28,6 @@ import {ServerVersion} from '../../src/v1/internal/server-version'; import lolex from 'lolex'; import Logger from '../../src/v1/internal/logger'; import StreamObserver from '../../src/v1/internal/stream-observer'; -import RequestMessage from '../../src/v1/internal/request-message'; import ConnectionErrorHandler from '../../src/v1/internal/connection-error-handler'; import testUtils from '../internal/test-utils'; @@ -88,16 +87,17 @@ describe('Connection', () => { records.push(record); }, onCompleted: () => { - expect(records[0][0]).toBe(1); + expect(records[0].get(0)).toBe(1); done(); } }; + const streamObserver = new StreamObserver(); + streamObserver.subscribe(pullAllObserver); - connection._negotiateProtocol().then(() => { - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken()); - connection.write(RequestMessage.run('RETURN 1.0', {}), {}, false); - connection.write(RequestMessage.pullAll(), pullAllObserver, true); - }); + connection.connect('mydriver/0.0.0', basicAuthToken()) + .then(() => { + connection.protocol().run('RETURN 1.0', {}, streamObserver); + }); }); it('should write protocol handshake', () => { @@ -107,10 +107,11 @@ describe('Connection', () => { connection._negotiateProtocol(); const boltMagicPreamble = '60 60 b0 17'; + const protocolVersion3 = '00 00 00 03'; const protocolVersion2 = '00 00 00 02'; const protocolVersion1 = '00 00 00 01'; const noProtocolVersion = '00 00 00 00'; - expect(observer.instance.toHex()).toBe(`${boltMagicPreamble} ${protocolVersion2} ${protocolVersion1} ${noProtocolVersion} ${noProtocolVersion} `); + expect(observer.instance.toHex()).toBe(`${boltMagicPreamble} ${protocolVersion3} ${protocolVersion2} ${protocolVersion1} ${noProtocolVersion} `); }); it('should provide error message when connecting to http-port', done => { diff --git a/test/internal/protocol-handshaker.test.js b/test/internal/protocol-handshaker.test.js index eb238d856..c292576e5 100644 --- a/test/internal/protocol-handshaker.test.js +++ b/test/internal/protocol-handshaker.test.js @@ -37,11 +37,12 @@ describe('ProtocolHandshaker', () => { expect(writtenBuffers.length).toEqual(1); const boltMagicPreamble = '60 60 b0 17'; + const protocolVersion3 = '00 00 00 03'; const protocolVersion2 = '00 00 00 02'; const protocolVersion1 = '00 00 00 01'; const noProtocolVersion = '00 00 00 00'; - expect(writtenBuffers[0].toHex()).toEqual(`${boltMagicPreamble} ${protocolVersion2} ${protocolVersion1} ${noProtocolVersion} ${noProtocolVersion} `); + expect(writtenBuffers[0].toHex()).toEqual(`${boltMagicPreamble} ${protocolVersion3} ${protocolVersion2} ${protocolVersion1} ${noProtocolVersion} `); }); it('should create protocol with valid version', () => { diff --git a/test/internal/stream-observer.test.js b/test/internal/stream-observer.test.js index 994c733c8..be322cdbc 100644 --- a/test/internal/stream-observer.test.js +++ b/test/internal/stream-observer.test.js @@ -158,6 +158,20 @@ describe('StreamObserver', () => { expect(errors).toEqual([error1]); }); + it('should be able to handle a single response', done => { + const streamObserver = new StreamObserver(); + streamObserver.prepareToHandleSingleResponse(); + + streamObserver.subscribe({ + onCompleted: metadata => { + expect(metadata.key).toEqual(42); + done(); + } + }); + + streamObserver.onCompleted({key: 42}); + }); + }); function newStreamObserver() {