Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit 1234218

Browse files
committed
feat: add multipart response stream
1 parent 6c85b4e commit 1234218

File tree

1 file changed

+75
-31
lines changed

1 file changed

+75
-31
lines changed

src/http/api/routes/files.js

Lines changed: 75 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,57 +6,107 @@ const fs = require('fs')
66
const path = require('path')
77
const tempy = require('tempy')
88
const multipart = require('ipfs-multipart')
9-
const pull = require('pull-stream')
109
const toPull = require('stream-to-pull-stream')
10+
const toStream = require('pull-stream-to-stream')
11+
const pull = require('pull-stream')
1112
const pushable = require('pull-pushable')
13+
const abortable = require('pull-abortable')
14+
const { serialize } = require('pull-ndjson')
1215

1316
const streams = []
1417
const filesDir = tempy.directory()
1518

16-
const createMultipartStream = (readStream, boundary, ipfs, cb) => {
19+
const responseError = (msg, code, request, abortStream) => {
20+
const err = JSON.stringify({ Message: msg, Code: code })
21+
request.raw.res.addTrailers({
22+
'X-Stream-Error': err
23+
})
24+
abortStream.abort()
25+
}
26+
const createMultipartStream = (readStream, boundary, ipfs, request, reply, cb) => {
27+
const fileAdder = pushable()
1728
const parser = new multipart.Parser({ boundary: boundary })
29+
let filesParsed = false
30+
1831
readStream.pipe(parser)
19-
const fileAdder = pushable()
2032

2133
parser.on('file', (fileName, fileStream) => {
22-
fileName = decodeURIComponent(fileName)
23-
24-
const filePair = {
25-
path: fileName,
34+
console.log('File: ', fileName)
35+
filesParsed = true
36+
fileAdder.push({
37+
path: decodeURIComponent(fileName),
2638
content: toPull(fileStream)
27-
}
28-
console.log(filePair)
29-
fileAdder.push(filePair)
39+
})
3040
})
3141

3242
parser.on('directory', (directory) => {
33-
directory = decodeURIComponent(directory)
3443
fileAdder.push({
35-
path: directory,
44+
path: decodeURIComponent(directory),
3645
content: ''
3746
})
3847
})
3948

4049
parser.on('end', () => {
50+
console.log('multipart end')
4151
fileAdder.end()
52+
if (!filesParsed) {
53+
reply({
54+
Message: "File argument 'data' is required.",
55+
Code: 0,
56+
Type: 'error'
57+
}).code(400).takeover()
58+
}
4259
})
4360

61+
const pushStream = pushable()
62+
const abortStream = abortable()
63+
const replyStream = toStream.source(pull(
64+
pushStream,
65+
abortStream,
66+
serialize()
67+
))
68+
69+
// Fix Hapi Error: Stream must have a streams2 readable interface
70+
if (!replyStream._read) {
71+
replyStream._read = () => {}
72+
replyStream._readableState = {}
73+
replyStream.unpipe = () => {}
74+
}
75+
76+
// setup reply
77+
reply(replyStream)
78+
.header('x-chunked-output', '1')
79+
.header('content-type', 'application/json')
80+
.header('Trailer', 'X-Stream-Error')
81+
82+
const progressHandler = (bytes) => {
83+
pushStream.push({ Bytes: bytes })
84+
}
85+
// ipfs add options
86+
const options = {
87+
cidVersion: request.query['cid-version'],
88+
rawLeaves: request.query['raw-leaves'],
89+
progress: request.query.progress ? progressHandler : null,
90+
onlyHash: request.query['only-hash'],
91+
hashAlg: request.query.hash,
92+
wrapWithDirectory: request.query['wrap-with-directory'],
93+
pin: request.query.pin,
94+
chunker: request.query.chunker
95+
}
96+
4497
pull(
4598
fileAdder,
46-
ipfs.files.addPullStream(),
47-
pull.map((file) => {
48-
return {
49-
Name: file.path, // addPullStream already turned this into a hash if it wanted to
50-
Hash: file.hash,
51-
Size: file.size
52-
}
53-
}),
99+
ipfs.files.addPullStream(options),
54100
pull.collect((err, files) => {
55101
if (err) {
56-
cb(err)
57-
return
102+
return responseError(err.msg, 0, request)
58103
}
59-
cb(null, files)
104+
if (files.length === 0) {
105+
return responseError('Failed to add files.', 0, request)
106+
}
107+
console.log(files)
108+
files.forEach((f) => pushStream.push(f))
109+
pushStream.end()
60110
})
61111
)
62112

@@ -111,7 +161,7 @@ module.exports = (server) => {
111161
config: {
112162
payload: {
113163
parse: false,
114-
maxBytes: 10048576
164+
maxBytes: 10485760
115165
},
116166
handler: (request, reply) => {
117167
console.log('received')
@@ -166,13 +216,7 @@ module.exports = (server) => {
166216
stream.on('finish', function () {
167217
console.log('add to ipfs from the file')
168218
var readStream = fs.createReadStream(file)
169-
createMultipartStream(readStream, boundary, ipfs, (err, files) => {
170-
if (err) {
171-
console.error(err)
172-
}
173-
console.log('finished adding to ipfs', files)
174-
reply({files})
175-
})
219+
createMultipartStream(readStream, boundary, ipfs, request, reply)
176220
})
177221

178222
stream.end()

0 commit comments

Comments
 (0)