Skip to content

Commit 444d837

Browse files
authored
fix: perform find peer during dial if peer has no multiaddrs (#2345)
A recent change to go-libp2p started yielding peers with no multiaddrs during DHT queries. These peer ids have addresses that can be found during a find peer query but performing this query for every peer is expensive, so do the lookup only when dialing the peer.
1 parent 53e83ee commit 444d837

File tree

10 files changed

+128
-86
lines changed

10 files changed

+128
-86
lines changed

packages/libp2p/src/connection-manager/dial-queue.ts

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import {
1616
LAST_DIAL_FAILURE_KEY
1717
} from './constants.js'
1818
import { resolveMultiaddrs } from './utils.js'
19-
import type { AddressSorter, AbortOptions, ComponentLogger, Logger, Connection, ConnectionGater, Metrics, PeerId, Address, PeerStore } from '@libp2p/interface'
19+
import type { AddressSorter, AbortOptions, ComponentLogger, Logger, Connection, ConnectionGater, Metrics, PeerId, Address, PeerStore, PeerRouting } from '@libp2p/interface'
2020
import type { TransportManager } from '@libp2p/interface-internal'
2121

2222
export interface PendingDialTarget {
@@ -57,17 +57,15 @@ interface DialQueueComponents {
5757
peerId: PeerId
5858
metrics?: Metrics
5959
peerStore: PeerStore
60+
peerRouting: PeerRouting
6061
transportManager: TransportManager
6162
connectionGater: ConnectionGater
6263
logger: ComponentLogger
6364
}
6465

6566
export class DialQueue {
6667
public queue: Queue<Connection, DialQueueJobOptions>
67-
private readonly peerId: PeerId
68-
private readonly peerStore: PeerStore
69-
private readonly connectionGater: ConnectionGater
70-
private readonly transportManager: TransportManager
68+
private readonly components: DialQueueComponents
7169
private readonly addressSorter: AddressSorter
7270
private readonly maxPeerAddrsToDial: number
7371
private readonly dialTimeout: number
@@ -82,10 +80,7 @@ export class DialQueue {
8280
this.connections = init.connections ?? new PeerMap()
8381
this.log = components.logger.forComponent('libp2p:connection-manager:dial-queue')
8482

85-
this.peerId = components.peerId
86-
this.peerStore = components.peerStore
87-
this.connectionGater = components.connectionGater
88-
this.transportManager = components.transportManager
83+
this.components = components
8984
this.shutDownController = new AbortController()
9085

9186
setMaxListeners(Infinity, this.shutDownController.signal)
@@ -218,15 +213,15 @@ export class DialQueue {
218213

219214
for (const address of addrsToDial) {
220215
if (dialed === this.maxPeerAddrsToDial) {
221-
this.log('dialed %d addresses for %p, not trying any others', dialed, peerId)
216+
this.log('dialed maxPeerAddrsToDial (%d) addresses for %p, not trying any others', dialed, peerId)
222217

223218
throw new CodeError('Peer had more than maxPeerAddrsToDial', codes.ERR_TOO_MANY_ADDRESSES)
224219
}
225220

226221
dialed++
227222

228223
try {
229-
const conn = await this.transportManager.dial(address.multiaddr, {
224+
const conn = await this.components.transportManager.dial(address.multiaddr, {
230225
...options,
231226
signal
232227
})
@@ -240,7 +235,7 @@ export class DialQueue {
240235
if (peerId != null) {
241236
// record the failed dial
242237
try {
243-
await this.peerStore.patch(peerId, {
238+
await this.components.peerStore.patch(peerId, {
244239
metadata: {
245240
[LAST_DIAL_FAILURE_KEY]: uint8ArrayFromString(Date.now().toString())
246241
}
@@ -299,19 +294,20 @@ export class DialQueue {
299294

300295
// if a peer id or multiaddr(s) with a peer id, make sure it isn't our peer id and that we are allowed to dial it
301296
if (peerId != null) {
302-
if (this.peerId.equals(peerId)) {
297+
if (this.components.peerId.equals(peerId)) {
303298
throw new CodeError('Tried to dial self', codes.ERR_DIALED_SELF)
304299
}
305300

306-
if ((await this.connectionGater.denyDialPeer?.(peerId)) === true) {
301+
if ((await this.components.connectionGater.denyDialPeer?.(peerId)) === true) {
307302
throw new CodeError('The dial request is blocked by gater.allowDialPeer', codes.ERR_PEER_DIAL_INTERCEPTED)
308303
}
309304

310-
// if just a peer id was passed, load available multiaddrs for this peer from the address book
305+
// if just a peer id was passed, load available multiaddrs for this peer
306+
// from the peer store
311307
if (addrs.length === 0) {
312308
this.log('loading multiaddrs for %p', peerId)
313309
try {
314-
const peer = await this.peerStore.get(peerId)
310+
const peer = await this.components.peerStore.get(peerId)
315311
addrs.push(...peer.addresses)
316312
this.log('loaded multiaddrs for %p', peerId, addrs.map(({ multiaddr }) => multiaddr.toString()))
317313
} catch (err: any) {
@@ -320,9 +316,31 @@ export class DialQueue {
320316
}
321317
}
322318
}
319+
320+
// if we still don't have any addresses for this peer, try a lookup
321+
// using the peer routing
322+
if (addrs.length === 0) {
323+
this.log('looking up multiaddrs for %p in the peer routing', peerId)
324+
325+
try {
326+
const peerInfo = await this.components.peerRouting.findPeer(peerId)
327+
328+
this.log('found multiaddrs for %p in the peer routing', peerId, addrs.map(({ multiaddr }) => multiaddr.toString()))
329+
330+
addrs.push(...peerInfo.multiaddrs.map(multiaddr => ({
331+
multiaddr,
332+
isCertified: false
333+
})))
334+
} catch (err: any) {
335+
if (err.code !== codes.ERR_NO_ROUTERS_AVAILABLE) {
336+
this.log.error('looking up multiaddrs for %p in the peer routing failed', peerId, err)
337+
}
338+
}
339+
}
323340
}
324341

325-
// resolve addresses - this can result in a one-to-many translation when dnsaddrs are resolved
342+
// resolve addresses - this can result in a one-to-many translation when
343+
// dnsaddrs are resolved
326344
let resolvedAddresses = (await Promise.all(
327345
addrs.map(async addr => {
328346
const result = await resolveMultiaddrs(addr.multiaddr, {
@@ -367,7 +385,7 @@ export class DialQueue {
367385

368386
const filteredAddrs = resolvedAddresses.filter(addr => {
369387
// filter out any multiaddrs that we do not have transports for
370-
if (this.transportManager.transportForMultiaddr(addr.multiaddr) == null) {
388+
if (this.components.transportManager.transportForMultiaddr(addr.multiaddr) == null) {
371389
return false
372390
}
373391

@@ -407,7 +425,7 @@ export class DialQueue {
407425
const gatedAdrs: Address[] = []
408426

409427
for (const addr of dedupedMultiaddrs) {
410-
if (this.connectionGater.denyDialMultiaddr != null && await this.connectionGater.denyDialMultiaddr(addr.multiaddr)) {
428+
if (this.components.connectionGater.denyDialMultiaddr != null && await this.components.connectionGater.denyDialMultiaddr(addr.multiaddr)) {
411429
continue
412430
}
413431

packages/libp2p/src/connection-manager/index.ts

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { AutoDial } from './auto-dial.js'
1010
import { ConnectionPruner } from './connection-pruner.js'
1111
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
1212
import { DialQueue } from './dial-queue.js'
13-
import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, MultiaddrConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, Peer, PeerStore, Startable, PendingDialStatus } from '@libp2p/interface'
13+
import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, MultiaddrConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, Peer, PeerStore, Startable, PendingDialStatus, PeerRouting } from '@libp2p/interface'
1414
import type { ConnectionManager, OpenConnectionOptions, TransportManager } from '@libp2p/interface-internal'
1515
import type { JobStatus } from '@libp2p/utils/queue'
1616

@@ -82,8 +82,9 @@ export interface ConnectionManagerInit {
8282
maxParallelDials?: number
8383

8484
/**
85-
* Maximum number of addresses allowed for a given peer - if a peer has more
86-
* addresses than this then the dial will fail. (default: 25)
85+
* Maximum number of addresses allowed for a given peer before giving up
86+
*
87+
* @default 25
8788
*/
8889
maxPeerAddrsToDial?: number
8990

@@ -144,6 +145,7 @@ export interface DefaultConnectionManagerComponents {
144145
peerId: PeerId
145146
metrics?: Metrics
146147
peerStore: PeerStore
148+
peerRouting: PeerRouting
147149
transportManager: TransportManager
148150
connectionGater: ConnectionGater
149151
events: TypedEventTarget<Libp2pEvents>
@@ -233,14 +235,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
233235
allow: this.allow
234236
})
235237

236-
this.dialQueue = new DialQueue({
237-
peerId: components.peerId,
238-
metrics: components.metrics,
239-
peerStore: components.peerStore,
240-
transportManager: components.transportManager,
241-
connectionGater: components.connectionGater,
242-
logger: components.logger
243-
}, {
238+
this.dialQueue = new DialQueue(components, {
244239
addressSorter: init.addressSorter ?? defaultAddressSort,
245240
maxParallelDials: init.maxParallelDials ?? MAX_PARALLEL_DIALS,
246241
maxPeerAddrsToDial: init.maxPeerAddrsToDial ?? MAX_PEER_ADDRS_TO_DIAL,

packages/libp2p/src/content-routing.ts

Lines changed: 8 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import { CodeError } from '@libp2p/interface'
22
import { PeerSet } from '@libp2p/peer-collections'
33
import merge from 'it-merge'
4-
import parallel from 'it-parallel'
54
import { codes, messages } from './errors.js'
6-
import type { AbortOptions, ComponentLogger, ContentRouting, Logger, PeerInfo, PeerRouting, PeerStore, RoutingOptions, Startable } from '@libp2p/interface'
5+
import type { AbortOptions, ComponentLogger, ContentRouting, PeerInfo, PeerRouting, PeerStore, RoutingOptions, Startable } from '@libp2p/interface'
76
import type { CID } from 'multiformats/cid'
87

98
export interface CompoundContentRoutingInit {
@@ -17,13 +16,11 @@ export interface CompoundContentRoutingComponents {
1716
}
1817

1918
export class CompoundContentRouting implements ContentRouting, Startable {
20-
private readonly log: Logger
2119
private readonly routers: ContentRouting[]
2220
private started: boolean
2321
private readonly components: CompoundContentRoutingComponents
2422

2523
constructor (components: CompoundContentRoutingComponents, init: CompoundContentRoutingInit) {
26-
this.log = components.logger.forComponent('libp2p:content-routing')
2724
this.routers = init.routers ?? []
2825
this.started = false
2926
this.components = components
@@ -52,48 +49,22 @@ export class CompoundContentRouting implements ContentRouting, Startable {
5249
const self = this
5350
const seen = new PeerSet()
5451

55-
for await (const peer of parallel(
56-
async function * () {
57-
const source = merge(
58-
...self.routers.map(router => router.findProviders(key, options))
59-
)
60-
61-
for await (let peer of source) {
62-
yield async () => {
63-
// find multiaddrs if they are missing
64-
if (peer.multiaddrs.length === 0) {
65-
try {
66-
peer = await self.components.peerRouting.findPeer(peer.id, {
67-
...options,
68-
useCache: false
69-
})
70-
} catch (err) {
71-
self.log.error('could not find peer multiaddrs', err)
72-
return
73-
}
74-
}
75-
76-
return peer
77-
}
78-
}
79-
}()
52+
for await (const peer of merge(
53+
...self.routers.map(router => router.findProviders(key, options))
8054
)) {
8155
// the peer was yielded by a content router without multiaddrs and we
8256
// failed to load them
8357
if (peer == null) {
8458
continue
8559
}
8660

87-
// skip peers without addresses
88-
if (peer.multiaddrs.length === 0) {
89-
continue
61+
// store the addresses for the peer if found
62+
if (peer.multiaddrs.length > 0) {
63+
await this.components.peerStore.merge(peer.id, {
64+
multiaddrs: peer.multiaddrs
65+
})
9066
}
9167

92-
// ensure we have the addresses for a given peer
93-
await this.components.peerStore.merge(peer.id, {
94-
multiaddrs: peer.multiaddrs
95-
})
96-
9768
// deduplicate peers
9869
if (seen.has(peer.id)) {
9970
continue

packages/libp2p/src/peer-routing.ts

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ export class DefaultPeerRouting implements PeerRouting {
2222
private readonly peerStore: PeerStore
2323
private readonly routers: PeerRouting[]
2424

25-
constructor (components: DefaultPeerRoutingComponents, init: PeerRoutingInit) {
25+
constructor (components: DefaultPeerRoutingComponents, init: PeerRoutingInit = {}) {
2626
this.log = components.logger.forComponent('libp2p:peer-routing')
2727
this.peerId = components.peerId
2828
this.peerStore = components.peerStore
@@ -57,10 +57,12 @@ export class DefaultPeerRouting implements PeerRouting {
5757
continue
5858
}
5959

60-
// ensure we have the addresses for a given peer
61-
await this.peerStore.merge(peer.id, {
62-
multiaddrs: peer.multiaddrs
63-
})
60+
// store the addresses for the peer if found
61+
if (peer.multiaddrs.length > 0) {
62+
await this.peerStore.merge(peer.id, {
63+
multiaddrs: peer.multiaddrs
64+
})
65+
}
6466

6567
return peer
6668
}
@@ -105,22 +107,17 @@ export class DefaultPeerRouting implements PeerRouting {
105107
}
106108
}()
107109
)) {
108-
// the peer was yielded by a content router without multiaddrs and we
109-
// failed to load them
110110
if (peer == null) {
111111
continue
112112
}
113113

114-
// skip peers without addresses
115-
if (peer.multiaddrs.length === 0) {
116-
continue
114+
// store the addresses for the peer if found
115+
if (peer.multiaddrs.length > 0) {
116+
await this.peerStore.merge(peer.id, {
117+
multiaddrs: peer.multiaddrs
118+
})
117119
}
118120

119-
// ensure we have the addresses for a given peer
120-
await this.peerStore.merge(peer.id, {
121-
multiaddrs: peer.multiaddrs
122-
})
123-
124121
// deduplicate peers
125122
if (seen.has(peer.id)) {
126123
continue

packages/libp2p/test/connection-manager/auto-dial.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
1616
import { defaultComponents } from '../../src/components.js'
1717
import { AutoDial } from '../../src/connection-manager/auto-dial.js'
1818
import { LAST_DIAL_FAILURE_KEY } from '../../src/connection-manager/constants.js'
19-
import { matchPeerId } from '../fixtures/match-peer-id.js'
19+
import { matchPeerId } from '../fixtures/matchers.js'
2020
import type { ConnectionManager } from '@libp2p/interface-internal'
2121

2222
describe('auto-dial', () => {

0 commit comments

Comments
 (0)