Skip to content

Commit 1c10bc4

Browse files
committed
Add support for ACKs in join()/subscribe()
tarantool/tarantool#1147
1 parent 85cbd86 commit 1c10bc4

File tree

2 files changed

+26
-1
lines changed

2 files changed

+26
-1
lines changed

tarantool/connection.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from tarantool.response import Response
2222
from tarantool.request import (
2323
Request,
24+
RequestOK,
2425
RequestCall,
2526
RequestDelete,
2627
RequestEval,
@@ -52,7 +53,7 @@
5253
NetworkWarning)
5354

5455
from .schema import Schema
55-
from .utils import check_key, greeting_decode
56+
from .utils import check_key, greeting_decode, version_id
5657

5758

5859
class Connection(object):
@@ -357,8 +358,13 @@ def authenticate(self, user, password):
357358

358359
def join(self, server_uuid):
359360
request = RequestJoin(self, server_uuid)
361+
sync = request._sync
360362
resp = self._send_request(request)
361363
while True:
364+
if self.version_id >= version_id(1, 7, 0):
365+
# Send acknowledgement
366+
ack = RequestOK(self, sync)
367+
self._socket.sendall(bytes(ack))
362368
yield resp
363369
if resp.code == REQUEST_TYPE_OK or resp.code >= REQUEST_TYPE_ERROR:
364370
return
@@ -368,8 +374,13 @@ def join(self, server_uuid):
368374
def subscribe(self, cluster_uuid, server_uuid, vclock={}):
369375
# FIXME rudnyh: ^ 'vclock={}'? really? sure?
370376
request = RequestSubscribe(self, cluster_uuid, server_uuid, vclock)
377+
sync = request._sync
371378
resp = self._send_request(request)
372379
while True:
380+
if self.version_id >= version_id(1, 7, 0):
381+
# Send acknowledgement
382+
ack = RequestOK(self, sync)
383+
self._socket.sendall(bytes(ack))
373384
yield resp
374385
if resp.code >= REQUEST_TYPE_ERROR:
375386
return

tarantool/request.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
IPROTO_CLUSTER_UUID,
2525
IPROTO_VCLOCK,
2626
IPROTO_EXPR,
27+
REQUEST_TYPE_OK,
2728
REQUEST_TYPE_PING,
2829
REQUEST_TYPE_SELECT,
2930
REQUEST_TYPE_INSERT,
@@ -278,3 +279,16 @@ def __init__(self, conn, cluster_uuid, server_uuid, vclock):
278279
IPROTO_VCLOCK: vclock
279280
})
280281
self._bytes = self.header(len(request_body)) + request_body
282+
283+
class RequestOK(Request):
284+
'''
285+
Represents OK acknowledgement
286+
'''
287+
request_type = REQUEST_TYPE_OK
288+
289+
# pylint: disable=W0231
290+
def __init__(self, conn, sync):
291+
super(RequestOK, self).__init__(conn)
292+
header = msgpack.dumps({IPROTO_CODE: self.request_type,
293+
IPROTO_SYNC: sync})
294+
self._bytes = msgpack.dumps(len(header)) + header

0 commit comments

Comments
 (0)