From 268f613f00a16b05046f0b90674e7fff8cec8a21 Mon Sep 17 00:00:00 2001 From: Yuta Saito Date: Thu, 4 Jul 2024 18:54:39 +0900 Subject: [PATCH 01/12] Initial WebWorkerTaskExecutor WebWorkerTaskExecutor is an implementation of `TaskExecutor` protocol, which is introduced by [SE-0417] since Swift 6.0. This task executor runs tasks on Worker threads, which is useful for offloading computationally expensive tasks from the main thread. The `WebWorkerTaskExecutor` is designed to work with [Web Workers API] and Node.js's [`worker_threads` module]. [SE-0417]: https://github.com/swiftlang/swift-evolution/blob/main/proposals/0417-task-executor-preference.md [Web Workers API]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API [`worker_threads` module]: https://nodejs.org/api/worker_threads.html --- IntegrationTests/lib.js | 149 ++++-- Package.swift | 8 + Runtime/src/index.ts | 173 ++++++- Runtime/src/object-heap.ts | 18 +- Runtime/src/types.ts | 8 + .../JavaScriptEventLoop.swift | 26 +- Sources/JavaScriptEventLoop/JobQueue.swift | 2 +- .../WebWorkerTaskExecutor.swift | 455 ++++++++++++++++++ .../FundamentalObjects/JSClosure.swift | 20 +- .../_CJavaScriptEventLoop.c | 2 + .../include/_CJavaScriptEventLoop.h | 2 + Sources/_CJavaScriptKit/_CJavaScriptKit.c | 2 + .../_CJavaScriptKit/include/_CJavaScriptKit.h | 15 + .../WebWorkerTaskExecutorTests.swift | 154 ++++++ scripts/test-harness.mjs | 2 + 15 files changed, 999 insertions(+), 37 deletions(-) create mode 100644 Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift create mode 100644 Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift diff --git a/IntegrationTests/lib.js b/IntegrationTests/lib.js index fe25cf679..6f6ea4139 100644 --- a/IntegrationTests/lib.js +++ b/IntegrationTests/lib.js @@ -3,26 +3,30 @@ import { WASI as NodeWASI } from "wasi" import { WASI as MicroWASI, useAll } from "uwasi" import * as fs from "fs/promises" import path from "path"; +import { Worker, parentPort } from "node:worker_threads"; const WASI = { - MicroWASI: ({ programName }) => { + MicroWASI: ({ args }) => { const wasi = new MicroWASI({ - args: [path.basename(programName)], + args: args, env: {}, features: [useAll()], }) return { wasiImport: wasi.wasiImport, + setInstance(instance) { + wasi.instance = instance; + }, start(instance, swift) { wasi.initialize(instance); swift.main(); } } }, - Node: ({ programName }) => { + Node: ({ args }) => { const wasi = new NodeWASI({ - args: [path.basename(programName)], + args: args, env: {}, preopens: { "/": "./", @@ -44,12 +48,9 @@ const WASI = { const selectWASIBackend = () => { const value = process.env["JAVASCRIPTKIT_WASI_BACKEND"] if (value) { - const backend = WASI[value]; - if (backend) { - return backend; - } + return value; } - return WASI.Node; + return "Node" }; function isUsingSharedMemory(module) { @@ -62,33 +63,125 @@ function isUsingSharedMemory(module) { return false; } -export const startWasiTask = async (wasmPath, wasiConstructor = selectWASIBackend()) => { - const swift = new SwiftRuntime(); - // Fetch our Wasm File - const wasmBinary = await fs.readFile(wasmPath); - const wasi = wasiConstructor({ programName: wasmPath }); - - const module = await WebAssembly.compile(wasmBinary); - - const importObject = { +function constructBaseImportObject(wasi, swift) { + return { wasi_snapshot_preview1: wasi.wasiImport, - javascript_kit: swift.importObjects(), + javascript_kit: swift.wasmImports, benchmark_helper: { noop: () => {}, noop_with_int: (_) => {}, + }, + } +} + +export async function startWasiChildThread(event) { + const { module, programName, memory, tid, startArg } = event; + const swift = new SwiftRuntime({ + sharedMemory: true, + threadChannel: { + wakeUpMainThread: parentPort.postMessage.bind(parentPort), + listenWakeEventFromMainThread: (listener) => { + parentPort.on("message", listener) + } + } + }); + // Use uwasi for child threads because Node.js WASI cannot be used without calling + // `WASI.start` or `WASI.initialize`, which is already called in the main thread and + // will cause an error if called again. + const wasi = WASI.MicroWASI({ programName }); + + const importObject = constructBaseImportObject(wasi, swift); + + importObject["wasi"] = { + "thread-spawn": () => { + throw new Error("Cannot spawn a new thread from a worker thread") } }; + importObject["env"] = { memory }; + importObject["JavaScriptEventLoopTestSupportTests"] = { + "isMainThread": () => false, + } + + const instance = await WebAssembly.instantiate(module, importObject); + swift.setInstance(instance); + wasi.setInstance(instance); + swift.startThread(tid, startArg); +} + +class ThreadRegistry { + workers = new Map(); + nextTid = 1; + + spawnThread(module, programName, memory, startArg) { + const tid = this.nextTid++; + const selfFilePath = new URL(import.meta.url).pathname; + const worker = new Worker(` + const { parentPort } = require('node:worker_threads'); + + Error.stackTraceLimit = 100; + parentPort.once("message", async (event) => { + const { selfFilePath } = event; + const { startWasiChildThread } = await import(selfFilePath); + await startWasiChildThread(event); + }) + `, { type: "module", eval: true }) + + worker.on("error", (error) => { + console.error(`Worker thread ${tid} error:`, error); + }); + this.workers.set(tid, worker); + worker.postMessage({ selfFilePath, module, programName, memory, tid, startArg }); + return tid; + } + + worker(tid) { + return this.workers.get(tid); + } + + wakeUpWorkerThread(tid) { + const worker = this.workers.get(tid); + worker.postMessage(null); + } +} + +export const startWasiTask = async (wasmPath, wasiConstructorKey = selectWASIBackend()) => { + // Fetch our Wasm File + const wasmBinary = await fs.readFile(wasmPath); + const programName = wasmPath; + const args = [path.basename(programName)]; + args.push(...process.argv.slice(3)); + const wasi = WASI[wasiConstructorKey]({ args }); + + const module = await WebAssembly.compile(wasmBinary); + + const sharedMemory = isUsingSharedMemory(module); + const threadRegistry = new ThreadRegistry(); + const swift = new SwiftRuntime({ + sharedMemory, + threadChannel: { + wakeUpWorkerThread: threadRegistry.wakeUpWorkerThread.bind(threadRegistry), + listenMainJobFromWorkerThread: (tid, listener) => { + const worker = threadRegistry.worker(tid); + worker.on("message", listener); + } + } + }); + + const importObject = constructBaseImportObject(wasi, swift); + + importObject["JavaScriptEventLoopTestSupportTests"] = { + "isMainThread": () => true, + } - if (isUsingSharedMemory(module)) { - importObject["env"] = { - // We don't have JS API to get memory descriptor of imported memory - // at this moment, so we assume 256 pages (16MB) memory is enough - // large for initial memory size. - memory: new WebAssembly.Memory({ initial: 256, maximum: 16384, shared: true }), - }; + if (sharedMemory) { + // We don't have JS API to get memory descriptor of imported memory + // at this moment, so we assume 256 pages (16MB) memory is enough + // large for initial memory size. + const memory = new WebAssembly.Memory({ initial: 256, maximum: 16384, shared: true }) + importObject["env"] = { memory }; importObject["wasi"] = { - "thread-spawn": () => { - throw new Error("thread-spawn not implemented"); + "thread-spawn": (startArg) => { + return threadRegistry.spawnThread(module, programName, memory, startArg); } } } diff --git a/Package.swift b/Package.swift index d9f33839e..aa529c772 100644 --- a/Package.swift +++ b/Package.swift @@ -26,6 +26,14 @@ let package = Package( name: "JavaScriptEventLoop", dependencies: ["JavaScriptKit", "_CJavaScriptEventLoop"] ), + .testTarget( + name: "JavaScriptEventLoopTests", + dependencies: [ + "JavaScriptEventLoop", + "JavaScriptKit", + "JavaScriptEventLoopTestSupport", + ] + ), .target(name: "_CJavaScriptEventLoop"), .target( name: "JavaScriptEventLoopTestSupport", diff --git a/Runtime/src/index.ts b/Runtime/src/index.ts index 605ce2d06..f5cfb1ba6 100644 --- a/Runtime/src/index.ts +++ b/Runtime/src/index.ts @@ -10,8 +10,92 @@ import { import * as JSValue from "./js-value.js"; import { Memory } from "./memory.js"; -type SwiftRuntimeOptions = { +/** + * A thread channel is a set of functions that are used to communicate between + * the main thread and the worker thread. The main thread and the worker thread + * can send jobs to each other using these functions. + * + * @example + * ```javascript + * // worker.js + * const runtime = new SwiftRuntime({ + * threadChannel: { + * wakeUpMainThread: (unownedJob) => { + * // Send the job to the main thread + * postMessage({ type: "job", unownedJob }); + * }, + * listenWakeEventFromMainThread: (listener) => { + * self.onmessage = (event) => { + * if (event.data.type === "wake") { + * listener(); + * } + * }; + * } + * } + * }); + * + * // main.js + * const worker = new Worker("worker.js"); + * const runtime = new SwiftRuntime({ + * threadChannel: { + * wakeUpWorkerThread: (tid) => { + * worker.postMessage({ type: "wake" }); + * }, + * listenMainJobFromWorkerThread: (tid, listener) => { + * worker.onmessage = (event) => { + * if (event.data.type === "job") { + * listener(event.data.unownedJob); + * } + * }; + * } + * } + * }); + * ``` + */ +export type SwiftRuntimeThreadChannel = + | { + /** + * This function is called when the Web Worker thread sends a job to the main thread. + * The unownedJob is the pointer to the unowned job object in the Web Worker thread. + * The job submitted by this function expected to be listened by `listenMainJobFromWorkerThread`. + */ + wakeUpMainThread: (unownedJob: number) => void; + /** + * This function is expected to be set in the worker thread and should listen + * to the wake event from the main thread sent by `wakeUpWorkerThread`. + * The passed listener function awakes the Web Worker thread. + */ + listenWakeEventFromMainThread: (listener: () => void) => void; + } + | { + /** + * This function is expected to be set in the main thread and called + * when the main thread sends a wake event to the Web Worker thread. + * The `tid` is the thread ID of the worker thread to be woken up. + * The wake event is expected to be listened by `listenWakeEventFromMainThread`. + */ + wakeUpWorkerThread: (tid: number) => void; + /** + * This function is expected to be set in the main thread and shuold listen + * to the main job sent by `wakeUpMainThread` from the worker thread. + */ + listenMainJobFromWorkerThread: ( + tid: number, + listener: (unownedJob: number) => void + ) => void; + }; + +export type SwiftRuntimeOptions = { + /** + * If `true`, the memory space of the WebAssembly instance can be shared + * between the main thread and the worker thread. + */ sharedMemory?: boolean; + /** + * The thread channel is a set of functions that are used to communicate + * between the main thread and the worker thread. + */ + threadChannel?: SwiftRuntimeThreadChannel; }; export class SwiftRuntime { @@ -23,11 +107,14 @@ export class SwiftRuntime { private textDecoder = new TextDecoder("utf-8"); private textEncoder = new TextEncoder(); // Only support utf-8 + /** The thread ID of the current thread. */ + private tid: number | null; constructor(options?: SwiftRuntimeOptions) { this._instance = null; this._memory = null; this._closureDeallocator = null; + this.tid = null; this.options = options || {}; } @@ -72,6 +159,32 @@ export class SwiftRuntime { } } + /** + * Start a new thread with the given `tid` and `startArg`, which + * is forwarded to the `wasi_thread_start` function. + * This function is expected to be called from the spawned Web Worker thread. + */ + startThread(tid: number, startArg: number) { + this.tid = tid; + const instance = this.instance; + try { + if (typeof instance.exports.wasi_thread_start === "function") { + instance.exports.wasi_thread_start(tid, startArg); + } else { + throw new Error( + `The WebAssembly module is not built for wasm32-unknown-wasip1-threads target.` + ); + } + } catch (error) { + if (error instanceof UnsafeEventLoopYield) { + // Ignore the error + return; + } + // Rethrow other errors + throw error; + } + } + private get instance() { if (!this._instance) throw new Error("WebAssembly instance is not set yet"); @@ -462,6 +575,64 @@ export class SwiftRuntime { swjs_unsafe_event_loop_yield: () => { throw new UnsafeEventLoopYield(); }, + // This function is called by WebWorkerTaskExecutor on Web Worker thread. + swjs_send_job_to_main_thread: (unowned_job) => { + const threadChannel = this.options.threadChannel; + if (threadChannel && "wakeUpMainThread" in threadChannel) { + threadChannel.wakeUpMainThread(unowned_job); + } else { + throw new Error( + "wakeUpMainThread is not set in options given to SwiftRuntime. Please set it to send jobs to the main thread." + ); + } + }, + swjs_listen_wake_event_from_main_thread: () => { + // After the thread is started, + const swjs_wake_worker_thread = + this.exports.swjs_wake_worker_thread; + const threadChannel = this.options.threadChannel; + if ( + threadChannel && + "listenWakeEventFromMainThread" in threadChannel + ) { + threadChannel.listenWakeEventFromMainThread(() => { + swjs_wake_worker_thread(); + }); + } else { + throw new Error( + "listenWakeEventFromMainThread is not set in options given to SwiftRuntime. Please set it to listen to wake events from the main thread." + ); + } + }, + swjs_wake_up_worker_thread: (tid) => { + const threadChannel = this.options.threadChannel; + if (threadChannel && "wakeUpWorkerThread" in threadChannel) { + threadChannel.wakeUpWorkerThread(tid); + } else { + throw new Error( + "wakeUpWorkerThread is not set in options given to SwiftRuntime. Please set it to wake up worker threads." + ); + } + }, + swjs_listen_main_job_from_worker_thread: (tid) => { + const threadChannel = this.options.threadChannel; + if ( + threadChannel && + "listenMainJobFromWorkerThread" in threadChannel + ) { + threadChannel.listenMainJobFromWorkerThread( + tid, this.exports.swjs_enqueue_main_job_from_worker, + ); + } else { + throw new Error( + "listenMainJobFromWorkerThread is not set in options given to SwiftRuntime. Please set it to listen to jobs from worker threads." + ); + } + }, + swjs_get_worker_thread_id: () => { + // Main thread's tid is always -1 + return this.tid || -1; + }, }; } } diff --git a/Runtime/src/object-heap.ts b/Runtime/src/object-heap.ts index d59f5101e..98281b5ca 100644 --- a/Runtime/src/object-heap.ts +++ b/Runtime/src/object-heap.ts @@ -4,6 +4,7 @@ import { ref } from "./types.js"; type SwiftRuntimeHeapEntry = { id: number; rc: number; + released: boolean; }; export class SwiftRuntimeHeap { private _heapValueById: Map; @@ -15,7 +16,11 @@ export class SwiftRuntimeHeap { this._heapValueById.set(0, globalVariable); this._heapEntryByValue = new Map(); - this._heapEntryByValue.set(globalVariable, { id: 0, rc: 1 }); + this._heapEntryByValue.set(globalVariable, { + id: 0, + rc: 1, + released: false, + }); // Note: 0 is preserved for global this._heapNextKey = 1; @@ -29,13 +34,22 @@ export class SwiftRuntimeHeap { } const id = this._heapNextKey++; this._heapValueById.set(id, value); - this._heapEntryByValue.set(value, { id: id, rc: 1 }); + this._heapEntryByValue.set(value, { id: id, rc: 1, released: false }); return id; } release(ref: ref) { const value = this._heapValueById.get(ref); const entry = this._heapEntryByValue.get(value)!; + if (entry.released) { + console.error( + "Double release detected for reference " + ref, + entry + ); + throw new ReferenceError( + "Double release detected for reference " + ref + ); + } entry.rc--; if (entry.rc != 0) return; diff --git a/Runtime/src/types.ts b/Runtime/src/types.ts index 55f945b64..ed61555a8 100644 --- a/Runtime/src/types.ts +++ b/Runtime/src/types.ts @@ -19,6 +19,9 @@ export interface ExportedFunctions { ): bool; swjs_free_host_function(host_func_id: number): void; + + swjs_enqueue_main_job_from_worker(unowned_job: number): void; + swjs_wake_worker_thread(): void; } export interface ImportedFunctions { @@ -103,6 +106,11 @@ export interface ImportedFunctions { swjs_bigint_to_i64(ref: ref, signed: bool): bigint; swjs_i64_to_bigint_slow(lower: number, upper: number, signed: bool): ref; swjs_unsafe_event_loop_yield: () => void; + swjs_send_job_to_main_thread: (unowned_job: number) => void; + swjs_listen_wake_event_from_main_thread: () => void; + swjs_wake_up_worker_thread: (tid: number) => void; + swjs_listen_main_job_from_worker_thread: (tid: number) => void; + swjs_get_worker_thread_id: () => number; } export const enum LibraryFeatures { diff --git a/Sources/JavaScriptEventLoop/JavaScriptEventLoop.swift b/Sources/JavaScriptEventLoop/JavaScriptEventLoop.swift index 7a0364a5c..4ba186df5 100644 --- a/Sources/JavaScriptEventLoop/JavaScriptEventLoop.swift +++ b/Sources/JavaScriptEventLoop/JavaScriptEventLoop.swift @@ -1,6 +1,7 @@ import JavaScriptKit import _CJavaScriptEventLoop import _CJavaScriptKit +import Synchronization // NOTE: `@available` annotations are semantically wrong, but they make it easier to develop applications targeting WebAssembly in Xcode. @@ -56,7 +57,7 @@ public final class JavaScriptEventLoop: SerialExecutor, @unchecked Sendable { self.setTimeout = setTimeout } - /// A singleton instance of the Executor + /// A per-thread singleton instance of the Executor public static var shared: JavaScriptEventLoop { return _shared } @@ -142,6 +143,7 @@ public final class JavaScriptEventLoop: SerialExecutor, @unchecked Sendable { typealias swift_task_enqueueMainExecutor_hook_Fn = @convention(thin) (UnownedJob, swift_task_enqueueMainExecutor_original) -> Void let swift_task_enqueueMainExecutor_hook_impl: swift_task_enqueueMainExecutor_hook_Fn = { job, original in + assert(false) JavaScriptEventLoop.shared.unsafeEnqueue(job) } swift_task_enqueueMainExecutor_hook = unsafeBitCast(swift_task_enqueueMainExecutor_hook_impl, to: UnsafeMutableRawPointer?.self) @@ -149,9 +151,8 @@ public final class JavaScriptEventLoop: SerialExecutor, @unchecked Sendable { didInstallGlobalExecutor = true } - private func enqueue(_ job: UnownedJob, withDelay nanoseconds: UInt64) { - let milliseconds = nanoseconds / 1_000_000 - setTimeout(Double(milliseconds), { + func enqueue(_ job: UnownedJob, withDelay nanoseconds: UInt64) { + enqueue(withDelay: nanoseconds, job: { #if compiler(>=5.9) job.runSynchronously(on: self.asUnownedSerialExecutor()) #else @@ -160,6 +161,23 @@ public final class JavaScriptEventLoop: SerialExecutor, @unchecked Sendable { }) } + func enqueue(withDelay nanoseconds: UInt64, job: @escaping () -> Void) { + let milliseconds = nanoseconds / 1_000_000 + setTimeout(Double(milliseconds), job) + } + + func enqueue( + withDelay seconds: Int64, _ nanoseconds: Int64, + _ toleranceSec: Int64, _ toleranceNSec: Int64, + _ clock: Int32, job: @escaping () -> Void + ) { + var nowSec: Int64 = 0 + var nowNSec: Int64 = 0 + swift_get_time(&nowSec, &nowNSec, clock) + let delayNanosec = (seconds - nowSec) * 1_000_000_000 + (nanoseconds - nowNSec) + enqueue(withDelay: delayNanosec <= 0 ? 0 : UInt64(delayNanosec), job: job) + } + private func unsafeEnqueue(_ job: UnownedJob) { insertJobQueue(job: job) } diff --git a/Sources/JavaScriptEventLoop/JobQueue.swift b/Sources/JavaScriptEventLoop/JobQueue.swift index 5ad71f0a0..c6eb48b79 100644 --- a/Sources/JavaScriptEventLoop/JobQueue.swift +++ b/Sources/JavaScriptEventLoop/JobQueue.swift @@ -9,7 +9,7 @@ import _CJavaScriptEventLoop @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) struct QueueState: Sendable { fileprivate var headJob: UnownedJob? = nil - fileprivate var isSpinning: Bool = false + var isSpinning: Bool = false } @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) diff --git a/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift b/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift new file mode 100644 index 000000000..4b9b3215a --- /dev/null +++ b/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift @@ -0,0 +1,455 @@ +#if compiler(>=6.0) && _runtime(_multithreaded) // @_expose and @_extern are only available in Swift 6.0+ + +import JavaScriptKit +import _CJavaScriptKit +import _CJavaScriptEventLoop + +import Synchronization +#if canImport(wasi_pthread) + import wasi_pthread + import WASILibc +#endif + +// MARK: - Web Worker Task Executor + +/// A task executor that runs tasks on Web Worker threads. +/// +/// ## Prerequisites +/// +/// This task executor is designed to work with [wasi-threads](https://github.com/WebAssembly/wasi-threads) +/// but it requires the following single extension: +/// The wasi-threads implementation should listen to the `message` event +/// from spawned Web Workers, and forward the message to the main thread +/// by calling `_swjs_enqueue_main_job_from_worker`. +/// +/// ## Usage +/// +/// ```swift +/// let executor = WebWorkerTaskExecutor(numberOfThreads: 4) +/// defer { executor.terminate() } +/// +/// await withTaskExecutorPreference(executor) { +/// // This block runs on the Web Worker thread. +/// await withTaskGroup(of: Int.self) { group in +/// for i in 0..<10 { +/// // Structured child works are executed on the Web Worker thread. +/// group.addTask { fibonacci(of: i) } +/// } +/// } +/// } +/// ```` +/// +/// ## Known limitations +/// +/// Currently, the Cooperative Global Executor of Swift runtime has a bug around +/// main executor detection. The issue leads to ignoring the `@MainActor` +/// attribute, which is supposed to run tasks on the main thread, when this web +/// worker executor is preferred. +/// +/// ```swift +/// func run(executor: WebWorkerTaskExecutor) async { +/// await withTaskExecutorPreference(executor) { +/// // This block runs on the Web Worker thread. +/// await MainActor.run { +/// // This block should run on the main thread, but it runs on +/// // the Web Worker thread. +/// } +/// } +/// // Back to the main thread. +/// } +/// ```` +/// +public final class WebWorkerTaskExecutor: TaskExecutor { + + /// A job worker dedicated to a single Web Worker thread. + /// + /// ## Lifetime + /// The worker instance in Swift world lives as long as the + /// `WebWorkerTaskExecutor` instance that spawned it lives. Thus, the worker + /// instance may outlive the underlying Web Worker thread. + fileprivate final class Worker: Sendable { + + /// The state of the worker. + /// + /// State transition: + /// + /// +---------+ +------------+ + /// +----->| Idle |--[terminate]-->| Terminated | + /// | +---+-----+ +------------+ + /// | | + /// | [enqueue] + /// | | + /// [no more job] | + /// | v + /// | +---------+ + /// +------| Running | + /// +---------+ + /// + enum State: UInt32, AtomicRepresentable { + /// The worker is idle and waiting for a new job. + case idle = 0 + /// The worker is processing a job. + case running = 1 + /// The worker is terminated. + case terminated = 2 + } + let state: Atomic = Atomic(.idle) + /// TODO: Rewrite it to use real queue :-) + let jobQueue: Mutex<[UnownedJob]> = Mutex([]) + /// The TaskExecutor that spawned this worker. + /// This variable must be set only once when the worker is started. + nonisolated(unsafe) weak var parentTaskExecutor: WebWorkerTaskExecutor.Executor? + /// The thread ID of this worker. + let tid: Atomic = Atomic(0) + + /// A trace statistics + struct TraceStats: CustomStringConvertible { + var enqueuedJobs: Int = 0 + var dequeuedJobs: Int = 0 + var processedJobs: Int = 0 + + var description: String { + "TraceStats(E: \(enqueuedJobs), D: \(dequeuedJobs), P: \(processedJobs))" + } + } + #if JAVASCRIPTKIT_STATS + private let traceStats = Mutex(TraceStats()) + private func statsIncrement(_ keyPath: WritableKeyPath) { + traceStats.withLock { stats in + stats[keyPath: keyPath] += 1 + } + } + #else + private func statsIncrement(_ keyPath: WritableKeyPath) {} + #endif + + /// The worker bound to the current thread. + /// Returns `nil` if the current thread is not a worker thread. + static var currentThread: Worker? { + guard let ptr = swjs_thread_local_task_executor_worker else { + return nil + } + return Unmanaged.fromOpaque(ptr).takeUnretainedValue() + } + + init() {} + + /// Enqueue a job to the worker. + func enqueue(_ job: UnownedJob) { + statsIncrement(\.enqueuedJobs) + jobQueue.withLock { queue in + queue.append(job) + + // Wake up the worker to process a job. + switch state.exchange(.running, ordering: .sequentiallyConsistent) { + case .idle: + if Self.currentThread === self { + // Enqueueing a new job to the current worker thread, but it's idle now. + // This is usually the case when a continuation is resumed by JS events + // like `setTimeout` or `addEventListener`. + // We can run the job and subsequently spawned jobs immediately. + // JSPromise.resolve(JSValue.undefined).then { _ in + _ = JSObject.global.queueMicrotask!(JSOneshotClosure { _ in + self.run() + return JSValue.undefined + }) + } else { + let tid = self.tid.load(ordering: .sequentiallyConsistent) + swjs_wake_up_worker_thread(tid) + } + case .running: + // The worker is already running, no need to wake up. + break + case .terminated: + // Will not wake up the worker because it's already terminated. + break + } + } + } + + func scheduleNextRun() { + _ = JSObject.global.queueMicrotask!(JSOneshotClosure { _ in + self.run() + return JSValue.undefined + }) + } + + /// Run the worker + /// + /// NOTE: This function must be called from the worker thread. + /// It will return when the worker is terminated. + func start(executor: WebWorkerTaskExecutor.Executor) { + // Get the thread ID of the current worker thread from the JS side. + // NOTE: Unfortunately even though `pthread_self` internally holds the thread ID, + // there is no public API to get it because it's a part of implementation details + // of wasi-libc. So we need to get it from the JS side. + let tid = swjs_get_worker_thread_id() + // Set the thread-local variable to the current worker. + // `self` outlives the worker thread because `Executor` retains the worker. + // Thus it's safe to store the reference without extra retain. + swjs_thread_local_task_executor_worker = Unmanaged.passUnretained(self).toOpaque() + // Start listening wake-up events from the main thread. + // This must be called after setting the swjs_thread_local_task_executor_worker + // because the event listener enqueues jobs to the TLS worker. + swjs_listen_wake_event_from_main_thread() + // Set the parent executor. + parentTaskExecutor = executor + // Store the thread ID to the worker. This notifies the main thread that the worker is started. + self.tid.store(tid, ordering: .sequentiallyConsistent) + } + + /// Process jobs in the queue. + /// + /// Return when the worker has no more jobs to run or terminated. + /// This method must be called from the worker thread after the worker + /// is started by `start(executor:)`. + func run() { + trace("Worker.run") + guard let executor = parentTaskExecutor else { + preconditionFailure("The worker must be started with a parent executor.") + } + assert(state.load(ordering: .sequentiallyConsistent) == .running, "Invalid state: not running") + while true { + // Pop a job from the queue. + let job = jobQueue.withLock { queue -> UnownedJob? in + if let job = queue.first { + queue.removeFirst() + return job + } + // No more jobs to run now. Wait for a new job to be enqueued. + let (exchanged, original) = state.compareExchange(expected: .running, desired: .idle, ordering: .sequentiallyConsistent) + + switch (exchanged, original) { + case (true, _): + trace("Worker.run exited \(original) -> idle") + return nil // Regular case + case (false, .idle): + preconditionFailure("unreachable: Worker/run running in multiple threads!?") + case (false, .running): + preconditionFailure("unreachable: running -> idle should return exchanged=true") + case (false, .terminated): + return nil // The worker is terminated, exit the loop. + } + } + guard let job else { return } + statsIncrement(\.dequeuedJobs) + job.runSynchronously( + on: executor.asUnownedTaskExecutor() + ) + statsIncrement(\.processedJobs) + // The job is done. Continue to the next job. + } + } + + /// Terminate the worker. + func terminate() { + trace("Worker.terminate") + state.store(.terminated, ordering: .sequentiallyConsistent) + } + } + + fileprivate final class Executor: TaskExecutor { + let numberOfThreads: Int + let workers: [Worker] + let roundRobinIndex: Mutex = Mutex(0) + + init(numberOfThreads: Int) { + self.numberOfThreads = numberOfThreads + var workers = [Worker]() + for _ in 0...fromOpaque(ptr!).takeRetainedValue() + context.worker.start(executor: context.executor) + // The worker is started. Throw JS exception to unwind the call stack without + // reaching the `pthread_exit`, which is called immediately after this block. + swjs_unsafe_event_loop_yield() + return nil + }, ptr) + precondition(ret == 0, "Failed to create a thread") + } + // Wait until all worker threads are started and wire up messaging channels + // between the main thread and workers to notify job enqueuing events each other. + for worker in workers { + var tid: pid_t + repeat { + tid = worker.tid.load(ordering: .sequentiallyConsistent) + } while tid == 0 + swjs_listen_main_job_from_worker_thread(tid) + } + } + + func terminate() { + for worker in workers { + worker.terminate() + } + } + + func enqueue(_ job: consuming ExecutorJob) { + precondition(!workers.isEmpty, "No worker threads are available") + + let job = UnownedJob(job) + // If the current thread is a worker thread, enqueue the job to the current worker. + if let worker = Worker.currentThread { + worker.enqueue(job) + return + } + // Otherwise (main thread), enqueue the job to the worker with round-robin scheduling. + // TODO: Use a more sophisticated scheduling algorithm with priority. + roundRobinIndex.withLock { index in + let worker = workers[index] + worker.enqueue(job) + index = (index + 1) % numberOfThreads + } + } + } + + private let executor: Executor + + /// Create a new Web Worker task executor. + /// + /// - Parameter numberOfThreads: The number of Web Worker threads to spawn. + public init(numberOfThreads: Int) { + self.executor = Executor(numberOfThreads: numberOfThreads) + self.executor.start() + } + + /// Terminate child Web Worker threads. + /// Jobs enqueued to the executor after calling this method will be ignored. + /// + /// NOTE: This method must be called after all tasks that prefer this executor are done. + /// Otherwise, the tasks may stuck forever. + public func terminate() { + executor.terminate() + } + + /// The number of Web Worker threads. + public var numberOfThreads: Int { + executor.numberOfThreads + } + + // MARK: TaskExecutor conformance + + /// Enqueue a job to the executor. + /// + /// NOTE: Called from the Swift Concurrency runtime. + public func enqueue(_ job: consuming ExecutorJob) { + Self.traceStatsIncrement(\.enqueueExecutor) + executor.enqueue(job) + } + + // MARK: Statistics + + /// Executor global statistics + internal struct ExecutorStats: CustomStringConvertible { + var sendJobToMainThread: Int = 0 + var recieveJobFromWorkerThread: Int = 0 + var enqueueGlobal: Int = 0 + var enqueueExecutor: Int = 0 + + var description: String { + "ExecutorStats(sendWtoM: \(sendJobToMainThread), recvWfromM: \(recieveJobFromWorkerThread)), enqueueGlobal: \(enqueueGlobal), enqueueExecutor: \(enqueueExecutor)" + } + } + #if JAVASCRIPTKIT_STATS + private static let stats = Mutex(ExecutorStats()) + fileprivate static func traceStatsIncrement(_ keyPath: WritableKeyPath) { + stats.withLock { stats in + stats[keyPath: keyPath] += 1 + } + } + internal func dumpStats() { + Self.stats.withLock { stats in + print("WebWorkerTaskExecutor stats: \(stats)") + } + } + #else + fileprivate static func traceStatsIncrement(_ keyPath: WritableKeyPath) {} + internal func dumpStats() {} + #endif + + // MARK: Global Executor hack + + private static var _mainThread: pthread_t? + private static var _swift_task_enqueueGlobal_hook_original: UnsafeMutableRawPointer? + private static var _swift_task_enqueueGlobalWithDelay_hook_original: UnsafeMutableRawPointer? + private static var _swift_task_enqueueGlobalWithDeadline_hook_original: UnsafeMutableRawPointer? + + /// Install a global executor that forwards jobs from Web Worker threads to the main thread. + /// + /// This function must be called once before using the Web Worker task executor. + public static func installGlobalExecutor() { + // Ensure this function is called only once. + guard _mainThread == nil else { return } + + _mainThread = pthread_self() + assert(swjs_get_worker_thread_id() == -1, "\(#function) must be called on the main thread") + + _swift_task_enqueueGlobal_hook_original = swift_task_enqueueGlobal_hook + + typealias swift_task_enqueueGlobal_hook_Fn = @convention(thin) (UnownedJob, swift_task_enqueueGlobal_original) -> Void + let swift_task_enqueueGlobal_hook_impl: swift_task_enqueueGlobal_hook_Fn = { job, base in + WebWorkerTaskExecutor.traceStatsIncrement(\.enqueueGlobal) + // Enter this block only if the current Task has no executor preference. + if pthread_equal(pthread_self(), WebWorkerTaskExecutor._mainThread) != 0 { + // If the current thread is the main thread, delegate the job + // execution to the original hook of JavaScriptEventLoop. + let original = unsafeBitCast(WebWorkerTaskExecutor._swift_task_enqueueGlobal_hook_original, to: swift_task_enqueueGlobal_hook_Fn.self) + original(job, base) + } else { + // Notify the main thread to execute the job when a job is + // enqueued from a Web Worker thread but without an executor preference. + // This is usually the case when hopping back to the main thread + // at the end of a task. + WebWorkerTaskExecutor.traceStatsIncrement(\.sendJobToMainThread) + let jobBitPattern = unsafeBitCast(job, to: UInt.self) + swjs_send_job_to_main_thread(jobBitPattern) + } + } + swift_task_enqueueGlobal_hook = unsafeBitCast(swift_task_enqueueGlobal_hook_impl, to: UnsafeMutableRawPointer?.self) + } +} + +/// Enqueue a job scheduled from a Web Worker thread to the main thread. +/// This function is called when a job is enqueued from a Web Worker thread. +@_expose(wasm, "swjs_enqueue_main_job_from_worker") +func _swjs_enqueue_main_job_from_worker(_ job: UnownedJob) { + WebWorkerTaskExecutor.traceStatsIncrement(\.recieveJobFromWorkerThread) + JavaScriptEventLoop.shared.enqueue(ExecutorJob(job)) +} + +@_expose(wasm, "swjs_wake_worker_thread") +func _swjs_wake_worker_thread() { + WebWorkerTaskExecutor.Worker.currentThread!.run() +} + +#endif + +fileprivate func trace(_ message: String) { +#if JAVASCRIPTKIT_TRACE + JSObject.global.process.stdout.write("[trace tid=\(swjs_get_worker_thread_id())] \(message)\n") +#endif +} diff --git a/Sources/JavaScriptKit/FundamentalObjects/JSClosure.swift b/Sources/JavaScriptKit/FundamentalObjects/JSClosure.swift index 6decbc814..75a8398fa 100644 --- a/Sources/JavaScriptKit/FundamentalObjects/JSClosure.swift +++ b/Sources/JavaScriptKit/FundamentalObjects/JSClosure.swift @@ -63,8 +63,26 @@ public class JSOneshotClosure: JSObject, JSClosureProtocol { /// public class JSClosure: JSFunction, JSClosureProtocol { + class SharedJSClosure { + private var storage: [JavaScriptHostFuncRef: (object: JSObject, body: ([JSValue]) -> JSValue)] = [:] + init() {} + + subscript(_ key: JavaScriptHostFuncRef) -> (object: JSObject, body: ([JSValue]) -> JSValue)? { + get { storage[key] } + set { storage[key] = newValue } + } + } + // Note: Retain the closure object itself also to avoid funcRef conflicts - fileprivate static var sharedClosures: [JavaScriptHostFuncRef: (object: JSObject, body: ([JSValue]) -> JSValue)] = [:] + fileprivate static var sharedClosures: SharedJSClosure { + if let swjs_thread_local_closures { + return Unmanaged.fromOpaque(swjs_thread_local_closures).takeUnretainedValue() + } else { + let shared = SharedJSClosure() + swjs_thread_local_closures = Unmanaged.passRetained(shared).toOpaque() + return shared + } + } private var hostFuncRef: JavaScriptHostFuncRef = 0 diff --git a/Sources/_CJavaScriptEventLoop/_CJavaScriptEventLoop.c b/Sources/_CJavaScriptEventLoop/_CJavaScriptEventLoop.c index 009672933..ebb05e1db 100644 --- a/Sources/_CJavaScriptEventLoop/_CJavaScriptEventLoop.c +++ b/Sources/_CJavaScriptEventLoop/_CJavaScriptEventLoop.c @@ -1,3 +1,5 @@ #include "_CJavaScriptEventLoop.h" _Thread_local void *swjs_thread_local_event_loop; + +_Thread_local void *swjs_thread_local_task_executor_worker; diff --git a/Sources/_CJavaScriptEventLoop/include/_CJavaScriptEventLoop.h b/Sources/_CJavaScriptEventLoop/include/_CJavaScriptEventLoop.h index 890e26a01..4f1b9470c 100644 --- a/Sources/_CJavaScriptEventLoop/include/_CJavaScriptEventLoop.h +++ b/Sources/_CJavaScriptEventLoop/include/_CJavaScriptEventLoop.h @@ -66,4 +66,6 @@ extern void *_Nullable swift_task_asyncMainDrainQueue_hook; extern _Thread_local void * _Nullable swjs_thread_local_event_loop; +extern _Thread_local void * _Nullable swjs_thread_local_task_executor_worker; + #endif diff --git a/Sources/_CJavaScriptKit/_CJavaScriptKit.c b/Sources/_CJavaScriptKit/_CJavaScriptKit.c index 0bcc5eaca..3cc06af1c 100644 --- a/Sources/_CJavaScriptKit/_CJavaScriptKit.c +++ b/Sources/_CJavaScriptKit/_CJavaScriptKit.c @@ -47,4 +47,6 @@ int swjs_library_features(void) { return _library_features(); } +_Thread_local void *swjs_thread_local_closures; + #endif diff --git a/Sources/_CJavaScriptKit/include/_CJavaScriptKit.h b/Sources/_CJavaScriptKit/include/_CJavaScriptKit.h index 431b83615..dd7658649 100644 --- a/Sources/_CJavaScriptKit/include/_CJavaScriptKit.h +++ b/Sources/_CJavaScriptKit/include/_CJavaScriptKit.h @@ -295,4 +295,19 @@ IMPORT_JS_FUNCTION(swjs_release, void, (const JavaScriptObjectRef ref)) /// @note This function never returns IMPORT_JS_FUNCTION(swjs_unsafe_event_loop_yield, void, (void)) +IMPORT_JS_FUNCTION(swjs_send_job_to_main_thread, void, (uintptr_t job)) + +IMPORT_JS_FUNCTION(swjs_listen_wake_event_from_main_thread, void, (void)) + +IMPORT_JS_FUNCTION(swjs_wake_up_worker_thread, void, (int tid)) + +IMPORT_JS_FUNCTION(swjs_listen_main_job_from_worker_thread, void, (int tid)) + +IMPORT_JS_FUNCTION(swjs_get_worker_thread_id, int, (void)) + +/// MARK: - thread local storage + +// TODO: Rewrite closure system without global storage +extern _Thread_local void * _Nullable swjs_thread_local_closures; + #endif /* _CJavaScriptKit_h */ diff --git a/Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift b/Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift new file mode 100644 index 000000000..e4461620f --- /dev/null +++ b/Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift @@ -0,0 +1,154 @@ +#if compiler(>=6.0) && _runtime(_multithreaded) +import XCTest +import JavaScriptKit +import _CJavaScriptKit // For swjs_get_worker_thread_id +@testable import JavaScriptEventLoop + +@_extern(wasm, module: "JavaScriptEventLoopTestSupportTests", name: "isMainThread") +func isMainThread() -> Bool + +final class WebWorkerTaskExecutorTests: XCTestCase { + override func setUp() { + WebWorkerTaskExecutor.installGlobalExecutor() + } + + func testTaskRunOnMainThread() async { + let executor = WebWorkerTaskExecutor(numberOfThreads: 1) + + XCTAssertTrue(isMainThread()) + + let task = Task(executorPreference: executor) { + return isMainThread() + } + let taskRunOnMainThread = await task.value + // The task should run on the worker thread + XCTAssertFalse(taskRunOnMainThread) + // After the task is done, back to the main thread + XCTAssertTrue(isMainThread()) + + executor.terminate() + } + + func testWithPreferenceBlock() async { + let executor = WebWorkerTaskExecutor(numberOfThreads: 1) + await withTaskExecutorPreference(executor) { + XCTAssertFalse(isMainThread()) + } + } + + func testAwaitInsideTask() async throws { + let executor = WebWorkerTaskExecutor(numberOfThreads: 1) + + let task = Task(executorPreference: executor) { + await Task.yield() + _ = try await JSPromise.resolve(1).value + return isMainThread() + } + let taskRunOnMainThread = try await task.value + XCTAssertFalse(taskRunOnMainThread) + + executor.terminate() + } + + func testSleepInsideTask() async throws { + let executor = WebWorkerTaskExecutor(numberOfThreads: 1) + + let task = Task(executorPreference: executor) { + XCTAssertFalse(isMainThread()) + try await Task.sleep(nanoseconds: 10) + XCTAssertFalse(isMainThread()) + try await Task.sleep(nanoseconds: 100) + XCTAssertFalse(isMainThread()) + let clock = ContinuousClock() + try await clock.sleep(for: .milliseconds(10)) + return isMainThread() + } + let taskRunOnMainThread = try await task.value + XCTAssertFalse(taskRunOnMainThread) + + executor.terminate() + } + + func testMainActorRun() async { + let executor = WebWorkerTaskExecutor(numberOfThreads: 1) + + let task = Task(executorPreference: executor) { + await MainActor.run { + return isMainThread() + } + } + let taskRunOnMainThread = await task.value + // FIXME: The block passed to `MainActor.run` should run on the main thread + // XCTAssertTrue(taskRunOnMainThread) + XCTAssertFalse(taskRunOnMainThread) + // After the task is done, back to the main thread + XCTAssertTrue(isMainThread()) + + executor.terminate() + } + + func testTaskGroupRunOnSameThread() async { + let executor = WebWorkerTaskExecutor(numberOfThreads: 3) + + let mainTid = swjs_get_worker_thread_id() + await withTaskExecutorPreference(executor) { + let tid = swjs_get_worker_thread_id() + await withTaskGroup(of: Int32.self) { group in + group.addTask { + return swjs_get_worker_thread_id() + } + + group.addTask { + return swjs_get_worker_thread_id() + } + + for await id in group { + XCTAssertEqual(id, tid) + XCTAssertNotEqual(id, mainTid) + } + } + } + + executor.terminate() + } + + func testTaskGroupRunOnDifferentThreads() async { + let executor = WebWorkerTaskExecutor(numberOfThreads: 2) + + struct Item: Hashable { + let type: String + let tid: Int32 + let value: Int + init(_ type: String, _ tid: Int32, _ value: Int) { + self.type = type + self.tid = tid + self.value = value + } + } + + await withTaskGroup(of: Item.self) { group in + group.addTask { + let tid = swjs_get_worker_thread_id() + return Item("main", tid, 0) + } + + let numberOffloadedTasks = 10 + for i in 0.. { process.exit(1); } +Error.stackTraceLimit = Infinity; + startWasiTask(process.argv[2]).catch(handleExitOrError); From 91258e20ac7277a052e3be86e15cb9c29afbb2e3 Mon Sep 17 00:00:00 2001 From: Yuta Saito Date: Sat, 6 Jul 2024 10:30:42 +0000 Subject: [PATCH 02/12] Revert double free detection --- Runtime/src/object-heap.ts | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/Runtime/src/object-heap.ts b/Runtime/src/object-heap.ts index 98281b5ca..d59f5101e 100644 --- a/Runtime/src/object-heap.ts +++ b/Runtime/src/object-heap.ts @@ -4,7 +4,6 @@ import { ref } from "./types.js"; type SwiftRuntimeHeapEntry = { id: number; rc: number; - released: boolean; }; export class SwiftRuntimeHeap { private _heapValueById: Map; @@ -16,11 +15,7 @@ export class SwiftRuntimeHeap { this._heapValueById.set(0, globalVariable); this._heapEntryByValue = new Map(); - this._heapEntryByValue.set(globalVariable, { - id: 0, - rc: 1, - released: false, - }); + this._heapEntryByValue.set(globalVariable, { id: 0, rc: 1 }); // Note: 0 is preserved for global this._heapNextKey = 1; @@ -34,22 +29,13 @@ export class SwiftRuntimeHeap { } const id = this._heapNextKey++; this._heapValueById.set(id, value); - this._heapEntryByValue.set(value, { id: id, rc: 1, released: false }); + this._heapEntryByValue.set(value, { id: id, rc: 1 }); return id; } release(ref: ref) { const value = this._heapValueById.get(ref); const entry = this._heapEntryByValue.get(value)!; - if (entry.released) { - console.error( - "Double release detected for reference " + ref, - entry - ); - throw new ReferenceError( - "Double release detected for reference " + ref - ); - } entry.rc--; if (entry.rc != 0) return; From e3181b089191801fe135a74e765287812b8e10ab Mon Sep 17 00:00:00 2001 From: Yuta Saito Date: Sat, 6 Jul 2024 10:58:21 +0000 Subject: [PATCH 03/12] Revert debug changes in JavaScriptEventLoop --- .../JavaScriptEventLoop.swift | 24 +++---------------- Sources/JavaScriptEventLoop/JobQueue.swift | 2 +- 2 files changed, 4 insertions(+), 22 deletions(-) diff --git a/Sources/JavaScriptEventLoop/JavaScriptEventLoop.swift b/Sources/JavaScriptEventLoop/JavaScriptEventLoop.swift index 4ba186df5..e1e023e7f 100644 --- a/Sources/JavaScriptEventLoop/JavaScriptEventLoop.swift +++ b/Sources/JavaScriptEventLoop/JavaScriptEventLoop.swift @@ -1,7 +1,6 @@ import JavaScriptKit import _CJavaScriptEventLoop import _CJavaScriptKit -import Synchronization // NOTE: `@available` annotations are semantically wrong, but they make it easier to develop applications targeting WebAssembly in Xcode. @@ -143,7 +142,6 @@ public final class JavaScriptEventLoop: SerialExecutor, @unchecked Sendable { typealias swift_task_enqueueMainExecutor_hook_Fn = @convention(thin) (UnownedJob, swift_task_enqueueMainExecutor_original) -> Void let swift_task_enqueueMainExecutor_hook_impl: swift_task_enqueueMainExecutor_hook_Fn = { job, original in - assert(false) JavaScriptEventLoop.shared.unsafeEnqueue(job) } swift_task_enqueueMainExecutor_hook = unsafeBitCast(swift_task_enqueueMainExecutor_hook_impl, to: UnsafeMutableRawPointer?.self) @@ -151,8 +149,9 @@ public final class JavaScriptEventLoop: SerialExecutor, @unchecked Sendable { didInstallGlobalExecutor = true } - func enqueue(_ job: UnownedJob, withDelay nanoseconds: UInt64) { - enqueue(withDelay: nanoseconds, job: { + private func enqueue(_ job: UnownedJob, withDelay nanoseconds: UInt64) { + let milliseconds = nanoseconds / 1_000_000 + setTimeout(Double(milliseconds), { #if compiler(>=5.9) job.runSynchronously(on: self.asUnownedSerialExecutor()) #else @@ -161,23 +160,6 @@ public final class JavaScriptEventLoop: SerialExecutor, @unchecked Sendable { }) } - func enqueue(withDelay nanoseconds: UInt64, job: @escaping () -> Void) { - let milliseconds = nanoseconds / 1_000_000 - setTimeout(Double(milliseconds), job) - } - - func enqueue( - withDelay seconds: Int64, _ nanoseconds: Int64, - _ toleranceSec: Int64, _ toleranceNSec: Int64, - _ clock: Int32, job: @escaping () -> Void - ) { - var nowSec: Int64 = 0 - var nowNSec: Int64 = 0 - swift_get_time(&nowSec, &nowNSec, clock) - let delayNanosec = (seconds - nowSec) * 1_000_000_000 + (nanoseconds - nowNSec) - enqueue(withDelay: delayNanosec <= 0 ? 0 : UInt64(delayNanosec), job: job) - } - private func unsafeEnqueue(_ job: UnownedJob) { insertJobQueue(job: job) } diff --git a/Sources/JavaScriptEventLoop/JobQueue.swift b/Sources/JavaScriptEventLoop/JobQueue.swift index c6eb48b79..5ad71f0a0 100644 --- a/Sources/JavaScriptEventLoop/JobQueue.swift +++ b/Sources/JavaScriptEventLoop/JobQueue.swift @@ -9,7 +9,7 @@ import _CJavaScriptEventLoop @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) struct QueueState: Sendable { fileprivate var headJob: UnownedJob? = nil - var isSpinning: Bool = false + fileprivate var isSpinning: Bool = false } @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) From 403511475c32023cf28274548c8592d374962fc7 Mon Sep 17 00:00:00 2001 From: Yuta Saito Date: Sat, 6 Jul 2024 11:03:20 +0000 Subject: [PATCH 04/12] make regenerate_swiftpm_resources --- Sources/JavaScriptKit/Runtime/index.js | 73 +++++++++++++++++++++++++ Sources/JavaScriptKit/Runtime/index.mjs | 73 +++++++++++++++++++++++++ 2 files changed, 146 insertions(+) diff --git a/Sources/JavaScriptKit/Runtime/index.js b/Sources/JavaScriptKit/Runtime/index.js index 2aaabce65..63667d300 100644 --- a/Sources/JavaScriptKit/Runtime/index.js +++ b/Sources/JavaScriptKit/Runtime/index.js @@ -205,6 +205,7 @@ this._instance = null; this._memory = null; this._closureDeallocator = null; + this.tid = null; this.options = options || {}; } setInstance(instance) { @@ -240,6 +241,31 @@ throw error; } } + /** + * Start a new thread with the given `tid` and `startArg`, which + * is forwarded to the `wasi_thread_start` function. + * This function is expected to be called from the spawned Web Worker thread. + */ + startThread(tid, startArg) { + this.tid = tid; + const instance = this.instance; + try { + if (typeof instance.exports.wasi_thread_start === "function") { + instance.exports.wasi_thread_start(tid, startArg); + } + else { + throw new Error(`The WebAssembly module is not built for wasm32-unknown-wasip1-threads target.`); + } + } + catch (error) { + if (error instanceof UnsafeEventLoopYield) { + // Ignore the error + return; + } + // Rethrow other errors + throw error; + } + } get instance() { if (!this._instance) throw new Error("WebAssembly instance is not set yet"); @@ -464,6 +490,53 @@ swjs_unsafe_event_loop_yield: () => { throw new UnsafeEventLoopYield(); }, + // This function is called by WebWorkerTaskExecutor on Web Worker thread. + swjs_send_job_to_main_thread: (unowned_job) => { + const threadChannel = this.options.threadChannel; + if (threadChannel && "wakeUpMainThread" in threadChannel) { + threadChannel.wakeUpMainThread(unowned_job); + } + else { + throw new Error("wakeUpMainThread is not set in options given to SwiftRuntime. Please set it to send jobs to the main thread."); + } + }, + swjs_listen_wake_event_from_main_thread: () => { + // After the thread is started, + const swjs_wake_worker_thread = this.exports.swjs_wake_worker_thread; + const threadChannel = this.options.threadChannel; + if (threadChannel && + "listenWakeEventFromMainThread" in threadChannel) { + threadChannel.listenWakeEventFromMainThread(() => { + swjs_wake_worker_thread(); + }); + } + else { + throw new Error("listenWakeEventFromMainThread is not set in options given to SwiftRuntime. Please set it to listen to wake events from the main thread."); + } + }, + swjs_wake_up_worker_thread: (tid) => { + const threadChannel = this.options.threadChannel; + if (threadChannel && "wakeUpWorkerThread" in threadChannel) { + threadChannel.wakeUpWorkerThread(tid); + } + else { + throw new Error("wakeUpWorkerThread is not set in options given to SwiftRuntime. Please set it to wake up worker threads."); + } + }, + swjs_listen_main_job_from_worker_thread: (tid) => { + const threadChannel = this.options.threadChannel; + if (threadChannel && + "listenMainJobFromWorkerThread" in threadChannel) { + threadChannel.listenMainJobFromWorkerThread(tid, this.exports.swjs_enqueue_main_job_from_worker); + } + else { + throw new Error("listenMainJobFromWorkerThread is not set in options given to SwiftRuntime. Please set it to listen to jobs from worker threads."); + } + }, + swjs_get_worker_thread_id: () => { + // Main thread's tid is always -1 + return this.tid || -1; + }, }; } } diff --git a/Sources/JavaScriptKit/Runtime/index.mjs b/Sources/JavaScriptKit/Runtime/index.mjs index 52de118b5..2f0558323 100644 --- a/Sources/JavaScriptKit/Runtime/index.mjs +++ b/Sources/JavaScriptKit/Runtime/index.mjs @@ -199,6 +199,7 @@ class SwiftRuntime { this._instance = null; this._memory = null; this._closureDeallocator = null; + this.tid = null; this.options = options || {}; } setInstance(instance) { @@ -234,6 +235,31 @@ class SwiftRuntime { throw error; } } + /** + * Start a new thread with the given `tid` and `startArg`, which + * is forwarded to the `wasi_thread_start` function. + * This function is expected to be called from the spawned Web Worker thread. + */ + startThread(tid, startArg) { + this.tid = tid; + const instance = this.instance; + try { + if (typeof instance.exports.wasi_thread_start === "function") { + instance.exports.wasi_thread_start(tid, startArg); + } + else { + throw new Error(`The WebAssembly module is not built for wasm32-unknown-wasip1-threads target.`); + } + } + catch (error) { + if (error instanceof UnsafeEventLoopYield) { + // Ignore the error + return; + } + // Rethrow other errors + throw error; + } + } get instance() { if (!this._instance) throw new Error("WebAssembly instance is not set yet"); @@ -458,6 +484,53 @@ class SwiftRuntime { swjs_unsafe_event_loop_yield: () => { throw new UnsafeEventLoopYield(); }, + // This function is called by WebWorkerTaskExecutor on Web Worker thread. + swjs_send_job_to_main_thread: (unowned_job) => { + const threadChannel = this.options.threadChannel; + if (threadChannel && "wakeUpMainThread" in threadChannel) { + threadChannel.wakeUpMainThread(unowned_job); + } + else { + throw new Error("wakeUpMainThread is not set in options given to SwiftRuntime. Please set it to send jobs to the main thread."); + } + }, + swjs_listen_wake_event_from_main_thread: () => { + // After the thread is started, + const swjs_wake_worker_thread = this.exports.swjs_wake_worker_thread; + const threadChannel = this.options.threadChannel; + if (threadChannel && + "listenWakeEventFromMainThread" in threadChannel) { + threadChannel.listenWakeEventFromMainThread(() => { + swjs_wake_worker_thread(); + }); + } + else { + throw new Error("listenWakeEventFromMainThread is not set in options given to SwiftRuntime. Please set it to listen to wake events from the main thread."); + } + }, + swjs_wake_up_worker_thread: (tid) => { + const threadChannel = this.options.threadChannel; + if (threadChannel && "wakeUpWorkerThread" in threadChannel) { + threadChannel.wakeUpWorkerThread(tid); + } + else { + throw new Error("wakeUpWorkerThread is not set in options given to SwiftRuntime. Please set it to wake up worker threads."); + } + }, + swjs_listen_main_job_from_worker_thread: (tid) => { + const threadChannel = this.options.threadChannel; + if (threadChannel && + "listenMainJobFromWorkerThread" in threadChannel) { + threadChannel.listenMainJobFromWorkerThread(tid, this.exports.swjs_enqueue_main_job_from_worker); + } + else { + throw new Error("listenMainJobFromWorkerThread is not set in options given to SwiftRuntime. Please set it to listen to jobs from worker threads."); + } + }, + swjs_get_worker_thread_id: () => { + // Main thread's tid is always -1 + return this.tid || -1; + }, }; } } From 56a81a4cf2fc20935624cadbdf819d63def4957a Mon Sep 17 00:00:00 2001 From: Yuta Saito Date: Sat, 6 Jul 2024 13:27:53 +0000 Subject: [PATCH 05/12] Add doc comment --- Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift b/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift index 4b9b3215a..d2b458cc4 100644 --- a/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift +++ b/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift @@ -441,6 +441,8 @@ func _swjs_enqueue_main_job_from_worker(_ job: UnownedJob) { JavaScriptEventLoop.shared.enqueue(ExecutorJob(job)) } +/// Wake up the worker thread. +/// This function is called when a job is enqueued from the main thread to a worker thread. @_expose(wasm, "swjs_wake_worker_thread") func _swjs_wake_worker_thread() { WebWorkerTaskExecutor.Worker.currentThread!.run() From 0ff82155e330a7dcd63c32f75c8fc22233b408e2 Mon Sep 17 00:00:00 2001 From: Yuta Saito Date: Sat, 6 Jul 2024 13:34:31 +0000 Subject: [PATCH 06/12] Transfer placeholder data to the worker thread For now, the data is not used in the worker thread, but it can be used in the future. --- Runtime/src/index.ts | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/Runtime/src/index.ts b/Runtime/src/index.ts index f5cfb1ba6..493a266d9 100644 --- a/Runtime/src/index.ts +++ b/Runtime/src/index.ts @@ -22,13 +22,11 @@ import { Memory } from "./memory.js"; * threadChannel: { * wakeUpMainThread: (unownedJob) => { * // Send the job to the main thread - * postMessage({ type: "job", unownedJob }); + * postMessage(unownedJob); * }, * listenWakeEventFromMainThread: (listener) => { * self.onmessage = (event) => { - * if (event.data.type === "wake") { - * listener(); - * } + * listener(event.data); * }; * } * } @@ -38,14 +36,12 @@ import { Memory } from "./memory.js"; * const worker = new Worker("worker.js"); * const runtime = new SwiftRuntime({ * threadChannel: { - * wakeUpWorkerThread: (tid) => { - * worker.postMessage({ type: "wake" }); + * wakeUpWorkerThread: (tid, data) => { + * worker.postMessage(data); * }, * listenMainJobFromWorkerThread: (tid, listener) => { * worker.onmessage = (event) => { - * if (event.data.type === "job") { - * listener(event.data.unownedJob); - * } + listener(event.data); * }; * } * } @@ -65,16 +61,17 @@ export type SwiftRuntimeThreadChannel = * to the wake event from the main thread sent by `wakeUpWorkerThread`. * The passed listener function awakes the Web Worker thread. */ - listenWakeEventFromMainThread: (listener: () => void) => void; + listenWakeEventFromMainThread: (listener: (data: unknown) => void) => void; } | { /** * This function is expected to be set in the main thread and called * when the main thread sends a wake event to the Web Worker thread. * The `tid` is the thread ID of the worker thread to be woken up. + * The `data` is the data to be sent to the worker thread. * The wake event is expected to be listened by `listenWakeEventFromMainThread`. */ - wakeUpWorkerThread: (tid: number) => void; + wakeUpWorkerThread: (tid: number, data: unknown) => void; /** * This function is expected to be set in the main thread and shuold listen * to the main job sent by `wakeUpMainThread` from the worker thread. @@ -607,7 +604,8 @@ export class SwiftRuntime { swjs_wake_up_worker_thread: (tid) => { const threadChannel = this.options.threadChannel; if (threadChannel && "wakeUpWorkerThread" in threadChannel) { - threadChannel.wakeUpWorkerThread(tid); + // Currently, the data is not used, but it can be used in the future. + threadChannel.wakeUpWorkerThread(tid, {}); } else { throw new Error( "wakeUpWorkerThread is not set in options given to SwiftRuntime. Please set it to wake up worker threads." From 6992c32de0455e2d7e55ce48fda26d493fde6ce8 Mon Sep 17 00:00:00 2001 From: Yuta Saito Date: Sat, 6 Jul 2024 13:37:08 +0000 Subject: [PATCH 07/12] Update nightly toolchain --- .github/workflows/test.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2a4625d3c..edbc1e7b8 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -23,16 +23,16 @@ jobs: - { os: ubuntu-20.04, toolchain: wasm-5.9.1-RELEASE, wasi-backend: MicroWASI } - { os: ubuntu-20.04, toolchain: wasm-5.10.0-RELEASE, wasi-backend: MicroWASI } - os: ubuntu-22.04 - toolchain: DEVELOPMENT-SNAPSHOT-2024-05-01-a + toolchain: DEVELOPMENT-SNAPSHOT-2024-06-13-a swift-sdk: - id: DEVELOPMENT-SNAPSHOT-2024-05-25-a-wasm32-unknown-wasi - download-url: "https://github.com/swiftwasm/swift/releases/download/swift-wasm-DEVELOPMENT-SNAPSHOT-2024-05-25-a/swift-wasm-DEVELOPMENT-SNAPSHOT-2024-05-25-a-wasm32-unknown-wasi.artifactbundle.zip" + id: DEVELOPMENT-SNAPSHOT-2024-06-14-a-wasm32-unknown-wasi + download-url: "https://github.com/swiftwasm/swift/releases/download/swift-wasm-DEVELOPMENT-SNAPSHOT-2024-06-14-a/swift-wasm-DEVELOPMENT-SNAPSHOT-2024-06-14-a-wasm32-unknown-wasi.artifactbundle.zip" wasi-backend: Node - os: ubuntu-22.04 - toolchain: DEVELOPMENT-SNAPSHOT-2024-05-01-a + toolchain: DEVELOPMENT-SNAPSHOT-2024-06-13-a swift-sdk: - id: DEVELOPMENT-SNAPSHOT-2024-05-25-a-wasm32-unknown-wasip1-threads - download-url: "https://github.com/swiftwasm/swift/releases/download/swift-wasm-DEVELOPMENT-SNAPSHOT-2024-05-25-a/swift-wasm-DEVELOPMENT-SNAPSHOT-2024-05-25-a-wasm32-unknown-wasip1-threads.artifactbundle.zip" + id: DEVELOPMENT-SNAPSHOT-2024-06-14-a-wasm32-unknown-wasip1-threads + download-url: "https://github.com/swiftwasm/swift/releases/download/swift-wasm-DEVELOPMENT-SNAPSHOT-2024-06-14-a/swift-wasm-DEVELOPMENT-SNAPSHOT-2024-06-14-a-wasm32-unknown-wasip1-threads.artifactbundle.zip" wasi-backend: Node runs-on: ${{ matrix.entry.os }} From 8b9ed896b5f844a40da26b0a1ff532e2bf17893d Mon Sep 17 00:00:00 2001 From: Yuta Saito Date: Sat, 6 Jul 2024 13:41:15 +0000 Subject: [PATCH 08/12] make regenerate_swiftpm_resources --- Sources/JavaScriptKit/Runtime/index.js | 3 ++- Sources/JavaScriptKit/Runtime/index.mjs | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Sources/JavaScriptKit/Runtime/index.js b/Sources/JavaScriptKit/Runtime/index.js index 63667d300..4498ee773 100644 --- a/Sources/JavaScriptKit/Runtime/index.js +++ b/Sources/JavaScriptKit/Runtime/index.js @@ -517,7 +517,8 @@ swjs_wake_up_worker_thread: (tid) => { const threadChannel = this.options.threadChannel; if (threadChannel && "wakeUpWorkerThread" in threadChannel) { - threadChannel.wakeUpWorkerThread(tid); + // Currently, the data is not used, but it can be used in the future. + threadChannel.wakeUpWorkerThread(tid, {}); } else { throw new Error("wakeUpWorkerThread is not set in options given to SwiftRuntime. Please set it to wake up worker threads."); diff --git a/Sources/JavaScriptKit/Runtime/index.mjs b/Sources/JavaScriptKit/Runtime/index.mjs index 2f0558323..7b470aaa0 100644 --- a/Sources/JavaScriptKit/Runtime/index.mjs +++ b/Sources/JavaScriptKit/Runtime/index.mjs @@ -511,7 +511,8 @@ class SwiftRuntime { swjs_wake_up_worker_thread: (tid) => { const threadChannel = this.options.threadChannel; if (threadChannel && "wakeUpWorkerThread" in threadChannel) { - threadChannel.wakeUpWorkerThread(tid); + // Currently, the data is not used, but it can be used in the future. + threadChannel.wakeUpWorkerThread(tid, {}); } else { throw new Error("wakeUpWorkerThread is not set in options given to SwiftRuntime. Please set it to wake up worker threads."); From 91611022e028c0fba2487ae67825ad439b734364 Mon Sep 17 00:00:00 2001 From: Yuta Saito Date: Sat, 6 Jul 2024 13:49:19 +0000 Subject: [PATCH 09/12] Fix internal compiler crash on CopyPropagation --- Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift b/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift index d2b458cc4..3b0acae9a 100644 --- a/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift +++ b/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift @@ -308,10 +308,9 @@ public final class WebWorkerTaskExecutor: TaskExecutor { } } - func enqueue(_ job: consuming ExecutorJob) { + func enqueue(_ job: UnownedJob) { precondition(!workers.isEmpty, "No worker threads are available") - let job = UnownedJob(job) // If the current thread is a worker thread, enqueue the job to the current worker. if let worker = Worker.currentThread { worker.enqueue(job) @@ -356,7 +355,7 @@ public final class WebWorkerTaskExecutor: TaskExecutor { /// Enqueue a job to the executor. /// /// NOTE: Called from the Swift Concurrency runtime. - public func enqueue(_ job: consuming ExecutorJob) { + public func enqueue(_ job: UnownedJob) { Self.traceStatsIncrement(\.enqueueExecutor) executor.enqueue(job) } From 3a999510d11c349eeafddd82b8d9a71680927f02 Mon Sep 17 00:00:00 2001 From: Yuta Saito Date: Sat, 6 Jul 2024 15:30:09 +0000 Subject: [PATCH 10/12] Stop blocking the main thread when starting Web Worker threads Seems like blocking the main thread also blocks the Web Worker threads from starting on some browsers. --- .../WebWorkerTaskExecutor.swift | 17 +++++++++---- .../WebWorkerTaskExecutorTests.swift | 24 +++++++++---------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift b/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift index 3b0acae9a..21cee87c9 100644 --- a/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift +++ b/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift @@ -263,7 +263,7 @@ public final class WebWorkerTaskExecutor: TaskExecutor { self.workers = workers } - func start() { + func start(timeout: Duration, checkInterval: Duration) async throws { class Context: @unchecked Sendable { let executor: WebWorkerTaskExecutor.Executor let worker: Worker @@ -293,10 +293,16 @@ public final class WebWorkerTaskExecutor: TaskExecutor { } // Wait until all worker threads are started and wire up messaging channels // between the main thread and workers to notify job enqueuing events each other. + let clock = ContinuousClock() + let workerInitStarted = clock.now for worker in workers { var tid: pid_t repeat { + if workerInitStarted.duration(to: .now) > timeout { + fatalError("Worker thread initialization timeout exceeded (\(timeout))") + } tid = worker.tid.load(ordering: .sequentiallyConsistent) + try await clock.sleep(for: checkInterval) } while tid == 0 swjs_listen_main_job_from_worker_thread(tid) } @@ -330,10 +336,13 @@ public final class WebWorkerTaskExecutor: TaskExecutor { /// Create a new Web Worker task executor. /// - /// - Parameter numberOfThreads: The number of Web Worker threads to spawn. - public init(numberOfThreads: Int) { + /// - Parameters: + /// - numberOfThreads: The number of Web Worker threads to spawn. + /// - timeout: The timeout to wait for all worker threads to be started. + /// - checkInterval: The interval to check if all worker threads are started. + public init(numberOfThreads: Int, timeout: Duration = .seconds(3), checkInterval: Duration = .microseconds(5)) async throws { self.executor = Executor(numberOfThreads: numberOfThreads) - self.executor.start() + try await self.executor.start(timeout: timeout, checkInterval: checkInterval) } /// Terminate child Web Worker threads. diff --git a/Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift b/Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift index e4461620f..94e7635e4 100644 --- a/Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift +++ b/Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift @@ -12,8 +12,8 @@ final class WebWorkerTaskExecutorTests: XCTestCase { WebWorkerTaskExecutor.installGlobalExecutor() } - func testTaskRunOnMainThread() async { - let executor = WebWorkerTaskExecutor(numberOfThreads: 1) + func testTaskRunOnMainThread() async throws { + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) XCTAssertTrue(isMainThread()) @@ -29,15 +29,15 @@ final class WebWorkerTaskExecutorTests: XCTestCase { executor.terminate() } - func testWithPreferenceBlock() async { - let executor = WebWorkerTaskExecutor(numberOfThreads: 1) + func testWithPreferenceBlock() async throws { + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) await withTaskExecutorPreference(executor) { XCTAssertFalse(isMainThread()) } } func testAwaitInsideTask() async throws { - let executor = WebWorkerTaskExecutor(numberOfThreads: 1) + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) let task = Task(executorPreference: executor) { await Task.yield() @@ -51,7 +51,7 @@ final class WebWorkerTaskExecutorTests: XCTestCase { } func testSleepInsideTask() async throws { - let executor = WebWorkerTaskExecutor(numberOfThreads: 1) + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) let task = Task(executorPreference: executor) { XCTAssertFalse(isMainThread()) @@ -69,8 +69,8 @@ final class WebWorkerTaskExecutorTests: XCTestCase { executor.terminate() } - func testMainActorRun() async { - let executor = WebWorkerTaskExecutor(numberOfThreads: 1) + func testMainActorRun() async throws { + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) let task = Task(executorPreference: executor) { await MainActor.run { @@ -87,8 +87,8 @@ final class WebWorkerTaskExecutorTests: XCTestCase { executor.terminate() } - func testTaskGroupRunOnSameThread() async { - let executor = WebWorkerTaskExecutor(numberOfThreads: 3) + func testTaskGroupRunOnSameThread() async throws { + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 3) let mainTid = swjs_get_worker_thread_id() await withTaskExecutorPreference(executor) { @@ -112,8 +112,8 @@ final class WebWorkerTaskExecutorTests: XCTestCase { executor.terminate() } - func testTaskGroupRunOnDifferentThreads() async { - let executor = WebWorkerTaskExecutor(numberOfThreads: 2) + func testTaskGroupRunOnDifferentThreads() async throws { + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 2) struct Item: Hashable { let type: String From 358633c07f7833a2745587458845efa698c38ba8 Mon Sep 17 00:00:00 2001 From: Yuta Saito Date: Sat, 6 Jul 2024 15:37:34 +0000 Subject: [PATCH 11/12] Add termination callback for worker threads --- Runtime/src/index.ts | 12 ++++++++++++ Runtime/src/types.ts | 1 + .../JavaScriptEventLoop/WebWorkerTaskExecutor.swift | 6 ++++++ Sources/JavaScriptKit/Runtime/index.js | 7 +++++++ Sources/JavaScriptKit/Runtime/index.mjs | 7 +++++++ Sources/_CJavaScriptKit/include/_CJavaScriptKit.h | 2 ++ 6 files changed, 35 insertions(+) diff --git a/Runtime/src/index.ts b/Runtime/src/index.ts index 493a266d9..341b2156c 100644 --- a/Runtime/src/index.ts +++ b/Runtime/src/index.ts @@ -80,6 +80,12 @@ export type SwiftRuntimeThreadChannel = tid: number, listener: (unownedJob: number) => void ) => void; + + /** + * This function is expected to be set in the main thread and called + * when the worker thread is terminated. + */ + terminateWorkerThread?: (tid: number) => void; }; export type SwiftRuntimeOptions = { @@ -627,6 +633,12 @@ export class SwiftRuntime { ); } }, + swjs_terminate_worker_thread: (tid) => { + const threadChannel = this.options.threadChannel; + if (threadChannel && "terminateWorkerThread" in threadChannel) { + threadChannel.terminateWorkerThread?.(tid); + } // Otherwise, just ignore the termination request + }, swjs_get_worker_thread_id: () => { // Main thread's tid is always -1 return this.tid || -1; diff --git a/Runtime/src/types.ts b/Runtime/src/types.ts index ed61555a8..9aa36e96f 100644 --- a/Runtime/src/types.ts +++ b/Runtime/src/types.ts @@ -110,6 +110,7 @@ export interface ImportedFunctions { swjs_listen_wake_event_from_main_thread: () => void; swjs_wake_up_worker_thread: (tid: number) => void; swjs_listen_main_job_from_worker_thread: (tid: number) => void; + swjs_terminate_worker_thread: (tid: number) => void; swjs_get_worker_thread_id: () => number; } diff --git a/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift b/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift index 21cee87c9..9dd52bbf3 100644 --- a/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift +++ b/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift @@ -245,6 +245,12 @@ public final class WebWorkerTaskExecutor: TaskExecutor { func terminate() { trace("Worker.terminate") state.store(.terminated, ordering: .sequentiallyConsistent) + let tid = self.tid.load(ordering: .sequentiallyConsistent) + guard tid != 0 else { + // The worker is not started yet. + return + } + swjs_terminate_worker_thread(tid) } } diff --git a/Sources/JavaScriptKit/Runtime/index.js b/Sources/JavaScriptKit/Runtime/index.js index 4498ee773..4a5e8431a 100644 --- a/Sources/JavaScriptKit/Runtime/index.js +++ b/Sources/JavaScriptKit/Runtime/index.js @@ -534,6 +534,13 @@ throw new Error("listenMainJobFromWorkerThread is not set in options given to SwiftRuntime. Please set it to listen to jobs from worker threads."); } }, + swjs_terminate_worker_thread: (tid) => { + var _a; + const threadChannel = this.options.threadChannel; + if (threadChannel && "terminateWorkerThread" in threadChannel) { + (_a = threadChannel.terminateWorkerThread) === null || _a === void 0 ? void 0 : _a.call(threadChannel, tid); + } // Otherwise, just ignore the termination request + }, swjs_get_worker_thread_id: () => { // Main thread's tid is always -1 return this.tid || -1; diff --git a/Sources/JavaScriptKit/Runtime/index.mjs b/Sources/JavaScriptKit/Runtime/index.mjs index 7b470aaa0..1a1830795 100644 --- a/Sources/JavaScriptKit/Runtime/index.mjs +++ b/Sources/JavaScriptKit/Runtime/index.mjs @@ -528,6 +528,13 @@ class SwiftRuntime { throw new Error("listenMainJobFromWorkerThread is not set in options given to SwiftRuntime. Please set it to listen to jobs from worker threads."); } }, + swjs_terminate_worker_thread: (tid) => { + var _a; + const threadChannel = this.options.threadChannel; + if (threadChannel && "terminateWorkerThread" in threadChannel) { + (_a = threadChannel.terminateWorkerThread) === null || _a === void 0 ? void 0 : _a.call(threadChannel, tid); + } // Otherwise, just ignore the termination request + }, swjs_get_worker_thread_id: () => { // Main thread's tid is always -1 return this.tid || -1; diff --git a/Sources/_CJavaScriptKit/include/_CJavaScriptKit.h b/Sources/_CJavaScriptKit/include/_CJavaScriptKit.h index dd7658649..188f6b5db 100644 --- a/Sources/_CJavaScriptKit/include/_CJavaScriptKit.h +++ b/Sources/_CJavaScriptKit/include/_CJavaScriptKit.h @@ -303,6 +303,8 @@ IMPORT_JS_FUNCTION(swjs_wake_up_worker_thread, void, (int tid)) IMPORT_JS_FUNCTION(swjs_listen_main_job_from_worker_thread, void, (int tid)) +IMPORT_JS_FUNCTION(swjs_terminate_worker_thread, void, (int tid)) + IMPORT_JS_FUNCTION(swjs_get_worker_thread_id, int, (void)) /// MARK: - thread local storage From a4daecdede926f532cd7edfddb068ede1b6e26f2 Mon Sep 17 00:00:00 2001 From: Yuta Saito Date: Sat, 6 Jul 2024 16:49:20 +0000 Subject: [PATCH 12/12] Generalize the thread channel functions not limited to wake-up events --- IntegrationTests/lib.js | 12 +- Runtime/src/index.ts | 151 ++++++++++-------- Runtime/src/types.ts | 4 +- .../WebWorkerTaskExecutor.swift | 6 +- Sources/JavaScriptKit/Runtime/index.js | 76 +++++---- Sources/JavaScriptKit/Runtime/index.mjs | 76 +++++---- .../_CJavaScriptKit/include/_CJavaScriptKit.h | 4 +- 7 files changed, 181 insertions(+), 148 deletions(-) diff --git a/IntegrationTests/lib.js b/IntegrationTests/lib.js index 6f6ea4139..ed66c7e86 100644 --- a/IntegrationTests/lib.js +++ b/IntegrationTests/lib.js @@ -79,8 +79,8 @@ export async function startWasiChildThread(event) { const swift = new SwiftRuntime({ sharedMemory: true, threadChannel: { - wakeUpMainThread: parentPort.postMessage.bind(parentPort), - listenWakeEventFromMainThread: (listener) => { + postMessageToMainThread: parentPort.postMessage.bind(parentPort), + listenMessageFromMainThread: (listener) => { parentPort.on("message", listener) } } @@ -138,9 +138,9 @@ class ThreadRegistry { return this.workers.get(tid); } - wakeUpWorkerThread(tid) { + wakeUpWorkerThread(tid, message) { const worker = this.workers.get(tid); - worker.postMessage(null); + worker.postMessage(message); } } @@ -159,8 +159,8 @@ export const startWasiTask = async (wasmPath, wasiConstructorKey = selectWASIBac const swift = new SwiftRuntime({ sharedMemory, threadChannel: { - wakeUpWorkerThread: threadRegistry.wakeUpWorkerThread.bind(threadRegistry), - listenMainJobFromWorkerThread: (tid, listener) => { + postMessageToWorkerThread: threadRegistry.wakeUpWorkerThread.bind(threadRegistry), + listenMessageFromWorkerThread: (tid, listener) => { const worker = threadRegistry.worker(tid); worker.on("message", listener); } diff --git a/Runtime/src/index.ts b/Runtime/src/index.ts index 341b2156c..4cf0ee65a 100644 --- a/Runtime/src/index.ts +++ b/Runtime/src/index.ts @@ -10,21 +10,27 @@ import { import * as JSValue from "./js-value.js"; import { Memory } from "./memory.js"; +type MainToWorkerMessage = { + type: "wake"; +}; + +type WorkerToMainMessage = { + type: "job"; + data: number; +}; + /** * A thread channel is a set of functions that are used to communicate between * the main thread and the worker thread. The main thread and the worker thread - * can send jobs to each other using these functions. + * can send messages to each other using these functions. * * @example * ```javascript * // worker.js * const runtime = new SwiftRuntime({ * threadChannel: { - * wakeUpMainThread: (unownedJob) => { - * // Send the job to the main thread - * postMessage(unownedJob); - * }, - * listenWakeEventFromMainThread: (listener) => { + * postMessageToMainThread: postMessage, + * listenMessageFromMainThread: (listener) => { * self.onmessage = (event) => { * listener(event.data); * }; @@ -36,10 +42,10 @@ import { Memory } from "./memory.js"; * const worker = new Worker("worker.js"); * const runtime = new SwiftRuntime({ * threadChannel: { - * wakeUpWorkerThread: (tid, data) => { + * postMessageToWorkerThread: (tid, data) => { * worker.postMessage(data); * }, - * listenMainJobFromWorkerThread: (tid, listener) => { + * listenMessageFromWorkerThread: (tid, listener) => { * worker.onmessage = (event) => { listener(event.data); * }; @@ -50,40 +56,42 @@ import { Memory } from "./memory.js"; */ export type SwiftRuntimeThreadChannel = | { - /** - * This function is called when the Web Worker thread sends a job to the main thread. - * The unownedJob is the pointer to the unowned job object in the Web Worker thread. - * The job submitted by this function expected to be listened by `listenMainJobFromWorkerThread`. - */ - wakeUpMainThread: (unownedJob: number) => void; + /** + * This function is used to send messages from the worker thread to the main thread. + * The message submitted by this function is expected to be listened by `listenMessageFromWorkerThread`. + * @param message The message to be sent to the main thread. + */ + postMessageToMainThread: (message: WorkerToMainMessage) => void; /** * This function is expected to be set in the worker thread and should listen - * to the wake event from the main thread sent by `wakeUpWorkerThread`. - * The passed listener function awakes the Web Worker thread. + * to messages from the main thread sent by `postMessageToWorkerThread`. + * @param listener The listener function to be called when a message is received from the main thread. */ - listenWakeEventFromMainThread: (listener: (data: unknown) => void) => void; + listenMessageFromMainThread: (listener: (message: MainToWorkerMessage) => void) => void; } | { /** - * This function is expected to be set in the main thread and called - * when the main thread sends a wake event to the Web Worker thread. - * The `tid` is the thread ID of the worker thread to be woken up. - * The `data` is the data to be sent to the worker thread. - * The wake event is expected to be listened by `listenWakeEventFromMainThread`. + * This function is expected to be set in the main thread. + * The message submitted by this function is expected to be listened by `listenMessageFromMainThread`. + * @param tid The thread ID of the worker thread. + * @param message The message to be sent to the worker thread. */ - wakeUpWorkerThread: (tid: number, data: unknown) => void; + postMessageToWorkerThread: (tid: number, message: MainToWorkerMessage) => void; /** * This function is expected to be set in the main thread and shuold listen - * to the main job sent by `wakeUpMainThread` from the worker thread. + * to messsages sent by `postMessageToMainThread` from the worker thread. + * @param tid The thread ID of the worker thread. + * @param listener The listener function to be called when a message is received from the worker thread. */ - listenMainJobFromWorkerThread: ( + listenMessageFromWorkerThread: ( tid: number, - listener: (unownedJob: number) => void + listener: (message: WorkerToMainMessage) => void ) => void; /** * This function is expected to be set in the main thread and called * when the worker thread is terminated. + * @param tid The thread ID of the worker thread. */ terminateWorkerThread?: (tid: number) => void; }; @@ -578,60 +586,49 @@ export class SwiftRuntime { swjs_unsafe_event_loop_yield: () => { throw new UnsafeEventLoopYield(); }, - // This function is called by WebWorkerTaskExecutor on Web Worker thread. swjs_send_job_to_main_thread: (unowned_job) => { - const threadChannel = this.options.threadChannel; - if (threadChannel && "wakeUpMainThread" in threadChannel) { - threadChannel.wakeUpMainThread(unowned_job); - } else { - throw new Error( - "wakeUpMainThread is not set in options given to SwiftRuntime. Please set it to send jobs to the main thread." - ); - } + this.postMessageToMainThread({ type: "job", data: unowned_job }); }, - swjs_listen_wake_event_from_main_thread: () => { - // After the thread is started, - const swjs_wake_worker_thread = - this.exports.swjs_wake_worker_thread; + swjs_listen_message_from_main_thread: () => { const threadChannel = this.options.threadChannel; - if ( - threadChannel && - "listenWakeEventFromMainThread" in threadChannel - ) { - threadChannel.listenWakeEventFromMainThread(() => { - swjs_wake_worker_thread(); - }); - } else { + if (!(threadChannel && "listenMessageFromMainThread" in threadChannel)) { throw new Error( - "listenWakeEventFromMainThread is not set in options given to SwiftRuntime. Please set it to listen to wake events from the main thread." + "listenMessageFromMainThread is not set in options given to SwiftRuntime. Please set it to listen to wake events from the main thread." ); } + threadChannel.listenMessageFromMainThread((message) => { + switch (message.type) { + case "wake": + this.exports.swjs_wake_worker_thread(); + break; + default: + const unknownMessage: never = message.type; + throw new Error(`Unknown message type: ${unknownMessage}`); + } + }); }, swjs_wake_up_worker_thread: (tid) => { - const threadChannel = this.options.threadChannel; - if (threadChannel && "wakeUpWorkerThread" in threadChannel) { - // Currently, the data is not used, but it can be used in the future. - threadChannel.wakeUpWorkerThread(tid, {}); - } else { - throw new Error( - "wakeUpWorkerThread is not set in options given to SwiftRuntime. Please set it to wake up worker threads." - ); - } + this.postMessageToWorkerThread(tid, { type: "wake" }); }, - swjs_listen_main_job_from_worker_thread: (tid) => { + swjs_listen_message_from_worker_thread: (tid) => { const threadChannel = this.options.threadChannel; - if ( - threadChannel && - "listenMainJobFromWorkerThread" in threadChannel - ) { - threadChannel.listenMainJobFromWorkerThread( - tid, this.exports.swjs_enqueue_main_job_from_worker, - ); - } else { + if (!(threadChannel && "listenMessageFromWorkerThread" in threadChannel)) { throw new Error( - "listenMainJobFromWorkerThread is not set in options given to SwiftRuntime. Please set it to listen to jobs from worker threads." + "listenMessageFromWorkerThread is not set in options given to SwiftRuntime. Please set it to listen to jobs from worker threads." ); } + threadChannel.listenMessageFromWorkerThread( + tid, (message) => { + switch (message.type) { + case "job": + this.exports.swjs_enqueue_main_job_from_worker(message.data); + break; + default: + const unknownMessage: never = message.type; + throw new Error(`Unknown message type: ${unknownMessage}`); + } + }, + ); }, swjs_terminate_worker_thread: (tid) => { const threadChannel = this.options.threadChannel; @@ -645,6 +642,26 @@ export class SwiftRuntime { }, }; } + + private postMessageToMainThread(message: WorkerToMainMessage) { + const threadChannel = this.options.threadChannel; + if (!(threadChannel && "postMessageToMainThread" in threadChannel)) { + throw new Error( + "postMessageToMainThread is not set in options given to SwiftRuntime. Please set it to send messages to the main thread." + ); + } + threadChannel.postMessageToMainThread(message); + } + + private postMessageToWorkerThread(tid: number, message: MainToWorkerMessage) { + const threadChannel = this.options.threadChannel; + if (!(threadChannel && "postMessageToWorkerThread" in threadChannel)) { + throw new Error( + "postMessageToWorkerThread is not set in options given to SwiftRuntime. Please set it to send messages to worker threads." + ); + } + threadChannel.postMessageToWorkerThread(tid, message); + } } /// This error is thrown when yielding event loop control from `swift_task_asyncMainDrainQueue` diff --git a/Runtime/src/types.ts b/Runtime/src/types.ts index 9aa36e96f..dd638acc5 100644 --- a/Runtime/src/types.ts +++ b/Runtime/src/types.ts @@ -107,9 +107,9 @@ export interface ImportedFunctions { swjs_i64_to_bigint_slow(lower: number, upper: number, signed: bool): ref; swjs_unsafe_event_loop_yield: () => void; swjs_send_job_to_main_thread: (unowned_job: number) => void; - swjs_listen_wake_event_from_main_thread: () => void; + swjs_listen_message_from_main_thread: () => void; swjs_wake_up_worker_thread: (tid: number) => void; - swjs_listen_main_job_from_worker_thread: (tid: number) => void; + swjs_listen_message_from_worker_thread: (tid: number) => void; swjs_terminate_worker_thread: (tid: number) => void; swjs_get_worker_thread_id: () => number; } diff --git a/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift b/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift index 9dd52bbf3..d1f7f64e2 100644 --- a/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift +++ b/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift @@ -188,10 +188,10 @@ public final class WebWorkerTaskExecutor: TaskExecutor { // `self` outlives the worker thread because `Executor` retains the worker. // Thus it's safe to store the reference without extra retain. swjs_thread_local_task_executor_worker = Unmanaged.passUnretained(self).toOpaque() - // Start listening wake-up events from the main thread. + // Start listening events from the main thread. // This must be called after setting the swjs_thread_local_task_executor_worker // because the event listener enqueues jobs to the TLS worker. - swjs_listen_wake_event_from_main_thread() + swjs_listen_message_from_main_thread() // Set the parent executor. parentTaskExecutor = executor // Store the thread ID to the worker. This notifies the main thread that the worker is started. @@ -310,7 +310,7 @@ public final class WebWorkerTaskExecutor: TaskExecutor { tid = worker.tid.load(ordering: .sequentiallyConsistent) try await clock.sleep(for: checkInterval) } while tid == 0 - swjs_listen_main_job_from_worker_thread(tid) + swjs_listen_message_from_worker_thread(tid) } } diff --git a/Sources/JavaScriptKit/Runtime/index.js b/Sources/JavaScriptKit/Runtime/index.js index 4a5e8431a..9d29b4329 100644 --- a/Sources/JavaScriptKit/Runtime/index.js +++ b/Sources/JavaScriptKit/Runtime/index.js @@ -490,49 +490,43 @@ swjs_unsafe_event_loop_yield: () => { throw new UnsafeEventLoopYield(); }, - // This function is called by WebWorkerTaskExecutor on Web Worker thread. swjs_send_job_to_main_thread: (unowned_job) => { - const threadChannel = this.options.threadChannel; - if (threadChannel && "wakeUpMainThread" in threadChannel) { - threadChannel.wakeUpMainThread(unowned_job); - } - else { - throw new Error("wakeUpMainThread is not set in options given to SwiftRuntime. Please set it to send jobs to the main thread."); - } + this.postMessageToMainThread({ type: "job", data: unowned_job }); }, - swjs_listen_wake_event_from_main_thread: () => { - // After the thread is started, - const swjs_wake_worker_thread = this.exports.swjs_wake_worker_thread; + swjs_listen_message_from_main_thread: () => { const threadChannel = this.options.threadChannel; - if (threadChannel && - "listenWakeEventFromMainThread" in threadChannel) { - threadChannel.listenWakeEventFromMainThread(() => { - swjs_wake_worker_thread(); - }); - } - else { - throw new Error("listenWakeEventFromMainThread is not set in options given to SwiftRuntime. Please set it to listen to wake events from the main thread."); + if (!(threadChannel && "listenMessageFromMainThread" in threadChannel)) { + throw new Error("listenMessageFromMainThread is not set in options given to SwiftRuntime. Please set it to listen to wake events from the main thread."); } + threadChannel.listenMessageFromMainThread((message) => { + switch (message.type) { + case "wake": + this.exports.swjs_wake_worker_thread(); + break; + default: + const unknownMessage = message.type; + throw new Error(`Unknown message type: ${unknownMessage}`); + } + }); }, swjs_wake_up_worker_thread: (tid) => { - const threadChannel = this.options.threadChannel; - if (threadChannel && "wakeUpWorkerThread" in threadChannel) { - // Currently, the data is not used, but it can be used in the future. - threadChannel.wakeUpWorkerThread(tid, {}); - } - else { - throw new Error("wakeUpWorkerThread is not set in options given to SwiftRuntime. Please set it to wake up worker threads."); - } + this.postMessageToWorkerThread(tid, { type: "wake" }); }, - swjs_listen_main_job_from_worker_thread: (tid) => { + swjs_listen_message_from_worker_thread: (tid) => { const threadChannel = this.options.threadChannel; - if (threadChannel && - "listenMainJobFromWorkerThread" in threadChannel) { - threadChannel.listenMainJobFromWorkerThread(tid, this.exports.swjs_enqueue_main_job_from_worker); - } - else { - throw new Error("listenMainJobFromWorkerThread is not set in options given to SwiftRuntime. Please set it to listen to jobs from worker threads."); + if (!(threadChannel && "listenMessageFromWorkerThread" in threadChannel)) { + throw new Error("listenMessageFromWorkerThread is not set in options given to SwiftRuntime. Please set it to listen to jobs from worker threads."); } + threadChannel.listenMessageFromWorkerThread(tid, (message) => { + switch (message.type) { + case "job": + this.exports.swjs_enqueue_main_job_from_worker(message.data); + break; + default: + const unknownMessage = message.type; + throw new Error(`Unknown message type: ${unknownMessage}`); + } + }); }, swjs_terminate_worker_thread: (tid) => { var _a; @@ -547,6 +541,20 @@ }, }; } + postMessageToMainThread(message) { + const threadChannel = this.options.threadChannel; + if (!(threadChannel && "postMessageToMainThread" in threadChannel)) { + throw new Error("postMessageToMainThread is not set in options given to SwiftRuntime. Please set it to send messages to the main thread."); + } + threadChannel.postMessageToMainThread(message); + } + postMessageToWorkerThread(tid, message) { + const threadChannel = this.options.threadChannel; + if (!(threadChannel && "postMessageToWorkerThread" in threadChannel)) { + throw new Error("postMessageToWorkerThread is not set in options given to SwiftRuntime. Please set it to send messages to worker threads."); + } + threadChannel.postMessageToWorkerThread(tid, message); + } } /// This error is thrown when yielding event loop control from `swift_task_asyncMainDrainQueue` /// to JavaScript. This is usually thrown when: diff --git a/Sources/JavaScriptKit/Runtime/index.mjs b/Sources/JavaScriptKit/Runtime/index.mjs index 1a1830795..9201b7712 100644 --- a/Sources/JavaScriptKit/Runtime/index.mjs +++ b/Sources/JavaScriptKit/Runtime/index.mjs @@ -484,49 +484,43 @@ class SwiftRuntime { swjs_unsafe_event_loop_yield: () => { throw new UnsafeEventLoopYield(); }, - // This function is called by WebWorkerTaskExecutor on Web Worker thread. swjs_send_job_to_main_thread: (unowned_job) => { - const threadChannel = this.options.threadChannel; - if (threadChannel && "wakeUpMainThread" in threadChannel) { - threadChannel.wakeUpMainThread(unowned_job); - } - else { - throw new Error("wakeUpMainThread is not set in options given to SwiftRuntime. Please set it to send jobs to the main thread."); - } + this.postMessageToMainThread({ type: "job", data: unowned_job }); }, - swjs_listen_wake_event_from_main_thread: () => { - // After the thread is started, - const swjs_wake_worker_thread = this.exports.swjs_wake_worker_thread; + swjs_listen_message_from_main_thread: () => { const threadChannel = this.options.threadChannel; - if (threadChannel && - "listenWakeEventFromMainThread" in threadChannel) { - threadChannel.listenWakeEventFromMainThread(() => { - swjs_wake_worker_thread(); - }); - } - else { - throw new Error("listenWakeEventFromMainThread is not set in options given to SwiftRuntime. Please set it to listen to wake events from the main thread."); + if (!(threadChannel && "listenMessageFromMainThread" in threadChannel)) { + throw new Error("listenMessageFromMainThread is not set in options given to SwiftRuntime. Please set it to listen to wake events from the main thread."); } + threadChannel.listenMessageFromMainThread((message) => { + switch (message.type) { + case "wake": + this.exports.swjs_wake_worker_thread(); + break; + default: + const unknownMessage = message.type; + throw new Error(`Unknown message type: ${unknownMessage}`); + } + }); }, swjs_wake_up_worker_thread: (tid) => { - const threadChannel = this.options.threadChannel; - if (threadChannel && "wakeUpWorkerThread" in threadChannel) { - // Currently, the data is not used, but it can be used in the future. - threadChannel.wakeUpWorkerThread(tid, {}); - } - else { - throw new Error("wakeUpWorkerThread is not set in options given to SwiftRuntime. Please set it to wake up worker threads."); - } + this.postMessageToWorkerThread(tid, { type: "wake" }); }, - swjs_listen_main_job_from_worker_thread: (tid) => { + swjs_listen_message_from_worker_thread: (tid) => { const threadChannel = this.options.threadChannel; - if (threadChannel && - "listenMainJobFromWorkerThread" in threadChannel) { - threadChannel.listenMainJobFromWorkerThread(tid, this.exports.swjs_enqueue_main_job_from_worker); - } - else { - throw new Error("listenMainJobFromWorkerThread is not set in options given to SwiftRuntime. Please set it to listen to jobs from worker threads."); + if (!(threadChannel && "listenMessageFromWorkerThread" in threadChannel)) { + throw new Error("listenMessageFromWorkerThread is not set in options given to SwiftRuntime. Please set it to listen to jobs from worker threads."); } + threadChannel.listenMessageFromWorkerThread(tid, (message) => { + switch (message.type) { + case "job": + this.exports.swjs_enqueue_main_job_from_worker(message.data); + break; + default: + const unknownMessage = message.type; + throw new Error(`Unknown message type: ${unknownMessage}`); + } + }); }, swjs_terminate_worker_thread: (tid) => { var _a; @@ -541,6 +535,20 @@ class SwiftRuntime { }, }; } + postMessageToMainThread(message) { + const threadChannel = this.options.threadChannel; + if (!(threadChannel && "postMessageToMainThread" in threadChannel)) { + throw new Error("postMessageToMainThread is not set in options given to SwiftRuntime. Please set it to send messages to the main thread."); + } + threadChannel.postMessageToMainThread(message); + } + postMessageToWorkerThread(tid, message) { + const threadChannel = this.options.threadChannel; + if (!(threadChannel && "postMessageToWorkerThread" in threadChannel)) { + throw new Error("postMessageToWorkerThread is not set in options given to SwiftRuntime. Please set it to send messages to worker threads."); + } + threadChannel.postMessageToWorkerThread(tid, message); + } } /// This error is thrown when yielding event loop control from `swift_task_asyncMainDrainQueue` /// to JavaScript. This is usually thrown when: diff --git a/Sources/_CJavaScriptKit/include/_CJavaScriptKit.h b/Sources/_CJavaScriptKit/include/_CJavaScriptKit.h index 188f6b5db..1e539fde1 100644 --- a/Sources/_CJavaScriptKit/include/_CJavaScriptKit.h +++ b/Sources/_CJavaScriptKit/include/_CJavaScriptKit.h @@ -297,11 +297,11 @@ IMPORT_JS_FUNCTION(swjs_unsafe_event_loop_yield, void, (void)) IMPORT_JS_FUNCTION(swjs_send_job_to_main_thread, void, (uintptr_t job)) -IMPORT_JS_FUNCTION(swjs_listen_wake_event_from_main_thread, void, (void)) +IMPORT_JS_FUNCTION(swjs_listen_message_from_main_thread, void, (void)) IMPORT_JS_FUNCTION(swjs_wake_up_worker_thread, void, (int tid)) -IMPORT_JS_FUNCTION(swjs_listen_main_job_from_worker_thread, void, (int tid)) +IMPORT_JS_FUNCTION(swjs_listen_message_from_worker_thread, void, (int tid)) IMPORT_JS_FUNCTION(swjs_terminate_worker_thread, void, (int tid))