Skip to content
This repository was archived by the owner on Aug 12, 2020. It is now read-only.

chore: Conform to spec for stream slices #209

Merged
merged 1 commit into from
Apr 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,9 @@ Creates a new readable stream in object mode that outputs objects of the form
}
```

#### `begin` and `end`
#### `offset` and `length`

`begin` and `end` arguments can optionally be passed to the reader function. These follow the same semantics as the JavaScript [`Array.slice(begin, end)`][] method.

That is: `begin` is the index in the stream to start sending data, `end` is the index *before* which to stop sending data.

A negative `begin` starts the slice from the end of the stream and a negative `end` ends the slice by subtracting `end` from the total stream length.
`offset` and `length` arguments can optionally be passed to the reader function. These will cause the returned stream to only emit bytes starting at `offset` and with length of `length`.

See [the tests](test/reader.js) for examples of using these arguments.

Expand All @@ -195,8 +191,8 @@ const drain = require('pull-stream/sinks/drain')

pull(
exporter(cid, ipldResolver, {
begin: 0,
end: 10
offset: 0,
length: 10
})
drain((file) => {
// file.content() is a pull stream containing only the first 10 bytes of the file
Expand Down Expand Up @@ -225,7 +221,6 @@ pull(
[ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver
[UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs
[pull-stream]: https://www.npmjs.com/package/pull-stream
[`Array.slice(begin, end)`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/slice

## Contribute

Expand Down
69 changes: 38 additions & 31 deletions src/exporter/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const pull = require('pull-stream')
const paramap = require('pull-paramap')

// Logic to export a single (possibly chunked) unixfs file.
module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth, begin, end) => {
module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth, offset, length) => {
const accepts = pathRest[0]

if (accepts !== undefined && accepts !== path) {
Expand All @@ -16,46 +16,50 @@ module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth,

const file = UnixFS.unmarshal(node.data)
const fileSize = size || file.fileSize()
const content = streamBytes(dag, node, fileSize, findByteRange(fileSize, begin, end))

return pull.values([{
depth: depth,
content: content,
name: name,
path: path,
hash: node.multihash,
size: fileSize,
type: 'file'
}])
}
if (offset < 0) {
return pull.error(new Error('Offset must be greater than 0'))
}

function findByteRange (fileSize, begin, end) {
if (!begin) {
begin = 0
if (offset > fileSize) {
return pull.error(new Error('Offset must be less than the file size'))
}

if (!end || end > fileSize) {
end = fileSize
if (length < 0) {
return pull.error(new Error('Length must be greater than or equal to 0'))
}

if (begin < 0) {
begin = fileSize + begin
if (length === 0) {
return pull.empty()
}

if (end < 0) {
end = fileSize + end
if (!offset) {
offset = 0
}

return {
begin, end
if (!length || (offset + length > fileSize)) {
length = fileSize - offset
}

const content = streamBytes(dag, node, fileSize, offset, length)

return pull.values([{
depth: depth,
content: content,
name: name,
path: path,
hash: node.multihash,
size: fileSize,
type: 'file'
}])
}

function streamBytes (dag, node, fileSize, { begin, end }) {
if (begin === end) {
function streamBytes (dag, node, fileSize, offset, length) {
if (offset === fileSize || length === 0) {
return pull.empty()
}

const end = offset + length
let streamPosition = 0

function getData ({ node, start }) {
Expand All @@ -70,11 +74,13 @@ function streamBytes (dag, node, fileSize, { begin, end }) {
return
}

const block = extractDataFromBlock(file.data, start, begin, end)
const block = extractDataFromBlock(file.data, start, offset, end)

streamPosition += block.length

return block
} catch (err) {
throw new Error('Failed to unmarshal node')
} catch (error) {
throw new Error(`Failed to unmarshal node - ${error.message}`)
}
}

Expand All @@ -95,9 +101,9 @@ function streamBytes (dag, node, fileSize, { begin, end }) {
return child
})
.filter((child, index) => {
return (begin >= child.start && begin < child.end) || // child has begin byte
return (offset >= child.start && offset < child.end) || // child has offset byte
(end > child.start && end <= child.end) || // child has end byte
(begin < child.start && end > child.end) // child is between begin and end bytes
(offset < child.start && end > child.end) // child is between offset and end bytes
})

if (filteredLinks.length) {
Expand All @@ -111,7 +117,8 @@ function streamBytes (dag, node, fileSize, { begin, end }) {
dag.get(new CID(child.link.multihash), (error, result) => cb(error, {
start: child.start,
end: child.end,
node: result && result.value
node: result && result.value,
size: child.size
}))
})
)
Expand Down
4 changes: 2 additions & 2 deletions src/exporter/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ function pathBaseAndRest (path) {

const defaultOptions = {
maxDepth: Infinity,
begin: undefined,
end: undefined
offset: undefined,
length: undefined
}

module.exports = (path, dag, options) => {
Expand Down
12 changes: 6 additions & 6 deletions src/exporter/resolve.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,33 @@ function createResolver (dag, options, depth, parent) {
return pull.error(new Error('no depth'))
}
if (item.object) {
return cb(null, resolveItem(item.object, item, options.begin, options.end))
return cb(null, resolveItem(item.object, item, options.offset, options.length))
}
dag.get(new CID(item.multihash), (err, node) => {
if (err) {
return cb(err)
}
// const name = item.fromPathRest ? item.name : item.path
cb(null, resolveItem(node.value, item, options.begin, options.end))
cb(null, resolveItem(node.value, item, options.offset, options.length))
})
}),
pull.flatten(),
pull.filter(Boolean),
pull.filter((node) => node.depth <= options.maxDepth)
)

function resolveItem (node, item, begin, end) {
return resolve(node, item.name, item.path, item.pathRest, item.size, dag, item.parent || parent, item.depth, begin, end)
function resolveItem (node, item, offset, length) {
return resolve(node, item.name, item.path, item.pathRest, item.size, dag, item.parent || parent, item.depth, offset, length)
}

function resolve (node, name, path, pathRest, size, dag, parentNode, depth, begin, end) {
function resolve (node, name, path, pathRest, size, dag, parentNode, depth, offset, length) {
const type = typeOf(node)
const nodeResolver = resolvers[type]
if (!nodeResolver) {
return pull.error(new Error('Unkown node type ' + type))
}
const resolveDeep = createResolver(dag, options, depth, node)
return nodeResolver(node, name, path, pathRest, resolveDeep, size, dag, parentNode, depth, begin, end)
return nodeResolver(node, name, path, pathRest, resolveDeep, size, dag, parentNode, depth, offset, length)
}
}

Expand Down
Loading