4
4
/* :: import type {Batch, Query, QueryResult, Callback} from 'interface-datastore' */
5
5
6
6
const fs = require ( 'graceful-fs' )
7
- const pull = require ( 'pull-stream' )
8
7
const glob = require ( 'glob' )
9
- const setImmediate = require ( 'async/setImmediate' )
10
- const waterfall = require ( 'async/series' )
11
- const each = require ( 'async/each' )
12
8
const mkdirp = require ( 'mkdirp' )
13
- const writeFile = require ( 'fast-write-atomic' )
9
+ const promisify = require ( 'util' ) . promisify
10
+ const writeFile = promisify ( require ( 'fast-write-atomic' ) )
14
11
const path = require ( 'path' )
15
12
16
- const asyncFilter = require ( 'interface-datastore' ) . utils . asyncFilter
17
- const asyncSort = require ( 'interface-datastore' ) . utils . asyncSort
13
+ const filter = require ( 'interface-datastore' ) . utils . filter
14
+ const take = require ( 'interface-datastore' ) . utils . take
15
+ const map = require ( 'interface-datastore' ) . utils . map
16
+ const sortAll = require ( 'interface-datastore' ) . utils . sortAll
18
17
const IDatastore = require ( 'interface-datastore' )
18
+
19
+ const asyncMkdirp = promisify ( require ( 'mkdirp' ) )
20
+ const fsAccess = promisify ( fs . access )
21
+ const fsReadFile = promisify ( fs . readFile )
22
+ const fsUnlink = promisify ( fs . unlink )
23
+
19
24
const Key = IDatastore . Key
20
25
const Errors = IDatastore . Errors
21
26
@@ -57,9 +62,8 @@ class FsDatastore {
57
62
}
58
63
}
59
64
60
- open ( callback /* : Callback<void> */ ) /* : void */ {
65
+ open ( ) /* : void */ {
61
66
this . _openOrCreate ( )
62
- setImmediate ( callback )
63
67
}
64
68
65
69
/**
@@ -150,104 +154,97 @@ class FsDatastore {
150
154
*
151
155
* @param {Key } key
152
156
* @param {Buffer } val
153
- * @param {function(Error) } callback
154
- * @returns {void }
157
+ * @returns {Promise<void> }
155
158
*/
156
- putRaw ( key /* : Key */ , val /* : Buffer */ , callback /* : Callback<void> */) /* : void */ {
159
+ async putRaw ( key /* : Key */ , val /* : Buffer */ ) /* : void */ {
157
160
const parts = this . _encode ( key )
158
161
const file = parts . file . slice ( 0 , - this . opts . extension . length )
159
- waterfall ( [
160
- ( cb ) => mkdirp ( parts . dir , { fs : fs } , cb ) ,
161
- ( cb ) => writeFile ( file , val , cb )
162
- ] , ( err ) => callback ( err ) )
162
+ await asyncMkdirp ( parts . dir , { fs : fs } )
163
+ await writeFile ( file , val )
163
164
}
164
165
165
166
/**
166
167
* Store the given value under the key.
167
168
*
168
169
* @param {Key } key
169
170
* @param {Buffer } val
170
- * @param {function(Error) } callback
171
- * @returns {void }
171
+ * @returns {Promise<void> }
172
172
*/
173
- put ( key /* : Key */ , val /* : Buffer */ , callback /* : Callback<void> */) /* : void */ {
173
+ async put ( key /* : Key */ , val /* : Buffer */ ) /* : void */ {
174
174
const parts = this . _encode ( key )
175
- waterfall ( [
176
- ( cb ) => mkdirp ( parts . dir , { fs : fs } , cb ) ,
177
- ( cb ) => writeFile ( parts . file , val , cb )
178
- ] , ( err ) => {
179
- if ( err ) {
180
- return callback ( Errors . dbWriteFailedError ( err ) )
181
- }
182
- callback ( )
183
- } )
175
+ try {
176
+ await asyncMkdirp ( parts . dir , { fs : fs } )
177
+ await writeFile ( parts . file , val )
178
+ } catch ( err ) {
179
+ throw Errors . dbWriteFailedError ( err )
180
+ }
184
181
}
185
182
186
183
/**
187
184
* Read from the file system without extension.
188
185
*
189
186
* @param {Key } key
190
- * @param {function(Error, Buffer) } callback
191
- * @returns {void }
187
+ * @returns {Promise<Buffer> }
192
188
*/
193
- getRaw ( key /* : Key */ , callback /* : Callback<Buffer> */) /* : void */ {
189
+ async getRaw ( key /* : Key */ ) /* : void */ {
194
190
const parts = this . _encode ( key )
195
191
let file = parts . file
196
192
file = file . slice ( 0 , - this . opts . extension . length )
197
- fs . readFile ( file , ( err , data ) => {
198
- if ( err ) {
199
- return callback ( Errors . notFoundError ( err ) )
200
- }
201
- callback ( null , data )
202
- } )
193
+ let data
194
+ try {
195
+ data = await fsReadFile ( file )
196
+ } catch ( err ) {
197
+ throw Errors . notFoundError ( err )
198
+ }
199
+ return data
203
200
}
204
201
205
202
/**
206
203
* Read from the file system.
207
204
*
208
205
* @param {Key } key
209
- * @param {function(Error, Buffer) } callback
210
- * @returns {void }
206
+ * @returns {Promise<Buffer> }
211
207
*/
212
- get ( key /* : Key */ , callback /* : Callback<Buffer> */) /* : void */ {
208
+ async get ( key /* : Key */ ) /* : void */ {
213
209
const parts = this . _encode ( key )
214
- fs . readFile ( parts . file , ( err , data ) => {
215
- if ( err ) {
216
- return callback ( Errors . notFoundError ( err ) )
217
- }
218
- callback ( null , data )
219
- } )
210
+ let data
211
+ try {
212
+ data = await fsReadFile ( parts . file )
213
+ } catch ( err ) {
214
+ throw Errors . notFoundError ( err )
215
+ }
216
+ return data
220
217
}
221
218
222
219
/**
223
220
* Check for the existence of the given key.
224
221
*
225
222
* @param {Key } key
226
- * @param {function(Error, bool) } callback
227
- * @returns {void }
223
+ * @returns {Promise<bool> }
228
224
*/
229
- has ( key /* : Key */ , callback /* : Callback<bool> */) /* : void */ {
225
+ async has ( key /* : Key */ ) /* : void */ {
230
226
const parts = this . _encode ( key )
231
- fs . access ( parts . file , err => {
232
- callback ( null , ! err )
233
- } )
227
+ try {
228
+ await fsAccess ( parts . file )
229
+ } catch ( err ) {
230
+ return false
231
+ }
232
+ return true
234
233
}
235
234
236
235
/**
237
236
* Delete the record under the given key.
238
237
*
239
238
* @param {Key } key
240
- * @param {function(Error) } callback
241
- * @returns {void }
239
+ * @returns {Promise<void> }
242
240
*/
243
- delete ( key /* : Key */ , callback /* : Callback<void> */) /* : void */ {
241
+ async delete ( key /* : Key */ ) /* : void */ {
244
242
const parts = this . _encode ( key )
245
- fs . unlink ( parts . file , ( err ) => {
246
- if ( err ) {
247
- return callback ( Errors . dbDeleteFailedError ( err ) )
248
- }
249
- callback ( )
250
- } )
243
+ try {
244
+ await fsUnlink ( parts . file )
245
+ } catch ( err ) {
246
+ throw Errors . dbDeleteFailedError ( err )
247
+ }
251
248
}
252
249
253
250
/**
@@ -265,15 +262,9 @@ class FsDatastore {
265
262
delete ( key /* : Key */ ) /* : void */ {
266
263
deletes . push ( key )
267
264
} ,
268
- commit : ( callback /* : (err: ?Error) => void */ ) => {
269
- waterfall ( [
270
- ( cb ) => each ( puts , ( p , cb ) => {
271
- this . put ( p . key , p . value , cb )
272
- } , cb ) ,
273
- ( cb ) => each ( deletes , ( k , cb ) => {
274
- this . delete ( k , cb )
275
- } , cb )
276
- ] , ( err ) => callback ( err ) )
265
+ commit : async ( ) /* : Promise<void> */ => {
266
+ await Promise . all ( ( puts . map ( ( put ) => this . put ( put . key , put . value ) ) ) )
267
+ await Promise . all ( ( deletes . map ( ( del ) => this . delete ( del ) ) ) )
277
268
}
278
269
}
279
270
}
@@ -282,7 +273,7 @@ class FsDatastore {
282
273
* Query the store.
283
274
*
284
275
* @param {Object } q
285
- * @returns {PullStream }
276
+ * @returns {Iterable }
286
277
*/
287
278
query ( q /* : Query<Buffer> */ ) /* : QueryResult<Buffer> */ {
288
279
// glob expects a POSIX path
@@ -291,53 +282,46 @@ class FsDatastore {
291
282
. join ( this . path , prefix , '*' + this . opts . extension )
292
283
. split ( path . sep )
293
284
. join ( '/' )
294
- let tasks = [ pull . values ( glob . sync ( pattern ) ) ]
295
-
285
+ let files = glob . sync ( pattern )
286
+ let it
296
287
if ( ! q . keysOnly ) {
297
- tasks . push ( pull . asyncMap ( ( f , cb ) => {
298
- fs . readFile ( f , ( err , buf ) => {
299
- if ( err ) {
300
- return cb ( err )
301
- }
302
- cb ( null , {
303
- key : this . _decode ( f ) ,
304
- value : buf
305
- } )
306
- } )
307
- } ) )
288
+ it = map ( files , async ( f ) => {
289
+ const buf = await fsReadFile ( f )
290
+ return {
291
+ key : this . _decode ( f ) ,
292
+ value : buf
293
+ }
294
+ } )
308
295
} else {
309
- tasks . push ( pull . map ( f => ( { key : this . _decode ( f ) } ) ) )
296
+ it = map ( files , f => ( { key : this . _decode ( f ) } ) )
310
297
}
311
298
312
- if ( q . filters != null ) {
313
- tasks = tasks . concat ( q . filters . map ( asyncFilter ) )
299
+ if ( Array . isArray ( q . filters ) ) {
300
+ it = q . filters . reduce ( ( it , f ) => filter ( it , f ) , it )
314
301
}
315
302
316
- if ( q . orders != null ) {
317
- tasks = tasks . concat ( q . orders . map ( asyncSort ) )
303
+ if ( Array . isArray ( q . orders ) ) {
304
+ it = q . orders . reduce ( ( it , f ) => sortAll ( it , f ) , it )
318
305
}
319
306
320
307
if ( q . offset != null ) {
321
308
let i = 0
322
- tasks . push ( pull . filter ( ( ) => i ++ >= q . offset ) )
309
+ it = filter ( it , ( ) => i ++ >= q . offset )
323
310
}
324
311
325
312
if ( q . limit != null ) {
326
- tasks . push ( pull . take ( q . limit ) )
313
+ it = take ( it , q . limit )
327
314
}
328
315
329
- return pull . apply ( null , tasks )
316
+ return it
330
317
}
331
318
332
319
/**
333
320
* Close the store.
334
321
*
335
- * @param {function(Error) } callback
336
- * @returns {void }
322
+ * @returns {Promise<void> }
337
323
*/
338
- close ( callback /* : (err: ?Error) => void */ ) /* : void */ {
339
- setImmediate ( callback )
340
- }
324
+ async close ( ) /* : Promise<void> */ { }
341
325
}
342
326
343
327
module . exports = FsDatastore
0 commit comments