Skip to content

PYTHON-4864 - Create async version of SpecRunnerThread #2094

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions test/asynchronous/unified_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
client_knobs,
unittest,
)
from test.asynchronous.utils_spec_runner import SpecRunnerTask
from test.unified_format_shared import (
KMS_TLS_OPTS,
PLACEHOLDER_MAP,
Expand All @@ -58,7 +59,6 @@
snake_to_camel,
wait_until,
)
from test.utils_spec_runner import SpecRunnerThread
from test.version import Version
from typing import Any, Dict, List, Mapping, Optional

Expand Down Expand Up @@ -382,8 +382,8 @@ async def drop(self: AsyncGridFSBucket, *args: Any, **kwargs: Any) -> None:
return
elif entity_type == "thread":
name = spec["id"]
thread = SpecRunnerThread(name)
thread.start()
thread = SpecRunnerTask(name)
await thread.start()
self[name] = thread
return

Expand Down Expand Up @@ -1177,16 +1177,23 @@ def primary_changed() -> bool:

wait_until(primary_changed, "change primary", timeout=timeout)

def _testOperation_runOnThread(self, spec):
async def _testOperation_runOnThread(self, spec):
"""Run the 'runOnThread' operation."""
thread = self.entity_map[spec["thread"]]
thread.schedule(lambda: self.run_entity_operation(spec["operation"]))
if _IS_SYNC:
await thread.schedule(lambda: self.run_entity_operation(spec["operation"]))
else:

async def op():
await self.run_entity_operation(spec["operation"])

await thread.schedule(op)

def _testOperation_waitForThread(self, spec):
async def _testOperation_waitForThread(self, spec):
"""Run the 'waitForThread' operation."""
thread = self.entity_map[spec["thread"]]
thread.stop()
thread.join(10)
await thread.stop()
await thread.join(10)
if thread.exc:
raise thread.exc
self.assertFalse(thread.is_alive(), "Thread {} is still running".format(spec["thread"]))
Expand Down
110 changes: 77 additions & 33 deletions test/asynchronous/utils_spec_runner.py
Copy link
Contributor

@sleepyStick sleepyStick Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not loving the duplicated code between sync and async but i'm guessing its because the async version needs some more methods? If so, then i understand and can live with it >.<

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The async version doesn't implement threading.Thread, but it still needs to match the same API as the synchronous version. Let me see if I can reduce some of the duplication though.

Copy link
Contributor Author

@NoahStapp NoahStapp Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did some refactoring, much less duplication now. Great call-out!

Original file line number Diff line number Diff line change
Expand Up @@ -54,39 +54,83 @@

_IS_SYNC = False


class SpecRunnerThread(threading.Thread):
def __init__(self, name):
super().__init__()
self.name = name
self.exc = None
self.daemon = True
self.cond = threading.Condition()
self.ops = []
self.stopped = False

def schedule(self, work):
self.ops.append(work)
with self.cond:
self.cond.notify()

def stop(self):
self.stopped = True
with self.cond:
self.cond.notify()

def run(self):
while not self.stopped or self.ops:
if not self.ops:
with self.cond:
self.cond.wait(10)
if self.ops:
try:
work = self.ops.pop(0)
work()
except Exception as exc:
self.exc = exc
self.stop()
if _IS_SYNC:

class SpecRunnerThread(threading.Thread):
def __init__(self, name):
super().__init__()
self.name = name
self.exc = None
self.daemon = True
self.cond = threading.Condition()
self.ops = []
self.stopped = False

def schedule(self, work):
self.ops.append(work)
with self.cond:
self.cond.notify()

def stop(self):
self.stopped = True
with self.cond:
self.cond.notify()

def run(self):
while not self.stopped or self.ops:
if not self.ops:
with self.cond:
self.cond.wait(10)
if self.ops:
try:
work = self.ops.pop(0)
work()
except Exception as exc:
self.exc = exc
self.stop()
else:

class SpecRunnerTask:
def __init__(self, name):
self.name = name
self.exc = None
self.cond = asyncio.Condition()
self.ops = []
self.stopped = False
self.task = None

async def schedule(self, work):
self.ops.append(work)
async with self.cond:
self.cond.notify()

async def stop(self):
self.stopped = True
async with self.cond:
self.cond.notify()

async def start(self):
self.task = asyncio.create_task(self.run(), name=self.name)

async def join(self, timeout: int = 0):
if self.task is not None:
await asyncio.wait([self.task], timeout=timeout)

def is_alive(self):
return not self.stopped

async def run(self):
while not self.stopped or self.ops:
if not self.ops:
async with self.cond:
await asyncio.wait_for(self.cond.wait(), timeout=10)
if self.ops:
try:
work = self.ops.pop(0)
await work()
except Exception as exc:
self.exc = exc
await self.stop()


class AsyncSpecTestCreator:
Expand Down
9 changes: 8 additions & 1 deletion test/unified_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,14 @@ def primary_changed() -> bool:
def _testOperation_runOnThread(self, spec):
"""Run the 'runOnThread' operation."""
thread = self.entity_map[spec["thread"]]
thread.schedule(lambda: self.run_entity_operation(spec["operation"]))
if _IS_SYNC:
thread.schedule(lambda: self.run_entity_operation(spec["operation"]))
else:

def op():
self.run_entity_operation(spec["operation"])

thread.schedule(op)

def _testOperation_waitForThread(self, spec):
"""Run the 'waitForThread' operation."""
Expand Down
110 changes: 77 additions & 33 deletions test/utils_spec_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,39 +54,83 @@

_IS_SYNC = True


class SpecRunnerThread(threading.Thread):
def __init__(self, name):
super().__init__()
self.name = name
self.exc = None
self.daemon = True
self.cond = threading.Condition()
self.ops = []
self.stopped = False

def schedule(self, work):
self.ops.append(work)
with self.cond:
self.cond.notify()

def stop(self):
self.stopped = True
with self.cond:
self.cond.notify()

def run(self):
while not self.stopped or self.ops:
if not self.ops:
with self.cond:
self.cond.wait(10)
if self.ops:
try:
work = self.ops.pop(0)
work()
except Exception as exc:
self.exc = exc
self.stop()
if _IS_SYNC:

class SpecRunnerThread(threading.Thread):
def __init__(self, name):
super().__init__()
self.name = name
self.exc = None
self.daemon = True
self.cond = threading.Condition()
self.ops = []
self.stopped = False

def schedule(self, work):
self.ops.append(work)
with self.cond:
self.cond.notify()

def stop(self):
self.stopped = True
with self.cond:
self.cond.notify()

def run(self):
while not self.stopped or self.ops:
if not self.ops:
with self.cond:
self.cond.wait(10)
if self.ops:
try:
work = self.ops.pop(0)
work()
except Exception as exc:
self.exc = exc
self.stop()
else:

class SpecRunnerThread:
def __init__(self, name):
self.name = name
self.exc = None
self.cond = asyncio.Condition()
self.ops = []
self.stopped = False
self.task = None

def schedule(self, work):
self.ops.append(work)
with self.cond:
self.cond.notify()

def stop(self):
self.stopped = True
with self.cond:
self.cond.notify()

def start(self):
self.task = asyncio.create_task(self.run(), name=self.name)

def join(self, timeout: int = 0):
if self.task is not None:
asyncio.wait([self.task], timeout=timeout)

def is_alive(self):
return not self.stopped

def run(self):
while not self.stopped or self.ops:
if not self.ops:
with self.cond:
asyncio.wait_for(self.cond.wait(), timeout=10)
if self.ops:
try:
work = self.ops.pop(0)
work()
except Exception as exc:
self.exc = exc
self.stop()


class SpecTestCreator:
Expand Down
2 changes: 2 additions & 0 deletions tools/synchro.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@
"_async_create_lock": "_create_lock",
"_async_create_condition": "_create_condition",
"_async_cond_wait": "_cond_wait",
"AsyncDummyMonitor": "DummyMonitor",
"SpecRunnerTask": "SpecRunnerThread",
}

docstring_replacements: dict[tuple[str, str], str] = {
Expand Down
Loading