diff --git a/packages/libp2p/src/connection-manager/auto-dial.ts b/packages/libp2p/src/connection-manager/auto-dial.ts index 49eab00ef3..134d6fa3d2 100644 --- a/packages/libp2p/src/connection-manager/auto-dial.ts +++ b/packages/libp2p/src/connection-manager/auto-dial.ts @@ -2,7 +2,7 @@ import { logger } from '@libp2p/logger' import { PeerMap, PeerSet } from '@libp2p/peer-collections' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { PeerJobQueue } from '../utils/peer-job-queue.js' -import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE, AUTO_DIAL_INTERVAL, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PEER_RETRY_THRESHOLD, AUTO_DIAL_PRIORITY, LAST_DIAL_FAILURE_KEY, MIN_CONNECTIONS } from './constants.js' +import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE, AUTO_DIAL_INTERVAL, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PEER_RETRY_THRESHOLD, AUTO_DIAL_PRIORITY, LAST_CONNECTED_TIMESTAMP, LAST_DIAL_FAILURE_KEY, MIN_CONNECTIONS } from './constants.js' import type { Libp2pEvents } from '@libp2p/interface' import type { EventEmitter } from '@libp2p/interface/events' import type { PeerStore } from '@libp2p/interface/peer-store' @@ -249,9 +249,36 @@ export class AutoDial implements Startable { return Date.now() - lastDialFailureTimestamp > this.autoDialPeerRetryThresholdMs }) - log('selected %d/%d peers to dial', peersThatHaveNotFailed.length, peers.length) + // sort peers that have been connected recently to the front of the queue + const peersToDial = peersThatHaveNotFailed.sort((a, b) => { + let lastDialTimestampA = 0 + let lastDialTimestampB = 0 - for (const peer of peersThatHaveNotFailed) { + const peerAMetadata = a.metadata.get(LAST_CONNECTED_TIMESTAMP) + const peerBMetadata = b.metadata.get(LAST_CONNECTED_TIMESTAMP) + + if (peerAMetadata !== undefined) { + lastDialTimestampA = parseInt(uint8ArrayToString(peerAMetadata)) + } + + if (peerBMetadata !== undefined) { + lastDialTimestampB = parseInt(uint8ArrayToString(peerBMetadata)) + } + + if (lastDialTimestampA > lastDialTimestampB) { + return -1 + } + + if (lastDialTimestampB > lastDialTimestampA) { + return 1 + } + + return 0 + }) + + log('selected %d/%d peers to dial', peersToDial.length, peers.length) + + for (const peer of peersToDial) { this.queue.add(async () => { const numConnections = this.connectionManager.getConnectionsMap().size diff --git a/packages/libp2p/src/connection-manager/constants.defaults.ts b/packages/libp2p/src/connection-manager/constants.defaults.ts index 13f1f18957..dc0726f243 100644 --- a/packages/libp2p/src/connection-manager/constants.defaults.ts +++ b/packages/libp2p/src/connection-manager/constants.defaults.ts @@ -62,3 +62,5 @@ export const MAX_INCOMING_PENDING_CONNECTIONS = 10 * failed to dial. */ export const LAST_DIAL_FAILURE_KEY = 'last-dial-failure' + +export const LAST_CONNECTED_TIMESTAMP = 'last-connected-timestamp' diff --git a/packages/libp2p/src/connection-manager/index.ts b/packages/libp2p/src/connection-manager/index.ts index d94612fd9f..a376d310a2 100644 --- a/packages/libp2p/src/connection-manager/index.ts +++ b/packages/libp2p/src/connection-manager/index.ts @@ -6,11 +6,12 @@ import { defaultAddressSort } from '@libp2p/utils/address-sort' import { type Multiaddr, type Resolver, multiaddr } from '@multiformats/multiaddr' import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers' import { RateLimiterMemory } from 'rate-limiter-flexible' +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { codes } from '../errors.js' import { getPeerAddress } from '../get-peer.js' import { AutoDial } from './auto-dial.js' import { ConnectionPruner } from './connection-pruner.js' -import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PARALLEL_DIALS_PER_PEER, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js' +import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, LAST_CONNECTED_TIMESTAMP, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PARALLEL_DIALS_PER_PEER, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js' import { DialQueue } from './dial-queue.js' import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions } from '@libp2p/interface' import type { Connection, MultiaddrConnection } from '@libp2p/interface/connection' @@ -540,6 +541,17 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { peerConnections.push(connection) } + // Set connected peers as recently dialled + try { + await this.peerStore.patch(connection.remotePeer, { + metadata: { + [LAST_CONNECTED_TIMESTAMP]: uint8ArrayFromString(Date.now().toString()) + } + }) + } catch (err: any) { + log.error('could not update last connected timestamp for peer %p', connection.remotePeer, err) + } + return connection } diff --git a/packages/libp2p/test/connection-manager/auto-dial.spec.ts b/packages/libp2p/test/connection-manager/auto-dial.spec.ts index 3c5968bdf1..d5f69c14b4 100644 --- a/packages/libp2p/test/connection-manager/auto-dial.spec.ts +++ b/packages/libp2p/test/connection-manager/auto-dial.spec.ts @@ -13,7 +13,7 @@ import Sinon from 'sinon' import { stubInterface } from 'sinon-ts' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { AutoDial } from '../../src/connection-manager/auto-dial.js' -import { LAST_DIAL_FAILURE_KEY } from '../../src/connection-manager/constants.js' +import { LAST_CONNECTED_TIMESTAMP, LAST_DIAL_FAILURE_KEY } from '../../src/connection-manager/constants.js' import { matchPeerId } from '../fixtures/match-peer-id.js' import type { Libp2pEvents } from '@libp2p/interface' import type { Connection } from '@libp2p/interface/connection' @@ -291,4 +291,55 @@ describe('auto-dial', () => { // should have retried the unreachable peer expect(connectionManager.openConnection.calledWith(matchPeerId(undialablePeer.id))).to.be.true() }) + + it('should prioritize peers which have been successfully connected', async () => { + const recentlyConnectedPeer: Peer = { + id: await createEd25519PeerId(), + protocols: [], + addresses: [{ + multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001'), + isCertified: true + }], + metadata: new Map([[LAST_CONNECTED_TIMESTAMP, uint8ArrayFromString(`${Date.now()}`)]]), + tags: new Map() + } + + const neverConnectedPeer: Peer = { + id: await createEd25519PeerId(), + protocols: [], + addresses: [{ + multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4002'), + isCertified: true + }], + metadata: new Map(), + tags: new Map() + } + + await peerStore.save(neverConnectedPeer.id, neverConnectedPeer) + await peerStore.save(recentlyConnectedPeer.id, recentlyConnectedPeer) + + const connectionManager = stubInterface({ + getConnectionsMap: new PeerMap(), + getDialQueue: [] + }) + + autoDialler = new AutoDial({ + peerStore, + connectionManager, + events + }, { + minConnections: 10 + }) + + autoDialler.start() + + void autoDialler.autoDial() + + await pWaitFor(() => { + return connectionManager.openConnection.callCount === 2 + }) + + expect(connectionManager.openConnection.getCall(0).args[0].toString()).to.equal(recentlyConnectedPeer.id.toString()) + expect(connectionManager.openConnection.getCall(1).args[0].toString()).to.equal(neverConnectedPeer.id.toString()) + }) }) diff --git a/packages/libp2p/test/connection-manager/direct.spec.ts b/packages/libp2p/test/connection-manager/direct.spec.ts index 780c45f605..e531e8bad4 100644 --- a/packages/libp2p/test/connection-manager/direct.spec.ts +++ b/packages/libp2p/test/connection-manager/direct.spec.ts @@ -20,7 +20,7 @@ import { pEvent } from 'p-event' import sinon from 'sinon' import { stubInterface } from 'sinon-ts' import { defaultComponents, type Components } from '../../src/components.js' -import { LAST_DIAL_FAILURE_KEY } from '../../src/connection-manager/constants.js' +import { LAST_CONNECTED_TIMESTAMP, LAST_DIAL_FAILURE_KEY } from '../../src/connection-manager/constants.js' import { DefaultConnectionManager } from '../../src/connection-manager/index.js' import { codes as ErrorCodes } from '../../src/errors.js' import { type IdentifyService, identifyService } from '../../src/identify/index.js' @@ -117,6 +117,23 @@ describe('dialing (direct, WebSockets)', () => { expect(peer.metadata.has(LAST_DIAL_FAILURE_KEY)).to.be.true() }) + it('should mark a peer as having recently connected', async () => { + connectionManager = new DefaultConnectionManager(localComponents) + await connectionManager.start() + + const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '') + await localComponents.peerStore.patch(remotePeerId, { + multiaddrs: [remoteAddr] + }) + + const connection = await connectionManager.openConnection(remotePeerId) + expect(connection).to.exist() + + const peer = await localComponents.peerStore.get(remoteComponents.peerId) + + expect(peer.metadata.has(LAST_CONNECTED_TIMESTAMP)).to.be.true() + }) + it('should be able to connect to a given peer', async () => { connectionManager = new DefaultConnectionManager(localComponents) await connectionManager.start() @@ -412,7 +429,8 @@ describe('libp2p.dialer (direct, WebSockets)', () => { expect(identifySpy.callCount).to.equal(1) await identifySpy.firstCall.returnValue - expect(peerStorePatchSpy.callCount).to.equal(1) + // Account for the peer store being patched by recently dialed peer + expect(peerStorePatchSpy.callCount).to.equal(2) await libp2p.stop() })