Skip to content

Commit 1687553

Browse files
committed
Consolidate SpecRunnerTask classes
1 parent 05bb77c commit 1687553

File tree

5 files changed

+86
-145
lines changed

5 files changed

+86
-145
lines changed

test/asynchronous/unified_format.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,14 +1180,7 @@ def primary_changed() -> bool:
11801180
async def _testOperation_runOnThread(self, spec):
11811181
"""Run the 'runOnThread' operation."""
11821182
thread = self.entity_map[spec["thread"]]
1183-
if _IS_SYNC:
1184-
await thread.schedule(lambda: self.run_entity_operation(spec["operation"]))
1185-
else:
1186-
1187-
async def op():
1188-
await self.run_entity_operation(spec["operation"])
1189-
1190-
await thread.schedule(op)
1183+
await thread.schedule(functools.partial(self.run_entity_operation, spec["operation"]))
11911184

11921185
async def _testOperation_waitForThread(self, spec):
11931186
"""Run the 'waitForThread' operation."""

test/asynchronous/utils_spec_runner.py

Lines changed: 42 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
from pymongo.asynchronous.command_cursor import AsyncCommandCursor
4848
from pymongo.asynchronous.cursor import AsyncCursor
4949
from pymongo.errors import AutoReconnect, BulkWriteError, OperationFailure, PyMongoError
50+
from pymongo.lock import _async_create_condition, _async_create_lock
5051
from pymongo.read_concern import ReadConcern
5152
from pymongo.read_preferences import ReadPreference
5253
from pymongo.results import BulkWriteResult, _WriteResult
@@ -55,82 +56,59 @@
5556
_IS_SYNC = False
5657

5758
if _IS_SYNC:
58-
59-
class SpecRunnerThread(threading.Thread):
60-
def __init__(self, name):
61-
super().__init__()
62-
self.name = name
63-
self.exc = None
64-
self.daemon = True
65-
self.cond = threading.Condition()
66-
self.ops = []
67-
self.stopped = False
68-
69-
def schedule(self, work):
70-
self.ops.append(work)
71-
with self.cond:
72-
self.cond.notify()
73-
74-
def stop(self):
75-
self.stopped = True
76-
with self.cond:
77-
self.cond.notify()
78-
79-
def run(self):
80-
while not self.stopped or self.ops:
81-
if not self.ops:
82-
with self.cond:
83-
self.cond.wait(10)
84-
if self.ops:
85-
try:
86-
work = self.ops.pop(0)
87-
work()
88-
except Exception as exc:
89-
self.exc = exc
90-
self.stop()
59+
PARENT = threading.Thread
9160
else:
61+
PARENT = object
62+
9263

93-
class SpecRunnerTask:
94-
def __init__(self, name):
95-
self.name = name
96-
self.exc = None
97-
self.cond = asyncio.Condition()
98-
self.ops = []
99-
self.stopped = False
100-
self.task = None
101-
102-
async def schedule(self, work):
103-
self.ops.append(work)
104-
async with self.cond:
105-
self.cond.notify()
106-
107-
async def stop(self):
108-
self.stopped = True
109-
async with self.cond:
110-
self.cond.notify()
64+
class SpecRunnerTask(PARENT):
65+
def __init__(self, name):
66+
super().__init__()
67+
self.name = name
68+
self.exc = None
69+
self.daemon = True
70+
self.cond = _async_create_condition(_async_create_lock())
71+
self.ops = []
72+
self.stopped = False
73+
self.task = None
74+
75+
if not _IS_SYNC:
11176

11277
async def start(self):
11378
self.task = asyncio.create_task(self.run(), name=self.name)
11479

115-
async def join(self, timeout: int = 0):
80+
async def join(self, timeout: float | None = 0): # type: ignore[override]
11681
if self.task is not None:
11782
await asyncio.wait([self.task], timeout=timeout)
11883

11984
def is_alive(self):
12085
return not self.stopped
12186

122-
async def run(self):
123-
while not self.stopped or self.ops:
124-
if not self.ops:
125-
async with self.cond:
126-
await asyncio.wait_for(self.cond.wait(), timeout=10)
127-
if self.ops:
128-
try:
129-
work = self.ops.pop(0)
130-
await work()
131-
except Exception as exc:
132-
self.exc = exc
133-
await self.stop()
87+
async def schedule(self, work):
88+
self.ops.append(work)
89+
async with self.cond:
90+
self.cond.notify()
91+
92+
async def stop(self):
93+
self.stopped = True
94+
async with self.cond:
95+
self.cond.notify()
96+
97+
async def run(self):
98+
while not self.stopped or self.ops:
99+
if not self.ops:
100+
async with self.cond:
101+
if _IS_SYNC:
102+
await self.cond.wait(10) # type: ignore[call-arg]
103+
else:
104+
await asyncio.wait_for(self.cond.wait(), timeout=10) # type: ignore[arg-type]
105+
if self.ops:
106+
try:
107+
work = self.ops.pop(0)
108+
await work()
109+
except Exception as exc:
110+
self.exc = exc
111+
await self.stop()
134112

135113

136114
class AsyncSpecTestCreator:

test/unified_format.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,14 +1167,7 @@ def primary_changed() -> bool:
11671167
def _testOperation_runOnThread(self, spec):
11681168
"""Run the 'runOnThread' operation."""
11691169
thread = self.entity_map[spec["thread"]]
1170-
if _IS_SYNC:
1171-
thread.schedule(lambda: self.run_entity_operation(spec["operation"]))
1172-
else:
1173-
1174-
def op():
1175-
self.run_entity_operation(spec["operation"])
1176-
1177-
thread.schedule(op)
1170+
thread.schedule(functools.partial(self.run_entity_operation, spec["operation"]))
11781171

11791172
def _testOperation_waitForThread(self, spec):
11801173
"""Run the 'waitForThread' operation."""

test/utils_spec_runner.py

Lines changed: 42 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from gridfs import GridFSBucket
4545
from gridfs.synchronous.grid_file import GridFSBucket
4646
from pymongo.errors import AutoReconnect, BulkWriteError, OperationFailure, PyMongoError
47+
from pymongo.lock import _create_condition, _create_lock
4748
from pymongo.read_concern import ReadConcern
4849
from pymongo.read_preferences import ReadPreference
4950
from pymongo.results import BulkWriteResult, _WriteResult
@@ -55,82 +56,59 @@
5556
_IS_SYNC = True
5657

5758
if _IS_SYNC:
58-
59-
class SpecRunnerThread(threading.Thread):
60-
def __init__(self, name):
61-
super().__init__()
62-
self.name = name
63-
self.exc = None
64-
self.daemon = True
65-
self.cond = threading.Condition()
66-
self.ops = []
67-
self.stopped = False
68-
69-
def schedule(self, work):
70-
self.ops.append(work)
71-
with self.cond:
72-
self.cond.notify()
73-
74-
def stop(self):
75-
self.stopped = True
76-
with self.cond:
77-
self.cond.notify()
78-
79-
def run(self):
80-
while not self.stopped or self.ops:
81-
if not self.ops:
82-
with self.cond:
83-
self.cond.wait(10)
84-
if self.ops:
85-
try:
86-
work = self.ops.pop(0)
87-
work()
88-
except Exception as exc:
89-
self.exc = exc
90-
self.stop()
59+
PARENT = threading.Thread
9160
else:
61+
PARENT = object
62+
9263

93-
class SpecRunnerThread:
94-
def __init__(self, name):
95-
self.name = name
96-
self.exc = None
97-
self.cond = asyncio.Condition()
98-
self.ops = []
99-
self.stopped = False
100-
self.task = None
101-
102-
def schedule(self, work):
103-
self.ops.append(work)
104-
with self.cond:
105-
self.cond.notify()
106-
107-
def stop(self):
108-
self.stopped = True
109-
with self.cond:
110-
self.cond.notify()
64+
class SpecRunnerThread(PARENT):
65+
def __init__(self, name):
66+
super().__init__()
67+
self.name = name
68+
self.exc = None
69+
self.daemon = True
70+
self.cond = _create_condition(_create_lock())
71+
self.ops = []
72+
self.stopped = False
73+
self.task = None
74+
75+
if not _IS_SYNC:
11176

11277
def start(self):
11378
self.task = asyncio.create_task(self.run(), name=self.name)
11479

115-
def join(self, timeout: int = 0):
80+
def join(self, timeout: float | None = 0): # type: ignore[override]
11681
if self.task is not None:
11782
asyncio.wait([self.task], timeout=timeout)
11883

11984
def is_alive(self):
12085
return not self.stopped
12186

122-
def run(self):
123-
while not self.stopped or self.ops:
124-
if not self.ops:
125-
with self.cond:
126-
asyncio.wait_for(self.cond.wait(), timeout=10)
127-
if self.ops:
128-
try:
129-
work = self.ops.pop(0)
130-
work()
131-
except Exception as exc:
132-
self.exc = exc
133-
self.stop()
87+
def schedule(self, work):
88+
self.ops.append(work)
89+
with self.cond:
90+
self.cond.notify()
91+
92+
def stop(self):
93+
self.stopped = True
94+
with self.cond:
95+
self.cond.notify()
96+
97+
def run(self):
98+
while not self.stopped or self.ops:
99+
if not self.ops:
100+
with self.cond:
101+
if _IS_SYNC:
102+
self.cond.wait(10) # type: ignore[call-arg]
103+
else:
104+
asyncio.wait_for(self.cond.wait(), timeout=10) # type: ignore[arg-type]
105+
if self.ops:
106+
try:
107+
work = self.ops.pop(0)
108+
work()
109+
except Exception as exc:
110+
self.exc = exc
111+
self.stop()
134112

135113

136114
class SpecTestCreator:

tools/synchro.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@
119119
"_async_create_lock": "_create_lock",
120120
"_async_create_condition": "_create_condition",
121121
"_async_cond_wait": "_cond_wait",
122-
"AsyncDummyMonitor": "DummyMonitor",
123122
"SpecRunnerTask": "SpecRunnerThread",
124123
}
125124

0 commit comments

Comments
 (0)