Skip to content

Commit 4cf756c

Browse files
committed
Add derived sources and channels
1 parent 62c2f21 commit 4cf756c

File tree

4 files changed

+379
-145
lines changed

4 files changed

+379
-145
lines changed
Lines changed: 164 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,189 @@
11
package concurrent
2+
import java.util.concurrent.atomic.AtomicBoolean
3+
import scala.collection.mutable
4+
import runtime.suspend
5+
import scala.util.boundary
6+
7+
/** The underlying configuration of an async block */
8+
trait AsyncConfig:
9+
10+
/** The cancellable async source underlying this async computation */
11+
def root: Cancellable
12+
13+
/** The scheduler for runnables defined in this async computation */
14+
def scheduler: Scheduler
15+
16+
object AsyncConfig:
17+
18+
/** A toplevel async group with given scheduler and a synthetic root that
19+
* ignores cancellation requests
20+
*/
21+
given fromScheduler(using s: Scheduler): AsyncConfig with
22+
def root = Cancellable.empty
23+
def scheduler = s
24+
25+
end AsyncConfig
226

327
/** A context that allows to suspend waiting for asynchronous data sources */
4-
trait Async:
28+
trait Async extends AsyncConfig:
529

630
/** Wait for completion of async source `src` and return the result */
731
def await[T](src: Async.Source[T]): T
832

9-
/** Wait for completion of the first of the sources `src1`, `src2`
10-
* @return `Left(r1)` if `src1` completed first with `r1`
11-
* `Right(r2)` if `src2` completed first with `r2`
12-
*/
13-
def awaitEither[T1, T2](src1: Async.Source[T1], src2: Async.Source[T2]): Either[T1, T2]
33+
object Async:
1434

15-
/** The cancellable runner underlying this async computation. */
16-
def runner: Cancellable
35+
abstract class AsyncImpl(val root: Cancellable, val scheduler: Scheduler)
36+
(using boundary.Label[Unit]) extends Async:
1737

18-
/** The scheduler for runnables defined in this async computation */
19-
def scheduler: Scheduler
38+
protected def checkCancellation(): Unit
2039

21-
object Async:
40+
private var result: T
41+
42+
def await[T](src: Async.Source[T]): T =
43+
checkCancellation()
44+
var resultOpt: Option[T] = None
45+
if src.poll: x =>
46+
result = x
47+
true
48+
then result
49+
else
50+
try suspend[T, Unit]: k =>
51+
src.onComplete: x =>
52+
scheduler.schedule: () =>
53+
k.resume(x)
54+
true // signals to `src` that result `x` was consumed
55+
finally checkCancellation()
56+
57+
end AsyncImpl
2258

2359
/** The currently executing Async context */
2460
inline def current(using async: Async): Async = async
2561

26-
/** An asynchronous data source. Sources can be persistent or ephemeral.
27-
* A persistent source will always return the same data to calls to `poll`
28-
* and pass the same data to calls of `handle`. An ephemeral source might pass new
29-
* data in every call. An example of a persistent source is `Future`. An
30-
* example of an ephemeral source is `Channel`.
62+
/** Await source result in currently executing Async context */
63+
inline def await[T](src: Source[T])(using async: Async): T = async.await(src)
64+
65+
/** A function `T => Boolean` whose lineage is recorded by its implementing
66+
* classes. The Listener function accepts values of type `T` and returns
67+
* `true` iff the value was consumed by an async block.
3168
*/
32-
trait Source[+T]:
33-
thisSource =>
69+
trait Listener[-T] extends Function[T, Boolean]
3470

35-
/** Poll whether data is available
36-
* @return The data or None in an option. Depending on the nature of the
37-
* source, data might be returned only once in a poll. E.g. if
38-
* the source is a channel, a Some result might skip to the next
71+
/** A listener for values that are processed by the given source `src` and
72+
* that are demanded by the continuation listener `continue`.
73+
*/
74+
abstract case class ForwardingListener[T](src: Source[?], continue: Listener[?]) extends Listener[T]
75+
76+
/** A listener for values that are processed directly in an async block.
77+
* Closures of type `T => Boolean` can be SAM converted to this type.
78+
*/
79+
abstract case class FinalListener[T]() extends Listener[T]
80+
81+
/** A source that cannot be mapped, filtered, or raced. In other words,
82+
* an item coming from a direct source must be immediately consumed in
83+
* another async computation; no rejection of this item is possible.
84+
*/
85+
trait DirectSource[+T]:
86+
87+
/** If data is available at present, pass it to function `k`
88+
* and return the result if this call.
89+
* `k` returns true iff the data was consumed in an async block.
90+
* Calls to `poll` are always synchronous.
91+
*/
92+
def poll(k: Listener[T]): Boolean
93+
94+
/** Once data is available, pass it to function `k`.
95+
* `k` returns true iff the data was consumed in an async block.
96+
* Calls to `onComplete` are usually asynchronous, meaning that
97+
* the passed continuation `k` is a suspension.
3998
*/
40-
def poll: Option[T]
99+
def onComplete(k: Listener[T]): Unit
41100

42-
/** When data is available, pass it to function `k`.
101+
/** Signal that listener `k` is dead (i.e. will always return `false` from now on).
102+
* This permits original, (i.e. non-derived) sources like futures or channels
103+
* to drop the listener from their `waiting` sets.
43104
*/
44-
def handleWith(k: T => Unit): Unit
105+
def dropListener(k: Listener[T]): Unit
45106

46-
def map[U](f: T => U): Source[U] = new Source:
47-
def poll = thisSource.poll.map(f)
48-
def handleWith(k: U => Unit): Unit = thisSource.handleWith(f.andThen(k))
107+
end DirectSource
108+
109+
/** An asynchronous data source. Sources can be persistent or ephemeral.
110+
* A persistent source will always pass same data to calls of `poll and `onComplete`.
111+
* An ephememral source can pass new data in every call.
112+
* An example of a persistent source is `Future`.
113+
* An example of an ephemeral source is `Channel`.
114+
*/
115+
trait Source[+T] extends DirectSource[T]:
116+
117+
/** Pass on data transformed by `f` */
118+
def map[U](f: T => U): Source[U] =
119+
new DerivedSource[T, U](this):
120+
def listen(x: T, k: Listener[U]) = k(f(x))
121+
122+
/** Pass on only data matching the predicate `p` */
123+
def filter(p: T => Boolean): Source[T] =
124+
new DerivedSource[T, T](this):
125+
def listen(x: T, k: Listener[T]) = p(x) && k(x)
49126

50127
end Source
51128

129+
/** As source that transforms an original source in some way */
130+
131+
abstract class DerivedSource[T, U](src: Source[T]) extends Source[U]:
132+
133+
/** Handle a value `x` passed to the original source by possibly
134+
* invokiong the continuation for this source.
135+
*/
136+
protected def listen(x: T, k: Listener[U]): Boolean
137+
138+
private def transform(k: Listener[U]): Listener[T] =
139+
new ForwardingListener[T](this, k):
140+
def apply(x: T): Boolean = listen(x, k)
141+
142+
def poll(k: Listener[U]): Boolean =
143+
src.poll(transform(k))
144+
def onComplete(k: Listener[U]): Unit =
145+
src.onComplete(transform(k))
146+
def dropListener(k: Listener[U]): Unit =
147+
src.dropListener(transform(k))
148+
end DerivedSource
149+
150+
/** Pass first result from any of `sources` to the continuation */
151+
def race[T](sources: Source[T]*): Source[T] = new Source:
152+
153+
def poll(k: Listener[T]): Boolean =
154+
val it = sources.iterator
155+
var found = false
156+
while it.hasNext && !found do
157+
it.next.poll: x =>
158+
found = k(x)
159+
found
160+
found
161+
162+
def onComplete(k: Listener[T]): Unit =
163+
val listener = new ForwardingListener[T](this, k):
164+
var foundBefore = false
165+
def continueIfFirst(x: T): Boolean = synchronized:
166+
if foundBefore then false else { foundBefore = k(x); foundBefore }
167+
def apply(x: T): Boolean =
168+
val found = continueIfFirst(x)
169+
if found then sources.foreach(_.dropListener(this))
170+
found
171+
sources.foreach(_.onComplete(listener))
172+
173+
def dropListener(k: Listener[T]): Unit =
174+
val listener = new ForwardingListener[T](this, k):
175+
def apply(x: T): Boolean = ???
176+
// not to be called, we need the listener only for its
177+
// hashcode and equality test.
178+
sources.foreach(_.dropListener(listener))
179+
180+
end race
181+
182+
/** If left (respectively, right) source succeeds with `x`, pass `Left(x)`,
183+
* (respectively, Right(x)) on to the continuation.
184+
*/
185+
def either[T, U](src1: Source[T], src2: Source[U]): Source[Either[T, U]] =
186+
race[Either[T, U]](src1.map(Left(_)), src2.map(Right(_)))
187+
52188
end Async
53189

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
package concurrent
22

3-
/** A trait for cancellable entiries that can be grouped */
3+
/** A trait for cancellable entities that can be grouped */
44
trait Cancellable:
55

6+
/** Issue a cancel request */
67
def cancel(): Unit
78

89
/** Add a given child to this Cancellable, so that the child will be cancelled
910
* when the Cancellable itself is cancelled.
1011
*/
1112
def addChild(child: Cancellable): Unit
1213

13-
def isCancelled: Boolean
14+
object Cancellable:
15+
16+
/** A cancelled entity that ignores all `cancel` and `addChild` requests */
17+
object empty extends Cancellable:
18+
def cancel() = ()
19+
def addChild(child: Cancellable) = ()
1420

1521
end Cancellable
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package concurrent
2+
import scala.collection.mutable, mutable.ListBuffer
3+
import scala.util.boundary.Label
4+
import runtime.suspend
5+
import Async.{Listener, await}
6+
7+
/** An unbounded channel */
8+
class UnboundedChannel[T] extends Async.Source[T]:
9+
private val pending = ListBuffer[T]()
10+
private val waiting = mutable.Set[Listener[T]]()
11+
12+
private def drainWaiting(x: T): Boolean =
13+
val it = waiting.iterator
14+
var sent = false
15+
while it.hasNext && !sent do
16+
val k = it.next()
17+
sent = k(x)
18+
if sent then waiting -= k
19+
sent
20+
21+
private def drainPending(k: Listener[T]): Boolean =
22+
val sent = pending.nonEmpty && k(pending.head)
23+
if sent then
24+
while
25+
pending.dropInPlace(1)
26+
pending.nonEmpty && drainWaiting(pending.head)
27+
do ()
28+
sent
29+
30+
def read()(using Async): T = synchronized:
31+
await(this)
32+
33+
def send(x: T): Unit = synchronized:
34+
val sent = pending.isEmpty && drainWaiting(x)
35+
if !sent then pending += x
36+
37+
def poll(k: Listener[T]): Boolean = synchronized:
38+
drainPending(k)
39+
40+
def onComplete(k: Listener[T]): Unit = synchronized:
41+
if !drainPending(k) then waiting += k
42+
43+
def dropListener(k: Listener[T]): Unit = synchronized:
44+
waiting -= k
45+
46+
end UnboundedChannel
47+
48+
class SyncChannel[T]:
49+
50+
private val pendingReads = mutable.Set[Listener[T]]()
51+
private val pendingSends = mutable.Set[Listener[Listener[T]]]()
52+
53+
private def collapse[T](k2: Listener[Listener[T]]): Option[T] =
54+
var r: Option[T] = None
55+
if k2 { x => r = Some(x); true } then r else None
56+
57+
protected def link[T](pending: mutable.Set[T], op: T => Boolean): Boolean =
58+
pending.iterator.find(op) match
59+
case Some(elem) => pending -= elem; true
60+
case None => false
61+
62+
val canRead = new Async.Source[T]:
63+
def poll(k: Listener[T]): Boolean =
64+
link(pendingSends, sender => collapse(sender).map(k) == Some(true))
65+
def onComplete(k: Listener[T]): Unit =
66+
if !poll(k) then pendingReads += k
67+
def dropListener(k: Listener[T]): Unit =
68+
pendingReads -= k
69+
70+
val canSend = new Async.Source[Listener[T]]:
71+
def poll(k: Listener[Listener[T]]): Boolean =
72+
link(pendingReads, k)
73+
def onComplete(k: Listener[Listener[T]]): Unit =
74+
if !poll(k) then pendingSends += k
75+
def dropListener(k: Listener[Listener[T]]): Unit =
76+
pendingSends -= k
77+
78+
def send(x: T)(using Async): Unit =
79+
await(canSend)(x)
80+
81+
def read()(using Async): T =
82+
await(canRead)
83+
84+
end SyncChannel
85+
86+
class DirectSyncChannel[T]:
87+
88+
private val pendingReads = mutable.Set[Listener[T]]()
89+
private val pendingSends = mutable.Set[Listener[Listener[T]]]()
90+
91+
private def collapse[T](k2: Listener[Listener[T]]): Option[T] =
92+
var r: Option[T] = None
93+
if k2 { x => r = Some(x); true } then r else None
94+
95+
private def link[T](pending: mutable.Set[T], op: T => Unit): Boolean =
96+
pending.headOption match
97+
case Some(elem) => op(elem); true
98+
case None => false
99+
100+
val canRead = new Async.Source[T]:
101+
def poll(k: Listener[T]): Boolean =
102+
link(pendingSends, sender => collapse(sender).map(k))
103+
def onComplete(k: Listener[T]): Unit =
104+
if !poll(k) then pendingReads += k
105+
def dropListener(k: Listener[T]): Unit =
106+
pendingReads -= k
107+
108+
val canSend = new Async.Source[Listener[T]]:
109+
def poll(k: Listener[Listener[T]]): Boolean =
110+
link(pendingReads, k(_))
111+
def onComplete(k: Listener[Listener[T]]): Unit =
112+
if !poll(k) then pendingSends += k
113+
def dropListener(k: Listener[Listener[T]]): Unit =
114+
pendingSends -= k
115+
116+
def send(x: T)(using Async): Unit =
117+
await(canSend)(x)
118+
119+
def read()(using Async): T =
120+
await(canRead)
121+
122+
end DirectSyncChannel

0 commit comments

Comments
 (0)