Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

perf: expose importer concurrency controls when adding files #2637

Merged
merged 12 commits into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const preloadNode = MockPreloadNode.createNode()
const echoServer = EchoServer.createServer()

module.exports = {
bundlesize: { maxSize: '650kB' },
bundlesize: { maxSize: '651kB' },
webpack: {
resolve: {
mainFields: ['browser', 'main'],
Expand Down
7 changes: 3 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@
"@hapi/joi": "^15.0.0",
"abort-controller": "^3.0.0",
"array-shuffle": "^1.0.1",
"async-iterator-all": "^1.0.0",
"async-iterator-first": "^1.0.0",
"async-iterator-to-pull-stream": "^1.3.0",
"async-iterator-to-stream": "^1.1.0",
"base32.js": "~0.1.0",
Expand Down Expand Up @@ -100,14 +98,14 @@
"ipfs-bitswap": "^0.26.0",
"ipfs-block": "~0.8.1",
"ipfs-block-service": "~0.16.0",
"ipfs-http-client": "^40.0.1",
"ipfs-http-client": "^40.1.0",
"ipfs-http-response": "~0.4.0",
"ipfs-mfs": "^0.13.2",
"ipfs-multipart": "^0.2.0",
"ipfs-repo": "^0.30.0",
"ipfs-unixfs": "~0.1.16",
"ipfs-unixfs-exporter": "^0.38.0",
"ipfs-unixfs-importer": "^0.40.0",
"ipfs-unixfs-importer": "^0.42.0",
"ipfs-utils": "~0.4.0",
"ipld": "~0.25.0",
"ipld-bitcoin": "~0.3.0",
Expand All @@ -123,6 +121,7 @@
"is-pull-stream": "~0.0.0",
"is-stream": "^2.0.0",
"iso-url": "~0.4.6",
"it-all": "^1.0.1",
"it-pipe": "^1.0.1",
"it-to-stream": "^0.1.1",
"jsondiffpatch": "~0.3.11",
Expand Down
15 changes: 14 additions & 1 deletion src/cli/commands/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,20 @@ module.exports = {
default: false,
describe: 'Only chunk and hash, do not write'
},
'block-write-concurrency': {
type: 'integer',
default: 10,
describe: 'After a file has been chunked, this controls how many chunks to hash and add to the block store concurrently'
},
chunker: {
default: 'size-262144',
describe: 'Chunking algorithm to use, formatted like [size-{size}, rabin, rabin-{avg}, rabin-{min}-{avg}-{max}]'
},
'file-import-concurrency': {
type: 'integer',
default: 50,
describe: 'How many files to import at once'
},
'enable-sharding-experiment': {
type: 'boolean',
default: false
Expand Down Expand Up @@ -124,7 +134,10 @@ module.exports = {
wrapWithDirectory: argv.wrapWithDirectory,
pin: argv.pin,
chunker: argv.chunker,
preload: argv.preload
preload: argv.preload,
nonatomic: argv.nonatomic,
fileImportConcurrency: argv.fileImportConcurrency,
blockWriteConcurrency: argv.blockWriteConcurrency
}

if (options.enableShardingExperiment && argv.isDaemonOn()) {
Expand Down
2 changes: 1 addition & 1 deletion src/core/components/block.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const multihashing = require('multihashing-async')
const CID = require('cids')
const callbackify = require('callbackify')
const errCode = require('err-code')
const all = require('async-iterator-all')
const all = require('it-all')
const { PinTypes } = require('./pin/pin-manager')

module.exports = function block (self) {
Expand Down
2 changes: 1 addition & 1 deletion src/core/components/dag.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const callbackify = require('callbackify')
const CID = require('cids')
const all = require('async-iterator-all')
const all = require('it-all')
const errCode = require('err-code')
const multicodec = require('multicodec')

Expand Down
2 changes: 1 addition & 1 deletion src/core/components/files-mfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const isPullStream = require('is-pull-stream')
const toPullStream = require('async-iterator-to-pull-stream')
const toReadableStream = require('async-iterator-to-stream')
const pullStreamToAsyncIterator = require('pull-stream-to-async-iterator')
const all = require('async-iterator-all')
const all = require('it-all')
const nodeify = require('promise-nodeify')
const PassThrough = require('stream').PassThrough
const pull = require('pull-stream/pull')
Expand Down
3 changes: 1 addition & 2 deletions src/core/components/files-regular/add-async-iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ module.exports = function (self) {
: Infinity
}, options, {
strategy: 'balanced',
chunker: chunkerOptions.chunker,
chunkerOptions: chunkerOptions.chunkerOptions
...chunkerOptions
})

// CID v0 is for multihashes encoded with sha2-256
Expand Down
2 changes: 1 addition & 1 deletion src/core/components/files-regular/add.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const all = require('async-iterator-all')
const all = require('it-all')

module.exports = function (self) {
// can't use callbackify because if `data` is a pull stream
Expand Down
2 changes: 1 addition & 1 deletion src/core/components/files-regular/cat.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const callbackify = require('callbackify')
const all = require('async-iterator-all')
const all = require('it-all')

module.exports = function (self) {
return callbackify.variadic(async function cat (ipfsPath, options) {
Expand Down
2 changes: 1 addition & 1 deletion src/core/components/files-regular/get.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const callbackify = require('callbackify')
const all = require('async-iterator-all')
const all = require('it-all')

module.exports = function (self) {
return callbackify.variadic(async function get (ipfsPath, options) { // eslint-disable-line require-await
Expand Down
2 changes: 1 addition & 1 deletion src/core/components/files-regular/ls.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const callbackify = require('callbackify')
const all = require('async-iterator-all')
const all = require('it-all')

module.exports = function (self) {
return callbackify.variadic(async function ls (ipfsPath, options) { // eslint-disable-line require-await
Expand Down
2 changes: 1 addition & 1 deletion src/core/components/files-regular/refs-local.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const callbackify = require('callbackify')
const all = require('async-iterator-all')
const all = require('it-all')

module.exports = function (self) {
return callbackify.variadic(async function refsLocal (ipfsPath, options) { // eslint-disable-line require-await
Expand Down
2 changes: 1 addition & 1 deletion src/core/components/files-regular/refs.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const callbackify = require('callbackify')
const all = require('async-iterator-all')
const all = require('it-all')

module.exports = function (self) {
return callbackify.variadic(async function refs (ipfsPath, options) { // eslint-disable-line require-await
Expand Down
6 changes: 2 additions & 4 deletions src/core/components/files-regular/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@ const parseChunkerString = (chunker) => {
}
return {
chunker: 'fixed',
chunkerOptions: {
maxChunkSize: size
}
maxChunkSize: size
}
} else if (chunker.startsWith('rabin')) {
return {
chunker: 'rabin',
chunkerOptions: parseRabinString(chunker)
...parseRabinString(chunker)
}
} else {
throw new Error(`Unrecognized chunker option: ${chunker}`)
Expand Down
2 changes: 1 addition & 1 deletion src/core/runtime/add-from-fs-nodejs.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const callbackify = require('callbackify')
const globSource = require('ipfs-utils/src/files/glob-source')
const all = require('async-iterator-all')
const all = require('it-all')

module.exports = self => {
return callbackify.variadic(async (...args) => { // eslint-disable-line require-await
Expand Down
2 changes: 1 addition & 1 deletion src/http/api/resources/block.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const multibase = require('multibase')
const Boom = require('@hapi/boom')
const { cidToString } = require('../../../utils/cid')
const debug = require('debug')
const all = require('async-iterator-all')
const all = require('it-all')
const streamResponse = require('../../utils/stream-response')
const log = debug('ipfs:http-api:block')
log.error = debug('ipfs:http-api:block:error')
Expand Down
2 changes: 1 addition & 1 deletion src/http/api/resources/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const multipart = require('ipfs-multipart')
const Boom = require('@hapi/boom')
const Joi = require('@hapi/joi')
const { profiles } = require('../../../core/components/config')
const all = require('async-iterator-all')
const all = require('it-all')

exports.getOrSet = {
// pre request handler that parses the args and returns `key` & `value` which are assigned to `request.pre.args`
Expand Down
2 changes: 1 addition & 1 deletion src/http/api/resources/dag.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const debug = require('debug')
const {
cidToString
} = require('../../../utils/cid')
const all = require('async-iterator-all')
const all = require('it-all')
const log = debug('ipfs:http-api:dag')
log.error = debug('ipfs:http-api:dag:error')

Expand Down
10 changes: 9 additions & 1 deletion src/http/api/resources/files-regular.js
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ exports.add = {
'only-hash': Joi.boolean(),
pin: Joi.boolean().default(true),
'wrap-with-directory': Joi.boolean(),
'file-import-concurrency': Joi.number().integer().min(0).default(50),
'block-write-concurrency': Joi.number().integer().min(0).default(10),
chunker: Joi.string(),
trickle: Joi.boolean(),
preload: Joi.boolean().default(true)
Expand Down Expand Up @@ -218,7 +220,13 @@ exports.add = {
pin: request.query.pin,
chunker: request.query.chunker,
trickle: request.query.trickle,
preload: request.query.preload
preload: request.query.preload,

// this has to be hardcoded to 1 because we can only read one file
// at a time from a http request and we have to consume it completely
// before we can read the next file
fileImportConcurrency: 1,
blockWriteConcurrency: request.query['block-write-concurrency']
})
},
async function (source) {
Expand Down
2 changes: 1 addition & 1 deletion src/http/api/resources/object.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const CID = require('cids')
const multipart = require('ipfs-multipart')
const all = require('async-iterator-all')
const all = require('it-all')
const dagPB = require('ipld-dag-pb')
const { DAGNode, DAGLink } = dagPB
const Joi = require('@hapi/joi')
Expand Down
12 changes: 6 additions & 6 deletions test/core/files-regular-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,27 @@ describe('files-regular/utils', () => {
it('parses a fixed size string', () => {
const options = utils.parseChunkerString('size-512')
expect(options.chunker).to.equal('fixed')
expect(options.chunkerOptions.maxChunkSize).to.equal(512)
expect(options.maxChunkSize).to.equal(512)
})

it('parses a rabin string without size', () => {
const options = utils.parseChunkerString('rabin')
expect(options.chunker).to.equal('rabin')
expect(options.chunkerOptions.avgChunkSize).to.equal(262144)
expect(options.avgChunkSize).to.equal(262144)
})

it('parses a rabin string with only avg size', () => {
const options = utils.parseChunkerString('rabin-512')
expect(options.chunker).to.equal('rabin')
expect(options.chunkerOptions.avgChunkSize).to.equal(512)
expect(options.avgChunkSize).to.equal(512)
})

it('parses a rabin string with min, avg, and max', () => {
const options = utils.parseChunkerString('rabin-42-92-184')
expect(options.chunker).to.equal('rabin')
expect(options.chunkerOptions.minChunkSize).to.equal(42)
expect(options.chunkerOptions.avgChunkSize).to.equal(92)
expect(options.chunkerOptions.maxChunkSize).to.equal(184)
expect(options.minChunkSize).to.equal(42)
expect(options.avgChunkSize).to.equal(92)
expect(options.maxChunkSize).to.equal(184)
})

it('throws an error for unsupported chunker type', () => {
Expand Down