1
1
# -*- coding: utf-8 -*-
2
2
3
3
import json
4
+ from enum import Enum
5
+ from itertools import cycle
4
6
5
7
from tarantool .connection import Connection , ConnectionInterface
6
8
from tarantool .const import (CLUSTER_DISCOVERY_DELAY , CONNECTION_TIMEOUT ,
7
9
RECONNECT_DELAY , RECONNECT_MAX_ATTEMPTS ,
8
10
SOCKET_TIMEOUT )
9
- from tarantool .error import ClusterConnectWarning , ConfigurationError , warn
11
+ from tarantool .error import (ClusterConnectWarning , ClusterTolopogyWarning ,
12
+ ConfigurationError , warn )
10
13
from tarantool .utils import ENCODING_DEFAULT
11
14
12
15
try :
@@ -69,6 +72,65 @@ def format_error(address, err):
69
72
return format_error (address , 'port must be an int or a string' )
70
73
71
74
75
+ class RoundRobinStrategy (object ):
76
+ """
77
+ Simple round-robin connection rotation
78
+ """
79
+ def __init__ (self , pool ):
80
+ self .RW_iter = None
81
+ self .RO_iter = None
82
+ self .default = None
83
+
84
+ if len (pool ) == 0 :
85
+ return
86
+ self .update (pool )
87
+
88
+ def update (self , new_pool ):
89
+ self .default = next (iter (new_pool .values ()))['conn' ]
90
+
91
+ RW_pool = []
92
+ RO_pool = []
93
+
94
+ for v in new_pool .values ():
95
+ if v ['ro' ] is None :
96
+ continue
97
+
98
+ if v ['ro' ]:
99
+ RO_pool .append (v ['conn' ])
100
+ else :
101
+ RW_pool .append (v ['conn' ])
102
+
103
+ if len (RW_pool ) > 0 :
104
+ self .RW_iter = cycle (RW_pool )
105
+
106
+ if len (RO_pool ) > 0 :
107
+ self .RO_iter = cycle (RO_pool )
108
+
109
+ def getnext (self , mode ):
110
+ if mode == Mode .RW :
111
+ if self .RW_iter is not None :
112
+ return next (self .RW_iter )
113
+ else :
114
+ warn ("can't find rw instance in pool, route request to any connection" , ClusterTolopogyWarning )
115
+ return self .default
116
+ elif mode == Mode .PREFER_RO :
117
+ if self .RO_iter is not None :
118
+ return next (self .RO_iter )
119
+ elif self .RW_iter is not None :
120
+ return next (self .RW_iter )
121
+ else :
122
+ warn ("can't find rw/ro instance in pool, route request to any connection" , ClusterTolopogyWarning )
123
+ return self .default
124
+ elif mode == Mode .PREFER_RW :
125
+ if self .RW_iter is not None :
126
+ return next (self .RW_iter )
127
+ elif self .RO_iter is not None :
128
+ return next (self .RO_iter )
129
+ else :
130
+ warn ("can't find rw/ro instance in pool, route request to any connection" , ClusterTolopogyWarning )
131
+ return self .default
132
+
133
+
72
134
class ConnectionPool (ConnectionInterface ):
73
135
def __init__ (self ,
74
136
addrs ,
@@ -80,10 +142,10 @@ def __init__(self,
80
142
connect_now = True ,
81
143
encoding = ENCODING_DEFAULT ,
82
144
call_16 = False ,
83
- connection_timeout = CONNECTION_TIMEOUT ,):
145
+ connection_timeout = CONNECTION_TIMEOUT ,
146
+ strategy_class = RoundRobinStrategy ,):
84
147
if not isinstance (addrs , list ) or len (addrs ) == 0 :
85
148
raise ConfigurationError ("addrs must be non-empty list" )
86
- addrs = []
87
149
88
150
# Verify addresses.
89
151
for addr in addrs :
@@ -94,21 +156,24 @@ def __init__(self,
94
156
95
157
# Create connections
96
158
self .pool = {}
159
+ self .strategy = strategy_class (self .pool )
97
160
98
161
for addr in self .addrs :
99
162
key = json .dumps (addr )
100
- self .pool [key ] = Connection (
101
- host = addr ['host' ],
102
- port = addr ['port' ],
103
- user = user ,
104
- password = password ,
105
- socket_timeout = socket_timeout ,
106
- reconnect_max_attempts = reconnect_max_attempts ,
107
- reconnect_delay = reconnect_delay ,
108
- connect_now = False ,
109
- encoding = encoding ,
110
- call_16 = call_16 ,
111
- connection_timeout = connection_timeout )
163
+ self .pool [key ] = {
164
+ 'conn' : Connection (
165
+ host = addr ['host' ],
166
+ port = addr ['port' ],
167
+ user = user ,
168
+ password = password ,
169
+ socket_timeout = socket_timeout ,
170
+ reconnect_max_attempts = reconnect_max_attempts ,
171
+ reconnect_delay = reconnect_delay ,
172
+ connect_now = False , # Connect in ConnectionPool.connect()
173
+ encoding = encoding ,
174
+ call_16 = call_16 ,
175
+ connection_timeout = connection_timeout )
176
+ }
112
177
113
178
if connect_now :
114
179
self .connect ()
@@ -122,14 +187,27 @@ def is_closed(self):
122
187
def connect (self ):
123
188
for addr in self .addrs :
124
189
key = json .dumps (addr )
125
- self .pool [key ].connect ()
190
+ self .pool [key ][ 'conn' ] .connect ()
126
191
127
- if not self .pool [key ].connected :
192
+ if not self .pool [key ][ 'conn' ] .connected :
128
193
msg = 'Failed to connect to %s:%s' % str (addr ['host' ], addr ['port' ])
129
194
warn (msg , ClusterConnectWarning )
195
+ continue
130
196
131
- def call (self , func_name , * args ):
132
- raise NotImplementedError
197
+ resp = self .pool [key ]['conn' ].call ('box.info' )
198
+ if not resp .data or type (resp .data [0 ]) != dict or resp .data [0 ]['ro' ] is None :
199
+ warn ("got incorrect response for rw/ro state, skipping instance" , ClusterTolopogyWarning )
200
+ continue
201
+
202
+ self .pool [key ]['ro' ] = resp .data [0 ]['ro' ]
203
+
204
+ self .strategy .update (self .pool )
205
+
206
+ def call (self , func_name , * args , ** kwargs ):
207
+ mode = kwargs .get ('mode' , Mode .PREFER_RW )
208
+ conn = self .strategy .getnext (mode )
209
+
210
+ return conn .call (func_name , args )
133
211
134
212
def eval (self , expr , * args ):
135
213
raise NotImplementedError
0 commit comments