@@ -18,21 +18,28 @@ export function streamToMaConnection (props: StreamProperties): MultiaddrConnect
18
18
const { sink, source } = stream
19
19
const log = logger . forComponent ( 'libp2p:stream:converter' )
20
20
21
+ let closedRead = false
22
+ let closedWrite = false
23
+
21
24
const mapSource = ( async function * ( ) {
22
- for await ( const list of source ) {
23
- if ( list instanceof Uint8Array ) {
24
- yield list
25
- } else {
26
- yield * list
25
+ try {
26
+ for await ( const list of source ) {
27
+ if ( list instanceof Uint8Array ) {
28
+ yield list
29
+ } else {
30
+ yield * list
31
+ }
27
32
}
33
+ } finally {
34
+ closedRead = true
35
+ close ( )
28
36
}
29
37
} ( ) )
30
38
31
39
const maConn : MultiaddrConnection = {
32
40
async sink ( source ) {
33
41
try {
34
42
await sink ( source )
35
- close ( )
36
43
} catch ( err : any ) {
37
44
// If aborted we can safely ignore
38
45
if ( err . type !== 'aborted' ) {
@@ -41,23 +48,31 @@ export function streamToMaConnection (props: StreamProperties): MultiaddrConnect
41
48
// destroyed. There's nothing to do here except log the error & return.
42
49
log ( err )
43
50
}
51
+ } finally {
52
+ closedWrite = true
53
+ close ( )
44
54
}
45
55
} ,
46
56
source : mapSource ,
47
57
remoteAddr,
48
58
timeline : { open : Date . now ( ) , close : undefined } ,
49
59
async close ( options ?: AbortOptions ) {
50
- close ( )
60
+ close ( true )
51
61
await stream . close ( options )
52
62
} ,
53
63
abort ( err : Error ) : void {
54
- close ( )
64
+ close ( true )
55
65
stream . abort ( err )
56
66
}
57
67
}
58
68
59
- function close ( ) : void {
60
- if ( maConn . timeline . close == null ) {
69
+ function close ( force ?: boolean ) : void {
70
+ if ( force === true ) {
71
+ closedRead = true
72
+ closedWrite = true
73
+ }
74
+
75
+ if ( closedRead && closedWrite && maConn . timeline . close == null ) {
61
76
maConn . timeline . close = Date . now ( )
62
77
}
63
78
}
0 commit comments