Skip to content

Commit 90e793e

Browse files
authored
fix(@libp2p/webtransport): be more thorough about closing sessions (#1969)
Refactors session closing to happen in one function and call that function when the session has closed or failed to init. Doesn't quite solve the "Too many pending WebTransport Sessions" problem but does slow it down a little bit. Refs: #1896
1 parent d30f09f commit 90e793e

File tree

2 files changed

+160
-90
lines changed

2 files changed

+160
-90
lines changed

packages/transport-webtransport/src/index.ts

Lines changed: 123 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,16 @@ declare global {
1616
var WebTransport: any
1717
}
1818

19+
// https://www.w3.org/TR/webtransport/#web-transport-close-info
20+
interface WebTransportCloseInfo {
21+
closeCode: number
22+
reason: string
23+
}
24+
25+
interface WebTransportSessionCleanup {
26+
(closeInfo?: WebTransportCloseInfo): void
27+
}
28+
1929
const log = logger('libp2p:webtransport')
2030

2131
export interface WebTransportInit {
@@ -42,6 +52,8 @@ class WebTransportTransport implements Transport {
4252
readonly [symbol] = true
4353

4454
async dial (ma: Multiaddr, options: DialOptions): Promise<Connection> {
55+
options?.signal?.throwIfAborted()
56+
4557
log('dialing %s', ma)
4658
const localPeer = this.components.peerId
4759
if (localPeer === undefined) {
@@ -52,60 +64,122 @@ class WebTransportTransport implements Transport {
5264

5365
const { url, certhashes, remotePeer } = parseMultiaddr(ma)
5466

55-
if (certhashes.length === 0) {
56-
throw new Error('Expected multiaddr to contain certhashes')
57-
}
58-
59-
const wt = new WebTransport(`${url}/.well-known/libp2p-webtransport?type=noise`, {
60-
serverCertificateHashes: certhashes.map(certhash => ({
61-
algorithm: 'sha-256',
62-
value: certhash.digest
63-
}))
64-
})
65-
wt.closed.catch((error: Error) => {
66-
log.error('WebTransport transport closed due to:', error)
67-
})
68-
await wt.ready
69-
7067
if (remotePeer == null) {
7168
throw new Error('Need a target peerid')
7269
}
7370

74-
if (!await this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes)) {
75-
throw new Error('Failed to authenticate webtransport')
71+
if (certhashes.length === 0) {
72+
throw new Error('Expected multiaddr to contain certhashes')
7673
}
7774

78-
const maConn: MultiaddrConnection = {
79-
close: async (options?: AbortOptions) => {
80-
log('Closing webtransport')
81-
await wt.close()
82-
},
83-
abort: (err: Error) => {
84-
log('Aborting webtransport with err:', err)
85-
wt.close()
86-
},
87-
remoteAddr: ma,
88-
timeline: {
89-
open: Date.now()
90-
},
91-
// This connection is never used directly since webtransport supports native streams.
92-
...inertDuplex()
93-
}
75+
let abortListener: (() => void) | undefined
76+
let maConn: MultiaddrConnection | undefined
9477

95-
wt.closed.catch((err: Error) => {
96-
log.error('WebTransport connection closed:', err)
97-
// This is how we specify the connection is closed and shouldn't be used.
98-
maConn.timeline.close = Date.now()
99-
})
78+
let cleanUpWTSession: (closeInfo?: WebTransportCloseInfo) => void = () => {}
10079

10180
try {
81+
let closed = false
82+
const wt = new WebTransport(`${url}/.well-known/libp2p-webtransport?type=noise`, {
83+
serverCertificateHashes: certhashes.map(certhash => ({
84+
algorithm: 'sha-256',
85+
value: certhash.digest
86+
}))
87+
})
88+
89+
cleanUpWTSession = (closeInfo?: WebTransportCloseInfo) => {
90+
try {
91+
if (maConn != null) {
92+
if (maConn.timeline.close != null) {
93+
// already closed session
94+
return
95+
}
96+
97+
// This is how we specify the connection is closed and shouldn't be used.
98+
maConn.timeline.close = Date.now()
99+
}
100+
101+
if (closed) {
102+
// already closed session
103+
return
104+
}
105+
106+
wt.close(closeInfo)
107+
} catch (err) {
108+
log.error('error closing wt session', err)
109+
} finally {
110+
closed = true
111+
}
112+
}
113+
114+
// this promise resolves/throws when the session is closed or failed to init
115+
wt.closed
116+
.then(async () => {
117+
await maConn?.close()
118+
})
119+
.catch((err: Error) => {
120+
log.error('error on remote wt session close', err)
121+
maConn?.abort(err)
122+
})
123+
.finally(() => {
124+
// if we never got as far as creating the maConn, just clean up the session
125+
if (maConn == null) {
126+
cleanUpWTSession()
127+
}
128+
})
129+
130+
// if the dial is aborted before we are ready, close the WebTransport session
131+
abortListener = () => {
132+
if (abortListener != null) {
133+
options.signal?.removeEventListener('abort', abortListener)
134+
}
135+
136+
cleanUpWTSession()
137+
}
138+
options.signal?.addEventListener('abort', abortListener)
139+
140+
await wt.ready
141+
142+
if (!await this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes)) {
143+
throw new Error('Failed to authenticate webtransport')
144+
}
145+
146+
maConn = {
147+
close: async (options?: AbortOptions) => {
148+
log('Closing webtransport')
149+
cleanUpWTSession()
150+
},
151+
abort: (err: Error) => {
152+
log('aborting webtransport due to passed err', err)
153+
cleanUpWTSession({
154+
closeCode: 0,
155+
reason: err.message
156+
})
157+
},
158+
remoteAddr: ma,
159+
timeline: {
160+
open: Date.now()
161+
},
162+
// This connection is never used directly since webtransport supports native streams.
163+
...inertDuplex()
164+
}
165+
102166
options?.signal?.throwIfAborted()
103-
} catch (e) {
104-
wt.close()
105-
throw e
106-
}
107167

108-
return options.upgrader.upgradeOutbound(maConn, { skipEncryption: true, muxerFactory: this.webtransportMuxer(wt), skipProtection: true })
168+
return await options.upgrader.upgradeOutbound(maConn, { skipEncryption: true, muxerFactory: this.webtransportMuxer(wt, cleanUpWTSession), skipProtection: true })
169+
} catch (err: any) {
170+
log.error('caught wt session err', err)
171+
172+
cleanUpWTSession({
173+
closeCode: 0,
174+
reason: err.message
175+
})
176+
177+
throw err
178+
} finally {
179+
if (abortListener != null) {
180+
options.signal?.removeEventListener('abort', abortListener)
181+
}
182+
}
109183
}
110184

111185
async authenticateWebTransport (wt: InstanceType<typeof WebTransport>, localPeer: PeerId, remotePeer: PeerId, certhashes: Array<MultihashDigest<number>>): Promise<boolean> {
@@ -156,7 +230,7 @@ class WebTransportTransport implements Transport {
156230
return true
157231
}
158232

159-
webtransportMuxer (wt: InstanceType<typeof WebTransport>): StreamMuxerFactory {
233+
webtransportMuxer (wt: InstanceType<typeof WebTransport>, cleanUpWTSession: WebTransportSessionCleanup): StreamMuxerFactory {
160234
let streamIDCounter = 0
161235
const config = this.config
162236
return {
@@ -217,11 +291,14 @@ class WebTransportTransport implements Transport {
217291
*/
218292
close: async (options?: AbortOptions) => {
219293
log('Closing webtransport muxer')
220-
await wt.close()
294+
cleanUpWTSession()
221295
},
222296
abort: (err: Error) => {
223297
log('Aborting webtransport muxer with err:', err)
224-
wt.close()
298+
cleanUpWTSession({
299+
closeCode: 0,
300+
reason: err.message
301+
})
225302
},
226303
// This stream muxer is webtransport native. Therefore it doesn't plug in with any other duplex.
227304
...inertDuplex()

packages/transport-webtransport/test/browser.ts

Lines changed: 37 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import { noise } from '@chainsafe/libp2p-noise'
55
import { multiaddr } from '@multiformats/multiaddr'
66
import { expect } from 'aegir/chai'
7-
import { createLibp2p } from 'libp2p'
7+
import { createLibp2p, type Libp2p } from 'libp2p'
88
import { webTransport } from '../src/index.js'
99

1010
declare global {
@@ -14,22 +14,34 @@ declare global {
1414
}
1515

1616
describe('libp2p-webtransport', () => {
17-
it('webtransport connects to go-libp2p', async () => {
18-
if (process.env.serverAddr == null) {
19-
throw new Error('serverAddr not found')
20-
}
17+
let node: Libp2p
2118

22-
const maStr: string = process.env.serverAddr
23-
const ma = multiaddr(maStr)
24-
const node = await createLibp2p({
19+
beforeEach(async () => {
20+
node = await createLibp2p({
2521
transports: [webTransport()],
2622
connectionEncryption: [noise()],
2723
connectionGater: {
2824
denyDialMultiaddr: async () => false
2925
}
3026
})
27+
})
3128

32-
await node.start()
29+
afterEach(async () => {
30+
if (node != null) {
31+
await node.stop()
32+
33+
const conns = node.getConnections()
34+
expect(conns.length).to.equal(0)
35+
}
36+
})
37+
38+
it('webtransport connects to go-libp2p', async () => {
39+
if (process.env.serverAddr == null) {
40+
throw new Error('serverAddr not found')
41+
}
42+
43+
const maStr: string = process.env.serverAddr
44+
const ma = multiaddr(maStr)
3345

3446
// Ping many times
3547
for (let index = 0; index < 100; index++) {
@@ -70,10 +82,6 @@ describe('libp2p-webtransport', () => {
7082

7183
expect(res).to.be.greaterThan(-1)
7284
}
73-
74-
await node.stop()
75-
const conns = node.getConnections()
76-
expect(conns.length).to.equal(0)
7785
})
7886

7987
it('fails to connect without certhashes', async () => {
@@ -86,19 +94,25 @@ describe('libp2p-webtransport', () => {
8694
const maStrP2p = maStr.split('/p2p/')[1]
8795
const ma = multiaddr(maStrNoCerthash + '/p2p/' + maStrP2p)
8896

89-
const node = await createLibp2p({
90-
transports: [webTransport()],
91-
connectionEncryption: [noise()],
92-
connectionGater: {
93-
denyDialMultiaddr: async () => false
94-
}
95-
})
96-
await node.start()
97-
9897
const err = await expect(node.dial(ma)).to.eventually.be.rejected()
9998
expect(err.toString()).to.contain('Expected multiaddr to contain certhashes')
99+
})
100100

101-
await node.stop()
101+
it('fails to connect due to an aborted signal', async () => {
102+
if (process.env.serverAddr == null) {
103+
throw new Error('serverAddr not found')
104+
}
105+
106+
const maStr: string = process.env.serverAddr
107+
const ma = multiaddr(maStr)
108+
109+
const controller = new AbortController()
110+
controller.abort()
111+
112+
const err = await expect(node.dial(ma, {
113+
signal: controller.signal
114+
})).to.eventually.be.rejected()
115+
expect(err.toString()).to.contain('aborted')
102116
})
103117

104118
it('connects to ipv6 addresses', async function () {
@@ -110,21 +124,10 @@ describe('libp2p-webtransport', () => {
110124
}
111125

112126
const ma = multiaddr(process.env.serverAddr6)
113-
const node = await createLibp2p({
114-
transports: [webTransport()],
115-
connectionEncryption: [noise()],
116-
connectionGater: {
117-
denyDialMultiaddr: async () => false
118-
}
119-
})
120-
121-
await node.start()
122127

123128
// the address is unreachable but we can parse it correctly
124129
const stream = await node.dialProtocol(ma, '/ipfs/ping/1.0.0')
125130
await stream.close()
126-
127-
await node.stop()
128131
})
129132

130133
it('closes writes of streams after they have sunk a source', async () => {
@@ -135,13 +138,6 @@ describe('libp2p-webtransport', () => {
135138

136139
const maStr: string = process.env.serverAddr
137140
const ma = multiaddr(maStr)
138-
const node = await createLibp2p({
139-
transports: [webTransport()],
140-
connectionEncryption: [noise()],
141-
connectionGater: {
142-
denyDialMultiaddr: async () => false
143-
}
144-
})
145141

146142
async function * gen (): AsyncGenerator<Uint8Array, void, unknown> {
147143
yield new Uint8Array([0])
@@ -151,7 +147,6 @@ describe('libp2p-webtransport', () => {
151147
yield new Uint8Array([12, 13, 14, 15])
152148
}
153149

154-
await node.start()
155150
const stream = await node.dialProtocol(ma, 'echo')
156151

157152
await stream.sink(gen())
@@ -168,7 +163,5 @@ describe('libp2p-webtransport', () => {
168163
await stream.closeRead()
169164

170165
expect(stream.timeline.close).to.be.greaterThan(0)
171-
172-
await node.stop()
173166
})
174167
})

0 commit comments

Comments
 (0)