Skip to content

Commit b293c94

Browse files
committed
Refactor
1 parent 5a1f434 commit b293c94

File tree

6 files changed

+116
-206
lines changed

6 files changed

+116
-206
lines changed

test/asynchronous/helpers.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -381,14 +381,14 @@ def disable(self):
381381

382382

383383
class ConcurrentRunner(PARENT):
384-
def __init__(self, name, *args, **kwargs):
384+
def __init__(self, **kwargs):
385385
if _IS_SYNC:
386-
super().__init__(*args, **kwargs)
387-
self.name = name
386+
super().__init__(**kwargs)
387+
self.name = kwargs.get("name", "ConcurrentRunner")
388388
self.stopped = False
389389
self.task = None
390-
if "target" in kwargs:
391-
self.target = kwargs["target"]
390+
self.target = kwargs.get("target", None)
391+
self.args = kwargs.get("args", [])
392392

393393
if not _IS_SYNC:
394394

@@ -404,5 +404,8 @@ def is_alive(self):
404404

405405
async def run(self):
406406
if self.target:
407-
await self.target()
407+
if self.args:
408+
await self.target(*self.args)
409+
else:
410+
await self.target()
408411
self.stopped = True

test/asynchronous/test_examples.py

Lines changed: 48 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import functools
2121
import sys
2222
import threading
23+
from test.asynchronous.helpers import ConcurrentRunner
2324

2425
sys.path[0:0] = [""]
2526

@@ -741,102 +742,53 @@ async def test_delete(self):
741742

742743
self.assertEqual(await db.inventory.count_documents({}), 0)
743744

744-
if _IS_SYNC:
745-
746-
@async_client_context.require_change_streams
747-
async def test_change_streams(self):
748-
db = self.db
749-
done = False
750-
751-
async def insert_docs():
752-
nonlocal done
753-
while not done:
754-
await db.inventory.insert_one({"username": "alice"})
755-
await db.inventory.delete_one({"username": "alice"})
756-
757-
t = threading.Thread(target=insert_docs)
758-
t.start()
759-
760-
try:
761-
# 1. The database for reactive, real-time applications
762-
# Start Changestream Example 1
763-
cursor = await db.inventory.watch()
764-
await anext(cursor)
765-
# End Changestream Example 1
766-
await cursor.close()
767-
768-
# Start Changestream Example 2
769-
cursor = await db.inventory.watch(full_document="updateLookup")
770-
await anext(cursor)
771-
# End Changestream Example 2
772-
await cursor.close()
773-
774-
# Start Changestream Example 3
775-
resume_token = cursor.resume_token
776-
cursor = await db.inventory.watch(resume_after=resume_token)
777-
await anext(cursor)
778-
# End Changestream Example 3
779-
await cursor.close()
780-
781-
# Start Changestream Example 4
782-
pipeline = [
783-
{"$match": {"fullDocument.username": "alice"}},
784-
{"$addFields": {"newField": "this is an added field!"}},
785-
]
786-
cursor = await db.inventory.watch(pipeline=pipeline)
787-
await anext(cursor)
788-
# End Changestream Example 4
789-
await cursor.close()
790-
finally:
791-
done = True
792-
t.join()
793-
else:
794-
795-
@async_client_context.require_change_streams
796-
async def test_change_streams(self):
797-
db = self.db
798-
done = False
799-
800-
async def insert_docs():
801-
nonlocal done
802-
while not done:
803-
await db.inventory.insert_one({"username": "alice"})
804-
await db.inventory.delete_one({"username": "alice"})
805-
806-
t = asyncio.create_task(insert_docs())
807-
try:
808-
# 1. The database for reactive, real-time applications
809-
# Start Changestream Example 1
810-
cursor = await db.inventory.watch()
811-
await anext(cursor)
812-
# End Changestream Example 1
813-
await cursor.close()
814-
815-
# Start Changestream Example 2
816-
cursor = await db.inventory.watch(full_document="updateLookup")
817-
await anext(cursor)
818-
# End Changestream Example 2
819-
await cursor.close()
820-
821-
# Start Changestream Example 3
822-
resume_token = cursor.resume_token
823-
cursor = await db.inventory.watch(resume_after=resume_token)
824-
await anext(cursor)
825-
# End Changestream Example 3
826-
await cursor.close()
827-
828-
# Start Changestream Example 4
829-
pipeline = [
830-
{"$match": {"fullDocument.username": "alice"}},
831-
{"$addFields": {"newField": "this is an added field!"}},
832-
]
833-
cursor = await db.inventory.watch(pipeline=pipeline)
834-
await anext(cursor)
835-
# End Changestream Example 4
836-
await cursor.close()
837-
finally:
838-
done = True
839-
await t
745+
@async_client_context.require_change_streams
746+
async def test_change_streams(self):
747+
db = self.db
748+
done = False
749+
750+
async def insert_docs():
751+
nonlocal done
752+
while not done:
753+
await db.inventory.insert_one({"username": "alice"})
754+
await db.inventory.delete_one({"username": "alice"})
755+
756+
t = ConcurrentRunner(target=insert_docs)
757+
await t.start()
758+
759+
try:
760+
# 1. The database for reactive, real-time applications
761+
# Start Changestream Example 1
762+
cursor = await db.inventory.watch()
763+
await anext(cursor)
764+
# End Changestream Example 1
765+
await cursor.close()
766+
767+
# Start Changestream Example 2
768+
cursor = await db.inventory.watch(full_document="updateLookup")
769+
await anext(cursor)
770+
# End Changestream Example 2
771+
await cursor.close()
772+
773+
# Start Changestream Example 3
774+
resume_token = cursor.resume_token
775+
cursor = await db.inventory.watch(resume_after=resume_token)
776+
await anext(cursor)
777+
# End Changestream Example 3
778+
await cursor.close()
779+
780+
# Start Changestream Example 4
781+
pipeline = [
782+
{"$match": {"fullDocument.username": "alice"}},
783+
{"$addFields": {"newField": "this is an added field!"}},
784+
]
785+
cursor = await db.inventory.watch(pipeline=pipeline)
786+
await anext(cursor)
787+
# End Changestream Example 4
788+
await cursor.close()
789+
finally:
790+
done = True
791+
await t.join()
840792

841793
async def test_aggregate_examples(self):
842794
db = self.db

test/asynchronous/utils_spec_runner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858

5959
class SpecRunnerTask(ConcurrentRunner):
6060
def __init__(self, name):
61-
super().__init__(name)
61+
super().__init__(name=name)
6262
self.exc = None
6363
self.daemon = True
6464
self.cond = _async_create_condition(_async_create_lock())

test/helpers.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -381,14 +381,14 @@ def disable(self):
381381

382382

383383
class ConcurrentRunner(PARENT):
384-
def __init__(self, name, *args, **kwargs):
384+
def __init__(self, **kwargs):
385385
if _IS_SYNC:
386-
super().__init__(*args, **kwargs)
387-
self.name = name
386+
super().__init__(**kwargs)
387+
self.name = kwargs.get("name", "ConcurrentRunner")
388388
self.stopped = False
389389
self.task = None
390-
if "target" in kwargs:
391-
self.target = kwargs["target"]
390+
self.target = kwargs.get("target", None)
391+
self.args = kwargs.get("args", [])
392392

393393
if not _IS_SYNC:
394394

@@ -404,5 +404,8 @@ def is_alive(self):
404404

405405
def run(self):
406406
if self.target:
407-
self.target()
407+
if self.args:
408+
self.target(*self.args)
409+
else:
410+
self.target()
408411
self.stopped = True

test/test_examples.py

Lines changed: 48 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import functools
2121
import sys
2222
import threading
23+
from test.helpers import ConcurrentRunner
2324

2425
sys.path[0:0] = [""]
2526

@@ -741,102 +742,53 @@ def test_delete(self):
741742

742743
self.assertEqual(db.inventory.count_documents({}), 0)
743744

744-
if _IS_SYNC:
745-
746-
@client_context.require_change_streams
747-
def test_change_streams(self):
748-
db = self.db
749-
done = False
750-
751-
def insert_docs():
752-
nonlocal done
753-
while not done:
754-
db.inventory.insert_one({"username": "alice"})
755-
db.inventory.delete_one({"username": "alice"})
756-
757-
t = threading.Thread(target=insert_docs)
758-
t.start()
759-
760-
try:
761-
# 1. The database for reactive, real-time applications
762-
# Start Changestream Example 1
763-
cursor = db.inventory.watch()
764-
next(cursor)
765-
# End Changestream Example 1
766-
cursor.close()
767-
768-
# Start Changestream Example 2
769-
cursor = db.inventory.watch(full_document="updateLookup")
770-
next(cursor)
771-
# End Changestream Example 2
772-
cursor.close()
773-
774-
# Start Changestream Example 3
775-
resume_token = cursor.resume_token
776-
cursor = db.inventory.watch(resume_after=resume_token)
777-
next(cursor)
778-
# End Changestream Example 3
779-
cursor.close()
780-
781-
# Start Changestream Example 4
782-
pipeline = [
783-
{"$match": {"fullDocument.username": "alice"}},
784-
{"$addFields": {"newField": "this is an added field!"}},
785-
]
786-
cursor = db.inventory.watch(pipeline=pipeline)
787-
next(cursor)
788-
# End Changestream Example 4
789-
cursor.close()
790-
finally:
791-
done = True
792-
t.join()
793-
else:
794-
795-
@client_context.require_change_streams
796-
def test_change_streams(self):
797-
db = self.db
798-
done = False
799-
800-
def insert_docs():
801-
nonlocal done
802-
while not done:
803-
db.inventory.insert_one({"username": "alice"})
804-
db.inventory.delete_one({"username": "alice"})
805-
806-
t = asyncio.create_task(insert_docs())
807-
try:
808-
# 1. The database for reactive, real-time applications
809-
# Start Changestream Example 1
810-
cursor = db.inventory.watch()
811-
next(cursor)
812-
# End Changestream Example 1
813-
cursor.close()
814-
815-
# Start Changestream Example 2
816-
cursor = db.inventory.watch(full_document="updateLookup")
817-
next(cursor)
818-
# End Changestream Example 2
819-
cursor.close()
820-
821-
# Start Changestream Example 3
822-
resume_token = cursor.resume_token
823-
cursor = db.inventory.watch(resume_after=resume_token)
824-
next(cursor)
825-
# End Changestream Example 3
826-
cursor.close()
827-
828-
# Start Changestream Example 4
829-
pipeline = [
830-
{"$match": {"fullDocument.username": "alice"}},
831-
{"$addFields": {"newField": "this is an added field!"}},
832-
]
833-
cursor = db.inventory.watch(pipeline=pipeline)
834-
next(cursor)
835-
# End Changestream Example 4
836-
cursor.close()
837-
finally:
838-
done = True
839-
t
745+
@client_context.require_change_streams
746+
def test_change_streams(self):
747+
db = self.db
748+
done = False
749+
750+
def insert_docs():
751+
nonlocal done
752+
while not done:
753+
db.inventory.insert_one({"username": "alice"})
754+
db.inventory.delete_one({"username": "alice"})
755+
756+
t = ConcurrentRunner(target=insert_docs)
757+
t.start()
758+
759+
try:
760+
# 1. The database for reactive, real-time applications
761+
# Start Changestream Example 1
762+
cursor = db.inventory.watch()
763+
next(cursor)
764+
# End Changestream Example 1
765+
cursor.close()
766+
767+
# Start Changestream Example 2
768+
cursor = db.inventory.watch(full_document="updateLookup")
769+
next(cursor)
770+
# End Changestream Example 2
771+
cursor.close()
772+
773+
# Start Changestream Example 3
774+
resume_token = cursor.resume_token
775+
cursor = db.inventory.watch(resume_after=resume_token)
776+
next(cursor)
777+
# End Changestream Example 3
778+
cursor.close()
779+
780+
# Start Changestream Example 4
781+
pipeline = [
782+
{"$match": {"fullDocument.username": "alice"}},
783+
{"$addFields": {"newField": "this is an added field!"}},
784+
]
785+
cursor = db.inventory.watch(pipeline=pipeline)
786+
next(cursor)
787+
# End Changestream Example 4
788+
cursor.close()
789+
finally:
790+
done = True
791+
t.join()
840792

841793
def test_aggregate_examples(self):
842794
db = self.db

test/utils_spec_runner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858

5959
class SpecRunnerThread(ConcurrentRunner):
6060
def __init__(self, name):
61-
super().__init__(name)
61+
super().__init__(name=name)
6262
self.exc = None
6363
self.daemon = True
6464
self.cond = _create_condition(_create_lock())

0 commit comments

Comments
 (0)