-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[WIP] feat: support chunked add requests #1540
Changes from 9 commits
24fb89f
6c85b4e
1234218
95dfd21
eee889b
0d4581a
cd3d25e
405a7e6
e5c1d80
a0d731b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -93,15 +93,16 @@ | |
"byteman": "^1.3.5", | ||
"cids": "~0.5.3", | ||
"debug": "^3.1.0", | ||
"del": "^3.0.0", | ||
"err-code": "^1.1.2", | ||
"fast-glob": "^2.2.2", | ||
"file-type": "^8.1.0", | ||
"filesize": "^3.6.1", | ||
"fnv1a": "^1.0.1", | ||
"fsm-event": "^2.1.0", | ||
"get-folder-size": "^2.0.0", | ||
"glob": "^7.1.2", | ||
"hapi": "^16.6.2", | ||
"hapi-set-header": "^1.0.2", | ||
"hoek": "^5.0.3", | ||
"human-to-milliseconds": "^1.0.0", | ||
"interface-datastore": "~0.4.2", | ||
|
@@ -167,9 +168,11 @@ | |
"read-pkg-up": "^4.0.0", | ||
"readable-stream": "2.3.6", | ||
"receptacle": "^1.3.2", | ||
"stream-concat": "^0.3.0", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use |
||
"stream-to-pull-stream": "^1.7.2", | ||
"tar-stream": "^1.6.1", | ||
"temp": "~0.8.3", | ||
"tempy": "^0.2.1", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just use |
||
"through2": "^2.0.3", | ||
"update-notifier": "^2.5.0", | ||
"yargs": "^12.0.1", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,121 @@ | ||
'use strict' | ||
|
||
const resources = require('./../resources') | ||
const fs = require('fs') | ||
const path = require('path') | ||
const tempy = require('tempy') | ||
const del = require('del') | ||
const StreamConcat = require('stream-concat') | ||
const boom = require('boom') | ||
const pump = require('pump') | ||
const glob = require('fast-glob') | ||
const Joi = require('joi') | ||
const content = require('content') | ||
const { Parser } = require('ipfs-multipart') | ||
const toPull = require('stream-to-pull-stream') | ||
const toStream = require('pull-stream-to-stream') | ||
const pull = require('pull-stream') | ||
const pushable = require('pull-pushable') | ||
const abortable = require('pull-abortable') | ||
const { serialize } = require('pull-ndjson') | ||
const mfs = require('ipfs-mfs/http') | ||
const resources = require('./../resources') | ||
|
||
const filesDir = tempy.directory() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just so that we don't forget: we should remove temporary files after successful upload, but also have a plan for what happens with temp files produced by failed/aborted ones. I don't think we can assume temp dir will be cleared by the OS. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes definitely should go like this
|
||
|
||
const parseChunkedInput = (request) => { | ||
const input = request.headers['x-chunked-input'] | ||
const regex = /^uuid="([^"]+)";\s*index=(\d*)/i | ||
|
||
if (!input) { | ||
return null | ||
} | ||
const match = input.match(regex) | ||
|
||
return [match[1], Number(match[2])] | ||
} | ||
|
||
const createMultipartReply = (readStream, request, reply, cb) => { | ||
const fileAdder = pushable() | ||
const boundary = content.type(request.headers['content-type']).boundary | ||
const ipfs = request.server.app.ipfs | ||
const query = request.query | ||
const parser = new Parser({ boundary: boundary }) | ||
readStream.pipe(parser) | ||
|
||
parser.on('file', (fileName, fileStream) => { | ||
fileAdder.push({ | ||
path: decodeURIComponent(fileName), | ||
content: toPull(fileStream) | ||
}) | ||
}) | ||
|
||
parser.on('directory', (directory) => { | ||
fileAdder.push({ | ||
path: decodeURIComponent(directory), | ||
content: '' | ||
}) | ||
}) | ||
|
||
parser.on('end', () => { | ||
fileAdder.end() | ||
}) | ||
|
||
parser.on('error', err => cb(err)) | ||
|
||
const pushStream = pushable() | ||
const abortStream = abortable() | ||
const replyStream = toStream.source(pull( | ||
pushStream, | ||
abortStream, | ||
serialize() | ||
)) | ||
|
||
// Fix Hapi Error: Stream must have a streams2 readable interface | ||
if (!replyStream._read) { | ||
replyStream._read = () => {} | ||
replyStream._readableState = {} | ||
replyStream.unpipe = () => {} | ||
} | ||
|
||
// setup reply | ||
reply(replyStream) | ||
.header('x-chunked-output', '1') | ||
.header('content-type', 'application/json') | ||
|
||
const progressHandler = (bytes) => { | ||
pushStream.push({ Bytes: bytes }) | ||
} | ||
// ipfs add options | ||
const options = { | ||
cidVersion: query['cid-version'], | ||
rawLeaves: query['raw-leaves'], | ||
progress: query.progress ? progressHandler : null, | ||
onlyHash: query['only-hash'], | ||
hashAlg: query.hash, | ||
wrapWithDirectory: query['wrap-with-directory'], | ||
pin: query.pin, | ||
chunker: query.chunker | ||
} | ||
|
||
pull( | ||
fileAdder, | ||
ipfs.files.addPullStream(options), | ||
pull.collect((err, files) => { | ||
if (err) { | ||
pushStream.push({ | ||
Message: err.toString(), | ||
Code: 0, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI I've tracked down the source of values used in Translating from iota notation, it is an enum: ErrNormal // ErrorType = 0
ErrClient // ErrorType = 1
ErrImplementation // ErrorType = 2
ErrNotFound // ErrorType = 3
ErrFatal // ErrorType = 3 Not sure if that is a problem, probably hardcoding |
||
Type: 'error' | ||
}) | ||
pushStream.end() | ||
return | ||
} | ||
files.forEach((f) => pushStream.push(f)) | ||
pushStream.end() | ||
cb() | ||
}) | ||
) | ||
} | ||
module.exports = (server) => { | ||
const api = server.select('API') | ||
|
||
|
@@ -37,13 +150,109 @@ module.exports = (server) => { | |
config: { | ||
payload: { | ||
parse: false, | ||
output: 'stream' | ||
// output: 'stream', | ||
maxBytes: 10048576 | ||
}, | ||
handler: resources.files.add.handler, | ||
validate: resources.files.add.validate | ||
} | ||
}) | ||
|
||
api.route({ | ||
method: 'POST', | ||
path: '/api/v0/add-chunked', | ||
config: { | ||
payload: { | ||
parse: false, | ||
output: 'stream', | ||
maxBytes: 1000 * 1024 * 1024 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
we need this value to be high for the non chunked path
don't know. what you think ? from the hapi documentation when output=stream, maxBytes doesn't matter but this doesnt seem to be the case. Needs further investigation, maybe it's a matter of upgrading hapi dunno There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can't think of anything right now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should just be the maximum size we're prepared to accept for |
||
// maxBytes: 10485760 | ||
}, | ||
validate: { | ||
headers: { | ||
'content-range': Joi.string().regex(/(\d+)-(\d+)\/(\d+|\*)/), | ||
'x-chunked-input': Joi.string().regex(/^uuid="([^"]+)";\s*index=(\d*)/i) | ||
}, | ||
options: { | ||
allowUnknown: true | ||
} | ||
}, | ||
handler: (request, reply) => { | ||
const chunkedInput = parseChunkedInput(request) | ||
|
||
if (boom.isBoom(chunkedInput)) { | ||
return reply(chunkedInput) | ||
} | ||
|
||
// non chunked | ||
if (!chunkedInput) { | ||
createMultipartReply( | ||
request.payload, | ||
request, | ||
reply, | ||
(err) => { | ||
if (err) { | ||
return reply(err) | ||
} | ||
console.log('Finished adding') | ||
} | ||
) | ||
|
||
return | ||
} | ||
|
||
// chunked | ||
const [uuid, index] = chunkedInput | ||
const [, start, , total] = request.headers['content-range'].match(/(\d+)-(\d+)\/(\d+|\*)/) | ||
const file = path.join(filesDir, uuid) + '-' + index | ||
|
||
// TODO validate duplicates, missing chunks | ||
|
||
if (start === total) { | ||
/* all chunks have been received */ | ||
const base = path.join(filesDir, uuid) + '-' | ||
const pattern = base + '*' | ||
const files = glob.sync(pattern) | ||
|
||
files.sort((a, b) => { | ||
return Number(a.replace(base, '')) - Number(b.replace(base, '')) | ||
}) | ||
|
||
let fileIndex = 0 | ||
const nextStream = () => fileIndex === files.length ? null : fs.createReadStream(files[fileIndex++]) | ||
createMultipartReply( | ||
new StreamConcat(nextStream), | ||
request, | ||
reply, | ||
(err) => { | ||
if (err) { | ||
return reply(err) | ||
} | ||
|
||
console.log('Finished adding') | ||
del(pattern, { force: true }) | ||
.then(paths => { | ||
console.log('Deleted files and folders:\n', paths.join('\n')) | ||
}) | ||
.catch(console.error) | ||
} | ||
) | ||
} else { | ||
pump( | ||
request.payload, | ||
fs.createWriteStream(file), | ||
(err) => { | ||
if (err) { | ||
return reply(err) | ||
} | ||
reply({ Bytes: total }) | ||
} | ||
) | ||
} | ||
} | ||
} | ||
}) | ||
|
||
api.route({ | ||
// TODO fix method | ||
method: '*', | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use
glob
or replaceglob
withfast-glob
?