Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit 6ed431c

Browse files
author
Alan Shaw
committed
refactor: async iterables wip
1 parent d95b8f1 commit 6ed431c

File tree

31 files changed

+142
-412
lines changed

31 files changed

+142
-412
lines changed

package.json

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,27 +42,22 @@
4242
},
4343
"dependencies": {
4444
"abort-controller": "^3.0.0",
45-
"async-iterator-all": "^1.0.0",
46-
"async-iterator-to-pull-stream": "^1.3.0",
4745
"bignumber.js": "^9.0.0",
48-
"bl": "^4.0.0",
4946
"bs58": "^4.0.1",
5047
"buffer": "^5.4.2",
51-
"callbackify": "^1.1.0",
5248
"cids": "~0.7.1",
5349
"debug": "^4.1.0",
5450
"err-code": "^2.0.0",
5551
"explain-error": "^1.0.4",
5652
"form-data": "^3.0.0",
5753
"ipfs-block": "~0.8.1",
58-
"ipfs-utils": "^0.4.0",
54+
"ipfs-utils": "github:ipfs/js-ipfs-utils#refactor/async-iterators",
5955
"ipld-dag-cbor": "~0.15.0",
6056
"ipld-dag-pb": "^0.18.1",
6157
"ipld-raw": "^4.0.0",
6258
"is-ipfs": "~0.6.1",
6359
"it-glob": "0.0.7",
6460
"it-tar": "^1.1.1",
65-
"it-to-stream": "^0.1.1",
6661
"iterable-ndjson": "^1.1.0",
6762
"ky": "^0.15.0",
6863
"ky-universal": "^0.3.0",
@@ -72,10 +67,7 @@
7267
"multibase": "~0.6.0",
7368
"multicodec": "~0.5.1",
7469
"multihashes": "~0.4.14",
75-
"parse-duration": "^0.1.1",
76-
"peer-id": "~0.12.3",
77-
"peer-info": "~0.15.1",
78-
"promise-nodeify": "^3.0.1"
70+
"parse-duration": "^0.1.1"
7971
},
8072
"devDependencies": {
8173
"aegir": "^20.4.1",

src/bitswap/index.js

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
'use strict'
22

3-
const callbackify = require('callbackify')
4-
5-
module.exports = (config) => ({
6-
wantlist: callbackify.variadic(require('./wantlist')(config)),
7-
stat: callbackify.variadic(require('./stat')(config)),
8-
unwant: callbackify.variadic(require('./unwant')(config))
3+
module.exports = config => ({
4+
wantlist: require('./wantlist')(config),
5+
stat: require('./stat')(config),
6+
unwant: require('./unwant')(config)
97
})

src/block/index.js

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,8 @@
11
'use strict'
22

3-
const nodeify = require('promise-nodeify')
4-
const callbackify = require('callbackify')
5-
const { collectify } = require('../lib/converters')
6-
7-
module.exports = config => {
8-
const rm = require('./rm-async-iterator')(config)
9-
10-
return {
11-
get: callbackify.variadic(require('./get')(config)),
12-
stat: callbackify.variadic(require('./stat')(config)),
13-
put: callbackify.variadic(require('./put')(config)),
14-
rm: (input, options, callback) => {
15-
if (typeof options === 'function') {
16-
callback = options
17-
options = {}
18-
}
19-
return nodeify(collectify(rm)(input, options), callback)
20-
},
21-
_rmAsyncIterator: rm
22-
}
23-
}
3+
module.exports = config => ({
4+
get: require('./get')(config),
5+
stat: require('./stat')(config),
6+
put: require('./put')(config),
7+
rm: require('./rm')(config)
8+
})
File renamed without changes.

src/bootstrap/index.js

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
'use strict'
22

3-
const callbackify = require('callbackify')
4-
53
module.exports = config => ({
6-
add: callbackify.variadic(require('./add')(config)),
7-
rm: callbackify.variadic(require('./rm')(config)),
8-
list: callbackify.variadic(require('./list')(config))
4+
add: require('./add')(config),
5+
rm: require('./rm')(config),
6+
list: require('./list')(config)
97
})

src/config/index.js

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
'use strict'
22

3-
const callbackify = require('callbackify')
4-
53
module.exports = config => ({
6-
get: callbackify.variadic(require('./get')(config)),
7-
set: callbackify.variadic(require('./set')(config)),
8-
replace: callbackify.variadic(require('./replace')(config)),
4+
get: require('./get')(config),
5+
set: require('./set')(config),
6+
replace: require('./replace')(config),
97
profiles: require('./profiles')(config)
108
})

src/config/profiles/index.js

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
'use strict'
22

3-
const callbackify = require('callbackify')
4-
53
module.exports = config => ({
6-
apply: callbackify.variadic(require('./apply')(config)),
7-
list: callbackify.variadic(require('./list')(config))
4+
apply: require('./apply')(config),
5+
list: require('./list')(config)
86
})

src/dag/index.js

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
'use strict'
22

3-
const callbackify = require('callbackify')
4-
53
module.exports = config => ({
6-
get: callbackify.variadic(require('./get')(config)),
7-
put: callbackify.variadic(require('./put')(config)),
8-
resolve: callbackify.variadic(require('./resolve')(config))
4+
get: require('./get')(config),
5+
put: require('./put')(config),
6+
resolve: require('./resolve')(config)
97
})

src/dht/find-peer.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
'use strict'
22

3-
const PeerId = require('peer-id')
4-
const PeerInfo = require('peer-info')
3+
const CID = require('cids')
54
const multiaddr = require('multiaddr')
65
const ndjson = require('iterable-ndjson')
76
const configure = require('../lib/configure')
@@ -27,9 +26,10 @@ module.exports = configure(({ ky }) => {
2726
// https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18
2827
if (message.Type === 2 && message.Responses) {
2928
for (const { ID, Addrs } of message.Responses) {
30-
const peerInfo = new PeerInfo(PeerId.createFromB58String(ID))
31-
if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a)))
32-
yield peerInfo
29+
yield {
30+
id: new CID(ID),
31+
addrs: (Addrs || []).map(a => multiaddr(a))
32+
}
3333
}
3434
}
3535
}

src/dht/find-provs.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
'use strict'
22

3-
const PeerId = require('peer-id')
4-
const PeerInfo = require('peer-info')
3+
const CID = require('cids')
54
const multiaddr = require('multiaddr')
65
const ndjson = require('iterable-ndjson')
76
const configure = require('../lib/configure')
@@ -28,9 +27,10 @@ module.exports = configure(({ ky }) => {
2827
// https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L20
2928
if (message.Type === 4 && message.Responses) {
3029
for (const { ID, Addrs } of message.Responses) {
31-
const peerInfo = new PeerInfo(PeerId.createFromB58String(ID))
32-
if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a)))
33-
yield peerInfo
30+
yield {
31+
id: new CID(ID),
32+
addrs: (Addrs || []).map(a => multiaddr(a))
33+
}
3434
}
3535
}
3636
}

src/dht/index.js

Lines changed: 9 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,11 @@
11
'use strict'
22

3-
const callbackify = require('callbackify')
4-
const errCode = require('err-code')
5-
const { collectify } = require('../lib/converters')
6-
7-
module.exports = config => {
8-
const get = require('./get')(config)
9-
const findPeer = require('./find-peer')(config)
10-
11-
return {
12-
get: callbackify.variadic(async (key, options) => {
13-
for await (const value of get(key, options)) {
14-
return value
15-
}
16-
throw errCode(new Error('value not found'), 'ERR_TYPE_5_NOT_FOUND')
17-
}),
18-
put: callbackify.variadic(collectify(require('./put')(config))),
19-
findProvs: callbackify.variadic(collectify(require('./find-provs')(config))),
20-
findPeer: callbackify.variadic(async (peerId, options) => {
21-
for await (const peerInfo of findPeer(peerId, options)) {
22-
return peerInfo
23-
}
24-
throw errCode(new Error('final peer not found'), 'ERR_TYPE_2_NOT_FOUND')
25-
}),
26-
provide: callbackify.variadic(collectify(require('./provide')(config))),
27-
// find closest peerId to given peerId
28-
query: callbackify.variadic(collectify(require('./query')(config)))
29-
}
30-
}
3+
module.exports = config => ({
4+
get: require('./get')(config),
5+
put: require('./put')(config),
6+
findProvs: require('./find-provs')(config),
7+
findPeer: require('./find-peer')(config),
8+
provide: require('./provide')(config),
9+
// find closest peerId to given peerId
10+
query: require('./query')(config)
11+
})

src/dht/provide.js

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
'use strict'
22

3-
const PeerId = require('peer-id')
4-
const PeerInfo = require('peer-info')
3+
const CID = require('cids')
54
const multiaddr = require('multiaddr')
65
const ndjson = require('iterable-ndjson')
76
const configure = require('../lib/configure')
@@ -28,11 +27,10 @@ module.exports = configure(({ ky }) => {
2827
for await (let message of ndjson(toIterable(res.body))) {
2928
message = toCamel(message)
3029
if (message.responses) {
31-
message.responses = message.responses.map(({ ID, Addrs }) => {
32-
const peerInfo = new PeerInfo(PeerId.createFromB58String(ID))
33-
if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a)))
34-
return peerInfo
35-
})
30+
message.responses = message.responses.map(({ ID, Addrs }) => ({
31+
id: new CID(ID),
32+
addrs: (Addrs || []).map(a => multiaddr(a))
33+
}))
3634
}
3735
yield message
3836
}

src/dht/put.js

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
'use strict'
22

3-
const PeerId = require('peer-id')
4-
const PeerInfo = require('peer-info')
3+
const CID = require('cids')
54
const multiaddr = require('multiaddr')
65
const ndjson = require('iterable-ndjson')
76
const configure = require('../lib/configure')
@@ -29,11 +28,10 @@ module.exports = configure(({ ky }) => {
2928
for await (let message of ndjson(toIterable(res.body))) {
3029
message = toCamel(message)
3130
if (message.responses) {
32-
message.responses = message.responses.map(({ ID, Addrs }) => {
33-
const peerInfo = new PeerInfo(PeerId.createFromB58String(ID))
34-
if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a)))
35-
return peerInfo
36-
})
31+
message.responses = message.responses.map(({ ID, Addrs }) => ({
32+
id: new CID(ID),
33+
addrs: (Addrs || []).map(a => multiaddr(a))
34+
}))
3735
}
3836
yield message
3937
}

src/dht/query.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
'use strict'
22

3-
const PeerId = require('peer-id')
4-
const PeerInfo = require('peer-info')
3+
const CID = require('cids')
54
const ndjson = require('iterable-ndjson')
65
const configure = require('../lib/configure')
76
const toIterable = require('../lib/stream-to-iterable')
@@ -22,7 +21,10 @@ module.exports = configure(({ ky }) => {
2221
})
2322

2423
for await (const message of ndjson(toIterable(res.body))) {
25-
yield new PeerInfo(PeerId.createFromB58String(message.ID))
24+
yield {
25+
id: new CID(message.ID),
26+
addrs: []
27+
}
2628
}
2729
}
2830
})

src/diag/index.js

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
'use strict'
22

3-
const callbackify = require('callbackify')
4-
53
module.exports = config => ({
6-
net: callbackify.variadic(require('./net')(config)),
7-
sys: callbackify.variadic(require('./sys')(config)),
8-
cmds: callbackify.variadic(require('./cmds')(config))
4+
net: require('./net')(config),
5+
sys: require('./sys')(config),
6+
cmds: require('./cmds')(config)
97
})

src/files/index.js

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,13 @@
11
'use strict'
22

3-
const callbackify = require('callbackify')
4-
const { collectify, streamify, pullify, concatify } = require('../lib/converters')
5-
6-
module.exports = config => {
7-
const ls = require('./ls')(config)
8-
const read = require('./read')(config)
9-
10-
return {
11-
cp: callbackify.variadic(require('./cp')(config)),
12-
mkdir: callbackify.variadic(require('./mkdir')(config)),
13-
flush: callbackify.variadic(require('./flush')(config)),
14-
stat: callbackify.variadic(require('./stat')(config)),
15-
rm: callbackify.variadic(require('./rm')(config)),
16-
ls: callbackify.variadic(collectify(ls)),
17-
lsReadableStream: streamify.readable(ls),
18-
lsPullStream: pullify.source(ls),
19-
read: callbackify.variadic(concatify(read)),
20-
readReadableStream: streamify.readable(read),
21-
readPullStream: pullify.source(read),
22-
write: callbackify.variadic(require('./write')(config)),
23-
mv: callbackify.variadic(require('./mv')(config))
24-
}
25-
}
3+
module.exports = config => ({
4+
cp: require('./cp')(config),
5+
mkdir: require('./mkdir')(config),
6+
flush: require('./flush')(config),
7+
stat: require('./stat')(config),
8+
rm: require('./rm')(config),
9+
ls: require('./ls')(config),
10+
read: require('./read')(config),
11+
write: require('./write')(config),
12+
mv: require('./mv')(config)
13+
})

0 commit comments

Comments
 (0)