Skip to content

Commit 79e7c37

Browse files
committed
fix(cassandra/io/asyncioreactor.py): Using async and await inside @asyncio.coroutine
* Replace all places that use old async node (`@asyncio.coroutine` and `yield from`) and use a new Python syntax (Only for Python v3.5+) * Save the old code to allow user with Python v2+ to run our tests. * requirements.txt: * Install `futures` package only for Python v2 * Use the latest `sex` package version
1 parent 5fdb006 commit 79e7c37

File tree

2 files changed

+193
-96
lines changed

2 files changed

+193
-96
lines changed

cassandra/io/asyncioreactor.py

Lines changed: 191 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
import abc
2+
import sys
3+
4+
import six
5+
16
from cassandra.connection import Connection, ConnectionShutdown
27

38
import asyncio
@@ -9,24 +14,16 @@
914

1015

1116
log = logging.getLogger(__name__)
17+
is_min_python_version_3_5 = (sys.version_info.major, sys.version_info.micro) >= (3, 5)
1218

1319

14-
# This module uses ``yield from`` and ``@asyncio.coroutine`` over ``await`` and
15-
# ``async def`` for pre-Python-3.5 compatibility, so keep in mind that the
16-
# managed coroutines are generator-based, not native coroutines. See PEP 492:
17-
# https://www.python.org/dev/peps/pep-0492/#coroutine-objects
18-
20+
if not hasattr(asyncio, "run_coroutine_threadsafe"):
21+
raise ImportError("Cannot use asyncioreactor without access to asyncio.run_coroutine_threadsafe"
22+
" (added in 3.4.6 and 3.5.1)")
1923

20-
try:
21-
asyncio.run_coroutine_threadsafe
22-
except AttributeError:
23-
raise ImportError(
24-
'Cannot use asyncioreactor without access to '
25-
'asyncio.run_coroutine_threadsafe (added in 3.4.6 and 3.5.1)'
26-
)
2724

28-
29-
class AsyncioTimer(object):
25+
@six.add_metaclass(abc.ABCMeta)
26+
class BaseAsyncioTimer(object):
3027
"""
3128
An ``asyncioreactor``-specific Timer. Similar to :class:`.connection.Timer,
3229
but with a slightly different API due to limitations in the underlying
@@ -45,12 +42,6 @@ def __init__(self, timeout, callback, loop):
4542
loop=loop)
4643
self._handle = asyncio.run_coroutine_threadsafe(delayed, loop=loop)
4744

48-
@staticmethod
49-
@asyncio.coroutine
50-
def _call_delayed_coro(timeout, callback, loop):
51-
yield from asyncio.sleep(timeout, loop=loop)
52-
return callback()
53-
5445
def __lt__(self, other):
5546
try:
5647
return self._handle < other._handle
@@ -66,15 +57,21 @@ def finish(self):
6657
raise NotImplementedError('{} is not compatible with TimerManager and '
6758
'does not implement .finish()')
6859

60+
@staticmethod
61+
@abc.abstractmethod
62+
def _call_delayed_coro(timeout, callback, loop):
63+
pass
6964

70-
class AsyncioConnection(Connection):
71-
"""
72-
An experimental implementation of :class:`.Connection` that uses the
73-
``asyncio`` module in the Python standard library for its event loop.
7465

75-
Note that it requires ``asyncio`` features that were only introduced in the
76-
3.4 line in 3.4.6, and in the 3.5 line in 3.5.1.
66+
@six.add_metaclass(abc.ABCMeta)
67+
class BaseAsyncioConnection(Connection):
7768
"""
69+
An experimental implementation of :class:`.Connection` that uses the
70+
``asyncio`` module in the Python standard library for its event loop.
71+
72+
Note that it requires ``asyncio`` features that were only introduced in the
73+
3.4 line in 3.4.6, and in the 3.5 line in 3.5.1.
74+
"""
7875

7976
_loop = None
8077
_pid = os.getpid()
@@ -136,26 +133,6 @@ def close(self):
136133
self._close(), loop=self._loop
137134
)
138135

139-
@asyncio.coroutine
140-
def _close(self):
141-
log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint))
142-
if self._write_watcher:
143-
self._write_watcher.cancel()
144-
if self._read_watcher:
145-
self._read_watcher.cancel()
146-
if self._socket:
147-
self._loop.remove_writer(self._socket.fileno())
148-
self._loop.remove_reader(self._socket.fileno())
149-
self._socket.close()
150-
151-
log.debug("Closed socket to %s" % (self.endpoint,))
152-
153-
if not self.is_defunct:
154-
self.error_all_requests(
155-
ConnectionShutdown("Connection to %s was closed" % self.endpoint))
156-
# don't leave in-progress operations hanging
157-
self.connected_event.set()
158-
159136
def push(self, data):
160137
buff_size = self.out_buffer_size
161138
if len(data) > buff_size:
@@ -174,52 +151,176 @@ def push(self, data):
174151
# avoid races/hangs by just scheduling this, not using threadsafe
175152
self._loop.create_task(self._push_msg(chunks))
176153

177-
@asyncio.coroutine
178-
def _push_msg(self, chunks):
179-
# This lock ensures all chunks of a message are sequential in the Queue
180-
with (yield from self._write_queue_lock):
181-
for chunk in chunks:
182-
self._write_queue.put_nowait(chunk)
154+
@abc.abstractmethod
155+
def _close(self):
156+
pass
183157

158+
@abc.abstractmethod
159+
def _push_msg(self, chunks):
160+
pass
184161

185-
@asyncio.coroutine
162+
@abc.abstractmethod
186163
def handle_write(self):
187-
while True:
188-
try:
189-
next_msg = yield from self._write_queue.get()
190-
if next_msg:
191-
yield from self._loop.sock_sendall(self._socket, next_msg)
192-
except socket.error as err:
193-
log.debug("Exception in send for %s: %s", self, err)
194-
self.defunct(err)
195-
return
196-
except asyncio.CancelledError:
197-
return
164+
pass
198165

199-
@asyncio.coroutine
166+
@abc.abstractmethod
200167
def handle_read(self):
201-
while True:
202-
try:
203-
buf = yield from self._loop.sock_recv(self._socket, self.in_buffer_size)
204-
self._iobuf.write(buf)
205-
# sock_recv expects EWOULDBLOCK if socket provides no data, but
206-
# nonblocking ssl sockets raise these instead, so we handle them
207-
# ourselves by yielding to the event loop, where the socket will
208-
# get the reading/writing it "wants" before retrying
209-
except (ssl.SSLWantWriteError, ssl.SSLWantReadError):
210-
yield
211-
continue
212-
except socket.error as err:
213-
log.debug("Exception during socket recv for %s: %s",
214-
self, err)
215-
self.defunct(err)
216-
return # leave the read loop
217-
except asyncio.CancelledError:
218-
return
219-
220-
if buf and self._iobuf.tell():
221-
self.process_io_buffer()
222-
else:
223-
log.debug("Connection %s closed by server", self)
224-
self.close()
225-
return
168+
pass
169+
170+
171+
if is_min_python_version_3_5:
172+
class AsyncioTimer(BaseAsyncioTimer):
173+
@staticmethod
174+
async def _call_delayed_coro(timeout, callback, loop):
175+
await asyncio.sleep(timeout, loop=loop)
176+
return callback()
177+
178+
class AsyncioConnection(BaseAsyncioConnection):
179+
async def _close(self):
180+
log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint))
181+
if self._write_watcher:
182+
self._write_watcher.cancel()
183+
if self._read_watcher:
184+
self._read_watcher.cancel()
185+
if self._socket:
186+
self._loop.remove_writer(self._socket.fileno())
187+
self._loop.remove_reader(self._socket.fileno())
188+
self._socket.close()
189+
190+
log.debug("Closed socket to %s" % (self.endpoint,))
191+
192+
if not self.is_defunct:
193+
self.error_all_requests(
194+
ConnectionShutdown("Connection to %s was closed" % self.endpoint))
195+
# don't leave in-progress operations hanging
196+
self.connected_event.set()
197+
198+
async def _push_msg(self, chunks):
199+
# This lock ensures all chunks of a message are sequential in the Queue
200+
async with self._write_queue_lock:
201+
for chunk in chunks:
202+
self._write_queue.put_nowait(chunk)
203+
204+
async def handle_write(self):
205+
while True:
206+
try:
207+
next_msg = await self._write_queue.get()
208+
if next_msg:
209+
await self._loop.sock_sendall(self._socket, next_msg)
210+
except socket.error as err:
211+
log.debug("Exception in send for %s: %s", self, err)
212+
self.defunct(err)
213+
return
214+
except asyncio.CancelledError:
215+
return
216+
217+
async def handle_read(self):
218+
while True:
219+
try:
220+
buf = await self._loop.sock_recv(self._socket, self.in_buffer_size)
221+
self._iobuf.write(buf)
222+
# sock_recv expects EWOULDBLOCK if socket provides no data, but
223+
# nonblocking ssl sockets raise these instead, so we handle them
224+
# ourselves by yielding to the event loop, where the socket will
225+
# get the reading/writing it "wants" before retrying
226+
except (ssl.SSLWantWriteError, ssl.SSLWantReadError):
227+
pass
228+
except socket.error as err:
229+
log.debug("Exception during socket recv for %s: %s",
230+
self, err)
231+
self.defunct(err)
232+
return # leave the read loop
233+
except asyncio.CancelledError:
234+
return
235+
236+
if buf and self._iobuf.tell():
237+
self.process_io_buffer()
238+
else:
239+
log.debug("Connection %s closed by server", self)
240+
self.close()
241+
return
242+
else:
243+
class AsyncioTimer(BaseAsyncioTimer):
244+
@staticmethod
245+
@asyncio.coroutine
246+
def _call_delayed_coro(timeout, callback, loop):
247+
yield from asyncio.sleep(timeout, loop=loop)
248+
return callback()
249+
250+
class AsyncioConnection(BaseAsyncioConnection):
251+
@asyncio.coroutine
252+
def _close(self):
253+
log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint))
254+
if self._write_watcher:
255+
self._write_watcher.cancel()
256+
if self._read_watcher:
257+
self._read_watcher.cancel()
258+
if self._socket:
259+
self._loop.remove_writer(self._socket.fileno())
260+
self._loop.remove_reader(self._socket.fileno())
261+
self._socket.close()
262+
263+
log.debug("Closed socket to %s" % (self.endpoint,))
264+
265+
if not self.is_defunct:
266+
self.error_all_requests(
267+
ConnectionShutdown("Connection to %s was closed" % self.endpoint))
268+
# don't leave in-progress operations hanging
269+
self.connected_event.set()
270+
271+
log.debug("Closed socket to %s" % (self.endpoint,))
272+
273+
if not self.is_defunct:
274+
self.error_all_requests(
275+
ConnectionShutdown("Connection to %s was closed" % self.endpoint))
276+
# don't leave in-progress operations hanging
277+
self.connected_event.set()
278+
279+
@asyncio.coroutine
280+
def _push_msg(self, chunks):
281+
# This lock ensures all chunks of a message are sequential in the Queue
282+
with (yield from self._write_queue_lock):
283+
for chunk in chunks:
284+
self._write_queue.put_nowait(chunk)
285+
286+
@asyncio.coroutine
287+
def handle_write(self):
288+
while True:
289+
try:
290+
next_msg = yield from self._write_queue.get()
291+
if next_msg:
292+
yield from self._loop.sock_sendall(self._socket, next_msg)
293+
except socket.error as err:
294+
log.debug("Exception in send for %s: %s", self, err)
295+
self.defunct(err)
296+
return
297+
except asyncio.CancelledError:
298+
return
299+
300+
@asyncio.coroutine
301+
def handle_read(self):
302+
while True:
303+
try:
304+
buf = yield from self._loop.sock_recv(self._socket, self.in_buffer_size)
305+
self._iobuf.write(buf)
306+
# sock_recv expects EWOULDBLOCK if socket provides no data, but
307+
# nonblocking ssl sockets raise these instead, so we handle them
308+
# ourselves by yielding to the event loop, where the socket will
309+
# get the reading/writing it "wants" before retrying
310+
except (ssl.SSLWantWriteError, ssl.SSLWantReadError):
311+
yield
312+
continue
313+
except socket.error as err:
314+
log.debug("Exception during socket recv for %s: %s",
315+
self, err)
316+
self.defunct(err)
317+
return # leave the read loop
318+
except asyncio.CancelledError:
319+
return
320+
321+
if buf and self._iobuf.tell():
322+
self.process_io_buffer()
323+
else:
324+
log.debug("Connection %s closed by server", self)
325+
self.close()
326+
return

requirements.txt

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
11
geomet>=0.1,<0.3
2-
six >=1.9
3-
futures <=2.2.0
4-
# Futures is not required for Python 3, but it works up through 2.2.0 (after which it introduced breaking syntax).
5-
# This is left here to make sure install -r works with any runtime. When installing via setup.py, futures is omitted
6-
# for Python 3, in favor of the standard library implementation.
7-
# see PYTHON-393
2+
six
3+
futures; python_version < '3.0'

0 commit comments

Comments
 (0)