Skip to content

Commit d470f82

Browse files
committed
conn: create from socket fd
This patch adds the ability to create Tarantool connection using an existing socket fd. To achieve this, several changes have been made to work with non-blocking sockets, as `socket.socketpair` creates such [1]. The authentication [2] might have already occured when we establish such a connection. If that's the case, there is no need to pass 'user' argument. On success, connect takes ownership of the `socket_fd`. 1. tarantool/tarantool#8944 2. https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/authentication/ Closes #304
1 parent d57cad6 commit d470f82

11 files changed

+353
-23
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

7+
## Unreleased
8+
9+
### Added
10+
- The ability to connect to the Tarantool using an existing socket fd (#304).
11+
712
## 1.1.2 - 2023-09-20
813

914
### Fixed

tarantool/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
__version__ = '0.0.0-dev'
5252

5353

54-
def connect(host="localhost", port=33013, user=None, password=None,
54+
def connect(host="localhost", port=33013, socket_fd=None, user=None, password=None,
5555
encoding=ENCODING_DEFAULT, transport=DEFAULT_TRANSPORT,
5656
ssl_key_file=DEFAULT_SSL_KEY_FILE,
5757
ssl_cert_file=DEFAULT_SSL_CERT_FILE,
@@ -64,6 +64,8 @@ def connect(host="localhost", port=33013, user=None, password=None,
6464
6565
:param port: Refer to :paramref:`~tarantool.Connection.params.port`.
6666
67+
:param socket_fd: Refer to :paramref:`~tarantool.Connection.params.socket_fd`.
68+
6769
:param user: Refer to :paramref:`~tarantool.Connection.params.user`.
6870
6971
:param password: Refer to
@@ -93,6 +95,7 @@ def connect(host="localhost", port=33013, user=None, password=None,
9395
"""
9496

9597
return Connection(host, port,
98+
socket_fd=socket_fd,
9699
user=user,
97100
password=password,
98101
socket_timeout=SOCKET_TIMEOUT,

tarantool/connection.py

Lines changed: 81 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# pylint: disable=too-many-lines,duplicate-code
55

66
import os
7+
import select
78
import time
89
import errno
910
from enum import Enum
@@ -594,7 +595,10 @@ class Connection(ConnectionInterface):
594595
:value: :exc:`~tarantool.error.CrudModuleError`
595596
"""
596597

597-
def __init__(self, host, port,
598+
def __init__(self,
599+
host=None,
600+
port=None,
601+
socket_fd=None,
598602
user=None,
599603
password=None,
600604
socket_timeout=SOCKET_TIMEOUT,
@@ -623,8 +627,11 @@ def __init__(self, host, port,
623627
Unix sockets.
624628
:type host: :obj:`str` or :obj:`None`
625629
626-
:param port: Server port or Unix socket path.
627-
:type port: :obj:`int` or :obj:`str`
630+
:param port: Server port, or Unix socket path.
631+
:type port: :obj:`int` or :obj:`str` or :obj:`None`
632+
633+
:param socket_fd: socket fd number.
634+
:type socket_fd: :obj:`int` or :obj:`None`
628635
629636
:param user: User name for authentication on the Tarantool
630637
server.
@@ -803,6 +810,8 @@ def __init__(self, host, port,
803810
.. _mp_array: https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family
804811
"""
805812
# pylint: disable=too-many-arguments,too-many-locals,too-many-statements
813+
if socket_fd is not None and (host is not None or port is not None):
814+
raise ConfigurationError("specifying both socket_fd and host/port is not allowed")
806815

807816
if msgpack.version >= (1, 0, 0) and encoding not in (None, 'utf-8'):
808817
raise ConfigurationError("msgpack>=1.0.0 only supports None and "
@@ -820,6 +829,7 @@ def __init__(self, host, port,
820829
recv.restype = ctypes.c_int
821830
self.host = host
822831
self.port = port
832+
self.socket_fd = socket_fd
823833
self.user = user
824834
self.password = password
825835
self.socket_timeout = socket_timeout
@@ -897,10 +907,35 @@ def connect_basic(self):
897907
:meta private:
898908
"""
899909

900-
if self.host is None:
901-
self.connect_unix()
902-
else:
910+
if self.socket_fd is not None:
911+
self.connect_socket_fd()
912+
elif self.host is not None:
903913
self.connect_tcp()
914+
else:
915+
self.connect_unix()
916+
917+
def connect_socket_fd(self):
918+
"""
919+
Establish a connection using an existing socket fd.
920+
921+
socket_fd / timeout| >= 0 | None
922+
-------------------|-------------------------|-------------------------
923+
blocking | set non-blocking | don't change
924+
| socket lib call `select`| `select` isn't needed
925+
-------------------|-------------------------|-------------------------
926+
non-blocking | don't change | don't change
927+
| socket lib call `select`| call `select` ourselves
928+
-------------------|-------------------------|-------------------------
929+
930+
:meta private:
931+
"""
932+
self.connected = True
933+
if self._socket:
934+
self._socket.close()
935+
936+
self._socket = socket.socket(fileno=self.socket_fd)
937+
if self.socket_timeout is not None:
938+
self._socket.settimeout(self.socket_timeout)
904939

905940
def connect_tcp(self):
906941
"""
@@ -1124,6 +1159,11 @@ def _recv(self, to_read):
11241159
while to_read > 0:
11251160
try:
11261161
tmp = self._socket.recv(to_read)
1162+
except BlockingIOError:
1163+
ready, _, _ = select.select([self._socket.fileno()], [], [], self.socket_timeout)
1164+
if not ready:
1165+
raise NetworkError(TimeoutError()) # pylint: disable=raise-missing-from
1166+
continue
11271167
except OverflowError as exc:
11281168
self._socket.close()
11291169
err = socket.error(
@@ -1163,6 +1203,40 @@ def _read_response(self):
11631203
# Read the packet
11641204
return self._recv(length)
11651205

1206+
def _sendall(self, bytes_to_send):
1207+
"""
1208+
Sends bytes to the transport (socket).
1209+
1210+
:param bytes_to_send: message to send.
1211+
:type bytes_to_send: :obj:`bytes`
1212+
1213+
:raise: :exc:`~tarantool.error.NetworkError`
1214+
1215+
:meta: private:
1216+
"""
1217+
total_sent = 0
1218+
while total_sent < len(bytes_to_send):
1219+
try:
1220+
sent = self._socket.send(bytes_to_send[total_sent:])
1221+
if sent == 0:
1222+
err = socket.error(
1223+
errno.ECONNRESET,
1224+
"Lost connection to server during query"
1225+
)
1226+
raise NetworkError(err)
1227+
total_sent += sent
1228+
except BlockingIOError as exc:
1229+
total_sent += exc.characters_written
1230+
_, ready, _ = select.select([], [self._socket.fileno()], [], self.socket_timeout)
1231+
if not ready:
1232+
raise NetworkError(TimeoutError()) # pylint: disable=raise-missing-from
1233+
except socket.error as exc:
1234+
err = socket.error(
1235+
errno.ECONNRESET,
1236+
"Lost connection to server during query"
1237+
)
1238+
raise NetworkError(err) from exc
1239+
11661240
def _send_request_wo_reconnect(self, request, on_push=None, on_push_ctx=None):
11671241
"""
11681242
Send request without trying to reconnect.
@@ -1191,7 +1265,7 @@ def _send_request_wo_reconnect(self, request, on_push=None, on_push_ctx=None):
11911265
response = None
11921266
while True:
11931267
try:
1194-
self._socket.sendall(bytes(request))
1268+
self._sendall(bytes(request))
11951269
response = request.response_class(self, self._read_response())
11961270
break
11971271
except SchemaReloadException as exc:

tarantool/connection_pool.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class PoolUnit():
115115

116116
addr: dict
117117
"""
118-
``{"host": host, "port": port}`` info.
118+
``{"host": host, "port": port, "socket_fd": socket_fd}`` info.
119119
120120
:type: :obj:`dict`
121121
"""
@@ -161,6 +161,14 @@ class PoolUnit():
161161
:type: :obj:`bool`
162162
"""
163163

164+
def get_address(self):
165+
"""
166+
Get an address string representation.
167+
"""
168+
if self.addr['socket_fd'] is not None:
169+
return f'fd://{self.addr["socket_fd"]}'
170+
return f'{self.addr["host"]}:{self.addr["port"]}'
171+
164172

165173
# Based on https://realpython.com/python-interface/
166174
class StrategyInterface(metaclass=abc.ABCMeta):
@@ -398,6 +406,7 @@ def __init__(self,
398406
{
399407
"host': "str" or None, # mandatory
400408
"port": int or "str", # mandatory
409+
"socket_fd": int, # optional
401410
"transport": "str", # optional
402411
"ssl_key_file": "str", # optional
403412
"ssl_cert_file": "str", # optional
@@ -499,6 +508,7 @@ def __init__(self,
499508
conn=Connection(
500509
host=addr['host'],
501510
port=addr['port'],
511+
socket_fd=addr['socket_fd'],
502512
user=user,
503513
password=password,
504514
socket_timeout=socket_timeout,
@@ -529,15 +539,16 @@ def _make_key(self, addr):
529539
"""
530540
Make a unique key for a server based on its address.
531541
532-
:param addr: `{"host": host, "port": port}` dictionary.
542+
:param addr: `{"host": host, "port": port, "socket_fd": socket_fd}` dictionary.
533543
:type addr: :obj:`dict`
534544
535545
:rtype: :obj:`str`
536546
537547
:meta private:
538548
"""
539-
540-
return f"{addr['host']}:{addr['port']}"
549+
if addr['socket_fd'] is None:
550+
return f"{addr['host']}:{addr['port']}"
551+
return addr['socket_fd']
541552

542553
def _get_new_state(self, unit):
543554
"""
@@ -557,23 +568,23 @@ def _get_new_state(self, unit):
557568
try:
558569
conn.connect()
559570
except NetworkError as exc:
560-
msg = (f"Failed to connect to {unit.addr['host']}:{unit.addr['port']}, "
571+
msg = (f"Failed to connect to {unit.get_address()}, "
561572
f"reason: {repr(exc)}")
562573
warn(msg, ClusterConnectWarning)
563574
return InstanceState(Status.UNHEALTHY)
564575

565576
try:
566577
resp = conn.call('box.info')
567578
except NetworkError as exc:
568-
msg = (f"Failed to get box.info for {unit.addr['host']}:{unit.addr['port']}, "
579+
msg = (f"Failed to get box.info for {unit.get_address()}, "
569580
f"reason: {repr(exc)}")
570581
warn(msg, PoolTolopogyWarning)
571582
return InstanceState(Status.UNHEALTHY)
572583

573584
try:
574585
read_only = resp.data[0]['ro']
575586
except (IndexError, KeyError) as exc:
576-
msg = (f"Incorrect box.info response from {unit.addr['host']}:{unit.addr['port']}"
587+
msg = (f"Incorrect box.info response from {unit.get_address()}"
577588
f"reason: {repr(exc)}")
578589
warn(msg, PoolTolopogyWarning)
579590
return InstanceState(Status.UNHEALTHY)
@@ -582,11 +593,11 @@ def _get_new_state(self, unit):
582593
status = resp.data[0]['status']
583594

584595
if status != 'running':
585-
msg = f"{unit.addr['host']}:{unit.addr['port']} instance status is not 'running'"
596+
msg = f"{unit.get_address()} instance status is not 'running'"
586597
warn(msg, PoolTolopogyWarning)
587598
return InstanceState(Status.UNHEALTHY)
588599
except (IndexError, KeyError) as exc:
589-
msg = (f"Incorrect box.info response from {unit.addr['host']}:{unit.addr['port']}"
600+
msg = (f"Incorrect box.info response from {unit.get_address()}"
590601
f"reason: {repr(exc)}")
591602
warn(msg, PoolTolopogyWarning)
592603
return InstanceState(Status.UNHEALTHY)

tarantool/const.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@
103103
IPROTO_FEATURE_SPACE_AND_INDEX_NAMES = 5
104104
IPROTO_FEATURE_WATCH_ONCE = 6
105105

106+
# Default value for host.
107+
DEFAULT_HOST = None
108+
# Default value for port.
109+
DEFAULT_PORT = None
110+
# Default value for socket_fd.
111+
DEFAULT_SOCKET_FD = None
106112
# Default value for connection timeout (seconds)
107113
CONNECTION_TIMEOUT = None
108114
# Default value for socket timeout (seconds)

0 commit comments

Comments
 (0)