Skip to content

Improve connection/protocol layers #659

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import {
} from './internal/pool-config'
import Session from './session'
import RxSession from './session-rx'
import { ALL } from './internal/request-message'
import { FETCH_ALL } from './internal/bolt'
import { ENCRYPTION_ON, ENCRYPTION_OFF } from './internal/util'

const DEFAULT_MAX_CONNECTION_LIFETIME = 60 * 60 * 1000 // 1 hour
Expand Down Expand Up @@ -182,7 +182,7 @@ class Driver {
* @param {string|string[]} param.bookmarks - The initial reference or references to some previous
* transactions. Value is optional and absence indicates that that the bookmarks do not exist or are unknown.
* @param {number} param.fetchSize - The record fetch size of each batch of this session.
* Use {@link ALL} to always pull all records in one batch. This will override the config value set on driver config.
* Use {@link FETCH_ALL} to always pull all records in one batch. This will override the config value set on driver config.
* @param {string} param.database - The database this session will operate on.
* @return {Session} new session.
*/
Expand Down Expand Up @@ -369,12 +369,11 @@ function sanitizeIntValue (rawValue, defaultWhenAbsent) {
*/
function validateFetchSizeValue (rawValue, defaultWhenAbsent) {
const fetchSize = parseInt(rawValue, 10)
if (fetchSize > 0 || fetchSize === ALL) {
if (fetchSize > 0 || fetchSize === FETCH_ALL) {
return fetchSize
} else if (fetchSize === 0 || fetchSize < 0) {
throw new Error(
'The fetch size can only be a positive value or -1 for ALL. However fetchSize = ' +
fetchSize
`The fetch size can only be a positive value or ${FETCH_ALL} for ALL. However fetchSize = ${fetchSize}`
)
} else {
return defaultWhenAbsent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { newError } from '../error'
import { newError } from '../../error'
import { ResultStreamObserver } from './stream-observers'

/**
* @param {TxConfig} txConfig the auto-commit transaction configuration.
* @param {Connection} connection the connection.
* @param {function(error: string)} onProtocolError called when the txConfig is not empty.
* @param {ResultStreamObserver} observer the response observer.
*/
function assertTxConfigIsEmpty (txConfig, connection, observer) {
function assertTxConfigIsEmpty (txConfig, onProtocolError = () => {}, observer) {
if (txConfig && !txConfig.isEmpty()) {
const error = newError(
'Driver is connected to the database that does not support transaction configuration. ' +
'Please upgrade to neo4j 3.5.0 or later in order to use this functionality'
)

// unsupported API was used, consider this a fatal error for the current connection
connection._handleFatalError(error)
onProtocolError(error.message)
observer.onError(error)
throw error
}
Expand All @@ -41,17 +41,17 @@ function assertTxConfigIsEmpty (txConfig, connection, observer) {
/**
* Asserts that the passed-in database name is empty.
* @param {string} database
* @param {Connection} connection
* @param {fuction(err: String)} onProtocolError Called when it doesn't have database set
*/
function assertDatabaseIsEmpty (database, connection, observer) {
function assertDatabaseIsEmpty (database, onProtocolError = () => {}, observer) {
if (database) {
const error = newError(
'Driver is connected to the database that does not support multiple databases. ' +
'Please upgrade to neo4j 4.0.0 or later in order to use this functionality'
)

// unsupported API was used, consider this a fatal error for the current connection
connection._handleFatalError(error)
onProtocolError(error.message)
observer.onError(error)
throw error
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,56 @@ import {
assertDatabaseIsEmpty,
assertTxConfigIsEmpty
} from './bolt-protocol-util'
import Bookmark from './bookmark'
import { Chunker } from './chunking'
import Connection from './connection'
import { ACCESS_MODE_WRITE, BOLT_PROTOCOL_V1 } from './constants'
import * as v1 from './packstream-v1'
import { Packer } from './packstream-v1'
import Bookmark from '../bookmark'
import { Chunker } from '../chunking'
import { ACCESS_MODE_WRITE, BOLT_PROTOCOL_V1 } from '../constants'
import Logger from '../logger'
import * as v1 from '../packstream-v1'
import { Packer } from '../packstream-v1'
import RequestMessage from './request-message'
import {
LoginObserver,
ResetObserver,
ResultStreamObserver,
StreamObserver
} from './stream-observers'
import TxConfig from './tx-config'
import TxConfig from '../tx-config'

export default class BoltProtocol {
/**
* @callback CreateResponseHandler Creates the response handler
* @param {BoltProtocol} protocol The bolt protocol
* @returns {ResponseHandler} The response handler
*/
/**
* @callback OnProtocolError Handles protocol error
* @param {string} error The description
*/
/**
* @constructor
* @param {Connection} connection the connection.
* @param {Object} server the server informatio.
* @param {Chunker} chunker the chunker.
* @param {boolean} disableLosslessIntegers if this connection should convert all received integers to native JS numbers.
* @param {CreateResponseHandler} createResponseHandler Function which creates the response handler
* @param {Logger} log the logger
* @param {OnProtocolError} onProtocolError handles protocol errors
*/
constructor (connection, chunker, disableLosslessIntegers) {
this._connection = connection
constructor (
server,
chunker,
disableLosslessIntegers,
createResponseHandler = () => null,
log,
onProtocolError
) {
this._server = server || {}
this._chunker = chunker
this._packer = this._createPacker(chunker)
this._unpacker = this._createUnpacker(disableLosslessIntegers)
this._responseHandler = createResponseHandler(this)
this._log = log
this._onProtocolError = onProtocolError
this._fatalError = null
}

/**
Expand Down Expand Up @@ -91,16 +115,11 @@ export default class BoltProtocol {
*/
initialize ({ userAgent, authToken, onError, onComplete } = {}) {
const observer = new LoginObserver({
connection: this._connection,
afterError: onError,
afterComplete: onComplete
onError: error => this._onLoginError(error, onError),
onCompleted: metadata => this._onLoginCompleted(metadata, onComplete)
})

this._connection.write(
RequestMessage.init(userAgent, authToken),
observer,
true
)
this.write(RequestMessage.init(userAgent, authToken), observer, true)

return observer
}
Expand Down Expand Up @@ -252,7 +271,7 @@ export default class BoltProtocol {
} = {}
) {
const observer = new ResultStreamObserver({
connection: this._connection,
server: this._server,
beforeKeys,
afterKeys,
beforeError,
Expand All @@ -262,16 +281,12 @@ export default class BoltProtocol {
})

// bookmark and mode are ignored in this version of the protocol
assertTxConfigIsEmpty(txConfig, this._connection, observer)
assertTxConfigIsEmpty(txConfig, this._onProtocolError, observer)
// passing in a database name on this protocol version throws an error
assertDatabaseIsEmpty(database, this._connection, observer)
assertDatabaseIsEmpty(database, this._onProtocolError, observer)

this._connection.write(
RequestMessage.run(query, parameters),
observer,
false
)
this._connection.write(RequestMessage.pullAll(), observer, flush)
this.write(RequestMessage.run(query, parameters), observer, false)
this.write(RequestMessage.pullAll(), observer, flush)

return observer
}
Expand All @@ -285,12 +300,12 @@ export default class BoltProtocol {
*/
reset ({ onError, onComplete } = {}) {
const observer = new ResetObserver({
connection: this._connection,
onProtocolError: this._onProtocolError,
onError,
onComplete
})

this._connection.write(RequestMessage.reset(), observer, true)
this.write(RequestMessage.reset(), observer, true)

return observer
}
Expand All @@ -302,4 +317,116 @@ export default class BoltProtocol {
_createUnpacker (disableLosslessIntegers) {
return new v1.Unpacker(disableLosslessIntegers)
}

/**
* Write a message to the network channel.
* @param {RequestMessage} message the message to write.
* @param {StreamObserver} observer the response observer.
* @param {boolean} flush `true` if flush should happen after the message is written to the buffer.
*/
write (message, observer, flush) {
const queued = this.queueObserverIfProtocolIsNotBroken(observer)

if (queued) {
if (this._log.isDebugEnabled()) {
this._log.debug(`${this} C: ${message}`)
}

this.packer().packStruct(
message.signature,
message.fields.map(field => this.packer().packable(field))
)

this._chunker.messageBoundary()

if (flush) {
this._chunker.flush()
}
}
}

/**
* Notifies faltal erros to the observers and mark the protocol in the fatal error state.
* @param {Error} error The error
*/
notifyFatalError (error) {
this._fatalError = error
return this._responseHandler._notifyErrorToObservers(error)
}

/**
* Updates the the current observer with the next one on the queue.
*/
updateCurrentObserver () {
return this._responseHandler._updateCurrentObserver()
}

/**
* Checks if exist an ongoing observable requests
* @return {boolean}
*/
hasOngoingObservableRequests () {
return this._responseHandler.hasOngoingObservableRequests()
}

/**
* Enqueue the observer if the protocol is not broken.
* In case it's broken, the observer will be notified about the error.
*
* @param {StreamObserver} observer The observer
* @returns {boolean} if it was queued
*/
queueObserverIfProtocolIsNotBroken (observer) {
if (this.isBroken()) {
this.notifyFatalErrorToObserver(observer)
return false
}

return this._responseHandler._queueObserver(observer)
}

/**
* Veritfy the protocol is not broken.
* @returns {boolean}
*/
isBroken () {
return !!this._fatalError
}

/**
* Notifies the current fatal error to the observer
*
* @param {StreamObserver} observer The observer
*/
notifyFatalErrorToObserver (observer) {
if (observer && observer.onError) {
observer.onError(this._fatalError)
}
}

/**
* Reset current failure on the observable response handler to null.
*/
resetFailure () {
this._responseHandler._resetFailure()
}

_onLoginCompleted (metadata, onCompleted) {
if (metadata) {
const serverVersion = metadata.server
if (!this._server.version) {
this._server.version = serverVersion
}
}
if (onCompleted) {
onCompleted(metadata)
}
}

_onLoginError (error, onError) {
this._onProtocolError(error.message)
if (onError) {
onError(error)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
* limitations under the License.
*/
import BoltProtocolV1 from './bolt-protocol-v1'
import * as v2 from './packstream-v2'
import { BOLT_PROTOCOL_V2 } from './constants'
import * as v2 from '../packstream-v2'
import { BOLT_PROTOCOL_V2 } from '../constants'

export default class BoltProtocol extends BoltProtocolV1 {
_createPacker (chunker) {
Expand Down
Loading