Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

[WIP] feat: support chunked add requests #1540

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,16 @@
"byteman": "^1.3.5",
"cids": "~0.5.3",
"debug": "^3.1.0",
"del": "^3.0.0",
"err-code": "^1.1.2",
"fast-glob": "^2.2.2",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use glob or replace glob with fast-glob?

"file-type": "^8.1.0",
"filesize": "^3.6.1",
"fnv1a": "^1.0.1",
"fsm-event": "^2.1.0",
"get-folder-size": "^2.0.0",
"glob": "^7.1.2",
"hapi": "^16.6.2",
"hapi-set-header": "^1.0.2",
"hoek": "^5.0.3",
"human-to-milliseconds": "^1.0.0",
"interface-datastore": "~0.4.2",
Expand Down Expand Up @@ -167,9 +168,11 @@
"read-pkg-up": "^4.0.0",
"readable-stream": "2.3.6",
"receptacle": "^1.3.2",
"stream-concat": "^0.3.0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use ~ for pre 1.0 dependencies

"stream-to-pull-stream": "^1.7.2",
"tar-stream": "^1.6.1",
"temp": "~0.8.3",
"tempy": "^0.2.1",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just use temp?

"through2": "^2.0.3",
"update-notifier": "^2.5.0",
"yargs": "^12.0.1",
Expand Down
4 changes: 2 additions & 2 deletions src/http/api/resources/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ exports.add = {

parser.on('file', (fileName, fileStream) => {
fileName = decodeURIComponent(fileName)

const filePair = {
path: fileName,
content: toPull(fileStream)
Expand All @@ -192,7 +193,6 @@ exports.add = {

parser.on('directory', (directory) => {
directory = decodeURIComponent(directory)

fileAdder.push({
path: directory,
content: ''
Expand Down Expand Up @@ -220,7 +220,7 @@ exports.add = {
rawLeaves: request.query['raw-leaves'],
progress: request.query.progress ? progressHandler : null,
onlyHash: request.query['only-hash'],
hashAlg: request.query['hash'],
hashAlg: request.query.hash,
wrapWithDirectory: request.query['wrap-with-directory'],
pin: request.query.pin,
chunker: request.query.chunker
Expand Down
213 changes: 211 additions & 2 deletions src/http/api/routes/files.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,121 @@
'use strict'

const resources = require('./../resources')
const fs = require('fs')
const path = require('path')
const tempy = require('tempy')
const del = require('del')
const StreamConcat = require('stream-concat')
const boom = require('boom')
const pump = require('pump')
const glob = require('fast-glob')
const Joi = require('joi')
const content = require('content')
const { Parser } = require('ipfs-multipart')
const toPull = require('stream-to-pull-stream')
const toStream = require('pull-stream-to-stream')
const pull = require('pull-stream')
const pushable = require('pull-pushable')
const abortable = require('pull-abortable')
const { serialize } = require('pull-ndjson')
const mfs = require('ipfs-mfs/http')
const resources = require('./../resources')

const filesDir = tempy.directory()
Copy link
Member

@lidel lidel Sep 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so that we don't forget: we should remove temporary files after successful upload, but also have a plan for what happens with temp files produced by failed/aborted ones. I don't think we can assume temp dir will be cleared by the OS.
Leaving them forever may be a self-administered DoS attack (eventually running out of disk space)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes definitely should go like this

  • success adding ? delete file
  • errors during session ? define a way to remove only the last chunk from the file ( or have one file per chunk ) this would allow for resumable add's
  • on new session start async check the temp folder and delete all files older than 1 hour
  • also we need to handle open streams, too many can become a problem


const parseChunkedInput = (request) => {
const input = request.headers['x-chunked-input']
const regex = /^uuid="([^"]+)";\s*index=(\d*)/i

if (!input) {
return null
}
const match = input.match(regex)

return [match[1], Number(match[2])]
}

const createMultipartReply = (readStream, request, reply, cb) => {
const fileAdder = pushable()
const boundary = content.type(request.headers['content-type']).boundary
const ipfs = request.server.app.ipfs
const query = request.query
const parser = new Parser({ boundary: boundary })
readStream.pipe(parser)

parser.on('file', (fileName, fileStream) => {
fileAdder.push({
path: decodeURIComponent(fileName),
content: toPull(fileStream)
})
})

parser.on('directory', (directory) => {
fileAdder.push({
path: decodeURIComponent(directory),
content: ''
})
})

parser.on('end', () => {
fileAdder.end()
})

parser.on('error', err => cb(err))

const pushStream = pushable()
const abortStream = abortable()
const replyStream = toStream.source(pull(
pushStream,
abortStream,
serialize()
))

// Fix Hapi Error: Stream must have a streams2 readable interface
if (!replyStream._read) {
replyStream._read = () => {}
replyStream._readableState = {}
replyStream.unpipe = () => {}
}

// setup reply
reply(replyStream)
.header('x-chunked-output', '1')
.header('content-type', 'application/json')

const progressHandler = (bytes) => {
pushStream.push({ Bytes: bytes })
}
// ipfs add options
const options = {
cidVersion: query['cid-version'],
rawLeaves: query['raw-leaves'],
progress: query.progress ? progressHandler : null,
onlyHash: query['only-hash'],
hashAlg: query.hash,
wrapWithDirectory: query['wrap-with-directory'],
pin: query.pin,
chunker: query.chunker
}

pull(
fileAdder,
ipfs.files.addPullStream(options),
pull.collect((err, files) => {
if (err) {
pushStream.push({
Message: err.toString(),
Code: 0,
Copy link
Member

@lidel lidel Sep 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I've tracked down the source of values used in Code field in go-lang implementation (at go-ipfs-cmdkit/error.go#L9-L25).

Translating from iota notation, it is an enum:

	ErrNormal         //  ErrorType = 0
	ErrClient         //  ErrorType = 1
	ErrImplementation //  ErrorType = 2
	ErrNotFound       //  ErrorType = 3
	ErrFatal          //  ErrorType = 3

Not sure if that is a problem, probably hardcoding 0 here is fine.

Type: 'error'
})
pushStream.end()
return
}
files.forEach((f) => pushStream.push(f))
pushStream.end()
cb()
})
)
}
module.exports = (server) => {
const api = server.select('API')

Expand Down Expand Up @@ -37,13 +150,109 @@ module.exports = (server) => {
config: {
payload: {
parse: false,
output: 'stream'
// output: 'stream',
maxBytes: 10048576
},
handler: resources.files.add.handler,
validate: resources.files.add.validate
}
})

api.route({
method: 'POST',
path: '/api/v0/add-chunked',
config: {
payload: {
parse: false,
output: 'stream',
maxBytes: 1000 * 1024 * 1024
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • How do we pick what is the default max chunk size? Eg. why 1GB and not 256MB ?
  • Is this going to be a hardcoded limit, or a configuration option that can be passed to js-ipfs constructor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we pick what is the default max chunk size? Eg. why 1GB and not 256MB ?

we need this value to be high for the non chunked path

Is this going to be a hardcoded limit, or a configuration option that can be passed to js-ipfs constructor?

don't know. what you think ?

from the hapi documentation when output=stream, maxBytes doesn't matter but this doesnt seem to be the case. Needs further investigation, maybe it's a matter of upgrading hapi dunno

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of anything right now.
Unless you see good use case for customizing it, lets go with hardcoded value for now.
Exposing it as a config property can be contributed later in separate PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should just be the maximum size we're prepared to accept for /api/v0/add.

// maxBytes: 10485760
},
validate: {
headers: {
'content-range': Joi.string().regex(/(\d+)-(\d+)\/(\d+|\*)/),
'x-chunked-input': Joi.string().regex(/^uuid="([^"]+)";\s*index=(\d*)/i)
},
options: {
allowUnknown: true
}
},
handler: (request, reply) => {
const chunkedInput = parseChunkedInput(request)

if (boom.isBoom(chunkedInput)) {
return reply(chunkedInput)
}

// non chunked
if (!chunkedInput) {
createMultipartReply(
request.payload,
request,
reply,
(err) => {
if (err) {
return reply(err)
}
console.log('Finished adding')
}
)

return
}

// chunked
const [uuid, index] = chunkedInput
const [, start, , total] = request.headers['content-range'].match(/(\d+)-(\d+)\/(\d+|\*)/)
const file = path.join(filesDir, uuid) + '-' + index

// TODO validate duplicates, missing chunks

if (start === total) {
/* all chunks have been received */
const base = path.join(filesDir, uuid) + '-'
const pattern = base + '*'
const files = glob.sync(pattern)

files.sort((a, b) => {
return Number(a.replace(base, '')) - Number(b.replace(base, ''))
})

let fileIndex = 0
const nextStream = () => fileIndex === files.length ? null : fs.createReadStream(files[fileIndex++])
createMultipartReply(
new StreamConcat(nextStream),
request,
reply,
(err) => {
if (err) {
return reply(err)
}

console.log('Finished adding')
del(pattern, { force: true })
.then(paths => {
console.log('Deleted files and folders:\n', paths.join('\n'))
})
.catch(console.error)
}
)
} else {
pump(
request.payload,
fs.createWriteStream(file),
(err) => {
if (err) {
return reply(err)
}
reply({ Bytes: total })
}
)
}
}
}
})

api.route({
// TODO fix method
method: '*',
Expand Down
15 changes: 5 additions & 10 deletions src/http/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ const series = require('async/series')
const Hapi = require('hapi')
const debug = require('debug')
const multiaddr = require('multiaddr')
const setHeader = require('hapi-set-header')
const once = require('once')

const IPFS = require('../core')
Expand Down Expand Up @@ -103,7 +102,11 @@ function HttpApi (repo, config, cliArgs) {
this.server = new Hapi.Server({
connections: {
routes: {
cors: true
cors: {
origin: ['*'],
additionalHeaders: ['X-Stream-Output, X-Chunked-Output, X-Content-Length', 'Content-Type', 'Content-Range', 'X-Chunked-Input'],
additionalExposedHeaders: ['X-Stream-Output, X-Chunked-Output, X-Content-Length', 'X-Chunked-Input']
}
}
},
debug: process.env.DEBUG ? {
Expand Down Expand Up @@ -137,14 +140,6 @@ function HttpApi (repo, config, cliArgs) {
// load gateway routes
require('./gateway/routes')(this.server)

// Set default headers
setHeader(this.server,
'Access-Control-Allow-Headers',
'X-Stream-Output, X-Chunked-Output, X-Content-Length')
setHeader(this.server,
'Access-Control-Expose-Headers',
'X-Stream-Output, X-Chunked-Output, X-Content-Length')

this.server.start(cb)
})
},
Expand Down