-
-
Notifications
You must be signed in to change notification settings - Fork 166
Support async iterables #256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support async iterables #256
Conversation
Test fails since async iterables are not supported by Node.js 8. A workaround I can think of is to skip test cases for async iterables conditionally, e.g.: if (Number(/^v(\d+)/.exec(process.version)![1]) >= 10) {
// test cases for async iterables
} However, at least unit testing on Node.js 10 should be also added to CI then. |
8af3350
to
c9bbab9
Compare
Great stuff, will check it out! Thanks for sharing 🙂
The test matrix is currently: |
Sorry, but I need a bit more time for this. I gave it a first quick review and it looks like a good start, but the code isn't really straight forward, so let's re-iterate 🙂 My main concern right now is the Need to think about it some more, but my best idea so far is to extend the |
Thanks for your time and consideration. I understand your concern. |
Your idea to asynchronously return the async iterable sounds pretty interesting! Maybe not 100% as neat, but we can probably pull that of with way less code. Just need to extend the default serializer, I think. I say we try that out and see what it looks like :) |
I looked into how serializers work but have no idea how I can use it to implement this feature. My idea is to create a class which implements the async iterator/iterable protocol and use it in function createObservableForJob<ResultType>(worker: WorkerType, jobUID: number): Observable<ResultType> {
return new Observable(observer => {
let asyncType: WorkerJobStartMessage["resultType"] | undefined
const messageHandler = ((event: MessageEvent) => {
debugMessages("Message from worker:", event.data)
if (!event.data || event.data.uid !== jobUID) return
+ let asyncIterableWrapper: AsyncIterableWrapper;
if (isJobStartMessage(event.data)) {
asyncType = event.data.resultType
+ if (asyncType === "asyncIterable") {
+ asyncIterableWrapper = new AsyncIterableWrapper(worker, jobUID)
+ observer.next(asyncIterableWrapper)
+ observer.complete()
+ }
} else if (isJobResultMessage(event.data)) {
+ if (asyncIterableWrapper) {
+ asyncIterableWrapper.yield(deserialize(event.data.payload))
+ if (event.data.complete) {
+ worker.removeEventListener("message", messageHandler)
+ }
- if (asyncType === "promise") {
+ } else if (asyncType === "promise") {
// ... What do you think? |
You are still aiming for a usage like I think our best bet, even though not 100% elegant, is to aim for using it as This also means that we don't need to overload the ObservablePromise even further to act like an async iterable as well. I will try to propose some code in the next few days. |
Well, I believe that my code above fulfills all of your requirements:
However, I'd love to see your solution. |
Just started poking the thing myself a bit and realized that my serializer-based proposal wouldn't work, because right now there is no way to allow the master thread to call-back the iterator in the worker. As I believe this approach would lead to much better maintainable code and there is an active discussion around supporting callbacks, anyway (#145), I am now trying to tackle the callbacks now, so I can give the serializer approach a shot then. |
I am not 100% sure how |
I guess there is multiple ways how to play this. I am aiming for a generalization of the logic that let’s you call expose()-ed worker functions. It’s a major change to the code base, but it will allow you to dynamically make functions callable from other threadS and it will allow the master thread to expose functions to the worker. So instead of serializing the function itself, it will rather expose() them under a unique ID and send this ID to the other thread, so that the function is still run on the thread where its code resides and the other thread can call it arbitrarily. What do you think? (I came pretty far, but am not done yet) |
Not depending on function serialization is great, and then probably the following limitations described in #145 do not exist any more, right?
I have been thinking the async iterable pattern as a way of bidirectional communication between main and worker threads. |
Exactly!
I totally get your stand point, but I have to disagree, nevertheless. It's subtle, but the main reason for me is that you either have to overload the ObservablePromise even further which might easily turn into a bottom-less pit over time. Being able to solve generator support by only extending the serializer looks like a cleaner and better maintainable solution (which makes me sleep much better at night 😉) as that logic can be easily extended in a modular fashion. I was able to put something together on a feature branch: It turned out to be a major refactoring of the whole code base, but once the callbacks were implemented, adding support for generators was pretty straight forward: 300342e |
Thank you for sharing! That looks great.
I'm sorry that probably my comment was misleading. I actually expected something like your implementation in 300342e: supporting iterators via a serializer that uses new My minor concern is export const isIterator = (thing: any): thing is Iterator<any> | AsyncIterator<any> =>
thing && typeof thing === "object" && "next" in thing && typeof thing.next === "function" The reason why I wrote |
Great point! Let me check if this works as a simple drop-in replacement. I admit that I neglected the iterator detection a bit after spending so much time on the callbacks last weekend. |
Resolves #251
I gave up supporting non-async iterables since there are JavaScript built-in iterables (such as strings and arrays and array-like objects) that we do not want to handle in this way.