From 1b07f583329bfdf649783ebed7d76a6488477f51 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 28 Nov 2018 15:55:57 +0000 Subject: [PATCH] feat: adds ls streaming methods Implementation of ipfs/interface-ipfs-core#401 License: MIT Signed-off-by: achingbrain --- src/cli/ls.js | 93 ++++++++++++----------- src/core/index.js | 4 +- src/core/ls-pull-stream.js | 131 +++++++++++++++++++++++++++++++++ src/core/ls-readable-stream.js | 10 +++ src/core/ls.js | 120 ++++-------------------------- 5 files changed, 207 insertions(+), 151 deletions(-) create mode 100644 src/core/ls-pull-stream.js create mode 100644 src/core/ls-readable-stream.js diff --git a/src/cli/ls.js b/src/cli/ls.js index 0c0cdc7..c8c8ea9 100644 --- a/src/cli/ls.js +++ b/src/cli/ls.js @@ -1,5 +1,8 @@ 'use strict' +const pull = require('pull-stream/pull') +const onEnd = require('pull-stream/sinks/on-end') +const through = require('pull-stream/throughs/through') const { print, asBoolean @@ -21,12 +24,12 @@ module.exports = { coerce: asBoolean, describe: 'Use long listing format.' }, - unsorted: { - alias: 'U', + sort: { + alias: 's', type: 'boolean', default: false, coerce: asBoolean, - describe: 'Do not sort; list entries in directory order.' + describe: 'Sort entries by name' }, 'cid-base': { default: 'base58btc', @@ -39,55 +42,55 @@ module.exports = { path, ipfs, long, - unsorted, + sort, cidBase } = argv argv.resolve( - ipfs.files.ls(path || FILE_SEPARATOR, { - long, - unsorted, - cidBase - }) - .then(files => { - if (long) { - const table = [] - const lengths = {} - - files.forEach(link => { - const row = { - name: `${link.name}`, - hash: `${link.hash}`, - size: `${link.size}` + new Promise((resolve, reject) => { + if (sort) { + ipfs.files.ls(path || FILE_SEPARATOR, { + long, + sort, + cidBase + }) + .then(files => { + if (long) { + files.forEach(link => { + print(`${link.name}\t${link.hash}\t${link.size}`) + }) + } else { + files.forEach(link => print(link.name)) } - Object.keys(row).forEach(key => { - const value = row[key] - - lengths[key] = lengths[key] > value.length ? lengths[key] : value.length - }) - - table.push(row) - }) - - table.forEach(row => { - let line = '' - - Object.keys(row).forEach(key => { - const value = row[key] - - line += value.padEnd(lengths[key]) - line += '\t' - }) - - print(line) + resolve() }) - - return - } - - files.forEach(link => print(link.name)) - }) + .catch(reject) + + return + } + + pull( + ipfs.files.lsPullStream(path, { + long, + cidBase + }), + through(file => { + if (long) { + print(`${file.name}\t${file.hash}\t${file.size}`) + } else { + print(file.name) + } + }), + onEnd((error) => { + if (error) { + return reject(error) + } + + resolve() + }) + ) + }) ) } } diff --git a/src/core/index.js b/src/core/index.js index 496fa7c..ee99c09 100644 --- a/src/core/index.js +++ b/src/core/index.js @@ -30,7 +30,9 @@ const unwrappedOperations = { // These operations are synchronous and manage their own locking const unwrappedSynchronousOperations = { readPullStream: require('./read-pull-stream'), - readReadableStream: require('./read-readable-stream') + readReadableStream: require('./read-readable-stream'), + lsPullStream: require('./ls-pull-stream'), + lsReadableStream: require('./ls-readable-stream') } const wrap = ({ diff --git a/src/core/ls-pull-stream.js b/src/core/ls-pull-stream.js new file mode 100644 index 0000000..12f6ad0 --- /dev/null +++ b/src/core/ls-pull-stream.js @@ -0,0 +1,131 @@ +'use strict' + +const waterfall = require('async/waterfall') +const UnixFs = require('ipfs-unixfs') +const exporter = require('ipfs-unixfs-exporter') +const { + loadNode, + formatCid, + toMfsPath, + FILE_SEPARATOR, + FILE_TYPES +} = require('./utils') +const pull = require('pull-stream/pull') +const collect = require('pull-stream/sinks/collect') +const asyncMap = require('pull-stream/throughs/async-map') +const filter = require('pull-stream/throughs/filter') +const once = require('pull-stream/sources/once') +const error = require('pull-stream/sources/error') +const defer = require('pull-defer') + +const defaultOptions = { + long: false, + cidBase: 'base58btc' +} + +module.exports = (context) => { + return function mfsLs (path, options = {}) { + if (typeof path === 'object') { + options = path + path = FILE_SEPARATOR + } + + options = Object.assign({}, defaultOptions, options) + + options.long = options.l || options.long + + const deferred = defer.source() + + waterfall([ + (cb) => toMfsPath(context, path, cb), + ({ mfsPath, depth }, cb) => { + pull( + exporter(mfsPath, context.ipld, { + maxDepth: depth + }), + + collect((err, files) => { + if (err) { + return cb(err) + } + + if (files.length > 1) { + return cb(new Error(`Path ${path} had ${files.length} roots`)) + } + + const file = files[0] + + if (!file) { + return cb(new Error(`${path} does not exist`)) + } + + if (file.type !== 'dir') { + return cb(null, once(file)) + } + + let first = true + + return cb(null, pull( + exporter(mfsPath, context.ipld, { + maxDepth: depth + 1 + }), + // first item in list is the directory node + filter(() => { + if (first) { + first = false + return false + } + + return true + }) + )) + }) + ) + }, + (source, cb) => { + cb(null, + pull( + source, + + // load DAGNodes for each file + asyncMap((file, cb) => { + if (!options.long) { + return cb(null, { + name: file.name, + type: 0, + size: 0, + hash: '' + }) + } + + loadNode(context, { + cid: file.hash + }, (err, result) => { + if (err) { + return cb(err) + } + + const meta = UnixFs.unmarshal(result.node.data) + + cb(null, { + name: file.name, + type: FILE_TYPES[meta.type], + hash: formatCid(file.hash, options.cidBase), + size: meta.fileSize() || 0 + }) + }) + }) + ) + ) + } + ], (err, source) => { + if (err) { + return deferred.resolve(error(err)) + } + + deferred.resolve(source) + }) + + return deferred + } +} diff --git a/src/core/ls-readable-stream.js b/src/core/ls-readable-stream.js new file mode 100644 index 0000000..69d546d --- /dev/null +++ b/src/core/ls-readable-stream.js @@ -0,0 +1,10 @@ +'use strict' + +const lsPullStream = require('./ls-pull-stream') +const toStream = require('pull-stream-to-stream') + +module.exports = (context) => { + return function mfsLsReadableStream (path, options = {}) { + return toStream.source(lsPullStream(context)(path, options)) + } +} diff --git a/src/core/ls.js b/src/core/ls.js index 4ecd96b..b6622e4 100644 --- a/src/core/ls.js +++ b/src/core/ls.js @@ -1,25 +1,11 @@ 'use strict' -const waterfall = require('async/waterfall') -const UnixFs = require('ipfs-unixfs') -const exporter = require('ipfs-unixfs-exporter') const { - loadNode, - formatCid, - toMfsPath, - FILE_SEPARATOR, - FILE_TYPES + FILE_SEPARATOR } = require('./utils') const pull = require('pull-stream/pull') const collect = require('pull-stream/sinks/collect') -const asyncMap = require('pull-stream/throughs/async-map') -const filter = require('pull-stream/throughs/filter') - -const defaultOptions = { - long: false, - cidBase: 'base58btc', - unsorted: false -} +const lsPullStream = require('./ls-pull-stream') module.exports = (context) => { return function mfsLs (path, options, callback) { @@ -34,98 +20,22 @@ module.exports = (context) => { options = {} } - options = Object.assign({}, defaultOptions, options) - - options.long = options.l || options.long - - // if we are listing a file we want ls to return something - // if it's a directory it's ok for the results to be empty - let errorOnMissing = true - - waterfall([ - (cb) => toMfsPath(context, path, cb), - ({ mfsPath, depth }, cb) => { - const maxDepth = depth + 1 - - pull( - exporter(mfsPath, context.ipld, { - maxDepth - }), - filter(node => { - if (node.depth === depth && node.type === 'dir') { - errorOnMissing = false - } - - if (errorOnMissing) { - return node.depth === depth - } - - return node.depth === maxDepth - }), - - // load DAGNodes for each file - asyncMap((file, cb) => { - if (!options.long) { - return cb(null, { - name: file.name, - type: 0, - size: 0, - hash: '' - }) - } - - loadNode(context, { - cid: file.hash - }, (err, result) => { - if (err) { - return cb(err) - } - - const meta = UnixFs.unmarshal(result.node.data) - - cb(null, { - name: file.name, - type: meta.type, - hash: formatCid(file.hash, options.cidBase), - size: meta.fileSize() || 0 - }) - }) - }), - collect(cb) - ) - }, - - // https://github.com/ipfs/go-ipfs/issues/5181 - (files, cb) => { - if (options.unsorted) { - return cb(null, files) + pull( + lsPullStream(context)(path, options), + collect((err, files) => { + if (err) { + return callback(err) } - return cb(null, files.sort((a, b) => { - return b.name.localeCompare(a.name) - })) - }, - - // https://github.com/ipfs/go-ipfs/issues/5026 - (files, cb) => cb(null, files.map(file => { - if (!options.long) { - return file - } - - if (FILE_TYPES.hasOwnProperty(file.type)) { - file.type = FILE_TYPES[file.type] - } - - return file - })), - - (files, cb) => { - if (!files.length && errorOnMissing) { - return cb(new Error(path + ' does not exist')) + // https://github.com/ipfs/go-ipfs/issues/5181 + if (options.sort) { + return callback(null, files.sort((a, b) => { + return a.name.localeCompare(b.name) + })) } - cb(null, files) - } - ], callback) + return callback(null, files) + }) + ) } }