@@ -12,103 +12,164 @@ trait Scheduler:
12
12
def schedule (task : Runnable ): Unit = ???
13
13
14
14
object Scheduler extends Scheduler :
15
- given fromAsync (using async : Async ): Scheduler = async.client .scheduler
15
+ given fromAsync (using async : Async ): Scheduler = async.runner .scheduler
16
16
end Scheduler
17
17
18
+ /** A context that allows one to suspend waiting for asynchronous data sources */
18
19
trait Async :
19
20
20
- /** Wait for completion of future `f`. This means:
21
- * - ensure that computing `f` has started
22
- * - wait for the completion and return the completed Try
23
- */
24
- def await [T ](f : Future [T ]): Try [T ]
21
+ /** Wait for completion of async source `src` and return the result */
22
+ def await [T ](src : Async .Source [T ]): T
25
23
26
- /** Wait for completion of the first of the futures `f1 `, `f2 `
27
- * @return `Left(r1)` if `f1 ` completed first with `r1`
28
- * `Right(r2)` if `f2 ` completed first with `r2`
24
+ /** Wait for completion of the first of the sources `src1 `, `src2 `
25
+ * @return `Left(r1)` if `src1 ` completed first with `r1`
26
+ * `Right(r2)` if `src2 ` completed first with `r2`
29
27
*/
30
- def awaitEither [T1 , T2 ](f1 : Future [T1 ], f2 : Future [T2 ]): Either [Try [ T1 ], Try [ T2 ] ]
28
+ def awaitEither [T1 , T2 ](src1 : Async . Source [T1 ], src2 : Async . Source [T2 ]): Either [T1 , T2 ]
31
29
32
- /** The future computed by this async computation. */
33
- def client : Future [ ? ]
30
+ /** The runner underlying this async computation. */
31
+ def runner : Async . Runner
34
32
35
33
object Async :
34
+
35
+ /** The currently executing Async context */
36
36
inline def current (using async : Async ): Async = async
37
+
38
+ /** An asynchronous data source. Sources can be persistent or ephemeral.
39
+ * A persistent source will always return the same data to calls to `poll`
40
+ * and pass the same data to calls of `handle`. An ephemeral source might pass new
41
+ * data in every call. An example of a persistent source is `Future`. An
42
+ * example of an ephemeral source is `Channel`.
43
+ */
44
+ trait Source [+ T ]:
45
+
46
+ /** Poll whether data is available
47
+ * @return The data or None in an option. Depending on the nature of the
48
+ * source, data might be returned only once in a poll. E.g. if
49
+ * the source is a channel, a Some result might skip to the next
50
+ * entry.
51
+ */
52
+ def poll : Option [T ]
53
+
54
+ /** When data is available, pass it to function `k`.
55
+ */
56
+ def handleWith (k : T => Unit ): Unit
57
+
58
+ end Source
59
+
60
+ /** A thread-like entity that can be cancelled */
61
+ trait Runner :
62
+
63
+ /** The scheduler on which this computation is running */
64
+ def scheduler : Scheduler
65
+
66
+ /** Cancel computation for this runner and all its children */
67
+ def cancel (): Unit
68
+
69
+ /** Add a given child to this runner */
70
+ def addChild (child : Runner ): Unit
71
+ end Runner
72
+
37
73
end Async
38
74
39
- class Future [+ T ](body : Async ?=> T )(using val scheduler : Scheduler ):
75
+
76
+ class Future [+ T ](body : Async ?=> T )(using val scheduler : Scheduler )
77
+ extends Async .Source [Try [T ]], Async .Runner :
40
78
import Future .{Status , Cancellation }, Status .*
41
79
42
80
@ volatile private var status : Status = Started
43
81
private var result : Try [T ] = uninitialized
44
82
private var waiting : ListBuffer [Try [T ] => Unit ] = ListBuffer ()
45
- private var children : mutable.Set [Future [? ]] = mutable.Set ()
46
-
47
- private def addWaiting (k : Try [T ] => Unit ): Unit = synchronized :
48
- if status == Completed then k(result)
49
- else waiting += k
83
+ private var children : mutable.Set [Async .Runner ] = mutable.Set ()
50
84
51
85
private def currentWaiting (): List [Try [T ] => Unit ] = synchronized :
52
86
val ws = waiting.toList
53
87
waiting.clear()
54
88
ws
55
89
56
- private def currentChildren (): List [Future [ ? ] ] = synchronized :
90
+ private def currentChildren (): List [Async . Runner ] = synchronized :
57
91
val cs = children.toList
58
92
children.clear()
59
93
cs
60
94
61
- private def checkCancellation (): Unit =
62
- if status == Cancelled then throw Cancellation ()
95
+ def poll : Option [Try [T ]] = status match
96
+ case Started => None
97
+ case Completed => Some (result)
98
+ case Cancelled => Some (Failure (Cancellation ()))
99
+
100
+ def handleWith (k : Try [T ] => Unit ): Unit = synchronized :
101
+ if status == Completed then k(result)
102
+ else waiting += k
103
+
104
+ /** Eventually stop computation of this future and fail with
105
+ * a `Cancellation` exception. Also cancel all linked children.
106
+ */
107
+ def cancel (): Unit = synchronized :
108
+ if status != Completed && status != Cancelled then
109
+ status = Cancelled
110
+ for f <- currentChildren() do f.cancel()
111
+
112
+ def addChild (child : Async .Runner ): Unit = synchronized :
113
+ if status == Completed then child.cancel()
114
+ else children += this
115
+
116
+ /** Links the future as a child to the current async client.
117
+ * This means the future will be cancelled when the async client
118
+ * completes.
119
+ */
120
+ def linked (using async : Async ): this .type = synchronized :
121
+ if status != Completed then async.runner.addChild(this )
122
+ this
63
123
64
124
/** Wait for this future to be completed, return its value in case of success,
65
125
* or rethrow exception in case of failure.
66
126
*/
67
127
def value (using async : Async ): T = async.await(this ).get
68
128
129
+ /** Block thread until future is completed and return result
130
+ * N.B. This should be parameterized with a timeout.
131
+ */
132
+ def force (): T =
133
+ while status != Completed do wait()
134
+ result.get
135
+
69
136
// a handler for Async
70
137
private def async (body : Async ?=> Unit ): Unit =
71
138
boundary [Unit ]:
72
139
given Async with
73
-
74
- private def resultOption [T ](f : Future [T ]): Option [Try [T ]] = f.status match
75
- case Started =>
76
- None
77
- case Completed =>
78
- Some (f.result)
79
- case Cancelled =>
80
- Some (Failure (Cancellation ()))
140
+ private def checkCancellation (): Unit =
141
+ if status == Cancelled then throw Cancellation ()
81
142
82
143
private inline def cancelChecked [T ](op : => T ): T =
83
144
checkCancellation()
84
145
val res = op
85
146
checkCancellation()
86
147
res
87
148
88
- def await [T ](f : Future [T ]): Try [ T ] =
149
+ def await [T ](src : Async . Source [T ]): T =
89
150
cancelChecked :
90
- resultOption(f) .getOrElse:
91
- suspend[Try [ T ] , Unit ]: s =>
92
- f.addWaiting : result =>
151
+ src.poll .getOrElse:
152
+ suspend[T , Unit ]: k =>
153
+ src.handleWith : result =>
93
154
scheduler.schedule: () =>
94
- s .resume(result)
155
+ k .resume(result)
95
156
96
- def awaitEither [T1 , T2 ](f1 : Future [T1 ], f2 : Future [T2 ]): Either [Try [ T1 ], Try [ T2 ] ] =
157
+ def awaitEither [T1 , T2 ](src1 : Async . Source [T1 ], src2 : Async . Source [T2 ]): Either [T1 , T2 ] =
97
158
cancelChecked :
98
- resultOption(f1) .map(Left (_)).getOrElse:
99
- resultOption(f2) .map(Right (_)).getOrElse:
100
- suspend[Either [Try [ T1 ], Try [ T2 ]] , Unit ]: s =>
159
+ src1.poll .map(Left (_)).getOrElse:
160
+ src2.poll .map(Right (_)).getOrElse:
161
+ suspend[Either [T1 , T2 ], Unit ]: k =>
101
162
var found = AtomicBoolean ()
102
- f1.addWaiting : result =>
163
+ src1.handleWith : result =>
103
164
if ! found.getAndSet(true ) then
104
165
scheduler.schedule: () =>
105
- s .resume(Left (result))
106
- f2.addWaiting : result =>
166
+ k .resume(Left (result))
167
+ src2.handleWith : result =>
107
168
if ! found.getAndSet(true ) then
108
169
scheduler.schedule: () =>
109
- s .resume(Right (result))
170
+ k .resume(Right (result))
110
171
111
- def client = Future .this
172
+ def runner = Future .this
112
173
end given
113
174
114
175
body
@@ -121,36 +182,9 @@ class Future[+T](body: Async ?=> T)(using val scheduler: Scheduler):
121
182
catch case ex : Exception => Failure (ex)
122
183
status = Completed
123
184
for task <- currentWaiting() do task(result)
124
- cancelChildren ()
185
+ cancel ()
125
186
notifyAll()
126
187
127
- /** Links the future as a child to the current async client.
128
- * This means the future will be cancelled when the async client
129
- * completes.
130
- */
131
- def linked (using async : Async ): this .type = synchronized :
132
- if status != Completed then
133
- async.client.children += this
134
- this
135
-
136
- private def cancelChildren (): Unit =
137
- for f <- currentChildren() do f.cancel()
138
-
139
- /** Eventually stop computation of this future and fail with
140
- * a `Cancellation` exception. Also cancel all linked children.
141
- */
142
- def cancel (): Unit = synchronized :
143
- if status != Completed && status != Cancelled then
144
- status = Cancelled
145
- cancelChildren()
146
-
147
- /** Block thread until future is completed and return result
148
- * N.B. This should be parameterized with a timeout.
149
- */
150
- def force (): T =
151
- while status != Completed do wait()
152
- result.get
153
-
154
188
scheduler.schedule(() => complete())
155
189
end Future
156
190
@@ -187,6 +221,30 @@ class Task[+T](val body: Async ?=> T):
187
221
case Right (_ : Failure [? ]) => f1.value
188
222
end Task
189
223
224
+ /** An unbounded channel */
225
+ class Channel [T ] extends Async .Source [T ]:
226
+ private val pending = ListBuffer [T ]()
227
+ private val waiting = ListBuffer [T => Unit ]()
228
+ def send (x : T ): Unit = synchronized :
229
+ if waiting.isEmpty then pending += x
230
+ else
231
+ val k = waiting.head
232
+ waiting.dropInPlace(1 )
233
+ k(x)
234
+ def poll : Option [T ] = synchronized :
235
+ if pending.isEmpty then None
236
+ else
237
+ val x = pending.head
238
+ pending.dropInPlace(1 )
239
+ Some (x)
240
+ def handleWith (k : T => Unit ): Unit = synchronized :
241
+ if pending.isEmpty then waiting += k
242
+ else
243
+ val x = pending.head
244
+ pending.dropInPlace(1 )
245
+ k(x)
246
+ end Channel
247
+
190
248
def Test (x : Future [Int ], xs : List [Future [Int ]])(using Scheduler ): Future [Int ] =
191
249
Future :
192
250
x.value + xs.map(_.value).sum
0 commit comments