Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit eee889b

Browse files
committed
feat: use multiple files and on sucess delete those
1 parent 95dfd21 commit eee889b

File tree

2 files changed

+48
-52
lines changed

2 files changed

+48
-52
lines changed

package.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,9 @@
9393
"byteman": "^1.3.5",
9494
"cids": "~0.5.3",
9595
"debug": "^3.1.0",
96+
"del": "^3.0.0",
9697
"err-code": "^1.1.2",
98+
"fast-glob": "^2.2.2",
9799
"file-type": "^8.1.0",
98100
"filesize": "^3.6.1",
99101
"fnv1a": "^1.0.1",
@@ -167,6 +169,7 @@
167169
"read-pkg-up": "^4.0.0",
168170
"readable-stream": "2.3.6",
169171
"receptacle": "^1.3.2",
172+
"stream-concat": "^0.3.0",
170173
"stream-to-pull-stream": "^1.7.2",
171174
"tar-stream": "^1.6.1",
172175
"temp": "~0.8.3",

src/http/api/routes/files.js

Lines changed: 45 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,31 @@
11
'use strict'
22

3-
const resources = require('./../resources')
4-
const mfs = require('ipfs-mfs/http')
53
const fs = require('fs')
64
const path = require('path')
75
const tempy = require('tempy')
6+
const del = require('del')
7+
const StreamConcat = require('stream-concat')
8+
const boom = require('boom')
9+
const glob = require('fast-glob')
810
const multipart = require('ipfs-multipart')
911
const toPull = require('stream-to-pull-stream')
1012
const toStream = require('pull-stream-to-stream')
1113
const pull = require('pull-stream')
1214
const pushable = require('pull-pushable')
1315
const abortable = require('pull-abortable')
1416
const { serialize } = require('pull-ndjson')
17+
const mfs = require('ipfs-mfs/http')
18+
const resources = require('./../resources')
1519

16-
const streams = []
1720
const filesDir = tempy.directory()
1821

19-
const createMultipartReply = (readStream, boundary, ipfs, query, reply) => {
22+
const createMultipartReply = (readStream, boundary, ipfs, query, reply, cb) => {
2023
const fileAdder = pushable()
2124
const parser = new multipart.Parser({ boundary: boundary })
2225

2326
readStream.pipe(parser)
2427

2528
parser.on('file', (fileName, fileStream) => {
26-
console.log('File: ', fileName)
2729
fileAdder.push({
2830
path: decodeURIComponent(fileName),
2931
content: toPull(fileStream)
@@ -38,7 +40,6 @@ const createMultipartReply = (readStream, boundary, ipfs, query, reply) => {
3840
})
3941

4042
parser.on('end', () => {
41-
console.log('multipart end')
4243
fileAdder.end()
4344
})
4445

@@ -94,10 +95,9 @@ const createMultipartReply = (readStream, boundary, ipfs, query, reply) => {
9495
}
9596
files.forEach((f) => pushStream.push(f))
9697
pushStream.end()
98+
cb()
9799
})
98100
)
99-
100-
return parser
101101
}
102102
module.exports = (server) => {
103103
const api = server.select('API')
@@ -142,7 +142,6 @@ module.exports = (server) => {
142142
})
143143

144144
api.route({
145-
// TODO fix method
146145
method: 'POST',
147146
path: '/api/v0/add-chunked',
148147
config: {
@@ -151,65 +150,59 @@ module.exports = (server) => {
151150
maxBytes: 10485760
152151
},
153152
handler: (request, reply) => {
154-
console.log('received')
155-
console.log(request.headers['content-range'])
156-
console.log(request.headers['x-ipfs-chunk-index'])
157-
console.log(request.headers['x-ipfs-chunk-group-uuid'])
153+
// console.log('received')
154+
// console.log(request.headers['content-range'])
155+
// console.log(request.headers['x-ipfs-chunk-index'])
156+
// console.log(request.headers['x-ipfs-chunk-group-uuid'])
158157
const boundary = request.headers['x-ipfs-chunk-boundary']
159-
const id = request.headers['x-ipfs-chunk-group-uuid'] // change name to id
158+
const id = request.headers['x-ipfs-chunk-group-uuid']
160159
const index = Number(request.headers['x-ipfs-chunk-index'])
161-
const file = path.join(filesDir, id)
160+
const file = path.join(filesDir, id) + '-' + index
162161
const match = request.headers['content-range'].match(/(\d+)-(\d+)\/(\d+|\*)/)
163162
const ipfs = request.server.app.ipfs
164-
// if (!match || !match[1] || !match[2] || !match[3]) {
165-
/* malformed content-range header */
166-
// res.send('Bad Request', 400)
167-
// return;
168-
// }
163+
164+
if (!match || !match[1] || !match[2] || !match[3]) {
165+
return boom.badRequest('malformed content-range header')
166+
}
169167

170168
const start = parseInt(match[1])
171169
const end = parseInt(match[2])
172170
const total = parseInt(match[3])
173-
// console.log(start, end, total, index, boundary)
174171

175-
let stream = streams[id]
176-
if (!stream) {
177-
console.log('create new stream', file)
178-
stream = fs.createWriteStream(file, {flags: 'a+'})
179-
streams[id] = stream
180-
}
181-
182-
console.log('stream', file)
183-
let size = 0
184-
if (fs.existsSync(file)) {
185-
size = fs.statSync(file).size
186-
}
187-
188-
if ((end + 1) === size) {
189-
/* duplicate chunk */
190-
// res.send('Created', 201)
191-
// return;
192-
}
193-
194-
if (start !== size) {
195-
/* missing chunk */
196-
// res.send('Bad Request', 400)
197-
// return;
198-
}
172+
// TODO validate duplicates, missing chunks
199173

200174
if (start === total) {
201-
// check if size + payload.length === total
202175
/* all chunks have been received */
203-
stream.on('finish', function () {
204-
console.log('add to ipfs from the file')
205-
var readStream = fs.createReadStream(file)
206-
createMultipartReply(readStream, boundary, ipfs, request.query, reply)
176+
const base = path.join(filesDir, id) + '-'
177+
const pattern = base + '*'
178+
const files = glob.sync(pattern)
179+
180+
files.sort((a, b) => {
181+
return Number(a.replace(base, '')) - Number(b.replace(base, ''))
207182
})
208183

209-
stream.end()
184+
let fileIndex = 0
185+
const nextStream = () => fileIndex === files.length ? null : fs.createReadStream(files[fileIndex++])
186+
createMultipartReply(
187+
new StreamConcat(nextStream),
188+
boundary,
189+
ipfs,
190+
request.query,
191+
reply,
192+
() => {
193+
console.log('Finished adding')
194+
del(pattern, { force: true })
195+
.then(paths => {
196+
console.log('Deleted files and folders:\n', paths.join('\n'))
197+
})
198+
.catch(console.error)
199+
}
200+
)
210201
} else {
202+
const stream = fs.createWriteStream(file)
211203
stream.write(request.payload)
212-
/* this chunk has been processed successfully */
204+
205+
// TODO handle errors
213206
reply({ Bytes: request.payload.length })
214207
}
215208
}

0 commit comments

Comments
 (0)