From 29cce3a6c1d0c3c1fe54e210f15fe1e1305cb97c Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Wed, 20 Nov 2019 22:10:29 +0000 Subject: [PATCH 1/3] refactor: convert repo API to async await License: MIT Signed-off-by: Alan Shaw --- src/repo/gc.js | 48 +++++++++++++++++--------------------- src/repo/index.js | 17 ++++++-------- src/repo/stat.js | 44 +++++++++++++++++----------------- src/repo/version.js | 32 ++++++++++++------------- src/utils/load-commands.js | 2 +- 5 files changed, 68 insertions(+), 75 deletions(-) diff --git a/src/repo/gc.js b/src/repo/gc.js index 35d4288b0..ad49fc506 100644 --- a/src/repo/gc.js +++ b/src/repo/gc.js @@ -1,33 +1,29 @@ 'use strict' -const promisify = require('promisify-es6') -const streamToValueWithTransformer = require('../utils/stream-to-value-with-transformer') const CID = require('cids') +const ndjson = require('iterable-ndjson') +const configure = require('../lib/configure') +const toIterable = require('../lib/stream-to-iterable') -const transform = function (res, callback) { - callback(null, res.map(r => ({ - err: r.Err ? new Error(r.Err) : null, - cid: (r.Key || {})['/'] ? new CID(r.Key['/']) : null - }))) -} +module.exports = configure(({ ky }) => { + return (peerId, options) => (async function * () { + options = options || {} -module.exports = (send) => { - return promisify((opts, callback) => { - if (typeof (opts) === 'function') { - callback = opts - opts = {} - } - - const request = { - path: 'repo/gc', - qs: opts - } - send(request, (err, result) => { - if (err) { - return callback(err) - } + const searchParams = new URLSearchParams(options.searchParams) + if (options.streamErrors) searchParams.set('stream-errors', options.streamErrors) - streamToValueWithTransformer(result, transform, callback) + const res = await ky.get('repo/gc', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams }) - }) -} + + for await (const gcResult of ndjson(toIterable(res.body))) { + yield { + err: gcResult.Error ? new Error(gcResult.Error) : null, + cid: (gcResult.Key || {})['/'] ? new CID(gcResult.Key['/']) : null + } + } + })() +}) diff --git a/src/repo/index.js b/src/repo/index.js index d3ac72785..fe58504ad 100644 --- a/src/repo/index.js +++ b/src/repo/index.js @@ -1,13 +1,10 @@ 'use strict' -const moduleConfig = require('../utils/module-config') +const callbackify = require('callbackify') +const { collectify } = require('../lib/converters') -module.exports = (arg) => { - const send = moduleConfig(arg) - - return { - gc: require('./gc')(send), - stat: require('./stat')(send), - version: require('./version')(send) - } -} +module.exports = config => ({ + gc: callbackify.variadic(collectify(require('./gc')(config))), + stat: callbackify.variadic(require('./stat')(config)), + version: callbackify.variadic(require('./version')(config)) +}) diff --git a/src/repo/stat.js b/src/repo/stat.js index 2b922de03..4e4dfaf4e 100644 --- a/src/repo/stat.js +++ b/src/repo/stat.js @@ -1,28 +1,28 @@ 'use strict' -const promisify = require('promisify-es6') const Big = require('bignumber.js') +const configure = require('../lib/configure') -const transform = function (res, callback) { - callback(null, { - numObjects: new Big(res.NumObjects), - repoSize: new Big(res.RepoSize), - repoPath: res.RepoPath, - version: res.Version, - storageMax: new Big(res.StorageMax) - }) -} +module.exports = configure(({ ky }) => { + return async options => { + options = options || {} -module.exports = (send) => { - return promisify((opts, callback) => { - if (typeof (opts) === 'function') { - callback = opts - opts = {} - } + const searchParams = new URLSearchParams(options.searchParams) + if (options.sizeOnly) searchParams.set('size-only', options.sizeOnly) + + const res = await ky.get('repo/stat', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams + }).json() - send.andTransform({ - path: 'repo/stat', - qs: opts - }, transform, callback) - }) -} + return { + numObjects: new Big(res.NumObjects), + repoSize: new Big(res.RepoSize), + repoPath: res.RepoPath, + version: res.Version, + storageMax: new Big(res.StorageMax) + } + } +}) diff --git a/src/repo/version.js b/src/repo/version.js index 84e2a0008..436e2ae32 100644 --- a/src/repo/version.js +++ b/src/repo/version.js @@ -1,21 +1,21 @@ 'use strict' -const promisify = require('promisify-es6') +const configure = require('../lib/configure') -const transform = function (res, callback) { - callback(null, res.Version) -} +module.exports = configure(({ ky }) => { + return async options => { + options = options || {} -module.exports = (send) => { - return promisify((opts, callback) => { - if (typeof (opts) === 'function') { - callback = opts - opts = {} - } + const searchParams = new URLSearchParams(options.searchParams) + if (options.sizeOnly) searchParams.set('size-only', options.sizeOnly) - send.andTransform({ - path: 'repo/version', - qs: opts - }, transform, callback) - }) -} + const res = await ky.get('repo/version', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams + }).json() + + return res.Version + } +}) diff --git a/src/utils/load-commands.js b/src/utils/load-commands.js index a4a6dcd07..19ad3e2c6 100644 --- a/src/utils/load-commands.js +++ b/src/utils/load-commands.js @@ -107,6 +107,7 @@ function requireCommands (send, config) { refsReadableStream: streamify.readable(refs), refsPullStream: pullify.source(refs), _refsAsyncIterator: refs, + repo: require('../repo')(config), getEndpointConfig: require('../get-endpoint-config')(config), bitswap: require('../bitswap')(config), block: require('../block')(config), @@ -142,7 +143,6 @@ function requireCommands (send, config) { key: require('../key'), log: require('../log'), mount: require('../mount'), - repo: require('../repo'), stop: require('../stop'), shutdown: require('../stop'), stats: require('../stats'), From a891032fe45eb22ef45a7273a1dec18abec7e886 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 21 Nov 2019 09:41:34 +0000 Subject: [PATCH 2/3] refactor: convert stats API to async/await License: MIT Signed-off-by: Alan Shaw --- src/stats/bitswap.js | 32 --------------------- src/stats/bw-pull-stream.js | 31 -------------------- src/stats/bw-readable-stream.js | 31 -------------------- src/stats/bw-util.js | 12 -------- src/stats/bw.js | 51 ++++++++++++++++++--------------- src/stats/index.js | 22 ++++++++------ src/stats/repo.js | 28 ------------------ 7 files changed, 41 insertions(+), 166 deletions(-) delete mode 100644 src/stats/bitswap.js delete mode 100644 src/stats/bw-pull-stream.js delete mode 100644 src/stats/bw-readable-stream.js delete mode 100644 src/stats/bw-util.js delete mode 100644 src/stats/repo.js diff --git a/src/stats/bitswap.js b/src/stats/bitswap.js deleted file mode 100644 index 3f641f680..000000000 --- a/src/stats/bitswap.js +++ /dev/null @@ -1,32 +0,0 @@ -'use strict' - -const promisify = require('promisify-es6') -const Big = require('bignumber.js') - -const transform = function (res, callback) { - callback(null, { - provideBufLen: res.ProvideBufLen, - wantlist: res.Wantlist || [], - peers: res.Peers || [], - blocksReceived: new Big(res.BlocksReceived), - dataReceived: new Big(res.DataReceived), - blocksSent: new Big(res.BlocksSent), - dataSent: new Big(res.DataSent), - dupBlksReceived: new Big(res.DupBlksReceived), - dupDataReceived: new Big(res.DupDataReceived) - }) -} - -module.exports = (send) => { - return promisify((opts, callback) => { - if (typeof (opts) === 'function') { - callback = opts - opts = {} - } - - send.andTransform({ - path: 'stats/bitswap', - qs: opts - }, transform, callback) - }) -} diff --git a/src/stats/bw-pull-stream.js b/src/stats/bw-pull-stream.js deleted file mode 100644 index 26a4eb311..000000000 --- a/src/stats/bw-pull-stream.js +++ /dev/null @@ -1,31 +0,0 @@ -'use strict' - -const toPull = require('stream-to-pull-stream') -const map = require('pull-stream/throughs/map') -const pull = require('pull-stream/pull') -const transformChunk = require('./bw-util') -const deferred = require('pull-defer') - -module.exports = (send) => { - return (opts) => { - opts = opts || {} - - const p = deferred.source() - - send({ - path: 'stats/bw', - qs: opts - }, (err, stream) => { - if (err) { - return p.end(err) - } - - p.resolve(pull( - toPull.source(stream), - map(transformChunk) - )) - }) - - return p - } -} diff --git a/src/stats/bw-readable-stream.js b/src/stats/bw-readable-stream.js deleted file mode 100644 index c36fea518..000000000 --- a/src/stats/bw-readable-stream.js +++ /dev/null @@ -1,31 +0,0 @@ -'use strict' - -const Stream = require('readable-stream') -const pump = require('pump') -const transformChunk = require('./bw-util') - -module.exports = (send) => { - return (opts) => { - opts = opts || {} - - const pt = new Stream.Transform({ - objectMode: true, - transform (chunk, encoding, cb) { - cb(null, transformChunk(chunk)) - } - }) - - send({ - path: 'stats/bw', - qs: opts - }, (err, stream) => { - if (err) { - return pt.destroy(err) - } - - pump(stream, pt) - }) - - return pt - } -} diff --git a/src/stats/bw-util.js b/src/stats/bw-util.js deleted file mode 100644 index 928ee0c3c..000000000 --- a/src/stats/bw-util.js +++ /dev/null @@ -1,12 +0,0 @@ -'use strict' - -const Big = require('bignumber.js') - -module.exports = (chunk) => { - return { - totalIn: new Big(chunk.TotalIn), - totalOut: new Big(chunk.TotalOut), - rateIn: new Big(chunk.RateIn), - rateOut: new Big(chunk.RateOut) - } -} diff --git a/src/stats/bw.js b/src/stats/bw.js index 1db636a25..9ab358760 100644 --- a/src/stats/bw.js +++ b/src/stats/bw.js @@ -1,29 +1,34 @@ 'use strict' -const promisify = require('promisify-es6') -const streamToValue = require('../utils/stream-to-value') -const transformChunk = require('./bw-util') +const ndjson = require('iterable-ndjson') +const Big = require('bignumber.js') +const configure = require('../lib/configure') +const toIterable = require('../lib/stream-to-iterable') -const transform = (res, callback) => { - return streamToValue(res, (err, data) => { - if (err) { - return callback(err) - } +module.exports = configure(({ ky }) => { + return async function * (options) { + options = options || {} - callback(null, transformChunk(data[0])) - }) -} + const searchParams = new URLSearchParams(options.searchParams) + if (options.interval) searchParams.set('interval', options.interval) + if (options.peer) searchParams.set('peer', options.peer) + if (options.poll != null) searchParams.set('poll', options.poll) + if (options.proto) searchParams.set('proto', options.proto) -module.exports = (send) => { - return promisify((opts, callback) => { - if (typeof (opts) === 'function') { - callback = opts - opts = {} - } + const res = await ky.get('stats/bw', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams + }) - send.andTransform({ - path: 'stats/bw', - qs: opts - }, transform, callback) - }) -} + for await (const stats of ndjson(toIterable(res.body))) { + yield { + totalIn: new Big(stats.TotalIn), + totalOut: new Big(stats.TotalOut), + rateIn: new Big(stats.RateIn), + rateOut: new Big(stats.RateOut) + } + } + } +}) diff --git a/src/stats/index.js b/src/stats/index.js index 445a39835..4351d79e2 100644 --- a/src/stats/index.js +++ b/src/stats/index.js @@ -1,15 +1,19 @@ 'use strict' -const moduleConfig = require('../utils/module-config') - -module.exports = (arg) => { - const send = moduleConfig(arg) +const callbackify = require('callbackify') +const { streamify, pullify } = require('../lib/converters') +module.exports = config => { + const bw = require('./bw')(config) return { - bitswap: require('./bitswap')(send), - bw: require('./bw')(send), - bwReadableStream: require('./bw-readable-stream')(send), - bwPullStream: require('./bw-pull-stream')(send), - repo: require('./repo')(send) + bitswap: callbackify.variadic(require('../bitswap/stat')(config)), + bw: callbackify.variadic(async options => { + for await (const stats of bw(options)) { + return stats + } + }), + bwReadableStream: streamify.readable(bw), + bwPullStream: pullify.source(bw), + repo: callbackify.variadic(require('../repo/stat')(config)) } } diff --git a/src/stats/repo.js b/src/stats/repo.js deleted file mode 100644 index 81f808951..000000000 --- a/src/stats/repo.js +++ /dev/null @@ -1,28 +0,0 @@ -'use strict' - -const promisify = require('promisify-es6') -const Big = require('bignumber.js') - -const transform = function (res, callback) { - callback(null, { - numObjects: new Big(res.NumObjects), - repoSize: new Big(res.RepoSize), - repoPath: res.RepoPath, - version: res.Version, - storageMax: new Big(res.StorageMax) - }) -} - -module.exports = (send) => { - return promisify((opts, callback) => { - if (typeof (opts) === 'function') { - callback = opts - opts = {} - } - - send.andTransform({ - path: 'stats/repo', - qs: opts - }, transform, callback) - }) -} From 136319975c73e6322fe04294b2916495c0d508d4 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 21 Nov 2019 10:44:41 +0000 Subject: [PATCH 3/3] fix: tests --- src/utils/load-commands.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/load-commands.js b/src/utils/load-commands.js index 19ad3e2c6..c893e778d 100644 --- a/src/utils/load-commands.js +++ b/src/utils/load-commands.js @@ -108,6 +108,7 @@ function requireCommands (send, config) { refsPullStream: pullify.source(refs), _refsAsyncIterator: refs, repo: require('../repo')(config), + stats: require('../stats')(config), getEndpointConfig: require('../get-endpoint-config')(config), bitswap: require('../bitswap')(config), block: require('../block')(config), @@ -145,7 +146,6 @@ function requireCommands (send, config) { mount: require('../mount'), stop: require('../stop'), shutdown: require('../stop'), - stats: require('../stats'), update: require('../update'), version: require('../version'), resolve: require('../resolve')