Skip to content

Commit f662fdc

Browse files
authored
fix: ensure identify streams are closed (#551)
* fix: ensure identify streams are closed * fix: call connection.addStream properly * chore: simplify stream closure * test: improve durability of identify push test
1 parent 5608178 commit f662fdc

File tree

3 files changed

+18
-9
lines changed

3 files changed

+18
-9
lines changed

src/identify/index.js

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ const debug = require('debug')
44
const pb = require('it-protocol-buffers')
55
const lp = require('it-length-prefixed')
66
const pipe = require('it-pipe')
7-
const { collect, take } = require('streaming-iterables')
7+
const { collect, take, consume } = require('streaming-iterables')
88

99
const PeerInfo = require('peer-info')
1010
const PeerId = require('peer-id')
@@ -114,7 +114,8 @@ class IdentifyService {
114114
protocols: Array.from(this._protocols.keys())
115115
}],
116116
pb.encode(Message),
117-
stream
117+
stream,
118+
consume
118119
)
119120
} catch (err) {
120121
// Just log errors
@@ -153,6 +154,7 @@ class IdentifyService {
153154
async identify (connection) {
154155
const { stream } = await connection.newStream(MULTICODEC_IDENTIFY)
155156
const [data] = await pipe(
157+
[],
156158
stream,
157159
lp.decode(),
158160
take(1),
@@ -242,7 +244,8 @@ class IdentifyService {
242244
pipe(
243245
[message],
244246
lp.encode(),
245-
stream
247+
stream,
248+
consume
246249
)
247250
}
248251

@@ -255,6 +258,7 @@ class IdentifyService {
255258
*/
256259
async _handlePush ({ connection, stream }) {
257260
const [data] = await pipe(
261+
[],
258262
stream,
259263
lp.decode(),
260264
take(1),

src/upgrader.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ class Upgrader {
231231
const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys()))
232232
log('%s: incoming stream opened on %s', direction, protocol)
233233
if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol })
234-
connection.addStream(stream, protocol)
234+
connection.addStream(muxedStream, { protocol })
235235
this._onStream({ connection, stream, protocol })
236236
} catch (err) {
237237
log.error(err)

test/identify/index.spec.js

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ const PeerId = require('peer-id')
1212
const PeerInfo = require('peer-info')
1313
const duplexPair = require('it-pair/duplex')
1414
const multiaddr = require('multiaddr')
15+
const pWaitFor = require('p-wait-for')
1516

1617
const { codes: Errors } = require('../../src/errors')
1718
const { IdentifyService, multicodecs } = require('../../src/identify')
@@ -203,16 +204,17 @@ describe('Identify', () => {
203204
})
204205

205206
sinon.spy(libp2p.identifyService, 'identify')
206-
sinon.spy(libp2p.peerStore, 'replace')
207+
const peerStoreSpy = sinon.spy(libp2p.peerStore, 'replace')
207208

208209
const connection = await libp2p.dialer.connectToPeer(remoteAddr)
209210
expect(connection).to.exist()
210-
// Wait for nextTick to trigger the identify call
211-
await delay(1)
211+
212+
// Wait for peer store to be updated
213+
await pWaitFor(() => peerStoreSpy.callCount === 1)
212214
expect(libp2p.identifyService.identify.callCount).to.equal(1)
213-
await libp2p.identifyService.identify.firstCall.returnValue
214215

215-
expect(libp2p.peerStore.replace.callCount).to.equal(1)
216+
// The connection should have no open streams
217+
expect(connection.streams).to.have.length(0)
216218
await connection.close()
217219
})
218220

@@ -247,6 +249,9 @@ describe('Identify', () => {
247249
const results = await call.returnValue
248250
expect(results.length).to.equal(1)
249251
}
252+
253+
// Verify the streams close
254+
await pWaitFor(() => connection.streams.length === 0)
250255
})
251256
})
252257
})

0 commit comments

Comments
 (0)