Skip to content

Commit 9663a63

Browse files
committed
iproto: support feature push
Adds support for receiving out-of-band messages from a server that uses box.session.push call. Data obtaining is possible for methods: `call`, `eval`, `select`, `insert`, `replace`, `update`, `upsert`, `delete`. To do this, an optional argument `on_push_ctx` is used in the form of a python list, where the received data from out-of-band messages will be added. Closes #201
1 parent a94f97c commit 9663a63

File tree

3 files changed

+109
-23
lines changed

3 files changed

+109
-23
lines changed

tarantool/connection.py

Lines changed: 107 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@
5555
REQUEST_TYPE_ERROR,
5656
IPROTO_GREETING_SIZE,
5757
ITERATOR_EQ,
58-
ITERATOR_ALL
58+
ITERATOR_ALL,
59+
IPROTO_CHUNK
5960
)
6061
from tarantool.error import (
6162
Error,
@@ -748,7 +749,7 @@ def _read_response(self):
748749
# Read the packet
749750
return self._recv(length)
750751

751-
def _send_request_wo_reconnect(self, request):
752+
def _send_request_wo_reconnect(self, request, pushed_data=[], on_push=None, on_push_res=[]):
752753
"""
753754
Send request without trying to reconnect.
754755
Reload schema, if required.
@@ -767,12 +768,30 @@ def _send_request_wo_reconnect(self, request):
767768

768769
assert isinstance(request, Request)
769770

771+
# Flag for detecting the last message as out-of-band.
772+
iproto_chunk_detected = False
773+
770774
response = None
771775
while True:
772776
try:
773-
self._socket.sendall(bytes(request))
777+
if not iproto_chunk_detected:
778+
# If the last received message is out-of-band,
779+
# the request will not be sent.
780+
self._socket.sendall(bytes(request))
774781
response = request.response_class(self, self._read_response())
775-
break
782+
if response._code == IPROTO_CHUNK:
783+
# Сase of receiving an out-of-band message.
784+
pushed_data.append(response._data.copy())
785+
if callable(on_push):
786+
# Callback function with data from out-of-band
787+
# message is being called.
788+
on_push_res.append(on_push(response._data.copy()))
789+
iproto_chunk_detected = True
790+
# Receiving the next message.
791+
continue
792+
else:
793+
# Сase of receiving main message.
794+
break
776795
except SchemaReloadException as e:
777796
self.update_schema(e.schema_version)
778797
continue
@@ -851,7 +870,7 @@ def check(): # Check that connection is alive
851870
self.wrap_socket_ssl()
852871
self.handshake()
853872

854-
def _send_request(self, request):
873+
def _send_request(self, request, pushed_data=[], on_push=None, on_push_res=[]):
855874
"""
856875
Send a request to the server through the socket.
857876
@@ -872,7 +891,7 @@ def _send_request(self, request):
872891

873892
self._opt_reconnect()
874893

875-
return self._send_request_wo_reconnect(request)
894+
return self._send_request_wo_reconnect(request, pushed_data, on_push, on_push_res)
876895

877896
def load_schema(self):
878897
"""
@@ -914,7 +933,7 @@ def flush_schema(self):
914933
self.schema.flush()
915934
self.load_schema()
916935

917-
def call(self, func_name, *args):
936+
def call(self, func_name, *args, **kwargs):
918937
"""
919938
Execute a CALL request: call a stored Lua function.
920939
@@ -930,19 +949,30 @@ def call(self, func_name, *args):
930949
:exc:`~tarantool.error.SchemaError`,
931950
:exc:`~tarantool.error.NetworkError`,
932951
:exc:`~tarantool.error.SslError`
952+
953+
!!!
954+
TODO: write docs
955+
!!!
933956
"""
934957

935958
assert isinstance(func_name, str)
936959

937960
# This allows to use a tuple or list as an argument
938961
if len(args) == 1 and isinstance(args[0], (list, tuple)):
939962
args = args[0]
963+
# Case for absence of optional arg for accepting out-of-band msg data
964+
if not 'pushed_data' in kwargs:
965+
kwargs['pushed_data'] = []
966+
if not 'on_push' in kwargs:
967+
kwargs['on_push'] = None
968+
if not 'on_push_res' in kwargs:
969+
kwargs['on_push_res'] = []
940970

941971
request = RequestCall(self, func_name, args, self.call_16)
942-
response = self._send_request(request)
972+
response = self._send_request(request, kwargs['pushed_data'], kwargs['on_push'], kwargs['on_push_res'])
943973
return response
944974

945-
def eval(self, expr, *args):
975+
def eval(self, expr, *args, **kwargs):
946976
"""
947977
Execute an EVAL request: evaluate a Lua expression.
948978
@@ -966,12 +996,19 @@ def eval(self, expr, *args):
966996
# This allows to use a tuple or list as an argument
967997
if len(args) == 1 and isinstance(args[0], (list, tuple)):
968998
args = args[0]
999+
# Case for absence of optional arg for accepting out-of-band msg data
1000+
if not 'pushed_data' in kwargs:
1001+
kwargs['pushed_data'] = []
1002+
if not 'on_push' in kwargs:
1003+
kwargs['on_push'] = None
1004+
if not 'on_push_res' in kwargs:
1005+
kwargs['on_push_res'] = []
9691006

9701007
request = RequestEval(self, expr, args)
971-
response = self._send_request(request)
1008+
response = self._send_request(request, kwargs['pushed_data'], kwargs['on_push'], kwargs['on_push_res'])
9721009
return response
9731010

974-
def replace(self, space_name, values):
1011+
def replace(self, space_name, values, **kwargs):
9751012
"""
9761013
Execute a REPLACE request: `replace`_ a tuple in the space.
9771014
Doesn't throw an error if there is no tuple with the specified
@@ -994,10 +1031,18 @@ def replace(self, space_name, values):
9941031
.. _replace: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/replace/
9951032
"""
9961033

1034+
# Case for absence of optional arg for accepting out-of-band msg data
1035+
if not 'pushed_data' in kwargs:
1036+
kwargs['pushed_data'] = []
1037+
if not 'on_push' in kwargs:
1038+
kwargs['on_push'] = None
1039+
if not 'on_push_res' in kwargs:
1040+
kwargs['on_push_res'] = []
1041+
9971042
if isinstance(space_name, str):
9981043
space_name = self.schema.get_space(space_name).sid
9991044
request = RequestReplace(self, space_name, values)
1000-
return self._send_request(request)
1045+
return self._send_request(request, kwargs['pushed_data'], kwargs['on_push'], kwargs['on_push_res'])
10011046

10021047
def authenticate(self, user, password):
10031048
"""
@@ -1149,7 +1194,7 @@ def subscribe(self, cluster_uuid, server_uuid, vclock=None):
11491194
return
11501195
self.close() # close connection after SUBSCRIBE
11511196

1152-
def insert(self, space_name, values):
1197+
def insert(self, space_name, values, **kwargs):
11531198
"""
11541199
Execute an INSERT request: `insert`_ a tuple to the space.
11551200
Throws an error if there is already a tuple with the same
@@ -1172,12 +1217,20 @@ def insert(self, space_name, values):
11721217
.. _insert: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/insert/
11731218
"""
11741219

1220+
# Case for absence of optional arg for accepting out-of-band msg data
1221+
if not 'pushed_data' in kwargs:
1222+
kwargs['pushed_data'] = []
1223+
if not 'on_push' in kwargs:
1224+
kwargs['on_push'] = None
1225+
if not 'on_push_res' in kwargs:
1226+
kwargs['on_push_res'] = []
1227+
11751228
if isinstance(space_name, str):
11761229
space_name = self.schema.get_space(space_name).sid
11771230
request = RequestInsert(self, space_name, values)
1178-
return self._send_request(request)
1231+
return self._send_request(request, kwargs['pushed_data'], kwargs['on_push'], kwargs['on_push_res'])
11791232

1180-
def delete(self, space_name, key, *, index=0):
1233+
def delete(self, space_name, key, *, index=0, **kwargs):
11811234
"""
11821235
Execute a DELETE request: `delete`_ a tuple in the space.
11831236
@@ -1202,15 +1255,23 @@ def delete(self, space_name, key, *, index=0):
12021255
.. _delete: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/delete/
12031256
"""
12041257

1258+
# Case for absence of optional arg for accepting out-of-band msg data
1259+
if not 'pushed_data' in kwargs:
1260+
kwargs['pushed_data'] = []
1261+
if not 'on_push' in kwargs:
1262+
kwargs['on_push'] = None
1263+
if not 'on_push_res' in kwargs:
1264+
kwargs['on_push_res'] = []
1265+
12051266
key = check_key(key)
12061267
if isinstance(space_name, str):
12071268
space_name = self.schema.get_space(space_name).sid
12081269
if isinstance(index, str):
12091270
index = self.schema.get_index(space_name, index).iid
12101271
request = RequestDelete(self, space_name, index, key)
1211-
return self._send_request(request)
1272+
return self._send_request(request, kwargs['pushed_data'], kwargs['on_push'], kwargs['on_push_res'])
12121273

1213-
def upsert(self, space_name, tuple_value, op_list, *, index=0):
1274+
def upsert(self, space_name, tuple_value, op_list, *, index=0, **kwargs):
12141275
"""
12151276
Execute an UPSERT request: `upsert`_ a tuple to the space.
12161277
@@ -1252,16 +1313,24 @@ def upsert(self, space_name, tuple_value, op_list, *, index=0):
12521313
.. _upsert: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/upsert/
12531314
"""
12541315

1316+
# Case for absence of optional arg for accepting out-of-band msg data
1317+
if not 'pushed_data' in kwargs:
1318+
kwargs['pushed_data'] = []
1319+
if not 'on_push' in kwargs:
1320+
kwargs['on_push'] = None
1321+
if not 'on_push_res' in kwargs:
1322+
kwargs['on_push_res'] = []
1323+
12551324
if isinstance(space_name, str):
12561325
space_name = self.schema.get_space(space_name).sid
12571326
if isinstance(index, str):
12581327
index = self.schema.get_index(space_name, index).iid
12591328
op_list = self._ops_process(space_name, op_list)
12601329
request = RequestUpsert(self, space_name, index, tuple_value,
12611330
op_list)
1262-
return self._send_request(request)
1331+
return self._send_request(request, kwargs['pushed_data'], kwargs['on_push'], kwargs['on_push_res'])
12631332

1264-
def update(self, space_name, key, op_list, *, index=0):
1333+
def update(self, space_name, key, op_list, *, index=0, **kwargs):
12651334
"""
12661335
Execute an UPDATE request: `update`_ a tuple in the space.
12671336
@@ -1331,14 +1400,22 @@ def update(self, space_name, key, op_list, *, index=0):
13311400
.. _update: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/update/
13321401
"""
13331402

1403+
# Case for absence of optional arg for accepting out-of-band msg data
1404+
if not 'pushed_data' in kwargs:
1405+
kwargs['pushed_data'] = []
1406+
if not 'on_push' in kwargs:
1407+
kwargs['on_push'] = None
1408+
if not 'on_push_res' in kwargs:
1409+
kwargs['on_push_res'] = []
1410+
13341411
key = check_key(key)
13351412
if isinstance(space_name, str):
13361413
space_name = self.schema.get_space(space_name).sid
13371414
if isinstance(index, str):
13381415
index = self.schema.get_index(space_name, index).iid
13391416
op_list = self._ops_process(space_name, op_list)
13401417
request = RequestUpdate(self, space_name, index, key, op_list)
1341-
return self._send_request(request)
1418+
return self._send_request(request, kwargs['pushed_data'], kwargs['on_push'], kwargs['on_push_res'])
13421419

13431420
def ping(self, notime=False):
13441421
"""
@@ -1368,7 +1445,7 @@ def ping(self, notime=False):
13681445
return "Success"
13691446
return t1 - t0
13701447

1371-
def select(self, space_name, key=None, *, offset=0, limit=0xffffffff, index=0, iterator=None):
1448+
def select(self, space_name, key=None, *, offset=0, limit=0xffffffff, index=0, iterator=None, **kwargs):
13721449
"""
13731450
Execute a SELECT request: `select`_ a tuple from the space.
13741451
@@ -1518,13 +1595,21 @@ def select(self, space_name, key=None, *, offset=0, limit=0xffffffff, index=0, i
15181595
# tuples)
15191596
key = check_key(key, select=True)
15201597

1598+
# Case for absence of optional arg for accepting out-of-band msg data
1599+
if not 'pushed_data' in kwargs:
1600+
kwargs['pushed_data'] = []
1601+
if not 'on_push' in kwargs:
1602+
kwargs['on_push'] = None
1603+
if not 'on_push_res' in kwargs:
1604+
kwargs['on_push_res'] = []
1605+
15211606
if isinstance(space_name, str):
15221607
space_name = self.schema.get_space(space_name).sid
15231608
if isinstance(index, str):
15241609
index = self.schema.get_index(space_name, index).iid
15251610
request = RequestSelect(self, space_name, index, key, offset,
15261611
limit, iterator)
1527-
response = self._send_request(request)
1612+
response = self._send_request(request, kwargs['pushed_data'], kwargs['on_push'], kwargs['on_push_res'])
15281613
return response
15291614

15301615
def space(self, space_name):

tarantool/const.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
IPROTO_GREETING_SIZE = 128
4040
IPROTO_BODY_MAX_LEN = 2147483648
41+
IPROTO_CHUNK=0x80
4142

4243
REQUEST_TYPE_OK = 0
4344
REQUEST_TYPE_SELECT = 1

tarantool/mesh_connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,7 @@ def _opt_refresh_instances(self):
581581
update_connection(self, addr)
582582
self._opt_reconnect()
583583

584-
def _send_request(self, request):
584+
def _send_request(self, request, pushed_data=[], on_push=None, on_push_res=[]):
585585
"""
586586
Send a request to a Tarantool server. If required, refresh
587587
addresses list before sending a request.

0 commit comments

Comments
 (0)