Skip to content

Commit 0ef4ab6

Browse files
committed
Reorganize channels around Sources vs ComposableSources
1 parent 4cf756c commit 0ef4ab6

File tree

3 files changed

+112
-127
lines changed

3 files changed

+112
-127
lines changed

tests/pos/suspend-strawman-2/Async.scala

Lines changed: 51 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -32,29 +32,26 @@ trait Async extends AsyncConfig:
3232

3333
object Async:
3434

35-
abstract class AsyncImpl(val root: Cancellable, val scheduler: Scheduler)
35+
abstract class Impl(val root: Cancellable, val scheduler: Scheduler)
3636
(using boundary.Label[Unit]) extends Async:
3737

3838
protected def checkCancellation(): Unit
3939

40-
private var result: T
41-
4240
def await[T](src: Async.Source[T]): T =
4341
checkCancellation()
4442
var resultOpt: Option[T] = None
45-
if src.poll: x =>
46-
result = x
43+
src.poll: x =>
44+
resultOpt = Some(x)
4745
true
48-
then result
49-
else
46+
resultOpt.getOrElse:
5047
try suspend[T, Unit]: k =>
5148
src.onComplete: x =>
5249
scheduler.schedule: () =>
5350
k.resume(x)
5451
true // signals to `src` that result `x` was consumed
5552
finally checkCancellation()
5653

57-
end AsyncImpl
54+
end Impl
5855

5956
/** The currently executing Async context */
6057
inline def current(using async: Async): Async = async
@@ -78,11 +75,13 @@ object Async:
7875
*/
7976
abstract case class FinalListener[T]() extends Listener[T]
8077

81-
/** A source that cannot be mapped, filtered, or raced. In other words,
82-
* an item coming from a direct source must be immediately consumed in
83-
* another async computation; no rejection of this item is possible.
78+
/** An asynchronous data source. Sources can be persistent or ephemeral.
79+
* A persistent source will always pass same data to calls of `poll and `onComplete`.
80+
* An ephememral source can pass new data in every call.
81+
* An example of a persistent source is `Future`.
82+
* An example of an ephemeral source is `Channel`.
8483
*/
85-
trait DirectSource[+T]:
84+
trait Source[+T]:
8685

8786
/** If data is available at present, pass it to function `k`
8887
* and return the result if this call.
@@ -104,31 +103,29 @@ object Async:
104103
*/
105104
def dropListener(k: Listener[T]): Unit
106105

107-
end DirectSource
106+
end Source
108107

109-
/** An asynchronous data source. Sources can be persistent or ephemeral.
110-
* A persistent source will always pass same data to calls of `poll and `onComplete`.
111-
* An ephememral source can pass new data in every call.
112-
* An example of a persistent source is `Future`.
113-
* An example of an ephemeral source is `Channel`.
114-
*/
115-
trait Source[+T] extends DirectSource[T]:
108+
/** A source that can be mapped, filtered, or raced. Only ComposableSources
109+
* can pass `false` to the `Listener` in `poll` or `onComplete`. They do
110+
* that if the data is rejected by a filter or did not come first in a race.
111+
*/
112+
trait ComposableSource[+T] extends Source[T]:
116113

117114
/** Pass on data transformed by `f` */
118-
def map[U](f: T => U): Source[U] =
115+
def map[U](f: T => U): ComposableSource[U] =
119116
new DerivedSource[T, U](this):
120117
def listen(x: T, k: Listener[U]) = k(f(x))
121118

122119
/** Pass on only data matching the predicate `p` */
123-
def filter(p: T => Boolean): Source[T] =
120+
def filter(p: T => Boolean): ComposableSource[T] =
124121
new DerivedSource[T, T](this):
125122
def listen(x: T, k: Listener[T]) = p(x) && k(x)
126123

127-
end Source
124+
end ComposableSource
128125

129126
/** As source that transforms an original source in some way */
130127

131-
abstract class DerivedSource[T, U](src: Source[T]) extends Source[U]:
128+
abstract class DerivedSource[T, U](src: Source[T]) extends ComposableSource[U]:
132129

133130
/** Handle a value `x` passed to the original source by possibly
134131
* invokiong the continuation for this source.
@@ -148,41 +145,42 @@ object Async:
148145
end DerivedSource
149146

150147
/** Pass first result from any of `sources` to the continuation */
151-
def race[T](sources: Source[T]*): Source[T] = new Source:
152-
153-
def poll(k: Listener[T]): Boolean =
154-
val it = sources.iterator
155-
var found = false
156-
while it.hasNext && !found do
157-
it.next.poll: x =>
158-
found = k(x)
159-
found
160-
found
161-
162-
def onComplete(k: Listener[T]): Unit =
163-
val listener = new ForwardingListener[T](this, k):
164-
var foundBefore = false
165-
def continueIfFirst(x: T): Boolean = synchronized:
166-
if foundBefore then false else { foundBefore = k(x); foundBefore }
167-
def apply(x: T): Boolean =
168-
val found = continueIfFirst(x)
169-
if found then sources.foreach(_.dropListener(this))
170-
found
171-
sources.foreach(_.onComplete(listener))
172-
173-
def dropListener(k: Listener[T]): Unit =
174-
val listener = new ForwardingListener[T](this, k):
175-
def apply(x: T): Boolean = ???
176-
// not to be called, we need the listener only for its
177-
// hashcode and equality test.
178-
sources.foreach(_.dropListener(listener))
148+
def race[T](sources: ComposableSource[T]*): ComposableSource[T] =
149+
new ComposableSource:
150+
151+
def poll(k: Listener[T]): Boolean =
152+
val it = sources.iterator
153+
var found = false
154+
while it.hasNext && !found do
155+
it.next.poll: x =>
156+
found = k(x)
157+
found
158+
found
159+
160+
def onComplete(k: Listener[T]): Unit =
161+
val listener = new ForwardingListener[T](this, k):
162+
var foundBefore = false
163+
def continueIfFirst(x: T): Boolean = synchronized:
164+
if foundBefore then false else { foundBefore = k(x); foundBefore }
165+
def apply(x: T): Boolean =
166+
val found = continueIfFirst(x)
167+
if found then sources.foreach(_.dropListener(this))
168+
found
169+
sources.foreach(_.onComplete(listener))
170+
171+
def dropListener(k: Listener[T]): Unit =
172+
val listener = new ForwardingListener[T](this, k):
173+
def apply(x: T): Boolean = ???
174+
// not to be called, we need the listener only for its
175+
// hashcode and equality test.
176+
sources.foreach(_.dropListener(listener))
179177

180178
end race
181179

182180
/** If left (respectively, right) source succeeds with `x`, pass `Left(x)`,
183181
* (respectively, Right(x)) on to the continuation.
184182
*/
185-
def either[T, U](src1: Source[T], src2: Source[U]): Source[Either[T, U]] =
183+
def either[T, U](src1: ComposableSource[T], src2: ComposableSource[U]): ComposableSource[Either[T, U]] =
186184
race[Either[T, U]](src1.map(Left(_)), src2.map(Right(_)))
187185

188186
end Async

tests/pos/suspend-strawman-2/channels.scala

Lines changed: 59 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import runtime.suspend
55
import Async.{Listener, await}
66

77
/** An unbounded channel */
8-
class UnboundedChannel[T] extends Async.Source[T]:
8+
class UnboundedChannel[T] extends Async.ComposableSource[T]:
99
private val pending = ListBuffer[T]()
1010
private val waiting = mutable.Set[Listener[T]]()
1111

@@ -45,78 +45,69 @@ class UnboundedChannel[T] extends Async.Source[T]:
4545

4646
end UnboundedChannel
4747

48-
class SyncChannel[T]:
49-
50-
private val pendingReads = mutable.Set[Listener[T]]()
51-
private val pendingSends = mutable.Set[Listener[Listener[T]]]()
52-
53-
private def collapse[T](k2: Listener[Listener[T]]): Option[T] =
54-
var r: Option[T] = None
55-
if k2 { x => r = Some(x); true } then r else None
56-
57-
protected def link[T](pending: mutable.Set[T], op: T => Boolean): Boolean =
58-
pending.iterator.find(op) match
59-
case Some(elem) => pending -= elem; true
60-
case None => false
61-
62-
val canRead = new Async.Source[T]:
63-
def poll(k: Listener[T]): Boolean =
64-
link(pendingSends, sender => collapse(sender).map(k) == Some(true))
65-
def onComplete(k: Listener[T]): Unit =
66-
if !poll(k) then pendingReads += k
67-
def dropListener(k: Listener[T]): Unit =
68-
pendingReads -= k
69-
70-
val canSend = new Async.Source[Listener[T]]:
71-
def poll(k: Listener[Listener[T]]): Boolean =
72-
link(pendingReads, k)
73-
def onComplete(k: Listener[Listener[T]]): Unit =
74-
if !poll(k) then pendingSends += k
75-
def dropListener(k: Listener[Listener[T]]): Unit =
76-
pendingSends -= k
77-
78-
def send(x: T)(using Async): Unit =
79-
await(canSend)(x)
80-
81-
def read()(using Async): T =
82-
await(canRead)
48+
trait SyncChannel[T]:
49+
def canRead: Async.Source[T]
50+
def canSend: Async.Source[Listener[T]]
51+
52+
def send(x: T)(using Async): Unit = await(canSend)(x)
53+
54+
def read()(using Async): T = await(canRead)
55+
56+
object SyncChannel:
57+
def apply[T](): SyncChannel[T] = new Impl[T]:
58+
val canRead = new ReadSource
59+
val canSend = new SendSource
60+
61+
abstract class Impl[T] extends SyncChannel[T]:
62+
protected val pendingReads = mutable.Set[Listener[T]]()
63+
protected val pendingSends = mutable.Set[Listener[Listener[T]]]()
64+
65+
protected def link[T](pending: mutable.Set[T], op: T => Boolean): Boolean =
66+
pending.headOption match
67+
case Some(elem) => op(elem); true
68+
case None => false
69+
70+
private def collapse[T](k2: Listener[Listener[T]]): Option[T] =
71+
var r: Option[T] = None
72+
if k2 { x => r = Some(x); true } then r else None
73+
74+
protected class ReadSource extends Async.Source[T]:
75+
def poll(k: Listener[T]): Boolean =
76+
link(pendingSends, sender => collapse(sender).map(k) == Some(true))
77+
def onComplete(k: Listener[T]): Unit =
78+
if !poll(k) then pendingReads += k
79+
def dropListener(k: Listener[T]): Unit =
80+
pendingReads -= k
81+
82+
protected class SendSource extends Async.Source[Listener[T]]:
83+
def poll(k: Listener[Listener[T]]): Boolean =
84+
link(pendingReads, k(_))
85+
def onComplete(k: Listener[Listener[T]]): Unit =
86+
if !poll(k) then pendingSends += k
87+
def dropListener(k: Listener[Listener[T]]): Unit =
88+
pendingSends -= k
89+
end Impl
8390

8491
end SyncChannel
8592

86-
class DirectSyncChannel[T]:
87-
88-
private val pendingReads = mutable.Set[Listener[T]]()
89-
private val pendingSends = mutable.Set[Listener[Listener[T]]]()
90-
91-
private def collapse[T](k2: Listener[Listener[T]]): Option[T] =
92-
var r: Option[T] = None
93-
if k2 { x => r = Some(x); true } then r else None
94-
95-
private def link[T](pending: mutable.Set[T], op: T => Unit): Boolean =
96-
pending.headOption match
97-
case Some(elem) => op(elem); true
98-
case None => false
93+
trait ComposableSyncChannel[T] extends SyncChannel[T]:
94+
def canRead: Async.ComposableSource[T]
95+
def canSend: Async.ComposableSource[Listener[T]]
9996

100-
val canRead = new Async.Source[T]:
101-
def poll(k: Listener[T]): Boolean =
102-
link(pendingSends, sender => collapse(sender).map(k))
103-
def onComplete(k: Listener[T]): Unit =
104-
if !poll(k) then pendingReads += k
105-
def dropListener(k: Listener[T]): Unit =
106-
pendingReads -= k
97+
object ComposableSyncChannel:
98+
def apply[T](): ComposableSyncChannel[T] = new Impl[T]:
99+
val canRead = new ComposableReadSource
100+
val canSend = new ComposableSendSource
107101

108-
val canSend = new Async.Source[Listener[T]]:
109-
def poll(k: Listener[Listener[T]]): Boolean =
110-
link(pendingReads, k(_))
111-
def onComplete(k: Listener[Listener[T]]): Unit =
112-
if !poll(k) then pendingSends += k
113-
def dropListener(k: Listener[Listener[T]]): Unit =
114-
pendingSends -= k
102+
abstract class Impl[T] extends SyncChannel.Impl[T], ComposableSyncChannel[T]:
115103

116-
def send(x: T)(using Async): Unit =
117-
await(canSend)(x)
104+
override protected def link[T](pending: mutable.Set[T], op: T => Boolean): Boolean =
105+
pending.iterator.find(op) match
106+
case Some(elem) => pending -= elem; true
107+
case None => false
118108

119-
def read()(using Async): T =
120-
await(canRead)
109+
class ComposableReadSource extends ReadSource, Async.ComposableSource[T]
110+
class ComposableSendSource extends SendSource, Async.ComposableSource[Listener[T]]
111+
end Impl
121112

122-
end DirectSyncChannel
113+
end ComposableSyncChannel

tests/pos/suspend-strawman-2/futures.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import runtime.suspend
1010

1111
/** A cancellable future that can suspend waiting for other synchronous sources
1212
*/
13-
trait Future[+T] extends Async.Source[Try[T]], Cancellable:
13+
trait Future[+T] extends Async.ComposableSource[Try[T]], Cancellable:
1414

1515
/** Wait for this future to be completed, return its value in case of success,
1616
* or rethrow exception in case of failure.
@@ -40,10 +40,6 @@ trait Future[+T] extends Async.Source[Try[T]], Cancellable:
4040

4141
object Future:
4242

43-
private enum Status:
44-
case Initial, Completed
45-
import Status.*
46-
4743
/** The core part of a future that is compled explicitly by calling its
4844
* `complete` method. There are two implementations
4945
*
@@ -118,7 +114,7 @@ object Future:
118114
// a handler for Async
119115
private def async(body: Async ?=> Unit): Unit =
120116
boundary [Unit]:
121-
given Async = new Async.AsyncImpl(this, scheduler):
117+
given Async = new Async.Impl(this, scheduler):
122118
def checkCancellation() =
123119
if hasCompleted then throw new CancellationException()
124120
body

0 commit comments

Comments
 (0)