Skip to content

Commit ad18fcf

Browse files
committed
Drop distinction between filterable and normal sync channels
Filterable sync channels don't require significantly more complexity than normal channels. The only thing a normal channel could do that a filterable channel could not, was to discard a reader and sender immediately if there was no match. If there are no filters on the source, then the only way a source could not match is by losing a race. But then the source will be removed anyway by the `race` method. So the only improvement would be had if there was a data race condition where the removal by race happens after a second channel was triggered.
1 parent e1c60a4 commit ad18fcf

File tree

4 files changed

+91
-126
lines changed

4 files changed

+91
-126
lines changed

tests/pos/suspend-strawman-1/futures.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,3 @@ object Future:
4242
def Test(x: Future[Int], xs: List[Future[Int]]) =
4343
Future:
4444
x.await + xs.map(_.await).sum
45-
46-
47-
48-
49-
50-
51-

tests/pos/suspend-strawman-2/Async.scala

Lines changed: 44 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -4,43 +4,52 @@ import scala.collection.mutable
44
import runtime.suspend
55
import scala.util.boundary
66

7-
/** The underlying configuration of an async block */
8-
trait AsyncConfig:
9-
10-
/** The cancellable async source underlying this async computation */
11-
def root: Cancellable
12-
13-
/** The scheduler for runnables defined in this async computation */
14-
def scheduler: Scheduler
15-
16-
object AsyncConfig:
17-
18-
/** A toplevel async group with given scheduler and a synthetic root that
19-
* ignores cancellation requests
20-
*/
21-
given fromScheduler(using s: Scheduler): AsyncConfig with
22-
def root = Cancellable.empty
23-
def scheduler = s
24-
25-
end AsyncConfig
26-
27-
/** A context that allows to suspend waiting for asynchronous data sources */
28-
trait Async extends AsyncConfig:
7+
/** A context that allows to suspend waiting for asynchronous data sources
8+
*/
9+
trait Async extends Async.Config:
2910

3011
/** Wait for completion of async source `src` and return the result */
3112
def await[T](src: Async.Source[T]): T
3213

3314
object Async:
3415

35-
/** A marker type for Source#CanFilter */
36-
opaque type Yes = Unit
16+
/** The underlying configuration of an async block */
17+
trait Config:
18+
19+
/** The cancellable async source underlying this async computation */
20+
def root: Cancellable
21+
22+
/** The scheduler for runnables defined in this async computation */
23+
def scheduler: Scheduler
3724

25+
object Config:
26+
27+
/** A toplevel async group with given scheduler and a synthetic root that
28+
* ignores cancellation requests
29+
*/
30+
given fromScheduler(using s: Scheduler): AsyncConfig with
31+
def root = Cancellable.empty
32+
def scheduler = s
33+
34+
end Config
35+
36+
/** A possible implementation of Async. Defines an `await` method based
37+
* on a method to check for cancellation that needs to be implemented by
38+
* subclasses.
39+
*
40+
* @param root the root of the Async's config
41+
* @param scheduler the scheduler of the Async's config
42+
* @param label the label of the boundary that defines the representedd async block
43+
*/
3844
abstract class Impl(val root: Cancellable, val scheduler: Scheduler)
39-
(using boundary.Label[Unit]) extends Async:
45+
(using label: boundary.Label[Unit]) extends Async:
4046

4147
protected def checkCancellation(): Unit
4248

43-
def await[T](src: Async.Source[T]): T =
49+
/** Await a source first by polling it, and, if that fails, by suspending
50+
* in a onComplete call.
51+
*/
52+
def await[T](src: Source[T]): T =
4453
checkCancellation()
4554
var resultOpt: Option[T] = None
4655
src.poll: x =>
@@ -76,7 +85,7 @@ object Async:
7685
/** A listener for values that are processed directly in an async block.
7786
* Closures of type `T => Boolean` can be SAM converted to this type.
7887
*/
79-
abstract case class FinalListener[T]() extends Listener[T]
88+
abstract case class FinalListener[T](apply: T => Boolean) extends Listener[T]
8089

8190
/** An asynchronous data source. Sources can be persistent or ephemeral.
8291
* A persistent source will always pass same data to calls of `poll and `onComplete`.
@@ -86,10 +95,8 @@ object Async:
8695
*/
8796
trait Source[+T]:
8897

89-
type CanFilter
90-
9198
/** If data is available at present, pass it to function `k`
92-
* and return the result if this call.
99+
* and return the result of this call.
93100
* `k` returns true iff the data was consumed in an async block.
94101
* Calls to `poll` are always synchronous.
95102
*/
@@ -104,14 +111,13 @@ object Async:
104111

105112
/** Signal that listener `k` is dead (i.e. will always return `false` from now on).
106113
* This permits original, (i.e. non-derived) sources like futures or channels
107-
* to drop the listener from their `waiting` sets.
114+
* to drop the listener from their waiting sets.
108115
*/
109116
def dropListener(k: Listener[T]): Unit
110117

111118
end Source
112119

113-
/** As source that transforms an original source in some way */
114-
120+
/** A source that transforms an original source in some way */
115121
abstract class DerivedSource[T, U](val original: Source[T]) extends Source[U]:
116122

117123
/** Handle a value `x` passed to the original source by possibly
@@ -134,24 +140,18 @@ object Async:
134140
extension [T](src: Source[T])
135141

136142
/** Pass on data transformed by `f` */
137-
def map[U](f: T => U): Source[U] { type CanFilter = src.CanFilter } =
143+
def map[U](f: T => U): Source[U] =
138144
new DerivedSource[T, U](src):
139-
type CanFilter = src.CanFilter
140145
def listen(x: T, k: Listener[U]) = k(f(x))
141146

142-
extension [T](src: Source[T] { type CanFilter = Yes })
143-
144147
/** Pass on only data matching the predicate `p` */
145-
def filter(p: T => Boolean): Source[T] { type CanFilter = src.CanFilter } =
148+
def filter(p: T => Boolean): Source[T] =
146149
new DerivedSource[T, T](src):
147-
type CanFilter = src.CanFilter
148150
def listen(x: T, k: Listener[T]) = p(x) && k(x)
149151

150-
151152
/** Pass first result from any of `sources` to the continuation */
152-
def race[T, CF](sources: Source[T] { type CanFilter <: CF} *): Source[T] { type CanFilter <: CF } =
153+
def race[T](sources: Source[T]*): Source[T] =
153154
new Source[T]:
154-
type CanFilter <: CF
155155

156156
def poll(k: Listener[T]): Boolean =
157157
val it = sources.iterator
@@ -185,11 +185,8 @@ object Async:
185185
/** If left (respectively, right) source succeeds with `x`, pass `Left(x)`,
186186
* (respectively, Right(x)) on to the continuation.
187187
*/
188-
def either[T, U, CF](
189-
src1: Source[T] { type CanFilter <: CF },
190-
src2: Source[U] { type CanFilter <: CF })
191-
: Source[Either[T, U]] { type CanFilter <: CF } =
192-
race[Either[T, U], CF](src1.map(Left(_)), src2.map(Right(_)))
188+
def either[T, U](src1: Source[T], src2: Source[U]): Source[Either[T, U]] =
189+
race[Either[T, U]](src1.map(Left(_)), src2.map(Right(_)))
193190

194191
end Async
195192

tests/pos/suspend-strawman-2/channels.scala

Lines changed: 23 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,20 @@ package concurrent
22
import scala.collection.mutable, mutable.ListBuffer
33
import scala.util.boundary.Label
44
import runtime.suspend
5-
import Async.{Listener, await, Yes}
5+
import Async.{Listener, await}
66

7-
/** An unbounded channel
8-
* Unbounded channels are composable async sources.
7+
/** An unbounded asynchronous channel. Senders do not wait for matching
8+
* readers.
99
*/
1010
class UnboundedChannel[T] extends Async.Source[T]:
11-
type CanFilter = Yes
1211

1312
private val pending = ListBuffer[T]()
1413
private val waiting = mutable.Set[Listener[T]]()
1514

1615
private def drainWaiting(x: T): Boolean =
17-
val it = waiting.iterator
18-
var sent = false
19-
while it.hasNext && !sent do
20-
val k = it.next()
21-
sent = k(x)
22-
if sent then waiting -= k
23-
sent
16+
waiting.iterator.find(_(x)) match
17+
case Some(k) => waiting -= k; true
18+
case None => false
2419

2520
private def drainPending(k: Listener[T]): Boolean =
2621
val sent = pending.nonEmpty && k(pending.head)
@@ -50,92 +45,59 @@ class UnboundedChannel[T] extends Async.Source[T]:
5045
end UnboundedChannel
5146

5247
/** An unbuffered, synchronous channel. Senders and readers both block
53-
* until a communication between them happens.
54-
* The channel provides two async sources, one for reading and one for
55-
* sending. The two sources are not composable. This allows a simple
56-
* implementation strategy where at each point either some senders
57-
* are waiting for matching readers, or some readers are waiting for matching
58-
* senders, or the channel is idle, i.e. there are no waiting readers or senders.
59-
* If a send operation encounters some waiting readers, or a read operation
60-
* encounters some waiting sender the data is transmitted directly. Otherwise
61-
* we add the operation to the corresponding waiting pending set.
48+
* until a communication between them happens. The channel provides two
49+
* async sources, one for reading and one for sending. If a send operation
50+
* encounters some waiting readers, or a read operation encounters some
51+
* waiting sender the data is transmitted directly. Otherwise we add
52+
* the operation to the corresponding pending set.
6253
*/
6354
trait SyncChannel[T]:
64-
thisCannel =>
65-
66-
type CanFilter
6755

68-
val canRead: Async.Source[T] { type CanFilter = thisCannel.CanFilter }
69-
val canSend: Async.Source[Listener[T]] { type CanFilter = thisCannel.CanFilter }
56+
val canRead: Async.Source[T]
57+
val canSend: Async.Source[Listener[T]]
7058

7159
def send(x: T)(using Async): Unit = await(canSend)(x)
7260

7361
def read()(using Async): T = await(canRead)
7462

7563
object SyncChannel:
76-
def apply[T](): SyncChannel[T] = Impl[T]()
7764

78-
class Impl[T] extends SyncChannel[T]:
65+
def apply[T](): SyncChannel[T] = new SyncChannel[T]:
7966

8067
private val pendingReads = mutable.Set[Listener[T]]()
8168
private val pendingSends = mutable.Set[Listener[Listener[T]]]()
8269

83-
protected def link[T](pending: mutable.Set[T], op: T => Boolean): Boolean =
84-
pending.headOption match
85-
case Some(elem) =>
86-
val ok = op(elem)
87-
if !ok then
88-
// Since sources are not filterable, we can be here only if a race
89-
// was lost and the entry was not yet removed. In that case, remove
90-
// it here.
91-
pending -= pending.head
92-
link(pending, op)
93-
ok
70+
private def link[T](pending: mutable.Set[T], op: T => Boolean): Boolean =
71+
// Since sources are filterable, we have to match all pending readers or writers
72+
// against the incoming request
73+
pending.iterator.find(op) match
74+
case Some(elem) => pending -= elem; true
9475
case None => false
9576

9677
private def collapse[T](k2: Listener[Listener[T]]): Option[T] =
9778
var r: Option[T] = None
9879
if k2 { x => r = Some(x); true } then r else None
9980

100-
private class ReadSource extends Async.Source[T]:
101-
type CanFilter = Impl.this.CanFilter
81+
val canRead = new Async.Source[T]:
10282
def poll(k: Listener[T]): Boolean =
10383
link(pendingSends, sender => collapse(sender).map(k) == Some(true))
10484
def onComplete(k: Listener[T]): Unit =
10585
if !poll(k) then pendingReads += k
10686
def dropListener(k: Listener[T]): Unit =
10787
pendingReads -= k
10888

109-
private class SendSource extends Async.Source[Listener[T]]:
110-
type CanFilter = Impl.this.CanFilter
89+
val canSend = new Async.Source[Listener[T]]:
11190
def poll(k: Listener[Listener[T]]): Boolean =
11291
link(pendingReads, k(_))
11392
def onComplete(k: Listener[Listener[T]]): Unit =
11493
if !poll(k) then pendingSends += k
11594
def dropListener(k: Listener[Listener[T]]): Unit =
11695
pendingSends -= k
11796

118-
val canRead = new ReadSource
119-
val canSend = new SendSource
120-
end Impl
12197
end SyncChannel
12298

123-
object FilterableSyncChannel:
124-
def apply[T](): SyncChannel[T] { type CanFilter = Yes } = Impl[T]()
125-
126-
class Impl[T] extends SyncChannel.Impl[T]:
127-
type CanFilter = Yes
128-
override protected def link[T](pending: mutable.Set[T], op: T => Boolean): Boolean =
129-
// Since sources are filterable, we have to match all pending readers or writers
130-
// against the incoming request
131-
pending.iterator.find(op) match
132-
case Some(elem) => pending -= elem; true
133-
case None => false
134-
135-
end FilterableSyncChannel
136-
13799
def TestRace =
138-
val c1, c2 = FilterableSyncChannel[Int]()
100+
val c1, c2 = SyncChannel[Int]()
139101
val s = c1.canSend
140102
val c3 = Async.race(c1.canRead, c2.canRead)
141103
val c4 = c3.filter(_ >= 0)
@@ -148,9 +110,7 @@ def TestRace =
148110
case Right(x) => x
149111
.filter(_ >= 0)
150112

151-
//val d3bad = d1.filter(_ >= 0)
152-
val d5 = Async.either(c1.canRead, d2)
113+
val d5 = Async.either(c1.canRead, d2)
153114
.map:
154115
case Left(x) => -x
155116
case Right(x) => x
156-
//val d6bad = d5.filter(_ >= 0)

0 commit comments

Comments
 (0)