7
7
import time
8
8
import errno
9
9
import socket
10
+ import copy
10
11
try :
11
12
import ssl
12
13
is_ssl_supported = True
55
56
REQUEST_TYPE_ERROR ,
56
57
IPROTO_GREETING_SIZE ,
57
58
ITERATOR_EQ ,
58
- ITERATOR_ALL
59
+ ITERATOR_ALL ,
60
+ IPROTO_CHUNK
59
61
)
60
62
from tarantool .error import (
61
63
Error ,
@@ -151,55 +153,56 @@ def connect(self):
151
153
raise NotImplementedError
152
154
153
155
@abc .abstractmethod
154
- def call (self , func_name , * args ):
156
+ def call (self , func_name , * args , on_push = None , on_push_ctx = None ):
155
157
"""
156
158
Reference implementation: :meth:`~tarantool.Connection.call`.
157
159
"""
158
160
159
161
raise NotImplementedError
160
162
161
163
@abc .abstractmethod
162
- def eval (self , expr , * args ):
164
+ def eval (self , expr , * args , on_push = None , on_push_ctx = None ):
163
165
"""
164
166
Reference implementation: :meth:`~tarantool.Connection.eval`.
165
167
"""
166
168
167
169
raise NotImplementedError
168
170
169
171
@abc .abstractmethod
170
- def replace (self , space_name , values ):
172
+ def replace (self , space_name , values , on_push = None , on_push_ctx = None ):
171
173
"""
172
174
Reference implementation: :meth:`~tarantool.Connection.replace`.
173
175
"""
174
176
175
177
raise NotImplementedError
176
178
177
179
@abc .abstractmethod
178
- def insert (self , space_name , values ):
180
+ def insert (self , space_name , values , on_push = None , on_push_ctx = None ):
179
181
"""
180
182
Reference implementation: :meth:`~tarantool.Connection.insert`.
181
183
"""
182
184
183
185
raise NotImplementedError
184
186
185
187
@abc .abstractmethod
186
- def delete (self , space_name , key , * , index = None ):
188
+ def delete (self , space_name , key , * , index = None , on_push = None , on_push_ctx = None ):
187
189
"""
188
190
Reference implementation: :meth:`~tarantool.Connection.delete`.
189
191
"""
190
192
191
193
raise NotImplementedError
192
194
193
195
@abc .abstractmethod
194
- def upsert (self , space_name , tuple_value , op_list , * , index = None ):
196
+ def upsert (self , space_name , tuple_value , op_list , * , index = None ,
197
+ on_push = None , on_push_ctx = None ):
195
198
"""
196
199
Reference implementation: :meth:`~tarantool.Connection.upsert`.
197
200
"""
198
201
199
202
raise NotImplementedError
200
203
201
204
@abc .abstractmethod
202
- def update (self , space_name , key , op_list , * , index = None ):
205
+ def update (self , space_name , key , op_list , * , index = None , on_push = None , on_push_ctx = None ):
203
206
"""
204
207
Reference implementation: :meth:`~tarantool.Connection.update`.
205
208
"""
@@ -216,7 +219,7 @@ def ping(self, notime):
216
219
217
220
@abc .abstractmethod
218
221
def select (self , space_name , key , * , offset = None , limit = None ,
219
- index = None , iterator = None ):
222
+ index = None , iterator = None , on_push = None , on_push_ctx = None ):
220
223
"""
221
224
Reference implementation: :meth:`~tarantool.Connection.select`.
222
225
"""
@@ -748,7 +751,7 @@ def _read_response(self):
748
751
# Read the packet
749
752
return self ._recv (length )
750
753
751
- def _send_request_wo_reconnect (self , request ):
754
+ def _send_request_wo_reconnect (self , request , on_push = None , on_push_ctx = None ):
752
755
"""
753
756
Send request without trying to reconnect.
754
757
Reload schema, if required.
@@ -767,12 +770,33 @@ def _send_request_wo_reconnect(self, request):
767
770
768
771
assert isinstance (request , Request )
769
772
773
+ # Flag for detecting the last message as out-of-band.
774
+ iproto_chunk_detected = False
775
+
770
776
response = None
771
777
while True :
772
778
try :
773
- self ._socket .sendall (bytes (request ))
779
+ if not iproto_chunk_detected :
780
+ # If the last received message is out-of-band,
781
+ # the request will not be sent.
782
+ self ._socket .sendall (bytes (request ))
774
783
response = request .response_class (self , self ._read_response ())
775
- break
784
+ if response ._code == IPROTO_CHUNK :
785
+ # Сase of receiving an out-of-band message.
786
+ if on_push is not None :
787
+ if callable (on_push ):
788
+ # Callback function with data from out-of-band
789
+ # message is being called.
790
+ # The callback result can be written to on_push_ctx.
791
+ on_push (response ._data , on_push_ctx )
792
+ else :
793
+ raise TypeError ('The on_push callback must be callable' )
794
+ iproto_chunk_detected = True
795
+ # Receiving the next message.
796
+ continue
797
+ else :
798
+ # Сase of receiving main message.
799
+ break
776
800
except SchemaReloadException as e :
777
801
self .update_schema (e .schema_version )
778
802
continue
@@ -851,7 +875,7 @@ def check(): # Check that connection is alive
851
875
self .wrap_socket_ssl ()
852
876
self .handshake ()
853
877
854
- def _send_request (self , request ):
878
+ def _send_request (self , request , on_push = None , on_push_ctx = None ):
855
879
"""
856
880
Send a request to the server through the socket.
857
881
@@ -872,7 +896,7 @@ def _send_request(self, request):
872
896
873
897
self ._opt_reconnect ()
874
898
875
- return self ._send_request_wo_reconnect (request )
899
+ return self ._send_request_wo_reconnect (request , on_push , on_push_ctx )
876
900
877
901
def load_schema (self ):
878
902
"""
@@ -914,7 +938,7 @@ def flush_schema(self):
914
938
self .schema .flush ()
915
939
self .load_schema ()
916
940
917
- def call (self , func_name , * args ):
941
+ def call (self , func_name , * args , on_push = None , on_push_ctx = None ):
918
942
"""
919
943
Execute a CALL request: call a stored Lua function.
920
944
@@ -930,6 +954,10 @@ def call(self, func_name, *args):
930
954
:exc:`~tarantool.error.SchemaError`,
931
955
:exc:`~tarantool.error.NetworkError`,
932
956
:exc:`~tarantool.error.SslError`
957
+
958
+ !!!
959
+ TODO: write docs
960
+ !!!
933
961
"""
934
962
935
963
assert isinstance (func_name , str )
@@ -939,10 +967,10 @@ def call(self, func_name, *args):
939
967
args = args [0 ]
940
968
941
969
request = RequestCall (self , func_name , args , self .call_16 )
942
- response = self ._send_request (request )
970
+ response = self ._send_request (request , on_push , on_push_ctx )
943
971
return response
944
972
945
- def eval (self , expr , * args ):
973
+ def eval (self , expr , * args , on_push = None , on_push_ctx = None ):
946
974
"""
947
975
Execute an EVAL request: evaluate a Lua expression.
948
976
@@ -968,10 +996,10 @@ def eval(self, expr, *args):
968
996
args = args [0 ]
969
997
970
998
request = RequestEval (self , expr , args )
971
- response = self ._send_request (request )
999
+ response = self ._send_request (request , on_push , on_push_ctx )
972
1000
return response
973
1001
974
- def replace (self , space_name , values ):
1002
+ def replace (self , space_name , values , on_push = None , on_push_ctx = None ):
975
1003
"""
976
1004
Execute a REPLACE request: `replace`_ a tuple in the space.
977
1005
Doesn't throw an error if there is no tuple with the specified
@@ -997,7 +1025,7 @@ def replace(self, space_name, values):
997
1025
if isinstance (space_name , str ):
998
1026
space_name = self .schema .get_space (space_name ).sid
999
1027
request = RequestReplace (self , space_name , values )
1000
- return self ._send_request (request )
1028
+ return self ._send_request (request , on_push , on_push_ctx )
1001
1029
1002
1030
def authenticate (self , user , password ):
1003
1031
"""
@@ -1149,7 +1177,7 @@ def subscribe(self, cluster_uuid, server_uuid, vclock=None):
1149
1177
return
1150
1178
self .close () # close connection after SUBSCRIBE
1151
1179
1152
- def insert (self , space_name , values ):
1180
+ def insert (self , space_name , values , on_push = None , on_push_ctx = None ):
1153
1181
"""
1154
1182
Execute an INSERT request: `insert`_ a tuple to the space.
1155
1183
Throws an error if there is already a tuple with the same
@@ -1175,9 +1203,9 @@ def insert(self, space_name, values):
1175
1203
if isinstance (space_name , str ):
1176
1204
space_name = self .schema .get_space (space_name ).sid
1177
1205
request = RequestInsert (self , space_name , values )
1178
- return self ._send_request (request )
1206
+ return self ._send_request (request , on_push , on_push_ctx )
1179
1207
1180
- def delete (self , space_name , key , * , index = 0 ):
1208
+ def delete (self , space_name , key , * , index = 0 , on_push = None , on_push_ctx = None ):
1181
1209
"""
1182
1210
Execute a DELETE request: `delete`_ a tuple in the space.
1183
1211
@@ -1208,9 +1236,9 @@ def delete(self, space_name, key, *, index=0):
1208
1236
if isinstance (index , str ):
1209
1237
index = self .schema .get_index (space_name , index ).iid
1210
1238
request = RequestDelete (self , space_name , index , key )
1211
- return self ._send_request (request )
1239
+ return self ._send_request (request , on_push , on_push_ctx )
1212
1240
1213
- def upsert (self , space_name , tuple_value , op_list , * , index = 0 ):
1241
+ def upsert (self , space_name , tuple_value , op_list , * , index = 0 , on_push = None , on_push_ctx = None ):
1214
1242
"""
1215
1243
Execute an UPSERT request: `upsert`_ a tuple to the space.
1216
1244
@@ -1259,9 +1287,9 @@ def upsert(self, space_name, tuple_value, op_list, *, index=0):
1259
1287
op_list = self ._ops_process (space_name , op_list )
1260
1288
request = RequestUpsert (self , space_name , index , tuple_value ,
1261
1289
op_list )
1262
- return self ._send_request (request )
1290
+ return self ._send_request (request , on_push , on_push_ctx )
1263
1291
1264
- def update (self , space_name , key , op_list , * , index = 0 ):
1292
+ def update (self , space_name , key , op_list , * , index = 0 , on_push = None , on_push_ctx = None ):
1265
1293
"""
1266
1294
Execute an UPDATE request: `update`_ a tuple in the space.
1267
1295
@@ -1338,7 +1366,7 @@ def update(self, space_name, key, op_list, *, index=0):
1338
1366
index = self .schema .get_index (space_name , index ).iid
1339
1367
op_list = self ._ops_process (space_name , op_list )
1340
1368
request = RequestUpdate (self , space_name , index , key , op_list )
1341
- return self ._send_request (request )
1369
+ return self ._send_request (request , on_push , on_push_ctx )
1342
1370
1343
1371
def ping (self , notime = False ):
1344
1372
"""
@@ -1368,7 +1396,7 @@ def ping(self, notime=False):
1368
1396
return "Success"
1369
1397
return t1 - t0
1370
1398
1371
- def select (self , space_name , key = None , * , offset = 0 , limit = 0xffffffff , index = 0 , iterator = None ):
1399
+ def select (self , space_name , key = None , * , offset = 0 , limit = 0xffffffff , index = 0 , iterator = None , on_push = None , on_push_ctx = None ):
1372
1400
"""
1373
1401
Execute a SELECT request: `select`_ a tuple from the space.
1374
1402
@@ -1524,7 +1552,7 @@ def select(self, space_name, key=None, *, offset=0, limit=0xffffffff, index=0, i
1524
1552
index = self .schema .get_index (space_name , index ).iid
1525
1553
request = RequestSelect (self , space_name , index , key , offset ,
1526
1554
limit , iterator )
1527
- response = self ._send_request (request )
1555
+ response = self ._send_request (request , on_push , on_push_ctx )
1528
1556
return response
1529
1557
1530
1558
def space (self , space_name ):
0 commit comments