1
1
'use strict'
2
2
3
3
const Block = require ( 'ipfs-block' )
4
- const pull = require ( 'pull-stream' )
5
4
const Lock = require ( 'lock' )
6
5
const base32 = require ( 'base32.js' )
7
6
const path = require ( 'path' )
8
- const pullWrite = require ( 'pull-write' )
9
7
const parallel = require ( 'run-parallel' )
8
+ const pull = require ( 'pull-stream' )
9
+ const pullWrite = require ( 'pull-write' )
10
10
const pullDefer = require ( 'pull-defer/source' )
11
11
12
12
const PREFIX_LENGTH = 5
13
+ const EXTENSION = 'data'
13
14
14
15
exports = module . exports
15
16
16
17
function multihashToPath ( multihash ) {
17
- const extension = 'data'
18
18
const encoder = new base32 . Encoder ( )
19
19
const hash = encoder . write ( multihash ) . finalize ( )
20
- const filename = `${ hash } .${ extension } `
20
+ const filename = `${ hash } .${ EXTENSION } `
21
21
const folder = filename . slice ( 0 , PREFIX_LENGTH )
22
22
23
23
return path . join ( folder , filename )
@@ -27,17 +27,19 @@ exports.setUp = (basePath, BlobStore, locks) => {
27
27
const store = new BlobStore ( basePath + '/blocks' )
28
28
const lock = new Lock ( )
29
29
30
- function writeBlock ( block , callback ) {
31
- if ( ! block || ! block . data ) {
30
+ // blockBlob is an object with:
31
+ // { data: <>, key: <> }
32
+ function writeBlock ( blockBlob , callback ) {
33
+ if ( ! blockBlob || ! blockBlob . data ) {
32
34
return callback ( new Error ( 'Invalid block' ) )
33
35
}
34
36
35
- const key = multihashToPath ( block . key ( ) )
37
+ const key = multihashToPath ( blockBlob . key )
36
38
37
39
lock ( key , ( release ) => {
38
40
pull (
39
41
pull . values ( [
40
- block . data
42
+ blockBlob . data
41
43
] ) ,
42
44
store . write ( key , release ( released ) )
43
45
)
@@ -84,35 +86,32 @@ exports.setUp = (basePath, BlobStore, locks) => {
84
86
return deferred
85
87
} ,
86
88
87
- // returns a pull-stream to write blocks into
88
- // TODO use a more explicit name, given that getStream is just for
89
- // one block, multiple blocks should have different naming
89
+ /*
90
+ * putStream - write multiple blocks
91
+ *
92
+ * returns a pull-stream that expects blockBlobs
93
+ *
94
+ * NOTE: blockBlob is a { data: <>, key: <> } and not a
95
+ * ipfs-block instance. This is because Block instances support
96
+ * several types of hashing and it is up to the BlockService
97
+ * to understand the right one to use (given the CID)
98
+ */
99
+ // TODO
100
+ // consider using a more explicit name, this can cause some confusion
101
+ // since the natural association is
102
+ // getStream - createReadStream - read one
103
+ // putStream - createWriteStream - write one
104
+ // where in fact it is:
105
+ // getStream - createReadStream - read one (the same)
106
+ // putStream - createFilesWriteStream = write several
107
+ //
90
108
putStream ( ) {
91
109
let ended = false
92
110
let written = [ ]
93
111
let push = null
94
112
95
- const sink = pullWrite ( ( blocks , cb ) => {
96
- const tasks = blocks . map ( ( block ) => {
97
- return ( cb ) => {
98
- writeBlock ( block , ( err , meta ) => {
99
- if ( err ) {
100
- return cb ( err )
101
- }
102
-
103
- if ( push ) {
104
- const read = push
105
- push = null
106
- read ( null , meta )
107
- return cb ( )
108
- }
109
-
110
- written . push ( meta )
111
- cb ( )
112
- } )
113
- }
114
- } )
115
-
113
+ const sink = pullWrite ( ( blockBlobs , cb ) => {
114
+ const tasks = writeTasks ( blockBlobs )
116
115
parallel ( tasks , cb )
117
116
} , null , 100 , ( err ) => {
118
117
ended = err || true
@@ -121,7 +120,6 @@ exports.setUp = (basePath, BlobStore, locks) => {
121
120
}
122
121
} )
123
122
124
- // TODO ??Why does a putStream need to be a source as well??
125
123
const source = ( end , cb ) => {
126
124
if ( end ) {
127
125
ended = end
@@ -137,7 +135,36 @@ exports.setUp = (basePath, BlobStore, locks) => {
137
135
push = cb
138
136
}
139
137
140
- return { source : source , sink : sink }
138
+ /*
139
+ * Creates individual tasks to write each block blob that can be
140
+ * exectured in parallel
141
+ */
142
+ function writeTasks ( blockBlobs ) {
143
+ return blockBlobs . map ( ( blockBlob ) => {
144
+ return ( cb ) => {
145
+ writeBlock ( blockBlob , ( err , meta ) => {
146
+ if ( err ) {
147
+ return cb ( err )
148
+ }
149
+
150
+ if ( push ) {
151
+ const read = push
152
+ push = null
153
+ read ( null , meta )
154
+ return cb ( )
155
+ }
156
+
157
+ written . push ( meta )
158
+ cb ( )
159
+ } )
160
+ }
161
+ } )
162
+ }
163
+
164
+ return {
165
+ source : source ,
166
+ sink : sink
167
+ }
141
168
} ,
142
169
143
170
has ( key , callback ) {
0 commit comments