Skip to content

PYTHON-5107 - Convert test.test_streaming_protocol to async #2126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 228 additions & 0 deletions test/asynchronous/test_streaming_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
# Copyright 2020-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Test the database module."""
from __future__ import annotations

import sys
import time

sys.path[0:0] = [""]

from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest
from test.utils import (
HeartbeatEventListener,
ServerEventListener,
async_wait_until,
)

from pymongo import monitoring
from pymongo.hello import HelloCompat

_IS_SYNC = False


class TestStreamingProtocol(AsyncIntegrationTest):
@async_client_context.require_failCommand_appName
async def test_failCommand_streaming(self):
listener = ServerEventListener()
hb_listener = HeartbeatEventListener()
client = await self.async_rs_or_single_client(
event_listeners=[listener, hb_listener],
heartbeatFrequencyMS=500,
appName="failingHeartbeatTest",
)
# Force a connection.
await client.admin.command("ping")
address = await client.address
listener.reset()

fail_hello = {
"configureFailPoint": "failCommand",
"mode": {"times": 4},
"data": {
"failCommands": [HelloCompat.LEGACY_CMD, "hello"],
"closeConnection": False,
"errorCode": 10107,
"appName": "failingHeartbeatTest",
},
}
async with self.fail_point(fail_hello):

def _marked_unknown(event):
return (
event.server_address == address
and not event.new_description.is_server_type_known
)

def _discovered_node(event):
return (
event.server_address == address
and not event.previous_description.is_server_type_known
and event.new_description.is_server_type_known
)

def marked_unknown():
return len(listener.matching(_marked_unknown)) >= 1

def rediscovered():
return len(listener.matching(_discovered_node)) >= 1

# Topology events are not published synchronously
await async_wait_until(marked_unknown, "mark node unknown")
await async_wait_until(rediscovered, "rediscover node")

# Server should be selectable.
await client.admin.command("ping")

@async_client_context.require_failCommand_appName
async def test_streaming_rtt(self):
listener = ServerEventListener()
hb_listener = HeartbeatEventListener()
# On Windows, RTT can actually be 0.0 because time.time() only has
# 1-15 millisecond resolution. We need to delay the initial hello
# to ensure that RTT is never zero.
name = "streamingRttTest"
delay_hello: dict = {
"configureFailPoint": "failCommand",
"mode": {"times": 1000},
"data": {
"failCommands": [HelloCompat.LEGACY_CMD, "hello"],
"blockConnection": True,
"blockTimeMS": 20,
# This can be uncommented after SERVER-49220 is fixed.
# 'appName': name,
},
}
async with self.fail_point(delay_hello):
client = await self.async_rs_or_single_client(
event_listeners=[listener, hb_listener], heartbeatFrequencyMS=500, appName=name
)
# Force a connection.
await client.admin.command("ping")
address = await client.address

delay_hello["data"]["blockTimeMS"] = 500
delay_hello["data"]["appName"] = name
async with self.fail_point(delay_hello):

def rtt_exceeds_250_ms():
# XXX: Add a public TopologyDescription getter to MongoClient?
topology = client._topology
sd = topology.description.server_descriptions()[address]
assert sd.round_trip_time is not None
return sd.round_trip_time > 0.250

await async_wait_until(rtt_exceeds_250_ms, "exceed 250ms RTT")

# Server should be selectable.
await client.admin.command("ping")

def changed_event(event):
return event.server_address == address and isinstance(
event, monitoring.ServerDescriptionChangedEvent
)

# There should only be one event published, for the initial discovery.
events = listener.matching(changed_event)
self.assertEqual(1, len(events))
self.assertGreater(events[0].new_description.round_trip_time, 0)

@async_client_context.require_failCommand_appName
async def test_monitor_waits_after_server_check_error(self):
# This test implements:
# https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring-tests.md#monitors-sleep-at-least-minheartbeatfreqencyms-between-checks
fail_hello = {
"mode": {"times": 5},
"data": {
"failCommands": [HelloCompat.LEGACY_CMD, "hello"],
"errorCode": 1234,
"appName": "SDAMMinHeartbeatFrequencyTest",
},
}
async with self.fail_point(fail_hello):
start = time.time()
client = await self.async_single_client(
appName="SDAMMinHeartbeatFrequencyTest", serverSelectionTimeoutMS=5000
)
# Force a connection.
await client.admin.command("ping")
duration = time.time() - start
# Explanation of the expected events:
# 0ms: run configureFailPoint
# 1ms: create MongoClient
# 2ms: failed monitor handshake, 1
# 502ms: failed monitor handshake, 2
# 1002ms: failed monitor handshake, 3
# 1502ms: failed monitor handshake, 4
# 2002ms: failed monitor handshake, 5
# 2502ms: monitor handshake succeeds
# 2503ms: run awaitable hello
# 2504ms: application handshake succeeds
# 2505ms: ping command succeeds
self.assertGreaterEqual(duration, 2)
self.assertLessEqual(duration, 3.5)

@async_client_context.require_failCommand_appName
async def test_heartbeat_awaited_flag(self):
hb_listener = HeartbeatEventListener()
client = await self.async_single_client(
event_listeners=[hb_listener],
heartbeatFrequencyMS=500,
appName="heartbeatEventAwaitedFlag",
)
# Force a connection.
await client.admin.command("ping")

def hb_succeeded(event):
return isinstance(event, monitoring.ServerHeartbeatSucceededEvent)

def hb_failed(event):
return isinstance(event, monitoring.ServerHeartbeatFailedEvent)

fail_heartbeat = {
"mode": {"times": 2},
"data": {
"failCommands": [HelloCompat.LEGACY_CMD, "hello"],
"closeConnection": True,
"appName": "heartbeatEventAwaitedFlag",
},
}
async with self.fail_point(fail_heartbeat):
await async_wait_until(
lambda: hb_listener.matching(hb_failed), "published failed event"
)
# Reconnect.
await client.admin.command("ping")

hb_succeeded_events = hb_listener.matching(hb_succeeded)
hb_failed_events = hb_listener.matching(hb_failed)
self.assertFalse(hb_succeeded_events[0].awaited)
self.assertTrue(hb_failed_events[0].awaited)
# Depending on thread scheduling, the failed heartbeat could occur on
# the second or third check.
events = [type(e) for e in hb_listener.events[:4]]
if events == [
monitoring.ServerHeartbeatStartedEvent,
monitoring.ServerHeartbeatSucceededEvent,
monitoring.ServerHeartbeatStartedEvent,
monitoring.ServerHeartbeatFailedEvent,
]:
self.assertFalse(hb_succeeded_events[1].awaited)
else:
self.assertTrue(hb_succeeded_events[1].awaited)


if __name__ == "__main__":
unittest.main()
8 changes: 3 additions & 5 deletions test/test_streaming_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from pymongo import monitoring
from pymongo.hello import HelloCompat

_IS_SYNC = True


class TestStreamingProtocol(IntegrationTest):
@client_context.require_failCommand_appName
Expand All @@ -41,7 +43,6 @@ def test_failCommand_streaming(self):
heartbeatFrequencyMS=500,
appName="failingHeartbeatTest",
)
self.addCleanup(client.close)
# Force a connection.
client.admin.command("ping")
address = client.address
Expand Down Expand Up @@ -78,7 +79,7 @@ def marked_unknown():
def rediscovered():
return len(listener.matching(_discovered_node)) >= 1

# Topology events are published asynchronously
# Topology events are not published synchronously
wait_until(marked_unknown, "mark node unknown")
wait_until(rediscovered, "rediscover node")

Expand Down Expand Up @@ -108,7 +109,6 @@ def test_streaming_rtt(self):
client = self.rs_or_single_client(
event_listeners=[listener, hb_listener], heartbeatFrequencyMS=500, appName=name
)
self.addCleanup(client.close)
# Force a connection.
client.admin.command("ping")
address = client.address
Expand Down Expand Up @@ -156,7 +156,6 @@ def test_monitor_waits_after_server_check_error(self):
client = self.single_client(
appName="SDAMMinHeartbeatFrequencyTest", serverSelectionTimeoutMS=5000
)
self.addCleanup(client.close)
# Force a connection.
client.admin.command("ping")
duration = time.time() - start
Expand All @@ -183,7 +182,6 @@ def test_heartbeat_awaited_flag(self):
heartbeatFrequencyMS=500,
appName="heartbeatEventAwaitedFlag",
)
self.addCleanup(client.close)
# Force a connection.
client.admin.command("ping")

Expand Down
1 change: 1 addition & 0 deletions tools/synchro.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ def async_only_test(f: str) -> bool:
"test_session.py",
"test_srv_polling.py",
"test_ssl.py",
"test_streaming_protocol.py",
"test_transactions.py",
"test_unified_format.py",
"unified_format.py",
Expand Down
Loading