Skip to content
This repository was archived by the owner on Aug 12, 2020. It is now read-only.

Commit 879f9a8

Browse files
committed
End export stream on completion.
1 parent 4e11dfc commit 879f9a8

File tree

3 files changed

+75
-65
lines changed

3 files changed

+75
-65
lines changed

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@
3535
"homepage": "https://github.com/diasdavid/js-ipfs-data-importing#readme",
3636
"devDependencies": {
3737
"aegir": "^3.0.1",
38+
"async": "^1.5.2",
3839
"block-stream2": "^1.1.0",
3940
"bs58": "^3.0.0",
4041
"buffer-loader": "0.0.1",
4142
"chai": "^3.5.0",
43+
"concat-stream": "^1.5.1",
4244
"fs-blob-store": "^5.2.1",
4345
"idb-plus-blob-store": "^1.1.2",
4446
"ipfs-repo": "^0.7.5",
@@ -51,9 +53,9 @@
5153
"string-to-stream": "^1.0.1"
5254
},
5355
"dependencies": {
54-
"async": "^1.5.2",
5556
"block-stream2": "^1.1.0",
5657
"debug": "^2.2.0",
58+
"field-trip": "0.0.2",
5759
"ipfs-merkle-dag": "^0.5.0",
5860
"ipfs-unixfs": "^0.1.0",
5961
"isstream": "^0.1.2",

src/exporter.js

Lines changed: 44 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ const log = debug('exporter')
55
log.err = debug('exporter:error')
66
const UnixFS = require('ipfs-unixfs')
77
const series = require('run-series')
8-
const async = require('async')
98
const Readable = require('readable-stream').Readable
109
const pathj = require('path')
1110
const util = require('util')
11+
const fieldtrip = require('field-trip')
1212

1313
exports = module.exports = Exporter
1414

@@ -25,11 +25,12 @@ function Exporter (hash, dagService, options) {
2525

2626
this._read = (n) => {}
2727

28-
let fileExporter = (node, name, callback) => {
28+
let fileExporter = (node, name, done) => {
2929
let init
3030

31-
if (!callback) { callback = function noop () {} }
31+
if (!done) throw new Error('done must be set')
3232

33+
// Logic to export a single (possibly chunked) unixfs file.
3334
var rs = new Readable()
3435
if (node.links.length === 0) {
3536
const unmarshaledData = UnixFS.unmarshal(node.data)
@@ -43,8 +44,7 @@ function Exporter (hash, dagService, options) {
4344
rs.push(null)
4445
}
4546
this.push({ content: rs, path: name })
46-
callback()
47-
return
47+
done()
4848
} else {
4949
init = false
5050
rs._read = () => {
@@ -57,7 +57,7 @@ function Exporter (hash, dagService, options) {
5757
return (cb) => {
5858
dagService.get(link.hash, (err, res) => {
5959
if (err) {
60-
cb(err)
60+
return cb(err)
6161
}
6262
var unmarshaledData = UnixFS.unmarshal(res.data)
6363
rs.push(unmarshaledData.data)
@@ -67,26 +67,28 @@ function Exporter (hash, dagService, options) {
6767
})
6868
series(array, (err, res) => {
6969
if (err) {
70-
callback()
70+
rs.emit('error', err)
7171
return
7272
}
7373
rs.push(null)
74-
callback()
7574
return
7675
})
7776
}
7877
this.push({ content: rs, path: name })
79-
callback()
80-
return
78+
done()
8179
}
8280
}
8381

84-
let dirExporter = (node, name, callback) => {
82+
// Logic to export a unixfs directory.
83+
let dirExporter = (node, name, add, done) => {
8584
let init
8685

87-
if (!callback) { callback = function noop () {} }
86+
if (!add) throw new Error('add must be set')
87+
if (!done) throw new Error('done must be set')
8888

8989
var rs = new Readable()
90+
91+
// Directory has no links
9092
if (node.links.length === 0) {
9193
init = false
9294
rs._read = () => {
@@ -98,49 +100,45 @@ function Exporter (hash, dagService, options) {
98100
rs.push(null)
99101
}
100102
this.push({content: null, path: name})
101-
callback()
102-
return
103+
done()
103104
} else {
104-
async.forEachSeries(node.links, (link, callback) => {
105-
dagService.get(link.hash, (err, res) => {
106-
if (err) {
107-
callback(err)
108-
}
109-
var unmarshaledData = UnixFS.unmarshal(res.data)
110-
if (unmarshaledData.type === 'file') {
111-
return (fileExporter(res, pathj.join(name, link.name), callback))
112-
}
113-
if (unmarshaledData.type === 'directory') {
114-
return (dirExporter(res, pathj.join(name, link.name), callback))
115-
}
116-
callback()
117-
})
118-
}, (err) => {
119-
if (err) {
120-
callback()
121-
return
122-
}
123-
callback()
124-
return
105+
node.links.forEach((link) => {
106+
add({ path: pathj.join(name, link.name), hash: link.hash })
125107
})
108+
done()
126109
}
127110
}
128111

129-
dagService.get(hash, (err, fetchedNode) => {
112+
// Traverse the DAG asynchronously
113+
var self = this
114+
fieldtrip([{ path: hash, hash: hash }], visit, (err) => {
130115
if (err) {
131-
this.emit('error', err)
116+
self.emit('error', err)
132117
return
133118
}
134-
const data = UnixFS.unmarshal(fetchedNode.data)
135-
const type = data.type
136-
137-
if (type === 'directory') {
138-
dirExporter(fetchedNode, hash)
139-
}
140-
if (type === 'file') {
141-
fileExporter(fetchedNode, hash)
142-
}
119+
self.push(null)
143120
})
144121

122+
// Visit function: called once per node in the exported graph
123+
function visit (item, add, done) {
124+
dagService.get(item.hash, (err, fetchedNode) => {
125+
if (err) {
126+
self.emit('error', err)
127+
return
128+
}
129+
130+
const data = UnixFS.unmarshal(fetchedNode.data)
131+
const type = data.type
132+
133+
if (type === 'directory') {
134+
dirExporter(fetchedNode, item.path, add, done)
135+
}
136+
137+
if (type === 'file') {
138+
fileExporter(fetchedNode, item.path, done)
139+
}
140+
})
141+
}
142+
145143
return this
146144
}

test/test-exporter.js

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ const expect = require('chai').expect
77
const BlockService = require('ipfs-block-service')
88
const DAGService = require('ipfs-merkle-dag').DAGService
99
const UnixFS = require('ipfs-unixfs')
10-
const bl = require('bl')
10+
const concat = require('concat-stream')
1111
const fs = require('fs')
1212
const path = require('path')
1313

@@ -32,13 +32,16 @@ module.exports = function (repo) {
3232
const unmarsh = UnixFS.unmarshal(fetchedNode.data)
3333
expect(err).to.not.exist
3434
const testExport = exporter(hash, ds)
35-
testExport.on('data', (file) => {
36-
file.content.pipe(bl((err, bldata) => {
37-
expect(err).to.not.exist
35+
testExport.on('error', (err) => {
36+
expect(err).to.not.exist
37+
})
38+
testExport.pipe(concat((files) => {
39+
expect(files).to.be.length(1)
40+
files[0].content.pipe(concat((bldata) => {
3841
expect(bldata).to.deep.equal(unmarsh.data)
3942
done()
4043
}))
41-
})
44+
}))
4245
})
4346
})
4447

@@ -47,10 +50,12 @@ module.exports = function (repo) {
4750
const bs = new BlockService(repo)
4851
const ds = new DAGService(bs)
4952
const testExport = exporter(hash, ds)
53+
testExport.on('error', (err) => {
54+
expect(err).to.not.exist
55+
})
5056
testExport.on('data', (file) => {
51-
file.content.pipe(bl((err, bldata) => {
57+
file.content.pipe(concat((bldata) => {
5258
expect(bldata).to.deep.equal(bigFile)
53-
expect(err).to.not.exist
5459
done()
5560
}))
5661
})
@@ -61,10 +66,13 @@ module.exports = function (repo) {
6166
const bs = new BlockService(repo)
6267
const ds = new DAGService(bs)
6368
const testExport = exporter(hash, ds)
69+
testExport.on('error', (err) => {
70+
expect(err).to.not.exist
71+
})
6472
testExport.on('data', (file) => {
6573
expect(file.path).to.equal('QmRQgufjp9vLE8XK2LGKZSsPCFCF6e4iynCQtNB5X2HBKE')
66-
file.content.pipe(bl((err, bldata) => {
67-
expect(err).to.not.exist
74+
file.content.pipe(concat((bldata) => {
75+
expect(bldata).to.exist
6876
done()
6977
}))
7078
})
@@ -75,24 +83,26 @@ module.exports = function (repo) {
7583
const bs = new BlockService(repo)
7684
const ds = new DAGService(bs)
7785
const testExport = exporter(hash, ds)
78-
var fsa = []
79-
testExport.on('data', (files) => {
80-
fsa.push(files)
86+
testExport.on('error', (err) => {
87+
expect(err).to.not.exist
8188
})
82-
setTimeout(() => {
83-
expect(fsa[0].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/200Bytes.txt')
84-
expect(fsa[1].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/dir-another')
85-
expect(fsa[2].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/200Bytes.txt')
86-
expect(fsa[3].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/level-2')
89+
testExport.pipe(concat((files) => {
90+
expect(files[0].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/200Bytes.txt')
91+
expect(files[1].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/dir-another')
92+
expect(files[2].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/200Bytes.txt')
93+
expect(files[3].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/level-2')
8794
done()
88-
}, 1000)
95+
}))
8996
})
9097

9198
it('returns a null stream for dir', (done) => {
9299
const hash = 'QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn' // This hash doesn't exist in the repo
93100
const bs = new BlockService(repo)
94101
const ds = new DAGService(bs)
95102
const testExport = exporter(hash, ds)
103+
testExport.on('error', (err) => {
104+
expect(err).to.not.exist
105+
})
96106
testExport.on('data', (dir) => {
97107
expect(dir.content).to.equal(null)
98108
done()

0 commit comments

Comments
 (0)