Skip to content

Commit 3c058b4

Browse files
committed
Introduce Task abstraction
Tasks are Futures that have not started running yet. A task can be turned into a future by calling its `run` method. Futures are now always eager, so `Future.start()` was dropped. Tasks can be composed with `par` for parallel computations of pairs and `alt` for alternative computations in case of errors.
1 parent a448793 commit 3c058b4

File tree

1 file changed

+36
-55
lines changed

1 file changed

+36
-55
lines changed

tests/pos/suspend-strawman-2/futures.scala

Lines changed: 36 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,15 @@ import scala.util.boundary, boundary.Label
55
import scala.compiletime.uninitialized
66
import scala.util.{Try, Success, Failure}
77
import java.util.concurrent.atomic.AtomicBoolean
8-
import runtime.*
8+
import runtime.suspend
9+
10+
/** A hypothetical task scheduler trait */
11+
trait Scheduler:
12+
def schedule(task: Runnable): Unit = ???
13+
14+
object Scheduler extends Scheduler:
15+
given fromAsync(using async: Async): Scheduler = async.client.scheduler
16+
end Scheduler
917

1018
trait Async:
1119

@@ -24,15 +32,16 @@ trait Async:
2432
/** The future computed by this async computation. */
2533
def client: Future[?]
2634

35+
object Async:
36+
inline def current(using async: Async): Async = async
2737
end Async
2838

29-
class Future[+T](body: Async ?=> T):
39+
class Future[+T](body: Async ?=> T)(using val scheduler: Scheduler):
3040
import Future.{Status, Cancellation}, Status.*
3141

32-
@volatile private var status: Status = Initial
42+
@volatile private var status: Status = Started
3343
private var result: Try[T] = uninitialized
3444
private var waiting: ListBuffer[Try[T] => Unit] = ListBuffer()
35-
private var scheduler: Scheduler = uninitialized
3645
private var children: mutable.Set[Future[?]] = mutable.Set()
3746

3847
private def addWaiting(k: Try[T] => Unit): Unit = synchronized:
@@ -63,9 +72,6 @@ class Future[+T](body: Async ?=> T):
6372
given Async with
6473

6574
private def resultOption[T](f: Future[T]): Option[Try[T]] = f.status match
66-
case Initial =>
67-
f.ensureStarted()(using scheduler)
68-
resultOption(f)
6975
case Started =>
7076
None
7177
case Completed =>
@@ -110,30 +116,13 @@ class Future[+T](body: Async ?=> T):
110116

111117
private def complete(): Unit =
112118
async:
113-
val result =
119+
result =
114120
try Success(body)
115121
catch case ex: Exception => Failure(ex)
116122
status = Completed
117-
for task <- currentWaiting() do task(result)
118-
cancelChildren()
119-
notifyAll()
120-
121-
/** Ensure future's execution has started */
122-
def ensureStarted()(using scheduler: Scheduler): this.type =
123-
synchronized:
124-
if status == Initial then start()
125-
this
126-
127-
/** Start future's execution
128-
* @pre future has not yet started
129-
*/
130-
def start()(using scheduler: Scheduler): this.type =
131-
synchronized:
132-
assert(status == Initial)
133-
this.scheduler = scheduler
134-
scheduler.schedule(() => complete())
135-
status = Started
136-
this
123+
for task <- currentWaiting() do task(result)
124+
cancelChildren()
125+
notifyAll()
137126

138127
/** Links the future as a child to the current async client.
139128
* This means the future will be cancelled when the async client
@@ -162,52 +151,44 @@ class Future[+T](body: Async ?=> T):
162151
while status != Completed do wait()
163152
result.get
164153

165-
object Future:
166-
167-
class Cancellation extends Exception
154+
scheduler.schedule(() => complete())
155+
end Future
168156

157+
object Future:
169158
enum Status:
170159
// Transitions always go left to right.
171160
// Cancelled --> Completed with Failure(Cancellation()) result
172-
case Initial, Started, Cancelled, Completed
161+
case Started, Cancelled, Completed
173162

174-
/** Construct a future and start it so that ion runs in parallel with the
175-
* current thread.
176-
*/
177-
def spawn[T](body: Async ?=> T)(using Scheduler): Future[T] =
178-
Future(body).start()
163+
class Cancellation extends Exception
164+
end Future
179165

180-
/** The conjuntion of two futures with given bodies `body1` and `body2`.
181-
* If both futures succeed, suceed with their values in a pair. Otherwise,
182-
* fail with the failure that was returned first.
183-
*/
184-
def both[T1, T2](body1: Async ?=> T1, body2: Async ?=> T2): Future[(T1, T2)] =
185-
Future: async ?=>
186-
val f1 = Future(body1).linked
187-
val f2 = Future(body2).linked
166+
class Task[+T](val body: Async ?=> T):
167+
def run(using Scheduler): Future[T] = Future(body)
168+
169+
def par[U](other: Task[U]): Task[(T, U)] =
170+
Task: async ?=>
171+
val f1 = Future(this.body).linked
172+
val f2 = Future(other.body).linked
188173
async.awaitEither(f1, f2) match
189174
case Left(Success(x1)) => (x1, f2.value)
190175
case Right(Success(x2)) => (f1.value, x2)
191176
case Left(Failure(ex)) => throw ex
192177
case Right(Failure(ex)) => throw ex
193178

194-
/** The disjuntion of two futures with given bodies `body1` and `body2`.
195-
* If either future succeeds, suceed with the success that was returned first.
196-
* Otherwise, fail with the failure that was returned last.
197-
*/
198-
def either[T](body1: Async ?=> T, body2: Async ?=> T): Future[T] =
199-
Future: async ?=>
200-
val f1 = Future(body1).linked
201-
val f2 = Future(body2).linked
179+
def alt[U >: T](other: Task[U]): Task[U] =
180+
Task: async ?=>
181+
val f1 = Future(this.body).linked
182+
val f2 = Future(other.body).linked
202183
async.awaitEither(f1, f2) match
203184
case Left(Success(x1)) => x1
204185
case Right(Success(x2)) => x2
205186
case Left(_: Failure[?]) => f2.value
206187
case Right(_: Failure[?]) => f1.value
207-
end Future
188+
end Task
208189

209190
def Test(x: Future[Int], xs: List[Future[Int]])(using Scheduler): Future[Int] =
210-
Future.spawn:
191+
Future:
211192
x.value + xs.map(_.value).sum
212193

213194
def Main(x: Future[Int], xs: List[Future[Int]])(using Scheduler): Int =

0 commit comments

Comments
 (0)