1
1
package chrome .interop
2
2
3
3
import _root_ .fs2 ._
4
- import _root_ .fs2 .async . mutable .Queue
5
- import cats .effect .{Async , Effect , IO }
4
+ import _root_ .fs2 .concurrent .Queue
5
+ import cats .effect .{Async , Concurrent , Effect , IO }
6
6
import cats .effect .implicits ._
7
7
import cats .implicits ._
8
8
import chrome .events .bindings .Event
9
9
10
- import scala .concurrent .ExecutionContext
11
10
import scala .language .higherKinds
12
11
import scala .scalajs .js
13
12
@@ -28,20 +27,18 @@ package object fs2 {
28
27
}
29
28
}
30
29
31
- def toStream [F [_]: Effect ]( implicit EC : ExecutionContext ) : Stream [F , T1 ] =
32
- toStream(async.unboundedQueue [F , T1 ])
30
+ def toStream [F [_]: Effect : Concurrent ] : Stream [F , T1 ] =
31
+ toStream(Queue .unbounded [F , T1 ])
33
32
34
- def toStream [F [_]: Effect ](queue : F [Queue [F , T1 ]]): Stream [F , T1 ] = {
33
+ def toStream [F [_]: Effect ](queue : F [Queue [F , T1 ]]): Stream [F , T1 ] =
35
34
Stream .bracket {
36
35
queue.map { q =>
37
- val callback = (t : T1 ) => q.offer1(t).runAsync(_ => IO .unit).unsafeRunAsync(_ => ())
36
+ val callback = (t : T1 ) => q.offer1(t).runAsync(_ => IO .unit).unsafeRunSync
38
37
event.addListener(callback)
39
38
val release = Async [F ].delay(event.removeListener(callback))
40
39
(q, release)
41
40
}
42
- }(_._1.dequeue, _._2)
43
- }
44
-
41
+ }(_._2).flatMap(_._1.dequeue)
45
42
}
46
43
47
44
implicit class Event2FS2Ops [T1 , T2 ](val event : Event [js.Function2 [T1 , T2 , _]])
@@ -59,20 +56,18 @@ package object fs2 {
59
56
}
60
57
}
61
58
62
- def toStream [F [_]: Effect ]( implicit EC : ExecutionContext ) : Stream [F , (T1 , T2 )] =
63
- toStream(async.unboundedQueue [F , (T1 , T2 )])
59
+ def toStream [F [_]: Effect : Concurrent ] : Stream [F , (T1 , T2 )] =
60
+ toStream(Queue .unbounded [F , (T1 , T2 )])
64
61
65
- def toStream [F [_]: Effect ](queue : F [Queue [F , (T1 , T2 )]]): Stream [F , (T1 , T2 )] = {
62
+ def toStream [F [_]: Effect ](queue : F [Queue [F , (T1 , T2 )]]): Stream [F , (T1 , T2 )] =
66
63
Stream .bracket {
67
64
queue.map { q =>
68
- val callback = (t1 : T1 , t2 : T2 ) => q.offer1((t1, t2)).runAsync(_ => IO .unit).unsafeRunAsync(_ => ())
65
+ val callback = (t1 : T1 , t2 : T2 ) => q.offer1((t1, t2)).runAsync(_ => IO .unit).unsafeRunSync
69
66
event.addListener(callback)
70
67
val release = Async [F ].delay(event.removeListener(callback))
71
68
(q, release)
72
69
}
73
- }(_._1.dequeue, _._2)
74
- }
75
-
70
+ }(_._2).flatMap(_._1.dequeue)
76
71
}
77
72
78
73
implicit class Event3FS2Ops [T1 , T2 , T3 ](val event : Event [js.Function3 [T1 , T2 , T3 , _]])
@@ -90,19 +85,18 @@ package object fs2 {
90
85
}
91
86
}
92
87
93
- def toStream [F [_]: Effect ]( implicit EC : ExecutionContext ) : Stream [F , (T1 , T2 , T3 )] =
94
- toStream(async.unboundedQueue [F , (T1 , T2 , T3 )])
88
+ def toStream [F [_]: Effect : Concurrent ] : Stream [F , (T1 , T2 , T3 )] =
89
+ toStream(Queue .unbounded [F , (T1 , T2 , T3 )])
95
90
96
- def toStream [F [_]: Effect ](queue : F [Queue [F , (T1 , T2 , T3 )]]): Stream [F , (T1 , T2 , T3 )] = {
91
+ def toStream [F [_]: Effect ](queue : F [Queue [F , (T1 , T2 , T3 )]]): Stream [F , (T1 , T2 , T3 )] =
97
92
Stream .bracket {
98
93
queue.map { q =>
99
- val callback = (t1 : T1 , t2 : T2 , t3 : T3 ) => q.offer1((t1, t2, t3)).runAsync(_ => IO .unit).unsafeRunAsync(_ => ())
94
+ val callback = (t1 : T1 , t2 : T2 , t3 : T3 ) => q.offer1((t1, t2, t3)).runAsync(_ => IO .unit).unsafeRunSync
100
95
event.addListener(callback)
101
96
val release = Async [F ].delay(event.removeListener(callback))
102
97
(q, release)
103
98
}
104
- }(_._1.dequeue, _._2)
105
- }
99
+ }(_._2).flatMap(_._1.dequeue)
106
100
107
101
}
108
102
0 commit comments