Skip to content
This repository was archived by the owner on Aug 12, 2020. It is now read-only.

Rename "stream" to "content" in tuples. #43

Merged
merged 6 commits into from
May 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 41 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
IPFS unixFS Engine
===================

> Import data into an IPFS DAG Service.
> Import & Export data to/from an [IPFS DAG Service][]

[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
Expand Down Expand Up @@ -47,8 +47,8 @@ const res = []

const rs = fs.createReadStream(file)
const rs2 = fs.createReadStream(file2)
const input = {path: /tmp/foo/bar, stream: rs}
const input2 = {path: /tmp/foo/quxx, stream: rs2}
const input = {path: /tmp/foo/bar, content: rs}
const input2 = {path: /tmp/foo/quxx, content: rs2}

// Listen for the data event from the importer stream

Expand All @@ -74,41 +74,47 @@ When run, the stat of DAG Node is outputted for each file on data event until th

```
{ multihash: <Buffer 12 20 bd e2 2b 57 3f 6f bd 7c cc 5a 11 7f 28 6c a2 9a 9f c0 90 e1 d4 16 d0 5f 42 81 ec 0c 2a 7f 7f 93>,
Size: 39243,
size: 39243,
path: '/tmp/foo/bar' }

{ multihash: <Buffer 12 20 bd e2 2b 57 3f 6f bd 7c cc 5a 11 7f 28 6c a2 9a 9f c0 90 e1 d4 16 d0 5f 42 81 ec 0c 2a 7f 7f 93>,
Size: 59843,
size: 59843,
path: '/tmp/foo/quxx' }

{ multihash: <Buffer 12 20 bd e2 2b 57 3f 6f bd 7c cc 5a 11 7f 28 6c a2 9a 9f c0 90 e1 d4 16 d0 5f 42 81 ec 0c 2a 7f 7f 93>,
Size: 93242,
size: 93242,
path: '/tmp/foo' }

{ multihash: <Buffer 12 20 bd e2 2b 57 3f 6f bd 7c cc 5a 11 7f 28 6c a2 9a 9f c0 90 e1 d4 16 d0 5f 42 81 ec 0c 2a 7f 7f 93>,
Size: 94234,
size: 94234,
path: '/tmp' }

```

## API
## Importer API

```js
const Importer = require('ipfs-unixfs-engine').importer
```

### const add = new Importer(dag)

The importer is a duplex stream in object mode that writes inputs of tuples
of path and readable streams of data. You can stream an array of files to the
importer, just call the 'end' function to signal that you are done inputting file/s.
Listen to the 'data' for the returned informtion 'multihash, size and path' for
each file added. Listen to the 'end' event from the stream to know when the
importer has finished importing files. Input file paths with directory structure
will preserve the hierarchy in the dag node.
The importer is a object Transform stream that accepts objects of the form

```js
{
path: 'a name',
content: (Buffer or Readable stream)
}
```

The stream will output IPFS DAG Node stats for the nodes it as added to the DAG
Service. When stats on a node are emitted they are guaranteed to have been
written into the DAG Service's storage mechanism.

The input's file paths and directory structure will be preserved in the DAG
Nodes.

Uses the [DAG Service](https://github.com/vijayee/js-ipfs-merkle-dag/) instance
`dagService`.

## Example Exporter

Expand All @@ -133,22 +139,35 @@ exportEvent.on('data', (result) => {
}
```

##API
## Exporter: API
```js
const Importer = require('ipfs-unixfs-engine').exporter
const Exporter = require('ipfs-unixfs-engine').exporter
```

The exporter is a readable stream in object mode that returns an object ```{ stream: stream, path: 'path' }``` by the multihash of the file from the dag service.
The exporter is a readable stream in object mode that outputs objects of the
form

```js
{
path: 'a name',
content: (Buffer or Readable stream)
}
```

by the multihash of the file from the DAG Service.

## install

## Install

With [npm](https://npmjs.org/) installed, run

```
$ npm install ipfs-unixfs-engine
```

## license
## License

ISC


[IPFS DAG Service]: https://github.com/vijayee/js-ipfs-merkle-dag/
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "ipfs-unixfs-engine",
"version": "0.8.0",
"description": "JavaScript implementation of the unixfs Engine used by IPFS",
"main": "lib/index.js",
"main": "src/index.js",
"jsnext:main": "src/index.js",
"scripts": {
"lint": "aegir-lint",
Expand Down Expand Up @@ -56,8 +56,10 @@
"debug": "^2.2.0",
"ipfs-merkle-dag": "^0.5.0",
"ipfs-unixfs": "^0.1.0",
"isstream": "^0.1.2",
"readable-stream": "^1.1.13",
"run-series": "^1.1.4",
"streamifier": "^0.1.1",
"through2": "^2.0.0"
},
"contributors": [
Expand All @@ -68,4 +70,4 @@
"greenkeeperio-bot <support@greenkeeper.io>",
"nginnever <ginneversource@gmail.com>"
]
}
}
6 changes: 3 additions & 3 deletions src/exporter.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ function Exporter (hash, dagService, options) {
rs.push(unmarshaledData.data)
rs.push(null)
}
this.push({ stream: rs, path: name })
this.push({ content: rs, path: name })
callback()
return
} else {
Expand Down Expand Up @@ -75,7 +75,7 @@ function Exporter (hash, dagService, options) {
return
})
}
this.push({ stream: rs, path: name })
this.push({ content: rs, path: name })
callback()
return
}
Expand All @@ -97,7 +97,7 @@ function Exporter (hash, dagService, options) {
rs.push(node.data)
rs.push(null)
}
this.push({stream: null, path: name})
this.push({content: null, path: name})
callback()
return
} else {
Expand Down
55 changes: 41 additions & 14 deletions src/importer.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ const UnixFS = require('ipfs-unixfs')
const util = require('util')
const bs58 = require('bs58')
const Duplex = require('readable-stream').Duplex
const isStream = require('isstream')
const streamifier = require('streamifier')

exports = module.exports = Importer

Expand All @@ -36,7 +38,7 @@ function Importer (dagService, options) {
this._write = (fl, enc, next) => {
this.read()
counter++
if (!fl.stream) {
if (!fl.content) {
// 1. create the empty dir dag node
// 2. write it to the dag store
// 3. add to the files array {path: <>, hash: <>}
Expand All @@ -63,8 +65,20 @@ function Importer (dagService, options) {
return
}

// Convert a buffer to a readable stream
if (Buffer.isBuffer(fl.content)) {
const r = streamifier.createReadStream(fl.content)
fl.content = r
}

// Bail if 'content' is not readable
if (!isStream.isReadable(fl.content)) {
this.emit('error', new Error('"content" is not a Buffer nor Readable stream'))
return
}

const leaves = []
fl.stream
fl.content
.pipe(fsc(CHUNK_SIZE))
.pipe(through2((chunk, enc, cb) => {
// 1. create the unixfs merkledag node
Expand Down Expand Up @@ -224,13 +238,15 @@ function Importer (dagService, options) {
// If the value is not an object
// add as a link to the dirNode

function traverse (tree, base) {
let pendingWrites = 0

function traverse (tree, path, done) {
const keys = Object.keys(tree)
let tmpTree = tree
keys.map((key) => {
if (typeof tmpTree[key] === 'object' &&
!Buffer.isBuffer(tmpTree[key])) {
tmpTree[key] = traverse.call(this, tmpTree[key], base ? base + '/' + key : key)
tmpTree[key] = traverse.call(this, tmpTree[key], path ? path + '/' + key : key, done)
}
})

Expand All @@ -250,28 +266,39 @@ function Importer (dagService, options) {
})

n.data = d.marshal()

pendingWrites++
dagService.add(n, (err) => {
pendingWrites--
if (err) {
this.push({error: 'failed to store dirNode'})
} else if (path) {
const el = {
path: path,
multihash: n.multihash(),
yes: 'no',
size: n.size()
}
this.push(el)
}

if (pendingWrites <= 0) {
done()
}
})

if (!base) {
if (!path) {
return
}

const el = {
path: base,
multihash: n.multihash(),
size: n.size()
}
this.push(el)

mhIndex[bs58.encode(n.multihash())] = { size: n.size() }
return n.multihash()
}
/* const rootHash = */ traverse.call(this, fileTree)
this.push(null)

let self = this
/* const rootHash = */ traverse.call(this, fileTree, null, function () {
self.push(null)
})
}
}
}
8 changes: 4 additions & 4 deletions test/test-exporter.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ module.exports = function (repo) {
expect(err).to.not.exist
const testExport = exporter(hash, ds)
testExport.on('data', (file) => {
file.stream.pipe(bl((err, bldata) => {
file.content.pipe(bl((err, bldata) => {
expect(err).to.not.exist
expect(bldata).to.deep.equal(unmarsh.data)
done()
Expand All @@ -48,7 +48,7 @@ module.exports = function (repo) {
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
testExport.on('data', (file) => {
file.stream.pipe(bl((err, bldata) => {
file.content.pipe(bl((err, bldata) => {
expect(bldata).to.deep.equal(bigFile)
expect(err).to.not.exist
done()
Expand All @@ -63,7 +63,7 @@ module.exports = function (repo) {
const testExport = exporter(hash, ds)
testExport.on('data', (file) => {
expect(file.path).to.equal('QmRQgufjp9vLE8XK2LGKZSsPCFCF6e4iynCQtNB5X2HBKE')
file.stream.pipe(bl((err, bldata) => {
file.content.pipe(bl((err, bldata) => {
expect(err).to.not.exist
done()
}))
Expand Down Expand Up @@ -94,7 +94,7 @@ module.exports = function (repo) {
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
testExport.on('data', (dir) => {
expect(dir.stream).to.equal(null)
expect(dir.content).to.equal(null)
done()
})
})
Expand Down
Loading