Skip to content

Commit 1f31378

Browse files
absurdfarcedkropachev
authored andcommitted
Cleanup after PYTHON-1366
1 parent c44e264 commit 1f31378

15 files changed

+189
-183
lines changed

benchmarks/base.py

Lines changed: 16 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@
2323

2424
from greplin import scales
2525

26+
from cassandra.io.reactorloader import get_all_supported_connections_classes
27+
2628
dirname = os.path.dirname(os.path.abspath(__file__))
2729
sys.path.append(dirname)
2830
sys.path.append(os.path.join(dirname, '..'))
2931

3032
import cassandra
3133
from cassandra.cluster import Cluster
32-
from cassandra.io.asyncorereactor import AsyncoreConnection
3334

3435
log = logging.getLogger()
3536
handler = logging.StreamHandler()
@@ -48,31 +49,7 @@
4849
'NOTSET': logging.NOTSET,
4950
}
5051

51-
have_libev = False
52-
supported_reactors = [AsyncoreConnection]
53-
try:
54-
from cassandra.io.libevreactor import LibevConnection
55-
have_libev = True
56-
supported_reactors.append(LibevConnection)
57-
except ImportError as exc:
58-
pass
59-
60-
have_asyncio = False
61-
try:
62-
from cassandra.io.asyncioreactor import AsyncioConnection
63-
have_asyncio = True
64-
supported_reactors.append(AsyncioConnection)
65-
except (ImportError, SyntaxError):
66-
pass
67-
68-
have_twisted = False
69-
try:
70-
from cassandra.io.twistedreactor import TwistedConnection
71-
have_twisted = True
72-
supported_reactors.append(TwistedConnection)
73-
except ImportError as exc:
74-
log.exception("Error importing twisted")
75-
pass
52+
supported_reactors = get_all_supported_connections_classes()
7653

7754
KEYSPACE = "testkeyspace" + str(int(time.time()))
7855
TABLE = "testtable"
@@ -214,6 +191,15 @@ def benchmark(thread_class):
214191
log.info(" 99.9th: %0.4fs", request_timer['999percentile'])
215192

216193

194+
def get_connection_class(class_name):
195+
for cls in supported_reactors:
196+
if cls.__name__ == class_name:
197+
return cls
198+
else:
199+
log.error("unavailable reactor class: %s", class_name)
200+
sys.exit(f"{class_name} is not available")
201+
202+
217203
def parse_options():
218204
parser = OptionParser()
219205
parser.add_option('-H', '--hosts', default='127.0.0.1',
@@ -261,23 +247,15 @@ def parse_options():
261247
log.warning("Unknown log level specified: %s; specify one of %s", options.log_level, _log_levels.keys())
262248

263249
if options.asyncore_only:
264-
options.supported_reactors = [AsyncoreConnection]
250+
options.supported_reactors = [get_connection_class("AsyncoreConnection")]
265251
elif options.asyncio_only:
266-
options.supported_reactors = [AsyncioConnection]
252+
options.supported_reactors = [get_connection_class("AsyncioConnection")]
267253
elif options.libev_only:
268-
if not have_libev:
269-
log.error("libev is not available")
270-
sys.exit(1)
271-
options.supported_reactors = [LibevConnection]
254+
options.supported_reactors = [get_connection_class("LibevConnection")]
272255
elif options.twisted_only:
273-
if not have_twisted:
274-
log.error("Twisted is not available")
275-
sys.exit(1)
276-
options.supported_reactors = [TwistedConnection]
256+
options.supported_reactors = [get_connection_class("TwistedConnection")]
277257
else:
278258
options.supported_reactors = supported_reactors
279-
if not have_libev:
280-
log.warning("Not benchmarking libev reactor because libev is not available")
281259

282260
return options, args
283261

cassandra/cluster.py

Lines changed: 9 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from collections.abc import Mapping
2626
from concurrent.futures import ThreadPoolExecutor, FIRST_COMPLETED, wait as wait_futures
2727
from copy import copy
28-
from functools import partial, reduce, wraps
28+
from functools import partial, wraps
2929
from itertools import groupby, count, chain
3030
import json
3131
import logging
@@ -50,10 +50,11 @@
5050
from cassandra.connection import (ConnectionException, ConnectionShutdown,
5151
ConnectionHeartbeat, ProtocolVersionUnsupported,
5252
EndPoint, DefaultEndPoint, DefaultEndPointFactory,
53-
ContinuousPagingState, SniEndPointFactory, ConnectionBusy)
53+
ContinuousPagingState, SniEndPointFactory, ConnectionBusy, Connection)
5454
from cassandra.cqltypes import UserType
5555
import cassandra.cqltypes as types
5656
from cassandra.encoder import Encoder
57+
from cassandra.io.reactorloader import get_default_connection_class, try_twisted_connection
5758
from cassandra.protocol import (QueryMessage, ResultMessage,
5859
ErrorMessage, ReadTimeoutErrorMessage,
5960
WriteTimeoutErrorMessage,
@@ -96,82 +97,19 @@
9697
from cassandra.datastax import cloud as dscloud
9798
from cassandra.scylla.cloud import CloudConfiguration
9899

99-
try:
100-
from cassandra.io.twistedreactor import TwistedConnection
101-
except ImportError:
102-
TwistedConnection = None
103-
104-
try:
105-
from cassandra.io.eventletreactor import EventletConnection
106-
except (ImportError, AttributeError):
107-
# AttributeError was add for handling python 3.12 https://github.com/eventlet/eventlet/issues/812
108-
# TODO: remove it when eventlet issue would be fixed
109-
EventletConnection = None
100+
TwistedConnection = try_twisted_connection().connection_class
101+
EventletConnection = try_twisted_connection().connection_class
110102

111103
try:
112104
from weakref import WeakSet
113105
except ImportError:
114106
from cassandra.util import WeakSet # NOQA
115107

116-
def _is_gevent_monkey_patched():
117-
if 'gevent.monkey' not in sys.modules:
118-
return False
119-
import gevent.socket
120-
return socket.socket is gevent.socket.socket
121-
122-
def _try_gevent_import():
123-
if _is_gevent_monkey_patched():
124-
from cassandra.io.geventreactor import GeventConnection
125-
return (GeventConnection,None)
126-
else:
127-
return (None,None)
128-
129-
def _is_eventlet_monkey_patched():
130-
if 'eventlet.patcher' not in sys.modules:
131-
return False
132-
try:
133-
import eventlet.patcher
134-
return eventlet.patcher.is_monkey_patched('socket')
135-
except (ImportError, AttributeError):
136-
# AttributeError was add for handling python 3.12 https://github.com/eventlet/eventlet/issues/812
137-
# TODO: remove it when eventlet issue would be fixed
138-
return False
139-
140-
def _try_eventlet_import():
141-
if _is_eventlet_monkey_patched():
142-
from cassandra.io.eventletreactor import EventletConnection
143-
return (EventletConnection,None)
144-
else:
145-
return (None,None)
146-
147-
def _try_libev_import():
148-
try:
149-
from cassandra.io.libevreactor import LibevConnection
150-
return (LibevConnection,None)
151-
except DependencyException as e:
152-
return (None, e)
153-
154-
def _try_asyncore_import():
155-
try:
156-
from cassandra.io.asyncorereactor import AsyncoreConnection
157-
return (AsyncoreConnection,None)
158-
except DependencyException as e:
159-
return (None, e)
160-
161-
def _connection_reduce_fn(val,import_fn):
162-
(rv, excs) = val
163-
# If we've already found a workable Connection class return immediately
164-
if rv:
165-
return val
166-
(import_result, exc) = import_fn()
167-
if exc:
168-
excs.append(exc)
169-
return (rv or import_result, excs)
108+
log = logging.getLogger(__name__)
170109

171-
conn_fns = (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncore_import)
172-
(conn_class, excs) = reduce(_connection_reduce_fn, conn_fns, (None,[]))
173-
if excs:
174-
raise DependencyException("Exception loading connection class dependencies", excs)
110+
(conn_class, excs) = get_default_connection_class()
111+
if not conn_class:
112+
raise DependencyException("Unable to load a default connection class", excs)
175113
DefaultConnection = conn_class
176114

177115
# Forces load of utf8 encoding module to avoid deadlock that occurs

cassandra/io/eventletreactor.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,20 @@
1515

1616
# Originally derived from MagnetoDB source:
1717
# https://github.com/stackforge/magnetodb/blob/2015.1.0b1/magnetodb/common/cassandra/io/eventletreactor.py
18-
import eventlet
19-
from eventlet.green import socket
20-
from eventlet.queue import Queue
21-
from greenlet import GreenletExit
18+
from cassandra import DependencyException
19+
20+
try:
21+
import eventlet
22+
from eventlet.green import socket
23+
from eventlet.queue import Queue
24+
except (ModuleNotFoundError, ImportError, AttributeError):
25+
raise DependencyException("Unable to import eventlet module. Try to install it via `pip install eventlet`")
26+
27+
try:
28+
from greenlet import GreenletExit
29+
except (ModuleNotFoundError, ImportError, AttributeError):
30+
raise DependencyException("Unable to import greenlet module. Try to install it via `pip install greenlet`")
31+
2232
import logging
2333
from threading import Event
2434
import time

cassandra/io/geventreactor.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,19 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
import gevent
15-
import gevent.event
16-
from gevent.queue import Queue
17-
from gevent import socket
18-
import gevent.ssl
14+
from cassandra import DependencyException
15+
16+
try:
17+
import gevent
18+
import gevent.event
19+
from gevent.queue import Queue
20+
from gevent import socket
21+
import gevent.ssl
22+
except (ImportError, ModuleNotFoundError, AttributeError):
23+
raise DependencyException(
24+
"Unable to import gevent module. This module is optional, but if you want to use GeventConnection you need to "
25+
"install it. Try to install it via `pip install gevent`."
26+
)
1927

2028
import logging
2129
import time

cassandra/io/reactorloader.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import sys
2+
from socket import socket
3+
from typing import NamedTuple, Optional, Type
4+
5+
from cassandra import DependencyException
6+
from cassandra.connection import Connection
7+
8+
9+
class ClassImportResult(NamedTuple):
10+
name: str
11+
exception: Optional[Exception]
12+
connection_class: Optional[Type[Connection]]
13+
14+
15+
def is_gevent_monkey_patched():
16+
if 'gevent.monkey' not in sys.modules:
17+
return False
18+
try:
19+
import gevent.socket
20+
return socket.socket is gevent.socket.socket
21+
except (ModuleNotFoundError, ImportError, AttributeError):
22+
return False
23+
24+
25+
def try_gevent_connection():
26+
if is_gevent_monkey_patched():
27+
try:
28+
from cassandra.io.geventreactor import GeventConnection
29+
return ClassImportResult(name="GeventConnection", connection_class=GeventConnection, exception=None)
30+
except DependencyException as e:
31+
return ClassImportResult(name="GeventConnection", connection_class=None, exception=e)
32+
else:
33+
return ClassImportResult(name="GeventConnection", connection_class=None, exception=DependencyException("gevent is not patched"))
34+
35+
36+
def is_eventlet_monkey_patched():
37+
if 'eventlet.patcher' not in sys.modules:
38+
return False
39+
try:
40+
import eventlet.patcher
41+
return eventlet.patcher.is_monkey_patched('socket')
42+
except (ImportError, AttributeError):
43+
# AttributeError was add for handling python 3.12 https://github.com/eventlet/eventlet/issues/812
44+
# TODO: remove it when eventlet issue would be fixed
45+
return False
46+
47+
48+
def try_eventlet_connection():
49+
try:
50+
from cassandra.io.eventletreactor import EventletConnection
51+
except DependencyException as e:
52+
return ClassImportResult(name="EventletConnection", connection_class=None, exception=e)
53+
if is_eventlet_monkey_patched():
54+
return ClassImportResult(name="EventletConnection", connection_class=EventletConnection, exception=None)
55+
return ClassImportResult(name="EventletConnection", connection_class=None, exception=DependencyException("eventlet is not patched"))
56+
57+
58+
def try_libev_import():
59+
try:
60+
from cassandra.io.libevreactor import LibevConnection
61+
return ClassImportResult(name="LibevConnection", connection_class=LibevConnection, exception=None)
62+
except DependencyException as e:
63+
return ClassImportResult(name="LibevConnection", connection_class=None, exception=e)
64+
65+
66+
def try_asyncore_connection():
67+
try:
68+
from cassandra.io.asyncorereactor import AsyncoreConnection
69+
return ClassImportResult(name="AsyncoreConnection", connection_class=AsyncoreConnection, exception=None)
70+
except DependencyException as e:
71+
return ClassImportResult(name="AsyncoreConnection", connection_class=None, exception=e)
72+
73+
74+
def try_twisted_connection():
75+
try:
76+
from cassandra.io.twistedreactor import TwistedConnection
77+
return ClassImportResult(name="TwistedConnection", connection_class=TwistedConnection, exception=None)
78+
except DependencyException as e:
79+
return ClassImportResult(name="TwistedConnection", connection_class=None, exception=e)
80+
81+
82+
def load_all_connections_classes():
83+
results = []
84+
for try_fn in (try_gevent_connection, try_eventlet_connection, try_libev_import, try_asyncore_connection, try_twisted_connection):
85+
results.append(try_fn())
86+
return tuple(results)
87+
88+
89+
def get_all_supported_connections_classes():
90+
return [res.connection_class for res in load_all_connections_classes() if res.connection_class]
91+
92+
93+
def get_default_connection_class():
94+
excs = []
95+
for try_fn in (try_gevent_connection, try_eventlet_connection, try_libev_import, try_asyncore_connection, try_twisted_connection):
96+
res = try_fn()
97+
if res.connection_class:
98+
return res.connection_class, excs
99+
excs.append(res.exception)
100+
return None, tuple(excs)

cassandra/io/twistedreactor.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,19 @@
2222
from threading import Thread, Lock
2323
import weakref
2424

25-
from twisted.internet import reactor, protocol
26-
from twisted.internet.endpoints import connectProtocol, TCP4ClientEndpoint, SSL4ClientEndpoint
27-
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
28-
from twisted.python.failure import Failure
29-
from zope.interface import implementer
25+
from cassandra import DependencyException
26+
try:
27+
from twisted.internet import reactor, protocol
28+
from twisted.internet.endpoints import connectProtocol, TCP4ClientEndpoint, SSL4ClientEndpoint
29+
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
30+
from twisted.python.failure import Failure
31+
except (ModuleNotFoundError, ImportError):
32+
raise DependencyException("Unable to import twisted module. Try to install it via `pip install twisted[tls]`")
33+
34+
try:
35+
from zope.interface import implementer
36+
except (ModuleNotFoundError, ImportError):
37+
raise DependencyException("Unable to import zope module. Try to install it via `pip install zope`")
3038

3139
from cassandra.connection import Connection, ConnectionShutdown, Timer, TimerManager, ConnectionException
3240

tests/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def is_monkey_patched():
6666
gevent.monkey.patch_all()
6767
from cassandra.io.geventreactor import GeventConnection
6868
connection_class = GeventConnection
69-
except ImportError:
69+
except DependencyException:
7070
connection_class = None
7171
elif "eventlet" in EVENT_LOOP_MANAGER:
7272
from eventlet import monkey_patch

0 commit comments

Comments
 (0)