1
1
import { AbortError , CodeError } from '@libp2p/interface/errors'
2
2
import { setMaxListeners } from '@libp2p/interface/events'
3
3
import { logger } from '@libp2p/logger'
4
+ import { PeerMap } from '@libp2p/peer-collections'
4
5
import { defaultAddressSort } from '@libp2p/utils/address-sort'
5
6
import { type Multiaddr , type Resolver , resolvers } from '@multiformats/multiaddr'
6
7
import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers'
@@ -35,6 +36,7 @@ export interface PendingDialTarget {
35
36
36
37
export interface DialOptions extends AbortOptions {
37
38
priority ?: number
39
+ force ?: boolean
38
40
}
39
41
40
42
interface PendingDialInternal extends PendingDial {
@@ -48,6 +50,7 @@ interface DialerInit {
48
50
maxParallelDialsPerPeer ?: number
49
51
dialTimeout ?: number
50
52
resolvers ?: Record < string , Resolver >
53
+ connections ?: PeerMap < Connection [ ] >
51
54
}
52
55
53
56
const defaultOptions = {
@@ -83,12 +86,14 @@ export class DialQueue {
83
86
private readonly inProgressDialCount ?: Metric
84
87
private readonly pendingDialCount ?: Metric
85
88
private readonly shutDownController : AbortController
89
+ private readonly connections : PeerMap < Connection [ ] >
86
90
87
91
constructor ( components : DialQueueComponents , init : DialerInit = { } ) {
88
92
this . addressSorter = init . addressSorter ?? defaultOptions . addressSorter
89
93
this . maxPeerAddrsToDial = init . maxPeerAddrsToDial ?? defaultOptions . maxPeerAddrsToDial
90
94
this . maxParallelDialsPerPeer = init . maxParallelDialsPerPeer ?? defaultOptions . maxParallelDialsPerPeer
91
95
this . dialTimeout = init . dialTimeout ?? defaultOptions . dialTimeout
96
+ this . connections = init . connections ?? new PeerMap ( )
92
97
93
98
this . peerId = components . peerId
94
99
this . peerStore = components . peerStore
@@ -187,6 +192,23 @@ export class DialQueue {
187
192
throw err
188
193
}
189
194
195
+ // make sure we don't have an existing connection to any of the addresses we
196
+ // are about to dial
197
+ let existingConnection = Array . from ( this . connections . values ( ) ) . flat ( ) . find ( conn => {
198
+ if ( options . force === true ) {
199
+ return false
200
+ }
201
+
202
+ return addrsToDial . find ( addr => {
203
+ return addr . multiaddr . equals ( conn . remoteAddr )
204
+ } )
205
+ } )
206
+
207
+ if ( existingConnection != null ) {
208
+ log ( 'already connected to %a' , existingConnection . remoteAddr )
209
+ return existingConnection
210
+ }
211
+
190
212
// ready to dial, all async work finished - make sure we don't have any
191
213
// pending dials in progress for this peer or set of multiaddrs
192
214
const existingDial = this . pendingDials . find ( dial => {
@@ -257,7 +279,28 @@ export class DialQueue {
257
279
// let other dials join this one
258
280
this . pendingDials . push ( pendingDial )
259
281
260
- return pendingDial . promise
282
+ const connection = await pendingDial . promise
283
+
284
+ // we may have been dialing a multiaddr without a peer id attached but by
285
+ // this point we have upgraded the connection so the remote peer information
286
+ // should be available - check again that we don't already have a connection
287
+ // to the remote multiaddr
288
+ existingConnection = Array . from ( this . connections . values ( ) ) . flat ( ) . find ( conn => {
289
+ if ( options . force === true ) {
290
+ return false
291
+ }
292
+
293
+ return conn . id !== connection . id && conn . remoteAddr . equals ( connection . remoteAddr )
294
+ } )
295
+
296
+ if ( existingConnection != null ) {
297
+ log ( 'already connected to %a' , existingConnection . remoteAddr )
298
+ await connection . close ( )
299
+ return existingConnection
300
+ }
301
+
302
+ log ( 'connection opened to %a' , connection . remoteAddr )
303
+ return connection
261
304
}
262
305
263
306
private createDialAbortControllers ( userSignal ?: AbortSignal ) : ClearableSignal {
0 commit comments