diff --git a/README.md b/README.md
index f57542d..ccab1e2 100755
--- a/README.md
+++ b/README.md
@@ -26,7 +26,7 @@ Code architecture and some features in version 3 borrowed from the [ioredis](htt
## Installation
-```
+```Bash
npm install --save tarantool-driver
```
## Configuration
@@ -38,18 +38,26 @@ Creates a Tarantool instance, extends [EventEmitter](http://nodejs.org/api/event
Connection related custom events:
* "reconnecting" - emitted when the client try to reconnect, first argument is retry delay in ms.
* "connect" - emitted when the client connected and auth passed (if username and password provided), first argument is an object with host and port of the Taranool server.
+* "change_host" - emitted when `nonWritableHostPolicy` option is set and write error occurs, first argument is the text of error which provoked the host to be changed.
| Param | Type | Default | Description |
| --- | --- | --- | --- |
| [port] | number
\| string
\| Object
| 3301
| Port of the Tarantool server, or a URI string (see the examples in [tarantool configuration doc](https://tarantool.org/en/doc/reference/configuration/index.html#uri)), or the `options` object(see the third argument). |
| [host] | string
\| Object
| "localhost"
| Host of the Tarantool server, when the first argument is a URL string, this argument is an object represents the options. |
-| [options] | Object
| | Other options. |
+| [path] | string
\| Object
| null
| Unix socket path of the Tarantool server. |
+| [options] | Object
| | Other options, including all from [net.createConnection](https://nodejs.org/api/net.html#netcreateconnection). |
| [options.port] | number
| 6379
| Port of the Tarantool server. |
| [options.host] | string
| "localhost"
| Host of the Tarantool server. |
| [options.username] | string
| null
| If set, client will authenticate with the value of this option when connected. |
| [options.password] | string
| null
| If set, client will authenticate with the value of this option when connected. |
| [options.timeout] | number
| 0
| The milliseconds before a timeout occurs during the initial connection to the Tarantool server. |
+| [options.tls] | Object
| null
| If specified, forces to use `tls` module instead of the default `net`. In object properties you can specify any TLS-related options, e.g. from the [tls.createSecureContext()](https://nodejs.org/api/tls.html#tlscreatesecurecontextoptions) |
+| [options.keepAlive] | boolean
| true
| Enables keep-alive functionality (recommended). |
+| [options.noDelay] | boolean
| true
| Disables the use of Nagle's algorithm (recommended). |
| [options.lazyConnect] | boolean
| false
| By default, When a new `Tarantool` instance is created, it will connect to Tarantool server automatically. If you want to keep disconnected util a command is called, you can pass the `lazyConnect` option to the constructor. |
+| [options.nonWritableHostPolicy] | string
| null
| What to do when Tarantool server rejects write operation, e.g. because of `box.cfg.read_only` set to `true` or during snapshot fetching.
Possible values are:
- `null`: just rejects Promise with an error
- `changeHost`: disconnect from the current host and connect to the next from `reserveHosts`. Pending Promise will be rejected.
- `changeAndRetry`: same as `changeHost`, but after reconnecting tries to run the command again in order to fullfil the Promise |
+| [options.maxRetriesPerRequest] | number
| 5
| Number of attempts to find the alive host if `nonWritableHostPolicy` is not null. |
+| [options.enableOfflineQueue] | boolean
| true
| By default, if there is no active connection to the Tarantool server, commands are added to a queue and are executed once the connection is "ready", meaning the connection to the Tarantool server has been established and auth passed (`connect` event is also executed at this moment). If this option is false, when execute the command when the connection isn't ready, an error will be returned. |
| [options.reserveHosts] | array
| [] | Array of [strings](https://tarantool.org/en/doc/reference/configuration/index.html?highlight=uri#uri) - reserve hosts. Client will try to connect to hosts from this array after loosing connection with current host and will do it cyclically. See example below.|
| [options.beforeReserve] | number
| 2
| Number of attempts to reconnect before connect to next host from the reserveHosts
|
| [options.retryStrategy] | function
| | See below |
@@ -107,7 +115,7 @@ will be lost forever if the user doesn't call `tarantool.connect()` manually.
## Usage example
We use TarantoolConnection instance and connect before other operations. Methods call return promise(https://developer.mozilla.org/ru/docs/Web/JavaScript/Reference/Global_Objects/Promise). Available methods with some testing: select, update, replace, insert, delete, auth, destroy.
-```
+```javascript
var TarantoolConnection = require('tarantool-driver');
var conn = new TarantoolConnection('notguest:sesame@mail.ru:3301');
@@ -123,8 +131,7 @@ conn.select(512, 0, 1, 0, 'eq', [50])
You can use any implementation that can be duck typing with next interface:
-```
-
+```Javascript
//msgpack implementation example
/*
@interface
@@ -155,6 +162,24 @@ Resolve if connected. Or reject if not.
Auth with using [chap-sha1](http://tarantool.org/doc/book/box/box_space.html). About authenthication more here: [authentication](http://tarantool.org/doc/book/box/authentication.html)
+### tarantool.packUuid(uuid: String)
+
+**Method for converting [UUID values](https://www.tarantool.io/ru/doc/latest/concepts/data_model/value_store/#uuid) to Tarantool-compatible format.**
+
+If passing UUID without converion via this method, server will accept it as simple String.
+
+### tarantool.packDecimal(numberToConvert: Number)
+
+**Method for converting Numbers (Float or Integer) to Tarantool [Decimal](https://www.tarantool.io/ru/doc/latest/concepts/data_model/value_store/#decimal) type.**
+
+If passing number without converion via this method, server will accept it as Integer or Double (for JS Float type).
+
+### tarantool.packInteger(numberToConvert: Number)
+
+**Method for safely passing numbers up to int64 to bind params**
+
+Otherwise msgpack will encode anything bigger than int32 as a double number.
+
### tarantool.select(spaceId: Number or String, indexId: Number or String, limit: Number, offset: Number, iterator: Iterator, key: tuple) ⇒ Promise
[Iterators](http://tarantool.org/doc/book/box/box_index.html). Available iterators: 'eq', 'req', 'all', 'lt', 'le', 'ge', 'gt', 'bitsAllSet', 'bitsAnySet', 'bitsAllNotSet'.
@@ -163,13 +188,28 @@ It's just select. Promise resolve array of tuples.
Some examples:
-```
+```Javascript
conn.select(512, 0, 1, 0, 'eq', [50]);
//same as
conn.select('test', 'primary', 1, 0, 'eq', [50]);
```
-You can use space name or index name instead of id, but it will some requests for get this metadata. That information actual for delete, replace, insert, update too.
+You can use space name or index name instead of id, but this way some requests will be made to get and cache metadata. This stored information will be actual for delete, replace, insert, update too.
+
+For tests, we will create a Space named 'users' on the Tarantool server-side, where the 'id' index is of UUID type:
+
+```lua
+-- example schema of such space
+box.schema.space.create("users", {engine = 'memtx'})
+box.space.users:format({
+ {name = 'id', type = 'uuid', is_nullable = false},
+ {name = 'username', type = 'string', is_nullable = false}
+})
+```
+And then select some tuples on a client side:
+```Javascript
+conn.select('users', 'id', 1, 0, 'eq', [conn.packUuid('550e8400-e29b-41d4-a716-446655440000')]);
+```
### tarantool.selectCb(spaceId: Number or String, indexId: Number or String, limit: Number, offset: Number, iterator: Iterator, key: tuple, callback: function(success), callback: function(error))
@@ -210,7 +250,7 @@ Promise resolve a new or replaced tuple.
Call a function with arguments.
You can create function on tarantool side:
-```
+```Lua
function myget(id)
val = box.space.batched:select{id}
return val[1]
@@ -218,7 +258,7 @@ end
```
And then use something like this:
-```
+```Javascript
conn.call('myget', 4)
.then(function(value){
console.log(value);
@@ -226,7 +266,7 @@ conn.call('myget', 4)
```
If you have a 2 arguments function just send a second arguments in this way:
-```
+```Javascript
conn.call('my2argumentsfunc', 'first', 'second argument')
```
And etc like this.
@@ -242,14 +282,14 @@ Promise resolve result:any.
Example:
-```
+```Javascript
conn.eval('return box.session.user()')
.then(function(res){
console.log('current user is:' res[0])
})
```
-### tarantool.sql(query: String, bindParams: Array) -> Promise
+### tarantool.sql(query: String, bindParams: Array) ⇒ Promise
It's accessible only in 2.1 tarantool.
@@ -261,7 +301,7 @@ More about it [here](https://www.tarantool.io/en/doc/2.1/tutorials/sql_tutorial/
Example:
-```
+```Javascript
await connection.insert('tags', ['tag_1', 1])
await connection.insert('tags', ['tag_2', 50])
connection.sql('select * from "tags"')
@@ -277,6 +317,24 @@ P.S. If you using lowercase in your space name you need to use a double quote fo
It doesn't work for space without format.
+### tarantool.pipeline().<...>.exec()
+
+Queue some commands in memory and then send them simultaneously to the server in a single (or several, if request body is too big) network call(s).
+This way the performance is significantly improved by more than 300% (depending on the number of pipelined commands - the bigger, the better)
+
+Example:
+
+```Javascript
+tarantool.pipeline()
+.insert('tags', ['tag_1', 1])
+.insert('tags', ['tag_2', 50])
+.sql('update "tags" set "amount" = 10 where "tag_id" = \'tag_1\'')
+.update('tags', 'tag_id', ['tag_2'], [['=', 'amount', 30]])
+.sql('select * from "tags"')
+.call('truncateTags')
+.exec()
+```
+
### tarantool.ping() ⇒ Promise
Promise resolve true.
@@ -299,10 +357,24 @@ It's ok you can do whatever you need. I add log options for some technical infor
## Changelog
+### 3.1.0
+
+- Added 3 new msgpack extensions: UUID, Datetime, Decimal.
+- Connection object now accepts all options of `net.createConnection()`, including Unix socket path.
+- New `nonWritableHostPolicy` and related options, which improves a high availability capabilities without any 3rd parties.
+- Ability to disable the offline queue.
+- Fixed [bug with int32](https://github.com/tarantool/node-tarantool-driver/issues/48) numbers when it was encoded as floating. Use method `packInteger()` to solve this.
+- `selectCb()` now also accepts `spaceId` and `indexId` as their String names, not only their IDs.
+- Some performance improvements by caching internal values.
+- TLS (SSL) support.
+- New `pipeline()`+`exec()` methods kindly borrowed from the [ioredis](https://github.com/redis/ioredis?tab=readme-ov-file#pipelining), which lets you to queue some commands in memory and then send them simultaneously to the server in a single (or several, if request body is too big) network call(s). Thanks to the Tarantool, which [made this possible](https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/format/#packet-structure).
+This way the performance is significantly improved by 500-1600% - you can check it yourself by running `npm run benchmark-read` or `npm run benchmark-write`.
+Note that this feature doesn't replaces the Transaction model, which has some level of isolation.
+- Changed `const` declaration to `var` in order to support old Node.JS versions.
+
### 3.0.7
-Fix in header decoding to support latest Tarantool versions.
-Update to tests to support latest Tarantool versions.
+Fix in header decoding to support latest Tarantool versions. Update to tests to support latest Tarantool versions.
### 3.0.6
@@ -348,4 +420,7 @@ Key is now can be just a number.
## ToDo
-finish multihost feature
+1. Streams
+2. Events and subscriptions
+3. Graceful shutdown protocol
+4. Prepared SQL statements
\ No newline at end of file
diff --git a/benchmark/box.lua b/benchmark/box.lua
index 09b3319..7b614ef 100755
--- a/benchmark/box.lua
+++ b/benchmark/box.lua
@@ -1,5 +1,9 @@
box.cfg{listen=3301}
+if not box.schema.user.exists('test') then
+ box.schema.user.create('test')
+end
+
user = box.user
if not user then
box.schema.user.grant('test', 'execute', 'universe')
diff --git a/benchmark/read.js b/benchmark/read.js
index 46d05d2..d9d11ec 100755
--- a/benchmark/read.js
+++ b/benchmark/read.js
@@ -11,16 +11,15 @@ var conn = new Driver(process.argv[process.argv.length - 1], {lazyConnect: true}
conn.connect()
.then(function(){
-
suite.add('select cb', {defer: true, fn: function(defer){
function callback(){
defer.resolve();
}
- conn.selectCb(512, 0, 1, 0, 'eq', ['test'], callback, console.error);
+ conn.selectCb('counter', 0, 1, 0, 'eq', ['test'], callback, console.error);
}});
suite.add('select promise', {defer: true, fn: function(defer){
- conn.select(512, 0, 1, 0, 'eq', ['test'])
+ conn.select('counter', 0, 1, 0, 'eq', ['test'])
.then(function(){ defer.resolve();});
}});
@@ -28,7 +27,7 @@ conn.connect()
try{
promises = [];
for (let l=0;l<500;l++){
- promises.push(conn.select(512, 0, 1, 0, 'eq', ['test']));
+ promises.push(conn.select('counter', 0, 1, 0, 'eq', ['test']));
}
var chain = Promise.all(promises);
chain.then(function(){ defer.resolve(); })
@@ -51,7 +50,7 @@ conn.connect()
promises = [];
for (var l=0;l<10;l++){
promises.push(
- conn.select(512, 0, 1, 0, 'eq', ['test'])
+ conn.select('counter', 0, 1, 0, 'eq', ['test'])
);
}
return Promise.all(promises);
@@ -76,7 +75,7 @@ conn.connect()
promises = [];
for (var l=0;l<50;l++){
promises.push(
- conn.select(512, 0, 1, 0, 'eq', ['test'])
+ conn.select('counter', 0, 1, 0, 'eq', ['test'])
);
}
return Promise.all(promises);
@@ -91,6 +90,31 @@ conn.connect()
console.error(e, e.stack);
}
}});
+
+ suite.add('pipelined select by 10', {defer: true, fn: function(defer){
+ var pipelinedConn = conn.pipeline()
+
+ for (var i=0;i<10;i++) {
+ pipelinedConn.select('counter', 0, 1, 0, 'eq', ['test']);
+ }
+
+ pipelinedConn.exec()
+ .then(function(){ defer.resolve(); })
+ .catch(function(e){ defer.reject(e); });
+ }});
+
+ suite.add('pipelined select by 50', {defer: true, fn: function(defer){
+ var pipelinedConn = conn.pipeline()
+
+ for (var i=0;i<50;i++) {
+ pipelinedConn.select('counter', 0, 1, 0, 'eq', ['test']);
+ }
+
+ pipelinedConn.exec()
+ .then(function(){ defer.resolve(); })
+ .catch(function(e){ defer.reject(e); });
+ }});
+
suite
.on('cycle', function(event) {
console.log(String(event.target));
diff --git a/benchmark/write.js b/benchmark/write.js
index 75b3da0..fc195fb 100755
--- a/benchmark/write.js
+++ b/benchmark/write.js
@@ -35,6 +35,31 @@ conn.connect()
console.error(e, e.stack);
}
}});
+
+ suite.add('pipelined insert by 10', {defer: true, fn: function(defer){
+ var pipelinedConn = conn.pipeline()
+
+ for (var i=0;i<10;i++) {
+ pipelinedConn.insert('bench', [c++, {user: 'username', data: 'Some data.'}])
+ }
+
+ pipelinedConn.exec()
+ .then(function(){ defer.resolve(); })
+ .catch(function(e){ defer.reject(e); })
+ }});
+
+ suite.add('pipelined insert by 50', {defer: true, fn: function(defer){
+ var pipelinedConn = conn.pipeline()
+
+ for (var i=0;i<50;i++) {
+ pipelinedConn.insert('bench', [c++, {user: 'username', data: 'Some data.'}])
+ }
+
+ pipelinedConn.exec()
+ .then(function(){ defer.resolve(); })
+ .catch(function(e){ defer.reject(e); })
+ }});
+
suite
.on('cycle', function(event) {
console.log(String(event.target));
diff --git a/lib/commands.js b/lib/commands.js
index e3f711c..bd1b9fa 100755
--- a/lib/commands.js
+++ b/lib/commands.js
@@ -1,16 +1,19 @@
/* global Promise */
-var msgpack = require('msgpack-lite');
-var crypto = require('crypto');
+var { createHash } = require('crypto');
var tarantoolConstants = require('./const');
-var utils = require('./utils');
-var _this;
-
-const requestMethods = ['select', 'delete', 'insert', 'replace', 'update', 'eval', 'call', 'upsert'];
+var {
+ bufferFrom,
+ createBuffer,
+ TarantoolError,
+ bufferSubarrayPoly
+} = require('./utils');
+var { encode: msgpackEncode } = require('msgpack-lite');
+var { codec } = require('./msgpack-extensions');
function Commands() {}
Commands.prototype.sendCommand = function () {};
-const maxSmi = 1<<30
+var maxSmi = 1<<30
Commands.prototype._getRequestId = function(){
if (this._id > maxSmi)
@@ -19,7 +22,7 @@ Commands.prototype._getRequestId = function(){
};
Commands.prototype._getSpaceId = function(name){
- _this = this;
+ var _this = this;
return this.select(tarantoolConstants.Space.space, tarantoolConstants.IndexSpace.name, 1, 0,
'eq', [name])
.then(function(value){
@@ -40,12 +43,12 @@ Commands.prototype._getSpaceId = function(name){
}
else
{
- throw new utils.TarantoolError('Cannot read a space name or space is not defined');
+ throw new TarantoolError('Cannot read a space name or space is not defined');
}
});
};
Commands.prototype._getIndexId = function(spaceId, indexName){
- _this = this;
+ var _this = this;
return this.select(tarantoolConstants.Space.index, tarantoolConstants.IndexSpace.indexName, 1, 0,
'eq', [spaceId, indexName])
.then(function(value) {
@@ -59,11 +62,11 @@ Commands.prototype._getIndexId = function(spaceId, indexName){
return indexId;
}
else
- throw new utils.TarantoolError('Cannot read a space name indexes or index is not defined');
+ throw new TarantoolError('Cannot read a space name indexes or index is not defined');
});
};
-Commands.prototype.select = function(spaceId, indexId, limit, offset, iterator, key){
- _this = this;
+Commands.prototype.select = function(spaceId, indexId, limit, offset, iterator, key, _isPipelined) {
+ var _this = this;
if (!(key instanceof Array))
key = [key];
return new Promise(function(resolve, reject){
@@ -75,7 +78,7 @@ Commands.prototype.select = function(spaceId, indexId, limit, offset, iterator,
{
return _this._getMetadata(spaceId, indexId)
.then(function(info){
- return _this.select(info[0], info[1], limit, offset, iterator, key);
+ return _this.select(info[0], info[1], limit, offset, iterator, key, _isPipelined);
})
.then(resolve)
.catch(reject);
@@ -84,9 +87,9 @@ Commands.prototype.select = function(spaceId, indexId, limit, offset, iterator,
if (iterator == 'all')
key = [];
- var bufKey = _this.msgpack.encode(key);
+ var bufKey = msgpackEncode(key, {codec});
var len = 31+bufKey.length;
- var buffer = utils.createBuffer(5+len);
+ var buffer = createBuffer(5+len);
buffer[0] = 0xce;
buffer.writeUInt32BE(len, 1);
@@ -113,19 +116,27 @@ Commands.prototype.select = function(spaceId, indexId, limit, offset, iterator,
buffer[35] = tarantoolConstants.KeysCode.key;
bufKey.copy(buffer, 36);
- _this.sendCommand([tarantoolConstants.RequestCode.rqSelect, reqId, {resolve: resolve, reject: reject}], buffer);
+ _this.sendCommand([
+ tarantoolConstants.RequestCode.rqSelect,
+ reqId,
+ {resolve: resolve, reject: reject}
+ ],
+ buffer,
+ _isPipelined === true
+ );
});
};
Commands.prototype._getMetadata = function(spaceName, indexName){
- _this = this;
- if (this.namespace[spaceName])
+ var _this = this;
+ var spName = this.namespace[spaceName] // reduce overhead of lookup
+ if (spName)
{
- spaceName = this.namespace[spaceName].id;
+ spaceName = spName.id;
}
- if (typeof(this.namespace[spaceName]) != 'undefined' && typeof(this.namespace[spaceName].indexes[indexName])!='undefined')
+ if (typeof(spName) != 'undefined' && typeof(spName.indexes[indexName])!='undefined')
{
- indexName = this.namespace[spaceName].indexes[indexName];
+ indexName = spName.indexes[indexName];
}
if (typeof(spaceName)=='string' && typeof(indexName)=='string')
{
@@ -147,11 +158,11 @@ Commands.prototype._getMetadata = function(spaceName, indexName){
};
Commands.prototype.ping = function(){
- _this = this;
+ var _this = this;
return new Promise(function (resolve, reject) {
var reqId = _this._getRequestId();
var len = 9;
- var buffer = utils.createBuffer(len+5);
+ var buffer = createBuffer(len+5);
buffer[0] = 0xce;
buffer.writeUInt32BE(len, 1);
@@ -162,19 +173,39 @@ Commands.prototype.ping = function(){
buffer[9] = 0xce;
buffer.writeUInt32BE(reqId, 10);
- _this.sendCommand([tarantoolConstants.RequestCode.rqPing, reqId, {resolve: resolve, reject: reject}], buffer);
+ _this.sendCommand([
+ tarantoolConstants.RequestCode.rqPing,
+ reqId,
+ {resolve: resolve, reject: reject}
+ ], buffer);
});
};
Commands.prototype.selectCb = function(spaceId, indexId, limit, offset, iterator, key, success, error){
if (!(key instanceof Array))
key = [key];
+
+ var _this = this;
+
+ if (typeof(spaceId) == 'string' && _this.namespace[spaceId])
+ spaceId = _this.namespace[spaceId].id;
+ if (typeof(indexId)=='string' && _this.namespace[spaceId] && _this.namespace[spaceId].indexes[indexId])
+ indexId = _this.namespace[spaceId].indexes[indexId];
+ if (typeof(spaceId)=='string' || typeof(indexId)=='string')
+ {
+ return _this._getMetadata(spaceId, indexId)
+ .then(function(info){
+ return _this.selectCb(info[0], info[1], limit, offset, iterator, key, success, error);
+ })
+ .catch(error);
+ }
+
var reqId = this._getRequestId();
if (iterator == 'all')
key = [];
- var bufKey = this.msgpack.encode(key);
+ var bufKey = msgpackEncode(key, {codec});
var len = 31+bufKey.length;
- var buffer = utils.createBuffer(5+len);
+ var buffer = createBuffer(5+len);
buffer[0] = 0xce;
buffer.writeUInt32BE(len, 1);
@@ -201,11 +232,15 @@ Commands.prototype.selectCb = function(spaceId, indexId, limit, offset, iterator
buffer[35] = tarantoolConstants.KeysCode.key;
bufKey.copy(buffer, 36);
- this.sendCommand([tarantoolConstants.RequestCode.rqSelect, reqId, {resolve: success, reject: error}], buffer);
+ this.sendCommand([
+ tarantoolConstants.RequestCode.rqSelect,
+ reqId,
+ {resolve: success, reject: error}
+ ], buffer);
};
Commands.prototype.delete = function(spaceId, indexId, key){
- _this = this;
+ var _this = this;
if (Number.isInteger(key))
key = [key];
return new Promise(function (resolve, reject) {
@@ -221,10 +256,10 @@ Commands.prototype.delete = function(spaceId, indexId, key){
.catch(reject);
}
var reqId = _this._getRequestId();
- var bufKey = _this.msgpack.encode(key);
+ var bufKey = msgpackEncode(key, {codec});
var len = 17+bufKey.length;
- var buffer = utils.createBuffer(5+len);
+ var buffer = createBuffer(5+len);
buffer[0] = 0xce;
buffer.writeUInt32BE(len, 1);
@@ -243,15 +278,19 @@ Commands.prototype.delete = function(spaceId, indexId, key){
buffer[21] = tarantoolConstants.KeysCode.key;
bufKey.copy(buffer, 22);
- _this.sendCommand([tarantoolConstants.RequestCode.rqDelete, reqId, {resolve: resolve, reject: reject}], buffer);
+ _this.sendCommand([
+ tarantoolConstants.RequestCode.rqDelete,
+ reqId,
+ {resolve: resolve, reject: reject}
+ ], buffer);
}
else
- reject(new utils.TarantoolError('need array'));
+ reject(new TarantoolError('need array'));
});
};
Commands.prototype.update = function(spaceId, indexId, key, ops){
- _this = this;
+ var _this = this;
if (Number.isInteger(key))
key = [key];
return new Promise(function (resolve, reject) {
@@ -266,11 +305,11 @@ Commands.prototype.update = function(spaceId, indexId, key, ops){
.catch(reject);
}
var reqId = _this._getRequestId();
- var bufKey = _this.msgpack.encode(key);
- var bufOps = _this.msgpack.encode(ops);
+ var bufKey = msgpackEncode(key, {codec});
+ var bufOps = msgpackEncode(ops, {codec});
var len = 18+bufKey.length+bufOps.length;
- var buffer = utils.createBuffer(len+5);
+ var buffer = createBuffer(len+5);
buffer[0] = 0xce;
buffer.writeUInt32BE(len, 1);
@@ -291,15 +330,19 @@ Commands.prototype.update = function(spaceId, indexId, key, ops){
buffer[22+bufKey.length] = tarantoolConstants.KeysCode.tuple;
bufOps.copy(buffer, 23+bufKey.length);
- _this.sendCommand([tarantoolConstants.RequestCode.rqUpdate, reqId, {resolve: resolve, reject: reject}], buffer);
+ _this.sendCommand([
+ tarantoolConstants.RequestCode.rqUpdate,
+ reqId,
+ {resolve: resolve, reject: reject}
+ ], buffer);
}
else
- reject(new utils.TarantoolError('need array'));
+ reject(new TarantoolError('need array'));
});
};
Commands.prototype.upsert = function(spaceId, ops, tuple){
- _this = this;
+ var _this = this;
return new Promise(function (resolve, reject) {
if (Array.isArray(ops)){
if (typeof(spaceId)=='string')
@@ -312,11 +355,11 @@ Commands.prototype.upsert = function(spaceId, ops, tuple){
.catch(reject);
}
var reqId = _this._getRequestId();
- var bufTuple = _this.msgpack.encode(tuple);
- var bufOps = _this.msgpack.encode(ops);
+ var bufTuple = msgpackEncode(tuple, {codec});
+ var bufOps = msgpackEncode(ops, {codec});
var len = 16+bufTuple.length+bufOps.length;
- var buffer = utils.createBuffer(len+5);
+ var buffer = createBuffer(len+5);
buffer[0] = 0xce;
buffer.writeUInt32BE(len, 1);
@@ -335,23 +378,27 @@ Commands.prototype.upsert = function(spaceId, ops, tuple){
buffer[20+bufTuple.length] = tarantoolConstants.KeysCode.def_tuple;
bufOps.copy(buffer, 21+bufTuple.length);
- _this.sendCommand([tarantoolConstants.RequestCode.rqUpsert, reqId, {resolve: resolve, reject: reject}], buffer);
+ _this.sendCommand([
+ tarantoolConstants.RequestCode.rqUpsert,
+ reqId,
+ {resolve: resolve, reject: reject}
+ ], buffer);
}
else
- reject(new utils.TarantoolError('need ops array'));
+ reject(new TarantoolError('need ops array'));
});
};
Commands.prototype.eval = function(expression){
- _this = this;
+ var _this = this;
var tuple = Array.prototype.slice.call(arguments, 1);
return new Promise(function (resolve, reject) {
var reqId = _this._getRequestId();
- var bufExp = _this.msgpack.encode(expression);
- var bufTuple = _this.msgpack.encode(tuple ? tuple : []);
+ var bufExp = msgpackEncode(expression);
+ var bufTuple = msgpackEncode(tuple ? tuple : [], {codec});
var len = 12+bufExp.length + bufTuple.length;
- var buffer = utils.createBuffer(len+5);
+ var buffer = createBuffer(len+5);
buffer[0] = 0xce;
buffer.writeUInt32BE(len, 1);
@@ -367,19 +414,23 @@ Commands.prototype.eval = function(expression){
buffer[16+bufExp.length] = tarantoolConstants.KeysCode.tuple;
bufTuple.copy(buffer, 17+bufExp.length);
- _this.sendCommand([tarantoolConstants.RequestCode.rqEval, reqId, {resolve: resolve, reject: reject}], buffer);
+ _this.sendCommand([
+ tarantoolConstants.RequestCode.rqEval,
+ reqId,
+ {resolve: resolve, reject: reject}
+ ], buffer);
});
};
Commands.prototype.call = function(functionName){
- _this = this;
- var tuple = arguments.length > 1 ? Array.prototype.slice.call(arguments, 1): [];
+ var _this = this;
+ var tuple = arguments.length > 1 ? Array.prototype.slice.call(arguments, 1) : [];
return new Promise(function (resolve, reject) {
var reqId = _this._getRequestId();
- var bufName = _this.msgpack.encode(functionName);
- var bufTuple = _this.msgpack.encode(tuple ? tuple : []);
+ var bufName = msgpackEncode(functionName);
+ var bufTuple = msgpackEncode(tuple ? tuple : [], {codec});
var len = 12+bufName.length + bufTuple.length;
- var buffer = utils.createBuffer(len+5);
+ var buffer = createBuffer(len+5);
buffer[0] = 0xce;
buffer.writeUInt32BE(len, 1);
@@ -395,35 +446,44 @@ Commands.prototype.call = function(functionName){
buffer[16+bufName.length] = tarantoolConstants.KeysCode.tuple;
bufTuple.copy(buffer, 17+bufName.length);
- _this.sendCommand([tarantoolConstants.RequestCode.rqCall, reqId, {resolve: resolve, reject: reject}], buffer);
+ _this.sendCommand([
+ tarantoolConstants.RequestCode.rqCall,
+ reqId,
+ {resolve: resolve, reject: reject}
+ ], buffer);
});
};
Commands.prototype.sql = function(query, bindParams = []){
- _this = this;
- return new Promise(function (resolve, reject) {
- var reqId = _this._getRequestId();
- var bufQuery = _this.msgpack.encode(query);
- var bufParams = _this.msgpack.encode(bindParams);
- var len = 12+bufQuery.length + bufParams.length;
- var buffer = utils.createBuffer(len+5);
-
- buffer[0] = 0xce;
- buffer.writeUInt32BE(len, 1);
- buffer[5] = 0x82;
- buffer[6] = tarantoolConstants.KeysCode.code;
- buffer[7] = tarantoolConstants.RequestCode.rqExecute;
- buffer[8] = tarantoolConstants.KeysCode.sync;
- buffer[9] = 0xce;
- buffer.writeUInt32BE(reqId, 10);
- buffer[14] = 0x82;
- buffer.writeUInt8(tarantoolConstants.KeysCode.sql_text, 15);
- bufQuery.copy(buffer, 16);
- buffer[16+bufQuery.length] = tarantoolConstants.KeysCode.sql_bind;
- bufParams.copy(buffer, 17+bufQuery.length);
-
- _this.sendCommand([tarantoolConstants.RequestCode.rqExecute, reqId, {resolve: resolve, reject: reject}], buffer);
- });
+ var _this = this;
+ return new Promise(function (resolve, reject) {
+ var reqId = _this._getRequestId();
+ var bufQuery = msgpackEncode(query);
+
+ var bufParams = msgpackEncode(bindParams, {codec});
+ var len = 12+bufQuery.length + bufParams.length;
+ var buffer = createBuffer(len+5);
+
+ buffer[0] = 0xce;
+ buffer.writeUInt32BE(len, 1);
+ buffer[5] = 0x82;
+ buffer[6] = tarantoolConstants.KeysCode.code;
+ buffer[7] = tarantoolConstants.RequestCode.rqExecute;
+ buffer[8] = tarantoolConstants.KeysCode.sync;
+ buffer[9] = 0xce;
+ buffer.writeUInt32BE(reqId, 10);
+ buffer[14] = 0x82;
+ buffer.writeUInt8(tarantoolConstants.KeysCode.sql_text, 15);
+ bufQuery.copy(buffer, 16);
+ buffer[16+bufQuery.length] = tarantoolConstants.KeysCode.sql_bind;
+ bufParams.copy(buffer, 17+bufQuery.length);
+
+ _this.sendCommand([
+ tarantoolConstants.RequestCode.rqExecute,
+ reqId,
+ {resolve: resolve, reject: reject}
+ ], buffer);
+ });
};
Commands.prototype.insert = function(spaceId, tuple){
@@ -437,7 +497,7 @@ Commands.prototype.replace = function(spaceId, tuple){
};
Commands.prototype._replaceInsert = function(cmd, reqId, spaceId, tuple){
- _this = this;
+ var _this = this;
return new Promise(function (resolve, reject) {
if (Array.isArray(tuple)){
if (typeof(spaceId)=='string')
@@ -449,9 +509,9 @@ Commands.prototype._replaceInsert = function(cmd, reqId, spaceId, tuple){
.then(resolve)
.catch(reject);
}
- var bufTuple = _this.msgpack.encode(tuple);
+ var bufTuple = msgpackEncode(tuple);
var len = 15+bufTuple.length;
- var buffer = utils.createBuffer(len+5);
+ var buffer = createBuffer(len+5);
buffer[0] = 0xce;
buffer.writeUInt32BE(len, 1);
@@ -468,22 +528,26 @@ Commands.prototype._replaceInsert = function(cmd, reqId, spaceId, tuple){
buffer[19] = tarantoolConstants.KeysCode.tuple;
bufTuple.copy(buffer, 20);
- _this.sendCommand([cmd, reqId, {resolve: resolve, reject: reject}], buffer);
+ _this.sendCommand([
+ cmd,
+ reqId,
+ {resolve: resolve, reject: reject}
+ ], buffer);
}
else
- reject(new utils.TarantoolError('need array'));
+ reject(new TarantoolError('need array'));
});
};
Commands.prototype._auth = function(username, password){
- _this = this;
+ var _this = this;
return new Promise(function (resolve, reject) {
var reqId = _this._getRequestId();
- var user = _this.msgpack.encode(username);
+ var user = msgpackEncode(username);
var scrambled = scramble(password, _this.salt);
var len = 44+user.length;
- var buffer = utils.createBuffer(len+5);
+ var buffer = createBuffer(len+5);
buffer[0] = 0xce;
buffer.writeUInt32BE(len, 1);
@@ -502,18 +566,22 @@ Commands.prototype._auth = function(username, password){
buffer[28+user.length] = 0xb4;
scrambled.copy(buffer, 29+user.length);
- _this.commandsQueue.push([tarantoolConstants.RequestCode.rqAuth, reqId, {resolve: resolve, reject: reject}]);
+ _this.commandsQueue.push([
+ tarantoolConstants.RequestCode.rqAuth,
+ reqId,
+ {resolve: resolve, reject: reject},
+ ]);
_this.socket.write(buffer);
});
};
function shatransform(t){
- return crypto.createHash('sha1').update(t).digest();
+ return createHash('sha1').update(t).digest();
}
function xor(a, b) {
- if (!Buffer.isBuffer(a)) a = new Buffer(a);
- if (!Buffer.isBuffer(b)) b = new Buffer(b);
+ if (!Buffer.isBuffer(a)) a = bufferFrom(a);
+ if (!Buffer.isBuffer(b)) b = bufferFrom(b);
var res = [];
var i;
if (a.length > b.length) {
@@ -525,15 +593,15 @@ function xor(a, b) {
res.push(a[i] ^ b[i]);
}
}
- return new Buffer(res);
+ return bufferFrom(res);
}
function scramble(password, salt){
- var encSalt = new Buffer(salt, 'base64');
+ var encSalt = bufferFrom(salt, 'base64');
var step1 = shatransform(password);
var step2 = shatransform(step1);
- var step3 = shatransform(Buffer.concat([encSalt.slice(0, 20), step2]));
+ var step3 = shatransform(Buffer.concat([encSalt[bufferSubarrayPoly](0, 20), step2]));
return xor(step1, step3);
}
-module.exports = Commands;
+module.exports = Commands;
\ No newline at end of file
diff --git a/lib/connection.js b/lib/connection.js
index 3981914..dd3b02b 100755
--- a/lib/connection.js
+++ b/lib/connection.js
@@ -1,23 +1,22 @@
/* global Promise */
-
var EventEmitter = require('events').EventEmitter;
var util = require('util');
-var msgpack = require('msgpack-lite');
-var crypto = require('crypto');
var debug = require('debug')('tarantool-driver:main');
var _ = require('lodash');
-
-var utils = require('./utils');
+var {
+ parseURL,
+ TarantoolError,
+ findPipelineError,
+ findPipelineErrors
+} = require('./utils');
+var { packAs } = require('./msgpack-extensions');
var Denque = require('denque');
var tarantoolConstants = require('./const');
var Commands = require('./commands');
+var Pipeline = require('./pipeline');
var Connector = require('./connector');
var eventHandler = require('./event-handler');
var SliderBuffer = require('./sliderBuffer')
-var multiplierBuffer = 2;
-
-var Decoder = require("msgpack-lite").Decoder;
-var decoder = new Decoder();
var revertStates = {
0: 'connecting',
@@ -29,16 +28,29 @@ var revertStates = {
32: 'end',
64: 'reconnecting',
128: 'auth',
- 256: 'connect'
+ 256: 'connect',
+ 512: 'changing_host'
};
TarantoolConnection.defaultOptions = {
host: 'localhost',
port: 3301,
+ path: null,
username: null,
password: null,
- reserveHosts: null,
+ reserveHosts: [],
beforeReserve: 2,
timeout: 0,
+ noDelay: true,
+ keepAlive: true,
+ nonWritableHostPolicy: null, /* What to do when Tarantool server rejects write operation,
+ e.g. because of box.cfg.read_only set to 'true' or during fetching snapshot.
+ Possible values are:
+ - null: reject Promise
+ - 'changeHost': disconnect from the current host and connect to the next of 'reserveHosts'. Pending Promise will be rejected.
+ - 'changeAndRetry': same as 'changeHost', but after connecting tries to run the command again in order to fullfil the Promise
+ */
+ maxRetriesPerRequest: 5, // If 'nonWritableHostPolicy' specified, Promise will be rejected only after exceeding this setting
+ enableOfflineQueue: true,
retryStrategy: function (times) {
return Math.min(times * 50, 2000);
},
@@ -54,7 +66,6 @@ function TarantoolConnection (){
this.parseOptions(arguments[0], arguments[1], arguments[2]);
this.connector = new Connector(this.options);
this.schemaId = null;
- this.msgpack = msgpack;
this.states = {
CONNECTING: 0,
CONNECTED: 1,
@@ -65,7 +76,8 @@ function TarantoolConnection (){
END: 32,
RECONNECTING: 64,
AUTH: 128,
- CONNECT: 256
+ CONNECT: 256,
+ CHANGING_HOST: 512
};
this.dataState = this.states.PREHELLO;
this.commandsQueue = new Denque();
@@ -75,6 +87,8 @@ function TarantoolConnection (){
this.awaitingResponseLength = -1;
this.retryAttempts = 0;
this._id = 0;
+ this.findPipelineError = findPipelineError
+ this.findPipelineErrors = findPipelineErrors
if (this.options.lazyConnect) {
this.setState(this.states.INITED);
} else {
@@ -84,8 +98,13 @@ function TarantoolConnection (){
util.inherits(TarantoolConnection, EventEmitter);
_.assign(TarantoolConnection.prototype, Commands.prototype);
+_.assign(TarantoolConnection.prototype, Pipeline.prototype);
_.assign(TarantoolConnection.prototype, require('./parser'));
+for (var packerName of Object.keys(packAs)) {
+ TarantoolConnection.prototype['pack' + packerName] = packAs[packerName]
+}
+
TarantoolConnection.prototype.resetOfflineQueue = function () {
this.offlineQueue = new Denque();
};
@@ -105,22 +124,30 @@ TarantoolConnection.prototype.parseOptions = function(){
this.options.port = arg;
continue;
}
- _.defaults(this.options, utils.parseURL(arg));
+ _.defaults(this.options, parseURL(arg));
} else if (typeof arg === 'number') {
this.options.port = arg;
} else {
- throw new utils.TarantoolError('Invalid argument ' + arg);
+ throw new TarantoolError('Invalid argument ' + arg);
}
}
_.defaults(this.options, TarantoolConnection.defaultOptions);
+ var reserveHostsLength = this.options.reserveHosts && this.options.reserveHosts.length || 0
+ if ((this.options.nonWritableHostPolicy != null) && (reserveHostsLength == 0)) {
+ throw new TarantoolError('\'nonWritableHostPolicy\' option is specified, but there are no reserve hosts. Specify it in connection options via \'reserveHosts\'')
+ }
if (typeof this.options.port === 'string') {
this.options.port = parseInt(this.options.port, 10);
}
- if (this.options.reserveHosts){
+ if (this.options.path != null) {
+ delete this.options.port
+ delete this.options.host
+ }
+ if (reserveHostsLength > 0){
this.reserveIterator = 1;
- this.reserve.push(_.pick(this.options, ['port', 'host', 'username', 'password']));
+ this.reserve.push(_.pick(this.options, ['port', 'host', 'username', 'password', 'path']));
for(i = 0; i %s(%s)', command[0], command[1]);
this.offlineQueue.push([command, buffer]);
- }else{
+ } else {
+ if (this.options.nonWritableHostPolicy == 'changeAndRetry') {
+ command.push(buffer)
+ }
this.commandsQueue.push(command);
- this.socket.write(buffer);
+ if (!isPipelined) this.socket.write(buffer); // in pipelined mode data is written via its own function
}
break;
case this.states.END:
- command[2].reject(new utils.TarantoolError('Connection is closed.'));
+ command[2].reject(new TarantoolError('Connection is closed.'));
break;
default:
debug('queue -> %s(%s)', command[0], command[1]);
+ if (!this.options.enableOfflineQueue) {
+ return command[2].reject(new TarantoolError('Connection not established yet!'));
+ }
this.offlineQueue.push([command, buffer]);
}
};
@@ -159,7 +203,11 @@ TarantoolConnection.prototype.setState = function (state, arg) {
if (this.socket && this.socket.remoteAddress && this.socket.remotePort) {
address = this.socket.remoteAddress + ':' + this.socket.remotePort;
} else {
- address = this.options.host + ':' + this.options.port;
+ if (this.options.path != null) {
+ address = this.options.path
+ } else {
+ address = this.options.host + ':' + this.options.port;
+ }
}
debug('state[%s]: %s -> %s', address, revertStates[this.state] || '[empty]', revertStates[state]);
this.state = state;
@@ -169,7 +217,7 @@ TarantoolConnection.prototype.setState = function (state, arg) {
TarantoolConnection.prototype.connect = function(){
return new Promise(function (resolve, reject) {
if (this.state === this.states.CONNECTING || this.state === this.states.CONNECT || this.state === this.states.CONNECTED || this.state === this.states.AUTH) {
- reject(new utils.TarantoolError('Tarantool is already connecting/connected'));
+ reject(new TarantoolError('Tarantool is already connecting/connected'));
return;
}
this.setState(this.states.CONNECTING);
@@ -188,14 +236,12 @@ TarantoolConnection.prototype.connect = function(){
socket.once('close', eventHandler.closeHandler(_this));
socket.on('data', eventHandler.dataHandler(_this));
- socket.setNoDelay(true);
-
if (_this.options.timeout) {
socket.setTimeout(_this.options.timeout, function () {
socket.setTimeout(0);
socket.destroy();
- var error = new utils.TarantoolError('connect ETIMEDOUT');
+ var error = new TarantoolError('connect ETIMEDOUT');
error.errorno = 'ETIMEDOUT';
error.code = 'ETIMEDOUT';
error.syscall = 'connect';
@@ -241,7 +287,7 @@ TarantoolConnection.prototype.silentEmit = function (eventName) {
if (
error instanceof Error &&
(
- error.message === utils.CONNECTION_CLOSED_ERROR_MSG ||
+ error.message === 'Connection manually closed' ||
error.syscall === 'connect' ||
error.syscall === 'read'
)
diff --git a/lib/connector.js b/lib/connector.js
index 816cd08..33a8acd 100755
--- a/lib/connector.js
+++ b/lib/connector.js
@@ -1,8 +1,7 @@
/* global Promise */
var net = require('net');
-var _ = require('lodash');
-
-var utils = require('./utils');
+var tls = require('tls');
+var { TarantoolError } = require('./utils');
function Connector(options) {
this.options = options;
@@ -16,24 +15,28 @@ Connector.prototype.disconnect = function () {
};
Connector.prototype.connect = function (callback) {
- this.connecting = true;
- var connectionOptions = _.pick(this.options, ['port', 'host']);
+ this.connecting = true;
var _this = this;
process.nextTick(function () {
if (!_this.connecting) {
- callback(new utils.TarantoolError('Connection is closed.'));
+ callback(new TarantoolError('Connection is closed.'));
return;
}
- var socket;
try {
- socket = net.createConnection(connectionOptions);
+ var connectionModule
+ if (typeof _this.options.tls == 'object') {
+ connectionModule = tls
+ _this.options = Object.assign(_this.options, _this.options.tls)
+ } else {
+ connectionModule = net
+ }
+ _this.socket = connectionModule.connect(_this.options);
} catch (err) {
callback(err);
return;
}
- _this.socket = socket;
- callback(null, socket);
+ callback(null, _this.socket);
});
};
diff --git a/lib/const.js b/lib/const.js
index d183a31..d7754d0 100755
--- a/lib/const.js
+++ b/lib/const.js
@@ -1,7 +1,6 @@
-var msgpack = require('msgpack-lite');
-
+var {bufferFrom} = require('./utils')
// i steal it from go
-const RequestCode = {
+var RequestCode = {
rqConnect: 0x00, //fake for connect
rqSelect: 0x01,
rqInsert: 0x02,
@@ -18,7 +17,7 @@ const RequestCode = {
rqPing: 0x40
};
-const KeysCode = {
+var KeysCode = {
code: 0x00,
sync: 0x01,
schema_version: 0x05,
@@ -34,16 +33,16 @@ const KeysCode = {
expression: 0x27,
def_tuple: 0x28,
data: 0x30,
- error: 0x31,
+ iproto_error_24: 0x31,
meta: 0x32,
- sql_text: 0x40,
- sql_bind: 0x41,
- sql_info: 0x42,
+ sql_text: 0x40,
+ sql_bind: 0x41,
+ sql_info: 0x42,
+ iproto_error: 0x52
};
-
// https://github.com/fl00r/go-tarantool-1.6/issues/2
-const IteratorsType = {
+var IteratorsType = {
eq: 0,
req: 1,
all: 2,
@@ -56,13 +55,13 @@ const IteratorsType = {
bitsAllNotSet: 9
};
-const OkCode = 0;
-const NetErrCode = 0xfffffff1; // fake code to wrap network problems into response
-const TimeoutErrCode = 0xfffffff2; // fake code to wrap timeout error into repsonse
+var OkCode = 0;
+var NetErrCode = 0xfffffff1; // fake code to wrap network problems into response
+var TimeoutErrCode = 0xfffffff2; // fake code to wrap timeout error into repsonse
-const PacketLengthBytes = 5;
+var PacketLengthBytes = 5;
-const Space = {
+var Space = {
schema: 272,
space: 281,
index: 289,
@@ -72,7 +71,7 @@ const Space = {
cluster: 320
};
-const IndexSpace = {
+var IndexSpace = {
primary: 0,
name: 2,
indexPrimary: 0,
@@ -82,25 +81,25 @@ const IndexSpace = {
var BufferedIterators = {};
for(t in IteratorsType)
{
- BufferedIterators[t] = new Buffer([KeysCode.iterator, IteratorsType[t], KeysCode.key]);
+ BufferedIterators[t] = bufferFrom([KeysCode.iterator, IteratorsType[t], KeysCode.key]);
}
var BufferedKeys = {};
for(k in KeysCode)
{
- BufferedKeys[k] = new Buffer([KeysCode[k]]);
+ BufferedKeys[k] = bufferFrom([KeysCode[k]]);
}
-const ExportPackage = {
+var ExportPackage = {
RequestCode: RequestCode,
KeysCode: KeysCode,
IteratorsType: IteratorsType,
OkCode: OkCode,
- passEnter: msgpack.encode('chap-sha1'),
+ passEnter: bufferFrom('a9636861702d73686131', 'hex') /* from msgpack.encode('chap-sha1') */,
Space: Space,
IndexSpace: IndexSpace,
- BufferedIterators: BufferedIterators,
- BufferedKeys: BufferedKeys
+ BufferedIterators: BufferedIterators,
+ BufferedKeys: BufferedKeys
};
-module.exports = ExportPackage;
+module.exports = ExportPackage;
\ No newline at end of file
diff --git a/lib/event-handler.js b/lib/event-handler.js
index cf58d95..3c99ba1 100755
--- a/lib/event-handler.js
+++ b/lib/event-handler.js
@@ -1,11 +1,9 @@
-var msgpack = require('msgpack-lite');
var debug = require('debug')('tarantool-driver:handler');
-
-var utils = require('./utils');
-var tarantoolConstants = require('./const');
-
-var Decoder = msgpack.Decoder;
-var decoder = new Decoder();
+var {
+ TarantoolError,
+ bufferSubarrayPoly
+} = require('./utils');
+var _ = require('lodash');
exports.connectHandler = function (self) {
return function () {
@@ -15,11 +13,12 @@ exports.connectHandler = function (self) {
self.dataState = self.states.PREHELLO;
break;
case self.states.CONNECTED:
+ var connectionOptions = _.pick(self.options, ['port', 'host', 'path'])
if(self.options.password){
self.setState(self.states.AUTH);
self._auth(self.options.username, self.options.password)
.then(function(){
- self.setState(self.states.CONNECT, {host: self.options.host, port: self.options.port});
+ self.setState(self.states.CONNECT, connectionOptions);
debug('authenticated [%s]', self.options.username);
sendOfflineQueue(self);
}, function(err){
@@ -28,7 +27,7 @@ exports.connectHandler = function (self) {
self.disconnect(true);
});
} else {
- self.setState(self.states.CONNECT, {host: self.options.host, port: self.options.port});
+ self.setState(self.states.CONNECT, connectionOptions);
sendOfflineQueue(self);
}
break;
@@ -43,7 +42,10 @@ function sendOfflineQueue(self){
self.resetOfflineQueue();
while (offlineQueue.length > 0) {
var command = offlineQueue.shift();
- self.sendCommand(command[0], command[1]);
+ self.sendCommand(
+ command[0],
+ command[1]
+ );
}
}
}
@@ -52,7 +54,7 @@ exports.dataHandler = function(self){
return function(data){
switch(self.dataState){
case self.states.PREHELLO:
- self.salt = data.slice(64, 108).toString('utf8');
+ self.salt = data[bufferSubarrayPoly](64, 108).toString('utf8');
self.dataState = self.states.CONNECTED;
self.setState(self.states.CONNECTED);
exports.connectHandler(self)();
@@ -183,42 +185,42 @@ exports.errorHandler = function(self){
};
};
-exports.closeHandler = function(self){
+exports.closeHandler = function (self) {
+ function close () {
+ self.setState(self.states.END);
+ self.flushQueue(new TarantoolError('Connection is closed.'));
+ }
+
return function(){
process.nextTick(self.emit.bind(self, 'close'));
if (self.manuallyClosing) {
- self.manuallyClosing = false;
- debug('skip reconnecting since the connection is manually closed.');
- return close();
- }
+ self.manuallyClosing = false;
+ debug('skip reconnecting since the connection is manually closed.');
+ return close();
+ }
if (typeof self.options.retryStrategy !== 'function') {
- debug('skip reconnecting because `retryStrategy` is not a function');
- return close();
- }
- var retryDelay = self.options.retryStrategy(++self.retryAttempts);
+ debug('skip reconnecting because `retryStrategy` is not a function');
+ return close();
+ }
+ var retryDelay = self.options.retryStrategy(++self.retryAttempts);
- if (typeof retryDelay !== 'number') {
- debug('skip reconnecting because `retryStrategy` doesn\'t return a number');
- return close();
+ if (typeof retryDelay !== 'number') {
+ debug('skip reconnecting because `retryStrategy` doesn\'t return a number');
+ return close();
}
self.setState(self.states.RECONNECTING, retryDelay);
- if (self.options.reserveHosts){
- if(self.retryAttempts-1 == self.options.beforeReserve){
+ if ((self.options.reserveHosts && self.options.reserveHosts.length || 0) > 0) {
+ if (self.retryAttempts-1 == self.options.beforeReserve){
self.useNextReserve();
self.connect().catch(function(){});
return;
}
- }
- debug('reconnect in %sms', retryDelay);
+ }
+ debug('reconnect in %sms', retryDelay);
- self.reconnectTimeout = setTimeout(function () {
- self.reconnectTimeout = null;
- self.connect().catch(function(){});
- }, retryDelay);
+ self.reconnectTimeout = setTimeout(function () {
+ self.reconnectTimeout = null;
+ self.connect().catch(function(){});
+ }, retryDelay);
};
-
- function close() {
- self.setState(self.states.END);
- self.flushQueue(new utils.TarantoolError('Connection is closed.'));
- }
};
\ No newline at end of file
diff --git a/lib/msgpack-extensions.js b/lib/msgpack-extensions.js
new file mode 100644
index 0000000..83cb63e
--- /dev/null
+++ b/lib/msgpack-extensions.js
@@ -0,0 +1,128 @@
+var { createCodec: msgpackCreateCodec} = require('msgpack-lite');
+var {
+ TarantoolError,
+ createBuffer,
+ bufferFrom
+} = require('./utils');
+var {
+ parse: uuidParse,
+ stringify: uuidStringify
+} = require('uuid');
+var {
+ Uint64BE,
+ Int64BE
+} = require("int64-buffer");
+
+var packAs = {}
+var codec = msgpackCreateCodec();
+
+// Pack big integers correctly (fix for https://github.com/tarantool/node-tarantool-driver/issues/48)
+packAs.Integer = function (value) {
+ if (!Number.isInteger(value)) throw new TarantoolError("Passed value doesn't seems to be an integer")
+
+ if (value > 2147483647) return Uint64BE(value)
+ if (value < -2147483648) return Int64BE(value)
+
+ return value
+}
+
+// UUID extension
+packAs.Uuid = function TarantoolUuidExt (value) {
+ if (!(this instanceof TarantoolUuidExt)) {
+ return new TarantoolUuidExt(value)
+ }
+
+ this.value = value
+}
+
+codec.addExtPacker(0x02, packAs.Uuid, (data) => {
+ return uuidParse(data.value);
+});
+
+codec.addExtUnpacker(0x02, (buffer) => {
+ return uuidStringify(buffer)
+});
+
+// Decimal extension
+function isFloat(n){
+ return Number(n) === n && n % 1 !== 0;
+}
+
+packAs.Decimal = function TarantoolDecimalExt (value) {
+ if (!(this instanceof TarantoolDecimalExt)) {
+ return new TarantoolDecimalExt(value)
+ }
+
+ if (!(Number.isInteger(value) || isFloat(value))) {
+ throw new TarantoolError('Passed value cannot be packed as decimal: expected integer or floating number')
+ }
+
+ this.value = value
+}
+
+function isOdd (number) {
+ return number % 2 !== 0;
+}
+
+codec.addExtPacker(0x01, packAs.Decimal, (data) => {
+ var strNum = data.value.toString()
+ var rawNum = strNum.replace('-', '')
+ var scaleBuffer = createBuffer(1)
+ var rawNumSplitted1 = rawNum.split('.')[1]
+ scaleBuffer.writeInt8(rawNumSplitted1 && rawNum.split('.')[1].length || 0)
+ var bufHexed = scaleBuffer.toString('hex')
+ + rawNum.replace('.', '')
+ + (strNum.startsWith('-') ? 'b' : 'a')
+
+ if (isOdd(bufHexed.length)) {
+ bufHexed = bufHexed.slice(0, 2) + '0' + bufHexed.slice(2)
+ }
+
+ return bufferFrom(bufHexed, 'hex')
+});
+
+codec.addExtUnpacker(0x01, (buffer) => {
+ var scale = buffer.readIntBE(0, 1)
+ var hex = buffer.toString('hex')
+
+ var sign = ['b', 'd'].includes(hex.slice(-1)) ? '-' : '+'
+ var slicedValue = hex.slice(2).slice(0, -1)
+
+ if (scale > 0) {
+ var nScale = scale * -1
+ slicedValue = slicedValue.slice(0, nScale) + '.' + slicedValue.slice(nScale)
+ }
+
+ return parseFloat(sign + slicedValue)
+});
+
+// Datetime extension
+codec.addExtPacker(0x04, Date, (date) => {
+ var seconds = date.getTime() / 1000 | 0
+ var nanoseconds = date.getMilliseconds() * 1000
+
+ var buffer = createBuffer(16)
+ buffer.writeBigUInt64LE(BigInt(seconds))
+ buffer.writeUInt32LE(nanoseconds, 8)
+ buffer.writeUInt32LE(0, 12)
+ /*
+ Node.Js 'Date' doesn't provide nanoseconds, so just using milliseconds.
+ tzoffset is set to UTC, and tzindex is omitted.
+ */
+
+ return buffer;
+});
+
+codec.addExtUnpacker(0x04, (buffer) => {
+ var time = new Date(parseInt(buffer.readBigUInt64LE(0)) * 1000)
+
+ if (buffer.length > 8) {
+ var milliseconds = (buffer.readUInt32LE(8) / 1000 | 0)
+ time.setMilliseconds(milliseconds)
+ }
+
+ return time
+})
+
+exports.packAs = packAs
+exports.codec = codec
\ No newline at end of file
diff --git a/lib/parser.js b/lib/parser.js
index 0f5a22e..0e1fb44 100644
--- a/lib/parser.js
+++ b/lib/parser.js
@@ -1,19 +1,19 @@
-var msgpack = require('msgpack-lite');
-var debug = require('debug')('tarantool-driver:response');
+var { Decoder: msgpackDecoder } = require('msgpack-lite');
var tarantoolConstants = require('./const');
-var utils = require('./utils');
-var Decoder = msgpack.Decoder;
-var decoder = new Decoder();
+var { TarantoolError } = require('./utils');
+var { codec } = require('./msgpack-extensions');
-exports._processResponse = function(buffer, offset){
- offset=offset||0;
+var decoder = new msgpackDecoder({codec});
+
+exports._processResponse = function(buffer, offset){
decoder.buffer = buffer;
- decoder.offset = offset;
- var header = decoder.fetch();
- var schemaId = header[tarantoolConstants.KeysCode.schema_version];
- var reqId = header[tarantoolConstants.KeysCode.sync];
- var code = header[tarantoolConstants.KeysCode.code];
- var obj = decoder.fetch();
+ decoder.offset = offset || 0;
+ var headers = decoder.fetch();
+ var schemaId = headers[tarantoolConstants.KeysCode.schema_version]
+ var reqId = headers[tarantoolConstants.KeysCode.sync]
+ var code = headers[tarantoolConstants.KeysCode.code]
+ var data = decoder.fetch()
+
if (this.schemaId)
{
if (this.schemaId != schemaId)
@@ -37,10 +37,41 @@ exports._processResponse = function(buffer, offset){
}
var dfd = task[2];
var success = code == 0 ? true : false;
- if (success)
- dfd.resolve(_returnBool(task[0], obj));
- else
- dfd.reject(new utils.TarantoolError(obj[tarantoolConstants.KeysCode.error]));
+
+ if (success) {
+ dfd.resolve(_returnBool(task[0], data));
+ }
+ else {
+ var tarantoolErrorObject = data[tarantoolConstants.KeysCode.iproto_error] && data[tarantoolConstants.KeysCode.iproto_error][0x00][0]
+ var errorDecription = (tarantoolErrorObject && tarantoolErrorObject[0x03]) || data[tarantoolConstants.KeysCode.iproto_error_24]
+
+ if ([
+ 358 /* code of 'read-only' */,
+ "Can't modify data on a read-only instance - box.cfg.read_only is true",
+ 3859 /* code of 'bootstrap not finished' */
+ ].includes((tarantoolErrorObject && tarantoolErrorObject[0x02]) || errorDecription)) {
+ switch (this.options.nonWritableHostPolicy) {
+ case 'changeAndRetry':
+ var attemptsCount = task[2].attempt
+ if (attemptsCount) {
+ task[2].attempt++
+ } else {
+ task[2].attempt = 1
+ }
+
+ if (this.options.maxRetriesPerRequest <= attemptsCount) {
+ return dfd.reject(new TarantoolError(errorDecription));
+ }
+
+ this.offlineQueue.push([[task[0], task[1], task[2]], task[3]]);
+ return changeHost(this, errorDecription)
+ case 'changeHost':
+ changeHost(this, errorDecription)
+ }
+ }
+
+ dfd.reject(new TarantoolError(errorDecription));
+ }
};
function _returnBool(cmd, data){
@@ -68,3 +99,9 @@ function _returnBool(cmd, data){
return data[tarantoolConstants.KeysCode.data];
}
}
+
+function changeHost (self, errorDecription) {
+ self.setState(512, errorDecription) // event 'changing_host'
+ self.useNextReserve()
+ self.disconnect(true)
+}
\ No newline at end of file
diff --git a/lib/pipeline.js b/lib/pipeline.js
new file mode 100644
index 0000000..104136f
--- /dev/null
+++ b/lib/pipeline.js
@@ -0,0 +1,101 @@
+var {
+ prototype: commandsPrototype
+} = require('./commands');
+var {RequestCode} = require('./const');
+
+var commandsPrototypeKeys = Object.keys(commandsPrototype);
+
+function commandInterceptorFactory (method) {
+ return function commandInterceptor () {
+ this.pipelinedCommands.push([method, arguments]);
+ return this
+ }
+}
+
+var setOfCommandInterceptors = {};
+for (var commandKey of commandsPrototypeKeys) {
+ setOfCommandInterceptors[commandKey] = commandInterceptorFactory(commandKey)
+}
+
+function sendCommandInterceptorFactory (self) {
+ return function sendCommandInterceptor (command, buffer, isPipelined) {
+ // If the Select request was made to search for a system space/index name, then bypass this
+ if ((RequestCode.rqSelect == command[0]) && !isPipelined) {
+ self._parent.sendCommand(command, buffer)
+ return;
+ }
+
+ self._buffersConcatenedCounter++
+ if (self.pipelinedBuffer) {
+ self.pipelinedBuffer = Buffer.concat([self.pipelinedBuffer, buffer])
+ } else {
+ self.pipelinedBuffer = buffer
+ }
+
+ self._trySendConcatenedBuffer()
+ self._parent.sendCommand(command, buffer, true)
+ }
+}
+
+function _trySendConcatenedBuffer () {
+ if (this._buffersConcatenedCounter == this.pipelinedCommands.length) {
+ this._parent.socket.write(this.pipelinedBuffer)
+ this.flushPipelined()
+ }
+}
+
+function exec () {
+ var _this = this;
+ var promises = [];
+ for (var interceptedCommand of _this.pipelinedCommands) {
+ var args = interceptedCommand[1]
+ if (interceptedCommand[0] == 'select') {
+ args = Object.values(args) // Otherwise 'arguments' doesn't get applied to function below correctly
+ args[6] = true // Marks 'select' request as pipelined, otherwise '_getMetadata' will break the pipelined queue
+ }
+ promises.push(
+ new Promise(function (resolve) {
+ _this._originalMethods[interceptedCommand[0]].apply(_this._originalMethods, args)
+ .then(function (result) {
+ resolve([null, result])
+ })
+ .catch(function (error) {
+ _this._buffersConcatenedCounter++ // fake, because rejected promise doesn't have its request buffer
+ _this._trySendConcatenedBuffer()
+ resolve([error, null])
+ })
+ })
+ );
+ }
+
+ return Promise.all(promises)
+}
+
+function flushPipelined () {
+ this.pipelinedCommands = [];
+}
+
+function Pipeline (self) {
+ var _this = this;
+ this._parent = self;
+ this.pipelinedCommands = [];
+ this._buffersConcatenedCounter = 0;
+ this._originalMethods = Object.assign({},
+ commandsPrototype,
+ self,
+ {
+ _id: self._id,
+ sendCommand: sendCommandInterceptorFactory(_this)
+ }
+ )
+ this.exec = exec
+ this._trySendConcatenedBuffer = _trySendConcatenedBuffer
+ this.flushPipelined = flushPipelined
+ Object.assign(this, setOfCommandInterceptors);
+}
+
+Pipeline.prototype.pipeline = function () {
+ return new Pipeline(this);
+}
+
+module.exports = Pipeline;
\ No newline at end of file
diff --git a/lib/sliderBuffer.js b/lib/sliderBuffer.js
index 1ff77cf..422b82b 100644
--- a/lib/sliderBuffer.js
+++ b/lib/sliderBuffer.js
@@ -1,9 +1,9 @@
-var utils = require('./utils');
+var { createBuffer } = require('./utils');
function SliderBuffer(){
this.bufferOffset = 0;
this.bufferLength = 0;
- this.buffer = utils.createBuffer(1024*10);
+ this.buffer = createBuffer(1024*10);
}
SliderBuffer.prototype.add = function(data, from, size){
@@ -23,7 +23,7 @@ SliderBuffer.prototype.add = function(data, from, size){
destLen = size + this.bufferLength;
if (this.buffer.length > destLen)
{
- newBuffer = utils.createBuffer(this.buffer.length * multiplierBuffer);
+ newBuffer = createBuffer(this.buffer.length * multiplierBuffer);
this.buffer.copy(newBuffer, 0, this.bufferOffset, this.bufferOffset+this.bufferLength);
data.copy(newBuffer, this.bufferLength, from, from+size);
this.buffer = newBuffer;
@@ -33,7 +33,7 @@ SliderBuffer.prototype.add = function(data, from, size){
newLen = this.buffer.length*multiplierBuffer;
while(newLen < destLen)
newLen *= multiplierBuffer;
- newBuffer = utils.createBuffer(newLen);
+ newBuffer = createBuffer(newLen);
this.buffer.copy(newBuffer, 0, this.bufferOffset, this.bufferOffset+this.bufferLength);
data.copy(newBuffer, this.bufferLength, from, from+size);
this.buffer = newBuffer;
@@ -54,7 +54,7 @@ SliderBuffer.prototype.add = function(data, from, size){
destLen = data.length + this.bufferLength;
if (this.buffer.length > destLen)
{
- newBuffer = utils.createBuffer(this.buffer.length * multiplierBuffer);
+ newBuffer = createBuffer(this.buffer.length * multiplierBuffer);
this.buffer.copy(newBuffer, 0, this.bufferOffset, this.bufferOffset+this.bufferLength);
data.copy(newBuffer, this.bufferLength);
this.buffer = newBuffer;
@@ -64,7 +64,7 @@ SliderBuffer.prototype.add = function(data, from, size){
newLen = this.buffer.length*multiplierBuffer;
while(newLen < destLen)
newLen *= multiplierBuffer;
- newBuffer = utils.createBuffer(newLen);
+ newBuffer = createBuffer(newLen);
this.buffer.copy(newBuffer, 0, this.bufferOffset, this.bufferOffset+this.bufferLength);
data.copy(newBuffer, this.bufferLength);
this.buffer = newBuffer;
diff --git a/lib/utils.js b/lib/utils.js
index cd0c929..571df4d 100644
--- a/lib/utils.js
+++ b/lib/utils.js
@@ -1,17 +1,61 @@
-exports.createBuffer = function(size){
- if (Buffer.allocUnsafe)
- {
- return Buffer.allocUnsafe(size);
+// cache the method of Buffer allocation
+var createBufferMethod
+if (Buffer.allocUnsafe) {
+ createBufferMethod = Buffer.allocUnsafe;
+} else if (Buffer.alloc) {
+ createBufferMethod = Buffer.alloc;
+} else {
+ createBufferMethod = new Buffer;
+}
+exports.createBuffer = function (size){
+ return createBufferMethod(size);
+};
+
+// cache the method of Buffer creation using an array of bytes
+var bufferFromMethod
+if (Buffer.from) {
+ bufferFromMethod = Buffer.from;
+} else {
+ bufferFromMethod = new Buffer;
+}
+exports.bufferFrom = function (data, encoding) {
+ return bufferFromMethod(data, encoding);
+}
+
+// 'buf.slice' is deprecated since Node v17.5.0 / v16.15.0, so we should proceed with 'buf.subarray'
+var bufferSubarrayPolyName
+if (Buffer.prototype.subarray) {
+ bufferSubarrayPolyName = 'subarray'
+} else {
+ bufferSubarrayPolyName = 'slice'
+};
+exports.bufferSubarrayPoly = bufferSubarrayPolyName;
+
+exports.findPipelineError = function (array) {
+ var error = array.find(element => element[0])
+ if (error !== undefined) {
+ return error[0]
+ } else {
+ return null;
}
- if (Buffer.alloc)
- {
- return Buffer.alloc(size);
+};
+
+exports.findPipelineErrors = function (array) {
+ var array_of_errors = [];
+ for (var subarray of array) {
+ var errored_element = subarray[0]
+ if (errored_element) array_of_errors.push(errored_element)
}
- return new Buffer(size);
+
+ return array_of_errors
};
exports.parseURL = function(str, reserve){
var result = {};
+ if (str.startsWith('/')) {
+ result.path = str
+ return result
+ }
var parsed = str.split(':');
if(reserve){
result.username = null;
diff --git a/package.json b/package.json
index 6921719..0b8779e 100755
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
{
"name": "tarantool-driver",
- "version": "3.0.6",
+ "version": "3.1.0",
"description": "Tarantool driver for 1.7+",
"main": "index.js",
"scripts": {
@@ -18,7 +18,16 @@
"tarantool",
"driver",
"db",
- "msgpack"
+ "msgpack",
+ "tarantool client",
+ "tarantool connector",
+ "in memory",
+ "in-memory",
+ "in memory storage",
+ "in-memory storage",
+ "in memory database",
+ "in-memory database",
+ "database"
],
"author": "KlonD90",
"license": "MIT",
@@ -29,18 +38,21 @@
"dependencies": {
"debug": "^2.6.8",
"denque": "^1.2.1",
+ "int64-buffer": "^1.0.1",
"lodash": "^4.17.4",
- "msgpack-lite": "^0.1.20"
+ "msgpack-lite": "^0.1.20",
+ "uuid": "^9.0.0"
},
"devDependencies": {
- "benchmark": "^2.1.0",
+ "benchmark": "^2.1.4",
"chai": "^4.1.0",
- "coveralls": "^2.13.1",
+ "coveralls": "^2.13.3",
"eslint": "^2.5.3",
"ioredis": "^3.1.2",
"istanbul": "^0.4.5",
"mocha": "^2.2.4",
"nanotimer": "^0.3.14",
+ "nyc": "^15.1.0",
"sinon": "^3.0.0"
}
}
diff --git a/test/app.js b/test/app.js
index 7890acd..55f2dc5 100755
--- a/test/app.js
+++ b/test/app.js
@@ -52,6 +52,9 @@ describe('constructor', function () {
expect(option).to.have.property('username', 'notguest');
expect(option).to.have.property('password', 'sesame');
+ option = getOption('/var/run/tarantool/unix.sock');
+ expect(option).to.have.property('path', '/var/run/tarantool/unix.sock');
+
option = getOption({
port: 6380,
host: '192.168.1.1'
@@ -78,6 +81,7 @@ describe('constructor', function () {
{
port: 6380,
host: '192.168.1.1',
+ path: null,
username: null,
password: null
},
@@ -743,4 +747,4 @@ describe('connection test with custom msgpack implementation', function(){
});
});
describe('slider buffer', function(){
-})
+})
\ No newline at end of file
diff --git a/test/box.lua b/test/box.lua
index 9cdae95..a3fe7aa 100755
--- a/test/box.lua
+++ b/test/box.lua
@@ -1,5 +1,6 @@
#!/usr/bin/env tarantool
box.cfg{listen=33013}
+
lp = {
test = 'test',
test_empty = '',
@@ -115,4 +116,4 @@ end
function func_arg(arg)
return arg
-end
+end
\ No newline at end of file