Skip to content

Add supportsMultiDb function #491

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,19 @@ class Driver {
return connectivityVerifier.verify({ database })
}

/**
* Returns whether the server supports multi database capabilities based on the handshaked protocol
* version.
*
* Note that this function call _always_ causes a round-trip to the server.
*
* @returns {Promise<boolean>} promise resolved with a boolean or rejected with error.
*/
supportsMultiDb () {
const connectionProvider = this._getOrCreateConnectionProvider()
return connectionProvider.supportsMultiDb()
}

/**
* Acquire a session to communicate with the database. The session will
* borrow connections from the underlying connection pool as required and
Expand Down
25 changes: 16 additions & 9 deletions src/internal/bolt-protocol-v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import RequestMessage from './request-message'
import * as v1 from './packstream-v1'
import Bookmark from './bookmark'
import TxConfig from './tx-config'
import { ACCESS_MODE_WRITE } from './constants'
import Connection from './connection'
import { Chunker } from './chunking'
import { Packer } from './packstream-v1'
import {
assertDatabaseIsEmpty,
assertTxConfigIsEmpty
} from './bolt-protocol-util'
import Bookmark from './bookmark'
import { Chunker } from './chunking'
import Connection from './connection'
import { ACCESS_MODE_WRITE, BOLT_PROTOCOL_V1 } from './constants'
import * as v1 from './packstream-v1'
import { Packer } from './packstream-v1'
import RequestMessage from './request-message'
import {
ResultStreamObserver,
LoginObserver,
ResetObserver,
ResultStreamObserver,
StreamObserver
} from './stream-observers'
import TxConfig from './tx-config'

export default class BoltProtocol {
/**
Expand All @@ -48,6 +48,13 @@ export default class BoltProtocol {
this._unpacker = this._createUnpacker(disableLosslessIntegers)
}

/**
* Returns the numerical version identifier for this protocol
*/
get version () {
return BOLT_PROTOCOL_V1
}

/**
* Get the packer.
* @return {Packer} the protocol's packer.
Expand Down
5 changes: 5 additions & 0 deletions src/internal/bolt-protocol-v2.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
import BoltProtocolV1 from './bolt-protocol-v1'
import * as v2 from './packstream-v2'
import { BOLT_PROTOCOL_V2 } from './constants'

export default class BoltProtocol extends BoltProtocolV1 {
_createPacker (chunker) {
Expand All @@ -27,4 +28,8 @@ export default class BoltProtocol extends BoltProtocolV1 {
_createUnpacker (disableLosslessIntegers) {
return new v2.Unpacker(disableLosslessIntegers)
}

get version () {
return BOLT_PROTOCOL_V2
}
}
5 changes: 5 additions & 0 deletions src/internal/bolt-protocol-v3.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ import {
LoginObserver,
ResultStreamObserver
} from './stream-observers'
import { BOLT_PROTOCOL_V3 } from './constants'

const noOpObserver = new StreamObserver()

export default class BoltProtocol extends BoltProtocolV2 {
get version () {
return BOLT_PROTOCOL_V3
}

transformMetadata (metadata) {
if ('t_first' in metadata) {
// Bolt V3 uses shorter key 't_first' to represent 'result_available_after'
Expand Down
5 changes: 5 additions & 0 deletions src/internal/bolt-protocol-v4.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@
import BoltProtocolV3 from './bolt-protocol-v3'
import RequestMessage from './request-message'
import { ResultStreamObserver } from './stream-observers'
import { BOLT_PROTOCOL_V4 } from './constants'

export default class BoltProtocol extends BoltProtocolV3 {
get version () {
return BOLT_PROTOCOL_V4
}

beginTransaction ({
bookmark,
txConfig,
Expand Down
24 changes: 24 additions & 0 deletions src/internal/connection-provider-direct.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import PooledConnectionProvider from './connection-provider-pooled'
import DelegateConnection from './connection-delegate'
import ChannelConnection from './connection-channel'
import { BOLT_PROTOCOL_V4 } from './constants'

export default class DirectConnectionProvider extends PooledConnectionProvider {
constructor ({ id, config, log, address, userAgent, authToken }) {
Expand All @@ -36,4 +38,26 @@ export default class DirectConnectionProvider extends PooledConnectionProvider {
.acquire(this._address)
.then(connection => new DelegateConnection(connection, null))
}

async supportsMultiDb () {
const connection = ChannelConnection.create(
this._address,
this._config,
this._createConnectionErrorHandler(),
this._log
)

try {
await connection._negotiateProtocol()

const protocol = connection.protocol()
if (protocol) {
return protocol.version >= BOLT_PROTOCOL_V4
}

return false
} finally {
await connection.close()
}
}
}
64 changes: 53 additions & 11 deletions src/internal/connection-provider-routing.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@ import ConnectionErrorHandler from './connection-error-handler'
import DelegateConnection from './connection-delegate'
import LeastConnectedLoadBalancingStrategy from './least-connected-load-balancing-strategy'
import Bookmark from './bookmark'
import ChannelConnection from './connection-channel'
import { int } from '../integer'
import { BOLT_PROTOCOL_V4 } from './constants'

const UNAUTHORIZED_ERROR_CODE = 'Neo.ClientError.Security.Unauthorized'
const DATABASE_NOT_FOUND_ERROR_CODE =
'Neo.ClientError.Database.DatabaseNotFound'
const SYSTEM_DB_NAME = 'system'
const DEFAULT_DB_NAME = ''
const DEFAULT_ROUTING_TABLE_PURGE_DELAY = int(30000)

export default class RoutingConnectionProvider extends PooledConnectionProvider {
constructor ({
Expand All @@ -47,7 +51,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
config,
log,
userAgent,
authToken
authToken,
routingTablePurgeDelay
}) {
super({ id, config, log, userAgent, authToken })

Expand All @@ -61,6 +66,9 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
this._dnsResolver = new HostNameResolver()
this._log = log
this._useSeedRouter = true
this._routingTablePurgeDelay = routingTablePurgeDelay
? int(routingTablePurgeDelay)
: DEFAULT_ROUTING_TABLE_PURGE_DELAY
}

_createConnectionErrorHandler () {
Expand All @@ -71,23 +79,15 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider

_handleUnavailability (error, address, database) {
this._log.warn(
`Routing driver ${
this._id
} will forget ${address} for database '${database}' because of an error ${
error.code
} '${error.message}'`
`Routing driver ${this._id} will forget ${address} for database '${database}' because of an error ${error.code} '${error.message}'`
)
this.forget(address, database || '')
return error
}

_handleWriteFailure (error, address, database) {
this._log.warn(
`Routing driver ${
this._id
} will forget writer ${address} for database '${database}' because of an error ${
error.code
} '${error.message}'`
`Routing driver ${this._id} will forget writer ${address} for database '${database}' because of an error ${error.code} '${error.message}'`
)
this.forgetWriter(address, database || '')
return newError(
Expand Down Expand Up @@ -152,6 +152,41 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
}
}

async supportsMultiDb () {
const addresses = await this._resolveSeedRouter(this._seedRouter)

let lastError
for (let i = 0; i < addresses.length; i++) {
const connection = ChannelConnection.create(
addresses[i],
this._config,
this._createConnectionErrorHandler(),
this._log
)

try {
await connection._negotiateProtocol()

const protocol = connection.protocol()
if (protocol) {
return protocol.version >= BOLT_PROTOCOL_V4
}

return false
} catch (error) {
lastError = error
} finally {
await connection.close()
}
}

if (lastError) {
throw lastError
}

return false
}

forget (address, database) {
if (database || database === '') {
this._routingTables[database].forget(address)
Expand Down Expand Up @@ -427,6 +462,13 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
// close old connections to servers not present in the new routing table
await this._connectionPool.keepAll(newRoutingTable.allServers())

// filter out expired to purge (expired for a pre-configured amount of time) routing table entries
Object.values(this._routingTables).forEach(value => {
if (value.isExpiredFor(this._routingTablePurgeDelay)) {
delete this._routingTables[value.database]
}
})

// make this driver instance aware of the new table
this._routingTables[newRoutingTable.database] = newRoutingTable
this._log.info(`Updated routing table ${newRoutingTable}`)
Expand Down
10 changes: 10 additions & 0 deletions src/internal/connection-provider.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ export default class ConnectionProvider {
throw new Error('not implemented')
}

/**
* This method checks whether the backend database supports multi database functionality
* by checking protocol handshake result.
*
* @returns {Promise<boolean>}
*/
supportsMultiDb () {
throw new Error('not implemented')
}

/**
* Closes this connection provider along with its internals (connections, pools, etc.)
*
Expand Down
14 changes: 13 additions & 1 deletion src/internal/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,16 @@
const ACCESS_MODE_READ = 'READ'
const ACCESS_MODE_WRITE = 'WRITE'

export { ACCESS_MODE_READ, ACCESS_MODE_WRITE }
const BOLT_PROTOCOL_V1 = 1
const BOLT_PROTOCOL_V2 = 2
const BOLT_PROTOCOL_V3 = 3
const BOLT_PROTOCOL_V4 = 4

export {
ACCESS_MODE_READ,
ACCESS_MODE_WRITE,
BOLT_PROTOCOL_V1,
BOLT_PROTOCOL_V2,
BOLT_PROTOCOL_V3,
BOLT_PROTOCOL_V4
}
15 changes: 13 additions & 2 deletions src/internal/routing-table.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const MIN_ROUTERS = 1
export default class RoutingTable {
constructor ({ database, routers, readers, writers, expirationTime } = {}) {
this.database = database
this.databaseName = database || 'default database'
this.routers = routers || []
this.readers = readers || []
this.writers = writers || []
Expand Down Expand Up @@ -61,14 +62,24 @@ export default class RoutingTable {
)
}

/**
* Check if this routing table is expired for specified amount of duration
*
* @param {Integer} duration amount of duration in milliseconds to check for expiration
* @returns {boolean}
*/
isExpiredFor (duration) {
return this.expirationTime.add(duration).lessThan(Date.now())
}

allServers () {
return [...this.routers, ...this.readers, ...this.writers]
}

toString () {
return (
`RoutingTable[` +
`database=${this.database}, ` +
'RoutingTable[' +
`database=${this.databaseName}, ` +
`expirationTime=${this.expirationTime}, ` +
`currentTime=${Date.now()}, ` +
`routers=[${this.routers}], ` +
Expand Down
10 changes: 8 additions & 2 deletions test/internal/bolt-protocol-v1.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ describe('#unit BoltProtocolV1', () => {
const recorder = new utils.MessageRecordingConnection()
const protocol = new BoltProtocolV1(recorder, null, false)

const onError = error => {}
const onError = _error => {}
const onComplete = () => {}
const clientName = 'js-driver/1.2.3'
const authToken = { username: 'neo4j', password: 'secret' }
Expand Down Expand Up @@ -166,6 +166,12 @@ describe('#unit BoltProtocolV1', () => {
expect(recorder.flushes).toEqual([false, true])
})

it('should return correct bolt version number', () => {
const protocol = new BoltProtocolV1(null, null, false)

expect(protocol.version).toBe(1)
})

describe('Bolt V3', () => {
/**
* @param {function(protocol: BoltProtocolV1)} fn
Expand Down Expand Up @@ -202,7 +208,7 @@ describe('#unit BoltProtocolV1', () => {

describe('run', () => {
function verifyRun (txConfig) {
verifyError((protocol, observer) =>
verifyError((protocol, _observer) =>
protocol.run('statement', {}, { txConfig })
)
}
Expand Down
Loading