Skip to content

Commit fffd21b

Browse files
committed
Tweaks
- Add first-order poll method for convenience - Drop link() method
1 parent 60847c0 commit fffd21b

File tree

3 files changed

+37
-34
lines changed

3 files changed

+37
-34
lines changed

tests/pos/suspend-strawman-2/Async.scala

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,16 +51,13 @@ object Async:
5151
*/
5252
def await[T](src: Source[T]): T =
5353
checkCancellation()
54-
var resultOpt: Option[T] = None
55-
src.poll: x =>
56-
resultOpt = Some(x)
57-
true
58-
resultOpt.getOrElse:
59-
try suspend[T, Unit]: k =>
60-
src.onComplete: x =>
61-
scheduler.schedule: () =>
62-
k.resume(x)
63-
true // signals to `src` that result `x` was consumed
54+
src.poll().getOrElse:
55+
try
56+
suspend[T, Unit]: k =>
57+
src.onComplete: x =>
58+
scheduler.schedule: () =>
59+
k.resume(x)
60+
true // signals to `src` that result `x` was consumed
6461
finally checkCancellation()
6562

6663
end Impl
@@ -115,6 +112,12 @@ object Async:
115112
*/
116113
def dropListener(k: Listener[T]): Unit
117114

115+
/** Utililty method for direct polling. */
116+
def poll(): Option[T] =
117+
var resultOpt: Option[T] = None
118+
poll { x => resultOpt = Some(x); true }
119+
resultOpt
120+
118121
end Source
119122

120123
/** A source that transforms an original source in some way */
@@ -135,6 +138,7 @@ object Async:
135138
original.onComplete(transform(k))
136139
def dropListener(k: Listener[U]): Unit =
137140
original.dropListener(transform(k))
141+
138142
end DerivedSource
139143

140144
extension [T](src: Source[T])

tests/pos/suspend-strawman-2/futures.scala

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import scala.compiletime.uninitialized
66
import scala.util.{Try, Success, Failure}
77
import scala.annotation.unchecked.uncheckedVariance
88
import java.util.concurrent.CancellationException
9-
import runtime.suspend
109

1110
/** A cancellable future that can suspend waiting for other asynchronous sources
1211
*/
@@ -22,14 +21,8 @@ trait Future[+T] extends Async.Source[Try[T]], Cancellable:
2221
*/
2322
def force(): T
2423

25-
/** Links the future as a child to the current async root.
26-
* This means the future will be cancelled if the async root
27-
* is canceled.
28-
*/
29-
def linked(using async: Async): this.type
30-
3124
/** Eventually stop computation of this future and fail with
32-
* a `Cancellation` exception. Also cancel all linked children.
25+
* a `Cancellation` exception. Also cancel all children.
3326
*/
3427
def cancel(): Unit
3528

@@ -40,8 +33,8 @@ trait Future[+T] extends Async.Source[Try[T]], Cancellable:
4033

4134
object Future:
4235

43-
/** The core part of a future that is completed explicitly by calling its
44-
* `complete` method. There are two implementations
36+
/** A future that is completed explicitly by calling its
37+
* `complete` method. There are two public implementations
4538
*
4639
* - RunnableFuture: Completion is done by running a block of code
4740
* - Promise.future: Completion is done by external request.
@@ -50,20 +43,27 @@ object Future:
5043

5144
@volatile protected var hasCompleted: Boolean = false
5245
private var result: Try[T] = uninitialized // guaranteed to be set if hasCompleted = true
53-
private var waiting: mutable.Set[Try[T] => Boolean] = mutable.Set()
54-
private var children: mutable.Set[Cancellable] = mutable.Set()
46+
private val waiting: mutable.Set[Try[T] => Boolean] = mutable.Set()
47+
private val children: mutable.Set[Cancellable] = mutable.Set()
5548

5649
private def extract[T](s: mutable.Set[T]): List[T] = synchronized:
5750
val xs = s.toList
5851
s.clear()
5952
xs
6053

54+
// Async.Source method implementations
55+
6156
def poll(k: Async.Listener[Try[T]]): Boolean =
6257
hasCompleted && k(result)
6358

6459
def onComplete(k: Async.Listener[Try[T]]): Unit = synchronized:
6560
if !poll(k) then waiting += k
6661

62+
def dropListener(k: Async.Listener[Try[T]]): Unit =
63+
waiting -= k
64+
65+
// Cancellable method implementations
66+
6767
def cancel(): Unit =
6868
val othersToCancel = synchronized:
6969
if hasCompleted then Nil
@@ -76,17 +76,12 @@ object Future:
7676
def addChild(child: Cancellable): Unit = synchronized:
7777
if !hasCompleted then children += this
7878

79-
def linked(using async: Async): this.type =
80-
if !hasCompleted then async.root.addChild(this)
81-
this
82-
83-
def dropListener(k: Async.Listener[Try[T]]): Unit =
84-
waiting -= k
79+
// Future method implementations
8580

8681
def value(using async: Async): T =
8782
async.await(this).get
8883

89-
def force(): T =
84+
def force(): T = synchronized:
9085
while !hasCompleted do wait()
9186
result.get
9287

@@ -104,10 +99,14 @@ object Future:
10499
this.result = result
105100
hasCompleted = true
106101
for listener <- extract(waiting) do listener(result)
107-
notifyAll()
102+
synchronized:
103+
notifyAll()
108104

109105
end CoreFuture
110106

107+
/** A future that is completed by evaluating `body` as a separate
108+
* asynchronous operation in the given `scheduler`
109+
*/
111110
private class RunnableFuture[+T](body: Async ?=> T)(using scheduler: Scheduler)
112111
extends CoreFuture[T]:
113112

@@ -191,7 +190,7 @@ class Task[+T](val body: Async ?=> T):
191190

192191
end Task
193192

194-
def Test(x: Future[Int], xs: List[Future[Int]])(using Scheduler): Future[Int] =
193+
def add(x: Future[Int], xs: List[Future[Int]])(using Scheduler): Future[Int] =
195194
val b = x.zip:
196195
Future:
197196
xs.headOption.toString
@@ -208,8 +207,8 @@ def Test(x: Future[Int], xs: List[Future[Int]])(using Scheduler): Future[Int] =
208207
x.value * 2
209208
x.value + xs.map(_.value).sum
210209

211-
end Test
210+
end add
212211

213212
def Main(x: Future[Int], xs: List[Future[Int]])(using Scheduler): Int =
214-
Test(x, xs).force()
213+
add(x, xs).force()
215214

tests/pos/suspend-strawman-2/scheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@ trait Scheduler:
55
def schedule(task: Runnable): Unit = ???
66

77
object Scheduler extends Scheduler:
8-
given fromAsync(using async: Async): Scheduler = async.scheduler
8+
given fromAsyncConfig(using ac: Async.Config): Scheduler = ac.scheduler
99
end Scheduler
1010

0 commit comments

Comments
 (0)