Skip to content

Commit 27b766b

Browse files
committed
WIP implemention of reifying nested streams for zip
1 parent 3231eb7 commit 27b766b

File tree

1 file changed

+88
-1
lines changed

1 file changed

+88
-1
lines changed

tests/run-with-compiler-custom-args/staged-streams_1.scala

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,8 +315,95 @@ object Test {
315315
}
316316
}
317317

318+
/**
319+
*
320+
* @param stream
321+
* @tparam A
322+
* @return
323+
*/
318324
private def makeLinear[A](stream: StagedStream[A]): StagedStream[A] = {
319-
???
325+
stream match {
326+
case Linear(producer) => stream
327+
case Nested(producer, nestedf) => {
328+
329+
/** Helper function that orchestrates the handling of the function that represents an `advance: Unit => Unit`.
330+
* It reifies a nested stream as calls to `advance`. Advance encodes the step function of each nested stream.
331+
* It is used in the init of a producer of a nested stream. When an inner stream finishes, the
332+
* `currentAdvance` holds the function to the `advance` function of the earlier stream.
333+
* `makeAdvanceFunction`, for each nested stream, installs a new `advance` function that after
334+
* the stream finishes it will restore the earlier one.
335+
*
336+
* When `advance` is called the result is consumed in the continuation. Within this continuation
337+
* the resulting value should be saved in a variable.
338+
*
339+
* @param currentAdvance variable that holds a function that represents the stream at each level.
340+
* @param k the continuation that consumes a variable.
341+
* @return the quote of the orchestrated code that will be executed as
342+
*/
343+
def makeAdvanceFunction[A](currentAdvance: Var[Unit => Unit], k: A => Expr[Unit], stream: StagedStream[A]): Expr[Unit] = {
344+
stream match {
345+
case Linear(producer) =>
346+
producer.card match {
347+
case AtMost1 => producer.init(st => '{
348+
if(~producer.hasNext(st)) {
349+
~producer.step(st, k)
350+
}
351+
})
352+
case Many => producer.init(st => '{
353+
val oldAdvance : Unit => Unit = ~currentAdvance.get
354+
val newAdvance : Unit => Unit = { _: Unit => {
355+
if(~producer.hasNext(st)) {
356+
~producer.step(st, k)
357+
}
358+
else {
359+
~currentAdvance.update('{oldAdvance})
360+
}
361+
~currentAdvance.update('{newAdvance})
362+
newAdvance(_)
363+
}}
364+
})
365+
}
366+
case nested: Nested[A, bt] =>
367+
makeAdvanceFunction(currentAdvance, (a: bt) => makeAdvanceFunction(currentAdvance, k, nested.nestedf(a)), Linear(nested.producer))
368+
}
369+
}
370+
371+
val newProducer = new Producer[A] {
372+
// _1: if the stream has ended,
373+
// _2: the current element,
374+
// _3: the step of the inner most steam
375+
type St = (Var[Boolean], Var[A], Var[Unit => Unit])
376+
val card: Cardinality = Many
377+
378+
def init(k: St => Expr[Unit]): Expr[Unit] = {
379+
producer.init(st => '{
380+
Var('{ (_: Unit) => ()}){ advf => {
381+
Var('{ true }) { hasNext => {
382+
// Var('{ null.asInstanceOf[A] }) { curr => {
383+
// def adv() = {
384+
// ~hasNext.update(producer.hasNext(st))
385+
// if(~hasNext.get) {
386+
// ~producer.step(st, el => '{
387+
// ~makeAdvanceFunction(advf, ((a: A) => curr.update('{a})), nestedf(el))
388+
// })
389+
// }
390+
// }
391+
// ~k((flag, current, advf))
392+
???
393+
// }}
394+
}}
395+
}}
396+
})
397+
}
398+
399+
def step(st: St, k: A => Expr[Unit]): Expr[Unit] = ???
400+
401+
def hasNext(st: St): Expr[Boolean] = ???
402+
}
403+
404+
Linear(newProducer)
405+
}
406+
}
320407
}
321408

322409
private def pushLinear[A, B, C](producer: Producer[A], nestedProducer: Producer[B], nestedf: (B => StagedStream[C])): StagedStream[(A, C)] = {

0 commit comments

Comments
 (0)