Skip to content

Commit b1a0d7a

Browse files
absurdfarcedkropachev
authored andcommitted
PYTHON-1366 Handle removal of asyncore in Python 3.12 (datastax#1187)
1 parent c44e264 commit b1a0d7a

13 files changed

+141
-120
lines changed

benchmarks/base.py

Lines changed: 15 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@
2828
sys.path.append(os.path.join(dirname, '..'))
2929

3030
import cassandra
31-
from cassandra.cluster import Cluster
32-
from cassandra.io.asyncorereactor import AsyncoreConnection
31+
from cassandra.cluster import Cluster, get_all_supported_connections_classes
3332

3433
log = logging.getLogger()
3534
handler = logging.StreamHandler()
@@ -48,31 +47,7 @@
4847
'NOTSET': logging.NOTSET,
4948
}
5049

51-
have_libev = False
52-
supported_reactors = [AsyncoreConnection]
53-
try:
54-
from cassandra.io.libevreactor import LibevConnection
55-
have_libev = True
56-
supported_reactors.append(LibevConnection)
57-
except ImportError as exc:
58-
pass
59-
60-
have_asyncio = False
61-
try:
62-
from cassandra.io.asyncioreactor import AsyncioConnection
63-
have_asyncio = True
64-
supported_reactors.append(AsyncioConnection)
65-
except (ImportError, SyntaxError):
66-
pass
67-
68-
have_twisted = False
69-
try:
70-
from cassandra.io.twistedreactor import TwistedConnection
71-
have_twisted = True
72-
supported_reactors.append(TwistedConnection)
73-
except ImportError as exc:
74-
log.exception("Error importing twisted")
75-
pass
50+
supported_reactors = get_all_supported_connections_classes()
7651

7752
KEYSPACE = "testkeyspace" + str(int(time.time()))
7853
TABLE = "testtable"
@@ -214,6 +189,15 @@ def benchmark(thread_class):
214189
log.info(" 99.9th: %0.4fs", request_timer['999percentile'])
215190

216191

192+
def get_connection_class(class_name):
193+
for cls in supported_reactors:
194+
if cls.__name__ == class_name:
195+
return cls
196+
else:
197+
log.error("unavailable reactor class: %s", class_name)
198+
sys.exit(f"{class_name} is not available")
199+
200+
217201
def parse_options():
218202
parser = OptionParser()
219203
parser.add_option('-H', '--hosts', default='127.0.0.1',
@@ -261,23 +245,15 @@ def parse_options():
261245
log.warning("Unknown log level specified: %s; specify one of %s", options.log_level, _log_levels.keys())
262246

263247
if options.asyncore_only:
264-
options.supported_reactors = [AsyncoreConnection]
248+
options.supported_reactors = [get_connection_class("AsyncoreConnection")]
265249
elif options.asyncio_only:
266-
options.supported_reactors = [AsyncioConnection]
250+
options.supported_reactors = [get_connection_class("AsyncioConnection")]
267251
elif options.libev_only:
268-
if not have_libev:
269-
log.error("libev is not available")
270-
sys.exit(1)
271-
options.supported_reactors = [LibevConnection]
252+
options.supported_reactors = [get_connection_class("LibevConnection")]
272253
elif options.twisted_only:
273-
if not have_twisted:
274-
log.error("Twisted is not available")
275-
sys.exit(1)
276-
options.supported_reactors = [TwistedConnection]
254+
options.supported_reactors = [get_connection_class("TwistedConnection")]
277255
else:
278256
options.supported_reactors = supported_reactors
279-
if not have_libev:
280-
log.warning("Not benchmarking libev reactor because libev is not available")
281257

282258
return options, args
283259

cassandra/cluster.py

Lines changed: 65 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from itertools import groupby, count, chain
3030
import json
3131
import logging
32+
from typing import NamedTuple, Type, Optional
3233
from warnings import warn
3334
from random import random
3435
import re
@@ -50,7 +51,7 @@
5051
from cassandra.connection import (ConnectionException, ConnectionShutdown,
5152
ConnectionHeartbeat, ProtocolVersionUnsupported,
5253
EndPoint, DefaultEndPoint, DefaultEndPointFactory,
53-
ContinuousPagingState, SniEndPointFactory, ConnectionBusy)
54+
ContinuousPagingState, SniEndPointFactory, ConnectionBusy, Connection)
5455
from cassandra.cqltypes import UserType
5556
import cassandra.cqltypes as types
5657
from cassandra.encoder import Encoder
@@ -98,12 +99,12 @@
9899

99100
try:
100101
from cassandra.io.twistedreactor import TwistedConnection
101-
except ImportError:
102+
except DependencyException:
102103
TwistedConnection = None
103104

104105
try:
105106
from cassandra.io.eventletreactor import EventletConnection
106-
except (ImportError, AttributeError):
107+
except DependencyException:
107108
# AttributeError was add for handling python 3.12 https://github.com/eventlet/eventlet/issues/812
108109
# TODO: remove it when eventlet issue would be fixed
109110
EventletConnection = None
@@ -113,18 +114,31 @@
113114
except ImportError:
114115
from cassandra.util import WeakSet # NOQA
115116

117+
118+
class ClassImportResult(NamedTuple):
119+
name: str
120+
exception: Optional[Exception]
121+
connection_class: Optional[Type[Connection]]
122+
123+
116124
def _is_gevent_monkey_patched():
117125
if 'gevent.monkey' not in sys.modules:
118126
return False
119-
import gevent.socket
120-
return socket.socket is gevent.socket.socket
127+
try:
128+
import gevent.socket
129+
return socket.socket is gevent.socket.socket
130+
except (ModuleNotFoundError, ImportError, AttributeError):
131+
return False
121132

122133
def _try_gevent_import():
123134
if _is_gevent_monkey_patched():
124-
from cassandra.io.geventreactor import GeventConnection
125-
return (GeventConnection,None)
135+
try:
136+
from cassandra.io.geventreactor import GeventConnection
137+
return ClassImportResult(name="GeventConnection", connection_class=GeventConnection, exception=None)
138+
except DependencyException as e:
139+
return ClassImportResult(name="GeventConnection", connection_class=None, exception=e)
126140
else:
127-
return (None,None)
141+
return ClassImportResult(name="GeventConnection", connection_class=None, exception=DependencyException("gevent is not patched"))
128142

129143
def _is_eventlet_monkey_patched():
130144
if 'eventlet.patcher' not in sys.modules:
@@ -138,40 +152,61 @@ def _is_eventlet_monkey_patched():
138152
return False
139153

140154
def _try_eventlet_import():
141-
if _is_eventlet_monkey_patched():
155+
try:
142156
from cassandra.io.eventletreactor import EventletConnection
143-
return (EventletConnection,None)
144-
else:
145-
return (None,None)
157+
except DependencyException as e:
158+
return ClassImportResult(name="EventletConnection", connection_class=None, exception=e)
159+
if _is_eventlet_monkey_patched():
160+
return ClassImportResult(name="EventletConnection", connection_class=EventletConnection, exception=None)
161+
return ClassImportResult(name="EventletConnection", connection_class=None, exception=DependencyException("eventlet is not patched"))
146162

147163
def _try_libev_import():
148164
try:
149165
from cassandra.io.libevreactor import LibevConnection
150-
return (LibevConnection,None)
166+
return ClassImportResult(name="LibevConnection", connection_class=LibevConnection, exception=None)
151167
except DependencyException as e:
152-
return (None, e)
168+
return ClassImportResult(name="LibevConnection", connection_class=None, exception=e)
153169

154170
def _try_asyncore_import():
155171
try:
156172
from cassandra.io.asyncorereactor import AsyncoreConnection
157-
return (AsyncoreConnection,None)
173+
return ClassImportResult(name="AsyncoreConnection", connection_class=AsyncoreConnection, exception=None)
158174
except DependencyException as e:
159-
return (None, e)
160-
161-
def _connection_reduce_fn(val,import_fn):
162-
(rv, excs) = val
163-
# If we've already found a workable Connection class return immediately
164-
if rv:
165-
return val
166-
(import_result, exc) = import_fn()
167-
if exc:
168-
excs.append(exc)
169-
return (rv or import_result, excs)
175+
return ClassImportResult(name="AsyncoreConnection", connection_class=None, exception=e)
176+
177+
def _try_twisted_import():
178+
try:
179+
from cassandra.io.twistedreactor import TwistedConnection
180+
return ClassImportResult(name="TwistedConnection", connection_class=TwistedConnection, exception=None)
181+
except DependencyException as e:
182+
return ClassImportResult(name="TwistedConnection", connection_class=None, exception=e)
183+
184+
185+
log = logging.getLogger(__name__)
186+
187+
188+
def load_all_connections_classes():
189+
results = []
190+
for try_fn in (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncore_import, _try_twisted_import):
191+
results.append(try_fn())
192+
return tuple(results)
193+
194+
def get_all_supported_connections_classes():
195+
return [res.connection_class for res in load_all_connections_classes() if res.connection_class]
196+
197+
def get_default_connection_class():
198+
excs = []
199+
for try_fn in (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncore_import, _try_twisted_import):
200+
res = try_fn()
201+
if res.connection_class:
202+
return res.connection_class, excs
203+
excs.append(res.exception)
204+
return None, tuple(excs)
205+
170206

171-
conn_fns = (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncore_import)
172-
(conn_class, excs) = reduce(_connection_reduce_fn, conn_fns, (None,[]))
173-
if excs:
174-
raise DependencyException("Exception loading connection class dependencies", excs)
207+
(conn_class, excs) = get_default_connection_class()
208+
if not conn_class:
209+
raise DependencyException("Unable to load a default connection class", excs)
175210
DefaultConnection = conn_class
176211

177212
# Forces load of utf8 encoding module to avoid deadlock that occurs

cassandra/io/eventletreactor.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,20 @@
1515

1616
# Originally derived from MagnetoDB source:
1717
# https://github.com/stackforge/magnetodb/blob/2015.1.0b1/magnetodb/common/cassandra/io/eventletreactor.py
18-
import eventlet
19-
from eventlet.green import socket
20-
from eventlet.queue import Queue
21-
from greenlet import GreenletExit
18+
from cassandra import DependencyException
19+
20+
try:
21+
import eventlet
22+
from eventlet.green import socket
23+
from eventlet.queue import Queue
24+
except (ModuleNotFoundError, ImportError, AttributeError):
25+
raise DependencyException("Unable to import eventlet module. Try to install it via `pip install eventlet`")
26+
27+
try:
28+
from greenlet import GreenletExit
29+
except (ModuleNotFoundError, ImportError, AttributeError):
30+
raise DependencyException("Unable to import greenlet module. Try to install it via `pip install greenlet`")
31+
2232
import logging
2333
from threading import Event
2434
import time

cassandra/io/geventreactor.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,19 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
import gevent
15-
import gevent.event
16-
from gevent.queue import Queue
17-
from gevent import socket
18-
import gevent.ssl
14+
from cassandra import DependencyException
15+
16+
try:
17+
import gevent
18+
import gevent.event
19+
from gevent.queue import Queue
20+
from gevent import socket
21+
import gevent.ssl
22+
except (ImportError, ModuleNotFoundError, AttributeError):
23+
raise DependencyException(
24+
"Unable to import gevent module. This module is optional, but if you want to use GeventConnection you need to "
25+
"install it. Try to install it via `pip install gevent`."
26+
)
1927

2028
import logging
2129
import time

cassandra/io/twistedreactor.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,19 @@
2222
from threading import Thread, Lock
2323
import weakref
2424

25-
from twisted.internet import reactor, protocol
26-
from twisted.internet.endpoints import connectProtocol, TCP4ClientEndpoint, SSL4ClientEndpoint
27-
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
28-
from twisted.python.failure import Failure
29-
from zope.interface import implementer
25+
from cassandra import DependencyException
26+
try:
27+
from twisted.internet import reactor, protocol
28+
from twisted.internet.endpoints import connectProtocol, TCP4ClientEndpoint, SSL4ClientEndpoint
29+
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
30+
from twisted.python.failure import Failure
31+
except (ModuleNotFoundError, ImportError):
32+
raise DependencyException("Unable to import twisted module. Try to install it via `pip install twisted[tls]`")
33+
34+
try:
35+
from zope.interface import implementer
36+
except (ModuleNotFoundError, ImportError):
37+
raise DependencyException("Unable to import zope module. Try to install it via `pip install zope`")
3038

3139
from cassandra.connection import Connection, ConnectionShutdown, Timer, TimerManager, ConnectionException
3240

tests/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def is_monkey_patched():
6666
gevent.monkey.patch_all()
6767
from cassandra.io.geventreactor import GeventConnection
6868
connection_class = GeventConnection
69-
except ImportError:
69+
except DependencyException:
7070
connection_class = None
7171
elif "eventlet" in EVENT_LOOP_MANAGER:
7272
from eventlet import monkey_patch

tests/integration/long/test_ipv6.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,20 @@
1515
import os, socket, errno
1616
from ccmlib import common
1717

18+
from cassandra import DependencyException
1819
from cassandra.cluster import NoHostAvailable
1920

2021
try:
2122
from cassandra.io.asyncorereactor import AsyncoreConnection
22-
except ImportError:
23+
except DependencyException:
2324
AsyncoreConnection = None
2425

2526
from tests import is_monkey_patched
2627
from tests.integration import use_cluster, remove_cluster, TestCluster
2728

2829
try:
2930
from cassandra.io.libevreactor import LibevConnection
30-
except ImportError:
31+
except DependencyException:
3132
LibevConnection = None
3233

3334

tests/integration/standard/test_scylla_cloud.py

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import json
21
import logging
32
import os.path
43
from unittest import TestCase
@@ -7,30 +6,10 @@
76

87
from cassandra.policies import TokenAwarePolicy, RoundRobinPolicy, ConstantReconnectionPolicy
98
from tests.integration import use_cluster, PROTOCOL_VERSION
10-
from cassandra.cluster import Cluster, TwistedConnection
9+
from cassandra.cluster import Cluster, get_all_supported_connections_classes
1110

11+
supported_connection_classes = get_all_supported_connections_classes()
1212

13-
supported_connection_classes = [TwistedConnection]
14-
15-
try:
16-
from cassandra.io.libevreactor import LibevConnection
17-
supported_connection_classes += [LibevConnection]
18-
except ImportError:
19-
pass
20-
21-
22-
try:
23-
from cassandra.io.asyncorereactor import AsyncoreConnection
24-
supported_connection_classes += [AsyncoreConnection]
25-
except ImportError:
26-
pass
27-
28-
#from cassandra.io.geventreactor import GeventConnection
29-
#from cassandra.io.eventletreactor import EventletConnection
30-
#from cassandra.io.asyncioreactor import AsyncioConnection
31-
32-
# need to run them with specific configuration like `gevent.monkey.patch_all()` or under async functions
33-
# unsupported_connection_classes = [GeventConnection, AsyncioConnection, EventletConnection]
3413
LOGGER = logging.getLogger(__name__)
3514

3615

0 commit comments

Comments
 (0)