Skip to content

Commit 7277cae

Browse files
committed
Merge branch 'master' into PYTHON-5081
2 parents 662b7e1 + c8d3afd commit 7277cae

File tree

8 files changed

+236
-39
lines changed

8 files changed

+236
-39
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
# Copyright 2016-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Test the monitoring of the server heartbeats."""
16+
from __future__ import annotations
17+
18+
import sys
19+
20+
sys.path[0:0] = [""]
21+
22+
from test.asynchronous import AsyncIntegrationTest, client_knobs, unittest
23+
from test.utils import AsyncMockPool, HeartbeatEventListener, async_wait_until
24+
25+
from pymongo.asynchronous.monitor import Monitor
26+
from pymongo.errors import ConnectionFailure
27+
from pymongo.hello import Hello, HelloCompat
28+
29+
_IS_SYNC = False
30+
31+
32+
class TestHeartbeatMonitoring(AsyncIntegrationTest):
33+
async def create_mock_monitor(self, responses, uri, expected_results):
34+
listener = HeartbeatEventListener()
35+
with client_knobs(
36+
heartbeat_frequency=0.1, min_heartbeat_interval=0.1, events_queue_frequency=0.1
37+
):
38+
39+
class MockMonitor(Monitor):
40+
async def _check_with_socket(self, *args, **kwargs):
41+
if isinstance(responses[1], Exception):
42+
raise responses[1]
43+
return Hello(responses[1]), 99
44+
45+
_ = await self.async_single_client(
46+
h=uri,
47+
event_listeners=(listener,),
48+
_monitor_class=MockMonitor,
49+
_pool_class=AsyncMockPool,
50+
connect=True,
51+
)
52+
53+
expected_len = len(expected_results)
54+
# Wait for *at least* expected_len number of results. The
55+
# monitor thread may run multiple times during the execution
56+
# of this test.
57+
await async_wait_until(
58+
lambda: len(listener.events) >= expected_len, "publish all events"
59+
)
60+
61+
# zip gives us len(expected_results) pairs.
62+
for expected, actual in zip(expected_results, listener.events):
63+
self.assertEqual(expected, actual.__class__.__name__)
64+
self.assertEqual(actual.connection_id, responses[0])
65+
if expected != "ServerHeartbeatStartedEvent":
66+
if isinstance(actual.reply, Hello):
67+
self.assertEqual(actual.duration, 99)
68+
self.assertEqual(actual.reply._doc, responses[1])
69+
else:
70+
self.assertEqual(actual.reply, responses[1])
71+
72+
async def test_standalone(self):
73+
responses = (
74+
("a", 27017),
75+
{HelloCompat.LEGACY_CMD: True, "maxWireVersion": 4, "minWireVersion": 0, "ok": 1},
76+
)
77+
uri = "mongodb://a:27017"
78+
expected_results = ["ServerHeartbeatStartedEvent", "ServerHeartbeatSucceededEvent"]
79+
80+
await self.create_mock_monitor(responses, uri, expected_results)
81+
82+
async def test_standalone_error(self):
83+
responses = (("a", 27017), ConnectionFailure("SPECIAL MESSAGE"))
84+
uri = "mongodb://a:27017"
85+
# _check_with_socket failing results in a second attempt.
86+
expected_results = [
87+
"ServerHeartbeatStartedEvent",
88+
"ServerHeartbeatFailedEvent",
89+
"ServerHeartbeatStartedEvent",
90+
"ServerHeartbeatFailedEvent",
91+
]
92+
93+
await self.create_mock_monitor(responses, uri, expected_results)
94+
95+
96+
if __name__ == "__main__":
97+
unittest.main()
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from __future__ import annotations
2+
3+
from test.asynchronous import AsyncIntegrationTest
4+
from typing import Any, List, MutableMapping
5+
6+
from bson import Binary, Code, DBRef, ObjectId, json_util
7+
from bson.binary import USER_DEFINED_SUBTYPE
8+
9+
_IS_SYNC = False
10+
11+
12+
class TestJsonUtilRoundtrip(AsyncIntegrationTest):
13+
async def test_cursor(self):
14+
db = self.db
15+
16+
await db.drop_collection("test")
17+
docs: List[MutableMapping[str, Any]] = [
18+
{"foo": [1, 2]},
19+
{"bar": {"hello": "world"}},
20+
{"code": Code("function x() { return 1; }")},
21+
{"bin": Binary(b"\x00\x01\x02\x03\x04", USER_DEFINED_SUBTYPE)},
22+
{"dbref": {"_ref": DBRef("simple", ObjectId("509b8db456c02c5ab7e63c34"))}},
23+
]
24+
25+
await db.test.insert_many(docs)
26+
reloaded_docs = json_util.loads(json_util.dumps(await (db.test.find()).to_list()))
27+
for doc in docs:
28+
self.assertTrue(doc in reloaded_docs)

test/test_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2399,7 +2399,7 @@ def test_reconnect(self):
23992399

24002400
# MongoClient discovers it's alone. The first attempt raises either
24012401
# ServerSelectionTimeoutError or AutoReconnect (from
2402-
# AsyncMockPool.get_socket).
2402+
# MockPool.get_socket).
24032403
with self.assertRaises(AutoReconnect):
24042404
c.db.collection.find_one()
24052405

test/test_heartbeat_monitoring.py

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
from pymongo.hello import Hello, HelloCompat
2727
from pymongo.synchronous.monitor import Monitor
2828

29+
_IS_SYNC = True
30+
2931

3032
class TestHeartbeatMonitoring(IntegrationTest):
3133
def create_mock_monitor(self, responses, uri, expected_results):
@@ -40,8 +42,12 @@ def _check_with_socket(self, *args, **kwargs):
4042
raise responses[1]
4143
return Hello(responses[1]), 99
4244

43-
m = self.single_client(
44-
h=uri, event_listeners=(listener,), _monitor_class=MockMonitor, _pool_class=MockPool
45+
_ = self.single_client(
46+
h=uri,
47+
event_listeners=(listener,),
48+
_monitor_class=MockMonitor,
49+
_pool_class=MockPool,
50+
connect=True,
4551
)
4652

4753
expected_len = len(expected_results)
@@ -50,20 +56,16 @@ def _check_with_socket(self, *args, **kwargs):
5056
# of this test.
5157
wait_until(lambda: len(listener.events) >= expected_len, "publish all events")
5258

53-
try:
54-
# zip gives us len(expected_results) pairs.
55-
for expected, actual in zip(expected_results, listener.events):
56-
self.assertEqual(expected, actual.__class__.__name__)
57-
self.assertEqual(actual.connection_id, responses[0])
58-
if expected != "ServerHeartbeatStartedEvent":
59-
if isinstance(actual.reply, Hello):
60-
self.assertEqual(actual.duration, 99)
61-
self.assertEqual(actual.reply._doc, responses[1])
62-
else:
63-
self.assertEqual(actual.reply, responses[1])
64-
65-
finally:
66-
m.close()
59+
# zip gives us len(expected_results) pairs.
60+
for expected, actual in zip(expected_results, listener.events):
61+
self.assertEqual(expected, actual.__class__.__name__)
62+
self.assertEqual(actual.connection_id, responses[0])
63+
if expected != "ServerHeartbeatStartedEvent":
64+
if isinstance(actual.reply, Hello):
65+
self.assertEqual(actual.duration, 99)
66+
self.assertEqual(actual.reply._doc, responses[1])
67+
else:
68+
self.assertEqual(actual.reply, responses[1])
6769

6870
def test_standalone(self):
6971
responses = (

test/test_json_util.py

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
import sys
2222
import uuid
2323
from collections import OrderedDict
24-
from typing import Any, List, MutableMapping, Tuple, Type
24+
from typing import Any, Tuple, Type
2525

2626
from bson.codec_options import CodecOptions, DatetimeConversion
2727

2828
sys.path[0:0] = [""]
2929

30-
from test import IntegrationTest, unittest
30+
from test import unittest
3131

3232
from bson import EPOCH_AWARE, EPOCH_NAIVE, SON, DatetimeMS, json_util
3333
from bson.binary import (
@@ -636,24 +636,5 @@ class MyBinary(Binary):
636636
self.assertEqual(json_util.dumps(MyBinary(b"bin", USER_DEFINED_SUBTYPE)), expected_json)
637637

638638

639-
class TestJsonUtilRoundtrip(IntegrationTest):
640-
def test_cursor(self):
641-
db = self.db
642-
643-
db.drop_collection("test")
644-
docs: List[MutableMapping[str, Any]] = [
645-
{"foo": [1, 2]},
646-
{"bar": {"hello": "world"}},
647-
{"code": Code("function x() { return 1; }")},
648-
{"bin": Binary(b"\x00\x01\x02\x03\x04", USER_DEFINED_SUBTYPE)},
649-
{"dbref": {"_ref": DBRef("simple", ObjectId("509b8db456c02c5ab7e63c34"))}},
650-
]
651-
652-
db.test.insert_many(docs)
653-
reloaded_docs = json_util.loads(json_util.dumps(db.test.find()))
654-
for doc in docs:
655-
self.assertTrue(doc in reloaded_docs)
656-
657-
658639
if __name__ == "__main__":
659640
unittest.main()

test/test_json_util_integration.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from __future__ import annotations
2+
3+
from test import IntegrationTest
4+
from typing import Any, List, MutableMapping
5+
6+
from bson import Binary, Code, DBRef, ObjectId, json_util
7+
from bson.binary import USER_DEFINED_SUBTYPE
8+
9+
_IS_SYNC = True
10+
11+
12+
class TestJsonUtilRoundtrip(IntegrationTest):
13+
def test_cursor(self):
14+
db = self.db
15+
16+
db.drop_collection("test")
17+
docs: List[MutableMapping[str, Any]] = [
18+
{"foo": [1, 2]},
19+
{"bar": {"hello": "world"}},
20+
{"code": Code("function x() { return 1; }")},
21+
{"bin": Binary(b"\x00\x01\x02\x03\x04", USER_DEFINED_SUBTYPE)},
22+
{"dbref": {"_ref": DBRef("simple", ObjectId("509b8db456c02c5ab7e63c34"))}},
23+
]
24+
25+
db.test.insert_many(docs)
26+
reloaded_docs = json_util.loads(json_util.dumps((db.test.find()).to_list()))
27+
for doc in docs:
28+
self.assertTrue(doc in reloaded_docs)

test/utils.py

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
from pymongo.errors import ConfigurationError, OperationFailure
4444
from pymongo.hello import HelloCompat
4545
from pymongo.helpers_shared import _SENSITIVE_COMMANDS
46-
from pymongo.lock import _create_lock
46+
from pymongo.lock import _async_create_lock, _create_lock
4747
from pymongo.monitoring import (
4848
ConnectionCheckedInEvent,
4949
ConnectionCheckedOutEvent,
@@ -312,6 +312,22 @@ def failed(self, event):
312312
self.event_list.append("serverHeartbeatFailedEvent")
313313

314314

315+
class AsyncMockConnection:
316+
def __init__(self):
317+
self.cancel_context = _CancellationContext()
318+
self.more_to_come = False
319+
self.id = random.randint(0, 100)
320+
321+
def close_conn(self, reason):
322+
pass
323+
324+
def __aenter__(self):
325+
return self
326+
327+
def __aexit__(self, exc_type, exc_val, exc_tb):
328+
pass
329+
330+
315331
class MockConnection:
316332
def __init__(self):
317333
self.cancel_context = _CancellationContext()
@@ -328,6 +344,47 @@ def __exit__(self, exc_type, exc_val, exc_tb):
328344
pass
329345

330346

347+
class AsyncMockPool:
348+
def __init__(self, address, options, handshake=True, client_id=None):
349+
self.gen = _PoolGeneration()
350+
self._lock = _async_create_lock()
351+
self.opts = options
352+
self.operation_count = 0
353+
self.conns = []
354+
355+
def stale_generation(self, gen, service_id):
356+
return self.gen.stale(gen, service_id)
357+
358+
@contextlib.asynccontextmanager
359+
async def checkout(self, handler=None):
360+
yield AsyncMockConnection()
361+
362+
async def checkin(self, *args, **kwargs):
363+
pass
364+
365+
async def _reset(self, service_id=None):
366+
async with self._lock:
367+
self.gen.inc(service_id)
368+
369+
async def ready(self):
370+
pass
371+
372+
async def reset(self, service_id=None, interrupt_connections=False):
373+
await self._reset()
374+
375+
async def reset_without_pause(self):
376+
await self._reset()
377+
378+
async def close(self):
379+
await self._reset()
380+
381+
async def update_is_writable(self, is_writable):
382+
pass
383+
384+
async def remove_stale_sockets(self, *args, **kwargs):
385+
pass
386+
387+
331388
class MockPool:
332389
def __init__(self, address, options, handshake=True, client_id=None):
333390
self.gen = _PoolGeneration()

tools/synchro.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@
119119
"_async_create_lock": "_create_lock",
120120
"_async_create_condition": "_create_condition",
121121
"_async_cond_wait": "_cond_wait",
122+
"AsyncMockConnection": "MockConnection",
123+
"AsyncMockPool": "MockPool",
122124
"StopAsyncIteration": "StopIteration",
123125
"asyncjoinall": "joinall",
124126
}
@@ -208,10 +210,12 @@ def async_only_test(f: str) -> bool:
208210
"test_database.py",
209211
"test_data_lake.py",
210212
"test_encryption.py",
213+
"test_heartbeat_monitoring.py",
211214
"test_index_management.py",
212215
"test_grid_file.py",
213216
"test_gridfs.py",
214217
"test_gridfs_spec.py",
218+
"test_json_util_integration.py",
215219
"test_logger.py",
216220
"test_monitoring.py",
217221
"test_raw_bson.py",

0 commit comments

Comments
 (0)