1
1
'use strict'
2
2
3
3
const Block = require ( 'ipfs-block' )
4
- const pull = require ( 'pull-stream' )
5
4
const Lock = require ( 'lock' )
6
5
const base32 = require ( 'base32.js' )
7
6
const path = require ( 'path' )
8
- const pullWrite = require ( 'pull-write' )
9
7
const parallel = require ( 'run-parallel' )
8
+ const pull = require ( 'pull-stream' )
9
+ const pullWrite = require ( 'pull-write' )
10
10
const pullDefer = require ( 'pull-defer/source' )
11
11
12
12
const PREFIX_LENGTH = 5
13
+ const EXTENSION = 'data'
13
14
14
15
exports = module . exports
15
16
16
17
function multihashToPath ( multihash ) {
17
- const extension = 'data'
18
18
const encoder = new base32 . Encoder ( )
19
19
const hash = encoder . write ( multihash ) . finalize ( )
20
- const filename = `${ hash } .${ extension } `
20
+ const filename = `${ hash } .${ EXTENSION } `
21
21
const folder = filename . slice ( 0 , PREFIX_LENGTH )
22
22
23
23
return path . join ( folder , filename )
@@ -87,7 +87,7 @@ exports.setUp = (basePath, BlobStore, locks) => {
87
87
} ,
88
88
89
89
/*
90
- * returns a pull-stream to write blockBlob into
90
+ * returns a pull-stream that expexts blockBlobs
91
91
* NOTE: blockBlob is a { data: <>, key: <> } and not a
92
92
* ipfs-block instance. This is because Block instances support
93
93
* several types of hashing and it is up to the BlockService
@@ -101,26 +101,7 @@ exports.setUp = (basePath, BlobStore, locks) => {
101
101
let push = null
102
102
103
103
const sink = pullWrite ( ( blockBlobs , cb ) => {
104
- const tasks = blockBlobs . map ( ( blockBlob ) => {
105
- return ( cb ) => {
106
- writeBlock ( blockBlob , ( err , meta ) => {
107
- if ( err ) {
108
- return cb ( err )
109
- }
110
-
111
- if ( push ) {
112
- const read = push
113
- push = null
114
- read ( null , meta )
115
- return cb ( )
116
- }
117
-
118
- written . push ( meta )
119
- cb ( )
120
- } )
121
- }
122
- } )
123
-
104
+ const tasks = writeTasks ( blockBlobs )
124
105
parallel ( tasks , cb )
125
106
} , null , 100 , ( err ) => {
126
107
ended = err || true
@@ -129,7 +110,7 @@ exports.setUp = (basePath, BlobStore, locks) => {
129
110
}
130
111
} )
131
112
132
- // TODO ??Why does a putStream need to be a source as well??
113
+ // TODO ?? Why does a putStream need to be a source as well??
133
114
const source = ( end , cb ) => {
134
115
if ( end ) {
135
116
ended = end
@@ -145,7 +126,36 @@ exports.setUp = (basePath, BlobStore, locks) => {
145
126
push = cb
146
127
}
147
128
148
- return { source : source , sink : sink }
129
+ /*
130
+ * Creates individual tasks to write each block blob that can be
131
+ * exectured in parallel
132
+ */
133
+ function writeTasks ( blockBlobs ) {
134
+ return blockBlobs . map ( ( blockBlob ) => {
135
+ return ( cb ) => {
136
+ writeBlock ( blockBlob , ( err , meta ) => {
137
+ if ( err ) {
138
+ return cb ( err )
139
+ }
140
+
141
+ if ( push ) {
142
+ const read = push
143
+ push = null
144
+ read ( null , meta )
145
+ return cb ( )
146
+ }
147
+
148
+ written . push ( meta )
149
+ cb ( )
150
+ } )
151
+ }
152
+ } )
153
+ }
154
+
155
+ return {
156
+ source : source ,
157
+ sink : sink
158
+ }
149
159
} ,
150
160
151
161
has ( key , callback ) {
0 commit comments