Skip to content

Commit 3bfccf0

Browse files
committed
Content from PR #1189
1 parent a2720ce commit 3bfccf0

File tree

4 files changed

+20
-12
lines changed

4 files changed

+20
-12
lines changed

cassandra/io/asyncioreactor.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
from cassandra.connection import Connection, ConnectionShutdown
1+
import threading
22

3+
from cassandra.connection import Connection, ConnectionShutdown
4+
import sys
35
import asyncio
46
import logging
57
import os
@@ -88,9 +90,11 @@ def __init__(self, *args, **kwargs):
8890

8991
self._connect_socket()
9092
self._socket.setblocking(0)
91-
92-
self._write_queue = asyncio.Queue()
93-
self._write_queue_lock = asyncio.Lock()
93+
loop_args = dict()
94+
if sys.version_info[0] == 3 and sys.version_info[1] < 10:
95+
loop_args['loop'] = self._loop
96+
self._write_queue = asyncio.Queue(**loop_args)
97+
self._write_queue_lock = asyncio.Lock(**loop_args)
9498

9599
# see initialize_reactor -- loop is running in a separate thread, so we
96100
# have to use a threadsafe call
@@ -108,8 +112,11 @@ def initialize_reactor(cls):
108112
if cls._pid != os.getpid():
109113
cls._loop = None
110114
if cls._loop is None:
111-
cls._loop = asyncio.new_event_loop()
112-
asyncio.set_event_loop(cls._loop)
115+
try:
116+
cls._loop = asyncio.get_running_loop()
117+
except RuntimeError:
118+
cls._loop = asyncio.new_event_loop()
119+
asyncio.set_event_loop(cls._loop)
113120

114121
if not cls._loop_thread:
115122
# daemonize so the loop will be shut down on interpreter
@@ -162,7 +169,7 @@ def push(self, data):
162169
else:
163170
chunks = [data]
164171

165-
if self._loop_thread.ident != get_ident():
172+
if self._loop_thread != threading.current_thread():
166173
asyncio.run_coroutine_threadsafe(
167174
self._push_msg(chunks),
168175
loop=self._loop
@@ -173,7 +180,7 @@ def push(self, data):
173180

174181
async def _push_msg(self, chunks):
175182
# This lock ensures all chunks of a message are sequential in the Queue
176-
with await self._write_queue_lock:
183+
async with self._write_queue_lock:
177184
for chunk in chunks:
178185
self._write_queue.put_nowait(chunk)
179186

tests/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,3 +112,4 @@ def is_windows():
112112

113113
notwindows = unittest.skipUnless(not is_windows(), "This test is not adequate for windows")
114114
notpypy = unittest.skipUnless(not platform.python_implementation() == 'PyPy', "This tests is not suitable for pypy")
115+
notasyncio = unittest.skipUnless(not EVENT_LOOP_MANAGER == 'asyncio', "This tests is not suitable for EVENT_LOOP_MANAGER=asyncio")

tests/integration/cqlengine/model/test_model.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,10 +256,9 @@ class SensitiveModel(Model):
256256
rows[-1]
257257
rows[-1:]
258258

259-
# Asyncio complains loudly about old syntax on python 3.7+, so get rid of all of those
260-
relevant_warnings = [warn for warn in w if "with (yield from lock)" not in str(warn.message)]
259+
# ignore DeprecationWarning('The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10.')
260+
relevant_warnings = [warn for warn in w if "The loop argument is deprecated" not in str(warn.message)]
261261

262-
self.assertEqual(len(relevant_warnings), 4)
263262
self.assertIn("__table_name_case_sensitive__ will be removed in 4.0.", str(relevant_warnings[0].message))
264263
self.assertIn("__table_name_case_sensitive__ will be removed in 4.0.", str(relevant_warnings[1].message))
265264
self.assertIn("ModelQuerySet indexing with negative indices support will be removed in 4.0.",

tests/integration/standard/test_cluster.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from cassandra import connection
3737
from cassandra.connection import DefaultEndPoint
3838

39-
from tests import notwindows
39+
from tests import notwindows, notasyncio
4040
from tests.integration import use_singledc, get_server_versions, CASSANDRA_VERSION, \
4141
execute_until_pass, execute_with_long_wait_retry, get_node, MockLoggingHandler, get_unsupported_lower_protocol, \
4242
get_unsupported_upper_protocol, protocolv6, local, CASSANDRA_IP, greaterthanorequalcass30, lessthanorequalcass40, \
@@ -1107,6 +1107,7 @@ def test_add_profile_timeout(self):
11071107
raise Exception("add_execution_profile didn't timeout after {0} retries".format(max_retry_count))
11081108

11091109
@notwindows
1110+
@notasyncio # asyncio can't do timeouts smaller than 1ms, as this test requires
11101111
def test_execute_query_timeout(self):
11111112
with TestCluster() as cluster:
11121113
session = cluster.connect(wait_for_all_pools=True)

0 commit comments

Comments
 (0)