Skip to content

Support async iterables #256

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

kimamula
Copy link

@kimamula kimamula commented Jun 7, 2020

Resolves #251

I gave up supporting non-async iterables since there are JavaScript built-in iterables (such as strings and arrays and array-like objects) that we do not want to handle in this way.

@kimamula
Copy link
Author

kimamula commented Jun 7, 2020

Test fails since async iterables are not supported by Node.js 8.

A workaround I can think of is to skip test cases for async iterables conditionally, e.g.:

if (Number(/^v(\d+)/.exec(process.version)![1]) >= 10) {
  // test cases for async iterables
}

However, at least unit testing on Node.js 10 should be also added to CI then.

@kimamula kimamula force-pushed the feature/251-support-async-generator-functions branch from 8af3350 to c9bbab9 Compare June 7, 2020 12:57
@andywer
Copy link
Owner

andywer commented Jun 7, 2020

Great stuff, will check it out! Thanks for sharing 🙂

However, at least unit testing on Node.js 10 should be also added to CI then.

The test matrix is currently: node 8 on linux, node latest on linux, node 8 on windows, node latest on windows 😉

@andywer
Copy link
Owner

andywer commented Jun 9, 2020

Sorry, but I need a bit more time for this. I gave it a first quick review and it looks like a good start, but the code isn't really straight forward, so let's re-iterate 🙂

My main concern right now is the mixinAsyncIterableIterator() function, as it introduces a noticeable overhead applied to every thread call (even if the called function does not return an async iterator).
It also quite hard to read and should probably be split into two parts: One for recognizing/handling an async iterator result + wrapping the observable promise in an async iterator "costume".

Need to think about it some more, but my best idea so far is to extend the ObservablePromise class once more (even though I am usually not a proponent of class-based inheritance in JS/TS at all). This could reduce garbage collection and on-call runtime overhead by quite a bit, I think.

@kimamula
Copy link
Author

kimamula commented Jun 9, 2020

Thanks for your time and consideration.

I understand your concern.
Another solution would be returning Promise<AsyncIterable> instead of AsyncIterable so that the wrapping process can be delayed until receiving WorkerJobStartMessage whose type is asyncIterable. This behavior is in line with how threads.js works for ordinary functions.
However, it is likely this also requires significant modifications of ObservablePromise implementation.

@andywer
Copy link
Owner

andywer commented Jun 9, 2020

Your idea to asynchronously return the async iterable sounds pretty interesting!

Maybe not 100% as neat, but we can probably pull that of with way less code. Just need to extend the default serializer, I think.

I say we try that out and see what it looks like :)

@kimamula
Copy link
Author

kimamula commented Jun 18, 2020

I looked into how serializers work but have no idea how I can use it to implement this feature. AsyncIterable should be created when the main thread receives WorkerJobStartMessage which does not have any payload and would look weird if deserialize() is executed on it.

My idea is to create a class which implements the async iterator/iterable protocol and use it in invocation-proxy.ts like this.

function createObservableForJob<ResultType>(worker: WorkerType, jobUID: number): Observable<ResultType> {
  return new Observable(observer => {
    let asyncType: WorkerJobStartMessage["resultType"] | undefined

    const messageHandler = ((event: MessageEvent) => {
      debugMessages("Message from worker:", event.data)
      if (!event.data || event.data.uid !== jobUID) return

+     let asyncIterableWrapper: AsyncIterableWrapper;
      if (isJobStartMessage(event.data)) {
        asyncType = event.data.resultType
+       if (asyncType === "asyncIterable") {
+         asyncIterableWrapper = new AsyncIterableWrapper(worker, jobUID)
+         observer.next(asyncIterableWrapper)
+         observer.complete()
+       }
      } else if (isJobResultMessage(event.data)) {
+       if (asyncIterableWrapper) {
+         asyncIterableWrapper.yield(deserialize(event.data.payload))
+         if (event.data.complete) {
+           worker.removeEventListener("message", messageHandler)
+         }
-       if (asyncType === "promise") {
+       } else if (asyncType === "promise") {
          // ...

What do you think?

@andywer
Copy link
Owner

andywer commented Jun 18, 2020

You are still aiming for a usage like const iterable = myWorker(), but I think this will definitely lead to major headaches and additional runtime overhead as I outlined before.

I think our best bet, even though not 100% elegant, is to aim for using it as const iterable = await myWorker(). This way we don't need to know if the function is gonna return an async iterable before the first result message is in.

This also means that we don't need to overload the ObservablePromise even further to act like an async iterable as well.

I will try to propose some code in the next few days.

@kimamula
Copy link
Author

Well, I believe that my code above fulfills all of your requirements:

  1. It is intended to be used as const iterable = await myWorker()
  2. It does not introduce any overhead if the function does not return an async iterable as it creates AsyncIterableWrapper instance only when the resultType is "asyncIterable".
  3. It does not require any modification of ObservablePromise class and its instance.

However, I'd love to see your solution.

@andywer
Copy link
Owner

andywer commented Jun 19, 2020

Just started poking the thing myself a bit and realized that my serializer-based proposal wouldn't work, because right now there is no way to allow the master thread to call-back the iterator in the worker.

As I believe this approach would lead to much better maintainable code and there is an active discussion around supporting callbacks, anyway (#145), I am now trying to tackle the callbacks now, so I can give the serializer approach a shot then.

@kimamula
Copy link
Author

I am not 100% sure how Callback is to be implemented, but I guess it would serialize functions with Function.prototype.toString() and deserialize them with Function()? Then, I'm afraid that it cannot be used under Content Security Policy without 'unsafe-eval' (see https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Security-Policy/script-src#Unsafe_eval_expressions).
I think it'd be better not to add such restriction to this feature.

@andywer
Copy link
Owner

andywer commented Jun 20, 2020

I guess there is multiple ways how to play this. I am aiming for a generalization of the logic that let’s you call expose()-ed worker functions.

It’s a major change to the code base, but it will allow you to dynamically make functions callable from other threadS and it will allow the master thread to expose functions to the worker.

So instead of serializing the function itself, it will rather expose() them under a unique ID and send this ID to the other thread, so that the function is still run on the thread where its code resides and the other thread can call it arbitrarily.

What do you think? (I came pretty far, but am not done yet)

@kimamula
Copy link
Author

kimamula commented Jun 20, 2020

Not depending on function serialization is great, and then probably the following limitations described in #145 do not exist any more, right?

  • Cannot access any variable declared outside the functions scope
  • Cannot import or require() other modules in the function (at least not in the browser when bundling)

I have been thinking the async iterable pattern as a way of bidirectional communication between main and worker threads.
Though I'm still not sure how exactly callbacks work, achieving the async iterable pattern by using some lower level API for bidirectional communication sounds natural.

@andywer
Copy link
Owner

andywer commented Jun 21, 2020

and then probably the following limitations described in #145 do not exist any more, right?

Exactly!

achieving the async iterable pattern by using some lower level API for bidirectional communication sounds natural

I totally get your stand point, but I have to disagree, nevertheless. It's subtle, but the main reason for me is that you either have to overload the ObservablePromise even further which might easily turn into a bottom-less pit over time.
Or we live with the additional await in iterator = await generatorWorker() (as you suggested), but then we have to extend the built-in code for the different result types, too, but in a rather inconsistent manner – we have to put the additional code nested into the if (result is rather promise than observable).

Being able to solve generator support by only extending the serializer looks like a cleaner and better maintainable solution (which makes me sleep much better at night 😉) as that logic can be easily extended in a modular fashion.


I was able to put something together on a feature branch: feature/callbacks

It turned out to be a major refactoring of the whole code base, but once the callbacks were implemented, adding support for generators was pretty straight forward: 300342e

@kimamula
Copy link
Author

It turned out to be a major refactoring of the whole code base, but once the callbacks were implemented, adding support for generators was pretty straight forward: 300342e

Thank you for sharing! That looks great.

I totally get your stand point, but I have to disagree, nevertheless.

I'm sorry that probably my comment was misleading. I actually expected something like your implementation in 300342e: supporting iterators via a serializer that uses new Callback API.
Now I close this PR.

My minor concern is isIterator function, though.

export const isIterator = (thing: any): thing is Iterator<any> | AsyncIterator<any> =>
  thing && typeof thing === "object" && "next" in thing && typeof thing.next === "function"

The reason why I wrote typeof (obj as AsyncIterable<unknown>)[Symbol.asyncIterator] === "function" in the corresponding function in this PR is because I thought it would be better to also support ReadableStream (both Web and Node.js), which does not have next() method. Moreover, it is possible that the expose()-ed function returns an object which is not an iterator but has next() method. On the contrary, it is reasonable enough to regard an object with [Symbol.asyncIterator]() method as AsyncIterable.

@kimamula kimamula closed this Jun 23, 2020
@kimamula kimamula deleted the feature/251-support-async-generator-functions branch June 23, 2020 12:41
@andywer
Copy link
Owner

andywer commented Jun 23, 2020

The reason why I wrote typeof (obj as AsyncIterable<unknown>)[Symbol.asyncIterator] === "function" in the corresponding function in this PR is because I thought it would be better to also support ReadableStream (...)

Great point! Let me check if this works as a simple drop-in replacement. I admit that I neglected the iterator detection a bit after spending so much time on the callbacks last weekend.

andywer added a commit that referenced this pull request Jun 23, 2020
andywer added a commit that referenced this pull request Jun 23, 2020
andywer added a commit that referenced this pull request Jun 30, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support (async) generator functions
2 participants