Skip to content

Commit 53aeaf7

Browse files
Implement connect
1 parent 122d6c1 commit 53aeaf7

File tree

2 files changed

+167
-1
lines changed

2 files changed

+167
-1
lines changed

tarantool/connection_pool.py

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
# -*- coding: utf-8 -*-
2+
3+
import json
4+
5+
from tarantool.connection import Connection, ConnectionInterface
6+
from tarantool.const import (CLUSTER_DISCOVERY_DELAY, CONNECTION_TIMEOUT,
7+
RECONNECT_DELAY, RECONNECT_MAX_ATTEMPTS,
8+
SOCKET_TIMEOUT)
9+
from tarantool.error import ClusterConnectWarning, ConfigurationError, warn
10+
from tarantool.utils import ENCODING_DEFAULT
11+
12+
try:
13+
string_types = basestring
14+
except NameError:
15+
string_types = str
16+
17+
class Mode(Enum):
18+
RW = 1
19+
PREFER_RW = 2
20+
PREFER_RO = 3
21+
22+
def validate_address(address):
23+
def format_error(address, err):
24+
return None, 'Address %s: %s' % (str(address), err)
25+
26+
if not isinstance(address, dict):
27+
return format_error(address, 'address must be a dict')
28+
29+
if 'port' not in address or address['port'] is None:
30+
return format_error(address, 'port is not set or None')
31+
32+
if isinstance(address['port'], int):
33+
# Looks like an inet address.
34+
35+
# Validate host.
36+
if 'host' not in address or address['host'] is None:
37+
return format_error(address,
38+
'host is mandatory for an inet address')
39+
if not isinstance(address['host'], string_types):
40+
return format_error(address,
41+
'host must be a string for an inet address')
42+
43+
# Validate port.
44+
if not isinstance(address['port'], int):
45+
return format_error(address,
46+
'port must be an int for an inet address')
47+
if address['port'] < 1 or address['port'] > 65535:
48+
return format_error(address, 'port must be in range [1, 65535] '
49+
'for an inet address')
50+
51+
# Looks okay.
52+
return True, None
53+
elif isinstance(address['port'], string_types):
54+
# Looks like a unix address.
55+
56+
# Expect no host.
57+
if 'host' in address and address['host'] is not None:
58+
return format_error(
59+
address, 'host must be unset or None for a unix address')
60+
61+
# Validate port.
62+
if not isinstance(address['port'], string_types):
63+
return format_error(address,
64+
'port must be a string for a unix address')
65+
66+
# Looks okay.
67+
return True, None
68+
69+
return format_error(address, 'port must be an int or a string')
70+
71+
72+
class ConnectionPool(ConnectionInterface):
73+
def __init__(self,
74+
addrs,
75+
user=None,
76+
password=None,
77+
socket_timeout=SOCKET_TIMEOUT,
78+
reconnect_max_attempts=RECONNECT_MAX_ATTEMPTS,
79+
reconnect_delay=RECONNECT_DELAY,
80+
connect_now=True,
81+
encoding=ENCODING_DEFAULT,
82+
call_16=False,
83+
connection_timeout=CONNECTION_TIMEOUT,):
84+
if not isinstance(addrs, list) or len(addrs) == 0:
85+
raise ConfigurationError("addrs must be non-empty list")
86+
addrs = []
87+
88+
# Verify addresses.
89+
for addr in addrs:
90+
ok, msg = validate_address(addr)
91+
if not ok:
92+
raise ConfigurationError(msg)
93+
self.addrs = addrs
94+
95+
# Create connections
96+
self.pool = {}
97+
98+
for addr in self.addrs:
99+
key = json.dumps(addr)
100+
self.pool[key] = Connection(
101+
host=addr['host'],
102+
port=addr['port'],
103+
user=user,
104+
password=password,
105+
socket_timeout=socket_timeout,
106+
reconnect_max_attempts=reconnect_max_attempts,
107+
reconnect_delay=reconnect_delay,
108+
connect_now=False,
109+
encoding=encoding,
110+
call_16=call_16,
111+
connection_timeout=connection_timeout)
112+
113+
if connect_now:
114+
self.connect()
115+
116+
def close(self):
117+
raise NotImplementedError
118+
119+
def is_closed(self):
120+
raise NotImplementedError
121+
122+
def connect(self):
123+
for addr in self.addrs:
124+
key = json.dumps(addr)
125+
self.pool[key].connect()
126+
127+
if not self.pool[key].connected:
128+
msg = 'Failed to connect to %s:%s' % str(addr['host'], addr['port'])
129+
warn(msg, ClusterConnectWarning)
130+
131+
def call(self, func_name, *args):
132+
raise NotImplementedError
133+
134+
def eval(self, expr, *args):
135+
raise NotImplementedError
136+
137+
def replace(self, space_name, values):
138+
raise NotImplementedError
139+
140+
def authenticate(self, user, password):
141+
raise NotImplementedError
142+
143+
def insert(self, space_name, values):
144+
raise NotImplementedError
145+
146+
def delete(self, space_name, key, **kwargs):
147+
raise NotImplementedError
148+
149+
def upsert(self, space_name, tuple_value, op_list, **kwargs):
150+
raise NotImplementedError
151+
152+
def update(self, space_name, key, op_list, **kwargs):
153+
raise NotImplementedError
154+
155+
def ping(self, notime):
156+
raise NotImplementedError
157+
158+
def select(self, space_name, key, **kwargs):
159+
raise NotImplementedError
160+
161+
def execute(self, query, params):
162+
raise NotImplementedError

tarantool/error.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import sys
2525
import warnings
2626

27-
2827
try:
2928
class Warning(StandardError):
3029
'''Exception raised for important warnings
@@ -223,6 +222,11 @@ class ClusterDiscoveryWarning(UserWarning):
223222
pass
224223

225224

225+
class ClusterConnectWarning(UserWarning):
226+
'''Warning related to cluster pool connection'''
227+
pass
228+
229+
226230
# always print this warnings
227231
warnings.filterwarnings("always", category=NetworkWarning)
228232

0 commit comments

Comments
 (0)