Skip to content

Commit 5712143

Browse files
committed
conn: create from socket fd
This patch adds the ability to create Tarantool connection using an existing socket fd. To achieve this, several changes have been made to work with non-blocking sockets, as `socket.socketpair` creates such [1]. The authentication [2] might have already occured when we establish such a connection. If that's the case, there is no need to pass 'user' argument. On success, connect takes ownership of the `fd`. 1. tarantool/tarantool#8944 2. https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/authentication/ Closes #304
1 parent 02818fb commit 5712143

File tree

8 files changed

+293
-19
lines changed

8 files changed

+293
-19
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

7+
## Unreleased
8+
9+
### Added
10+
- The ability to connect to the Tarantool using an existing socket fd (#304).
11+
712
## 1.1.2 - 2023-09-20
813

914
### Fixed

tarantool/connection.py

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# pylint: disable=too-many-lines,duplicate-code
55

66
import os
7+
import select
78
import time
89
import errno
910
from enum import Enum
@@ -623,7 +624,7 @@ def __init__(self, host, port,
623624
Unix sockets.
624625
:type host: :obj:`str` or :obj:`None`
625626
626-
:param port: Server port or Unix socket path.
627+
:param port: Server port, socket fd or Unix socket path.
627628
:type port: :obj:`int` or :obj:`str`
628629
629630
:param user: User name for authentication on the Tarantool
@@ -897,10 +898,36 @@ def connect_basic(self):
897898
:meta private:
898899
"""
899900

900-
if self.host is None:
901-
self.connect_unix()
902-
else:
901+
if self.host is not None:
903902
self.connect_tcp()
903+
elif isinstance(self.port, int):
904+
self.connect_socket_fd()
905+
else:
906+
self.connect_unix()
907+
908+
def connect_socket_fd(self):
909+
"""
910+
Establish a connection using an existing socket fd.
911+
912+
socket_fd / timeout| >= 0 | None
913+
-------------------|-------------------------|-------------------------
914+
blocking | set non-blocking | don't change
915+
| socket lib call `select`| `select` isn't needed
916+
-------------------|-------------------------|-------------------------
917+
non-blocking | don't change | don't change
918+
| socket lib call `select`| call `select` ourselves
919+
-------------------|-------------------------|-------------------------
920+
921+
:meta private:
922+
"""
923+
self.connected = True
924+
if self._socket:
925+
self._socket.close()
926+
927+
socket_fd = self.port
928+
self._socket = socket.socket(fileno=socket_fd)
929+
if self.socket_timeout is not None:
930+
self._socket.settimeout(self.socket_timeout)
904931

905932
def connect_tcp(self):
906933
"""
@@ -1124,6 +1151,11 @@ def _recv(self, to_read):
11241151
while to_read > 0:
11251152
try:
11261153
tmp = self._socket.recv(to_read)
1154+
except BlockingIOError:
1155+
ready, _, _ = select.select([self._socket.fileno()], [], [], self.socket_timeout)
1156+
if not ready:
1157+
raise NetworkError(socket.timeout()) # pylint: disable=raise-missing-from
1158+
continue
11271159
except OverflowError as exc:
11281160
self._socket.close()
11291161
err = socket.error(
@@ -1163,6 +1195,40 @@ def _read_response(self):
11631195
# Read the packet
11641196
return self._recv(length)
11651197

1198+
def _sendall(self, bytes_to_send):
1199+
"""
1200+
Sends bytes to the transport (socket).
1201+
1202+
:param bytes_to_send: message to send.
1203+
:type bytes_to_send: :obj:`bytes`
1204+
1205+
:raise: :exc:`~tarantool.error.NetworkError`
1206+
1207+
:meta: private:
1208+
"""
1209+
total_sent = 0
1210+
while total_sent < len(bytes_to_send):
1211+
try:
1212+
sent = self._socket.send(bytes_to_send[total_sent:])
1213+
if sent == 0:
1214+
err = socket.error(
1215+
errno.ECONNRESET,
1216+
"Lost connection to server during query"
1217+
)
1218+
raise NetworkError(err)
1219+
total_sent += sent
1220+
except BlockingIOError as exc:
1221+
total_sent += exc.characters_written
1222+
_, ready, _ = select.select([], [self._socket.fileno()], [], self.socket_timeout)
1223+
if not ready:
1224+
raise NetworkError(socket.timeout()) # pylint: disable=raise-missing-from
1225+
except socket.error as exc:
1226+
err = socket.error(
1227+
errno.ECONNRESET,
1228+
"Lost connection to server during query"
1229+
)
1230+
raise NetworkError(err) from exc
1231+
11661232
def _send_request_wo_reconnect(self, request, on_push=None, on_push_ctx=None):
11671233
"""
11681234
Send request without trying to reconnect.
@@ -1191,7 +1257,7 @@ def _send_request_wo_reconnect(self, request, on_push=None, on_push_ctx=None):
11911257
response = None
11921258
while True:
11931259
try:
1194-
self._socket.sendall(bytes(request))
1260+
self._sendall(bytes(request))
11951261
response = request.response_class(self, self._read_response())
11961262
break
11971263
except SchemaReloadException as exc:

tarantool/mesh_connection.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -139,23 +139,23 @@ def format_error(address, err):
139139
result[key] = val
140140

141141
if isinstance(result['port'], int):
142-
# Looks like an inet address.
142+
# Looks like an inet address or socket fd.
143143

144144
# Validate host.
145-
if 'host' not in result or result['host'] is None:
146-
return format_error(result,
147-
'host is mandatory for an inet result')
148-
if not isinstance(result['host'], str):
149-
return format_error(result,
150-
'host must be a string for an inet result')
145+
if ('host' in result and result['host'] is not None
146+
and not isinstance(result['host'], str)):
147+
return format_error(result, 'host must be a string for an inet result, '
148+
'or None for a socket fd result')
151149

152150
# Validate port.
153151
if not isinstance(result['port'], int):
154152
return format_error(result,
155-
'port must be an int for an inet result')
156-
if result['port'] < 1 or result['port'] > 65535:
157-
return format_error(result, 'port must be in range [1, 65535] '
158-
'for an inet result')
153+
'port must be an int for an inet/socket fd result')
154+
if 'host' in result and isinstance(result['host'], str):
155+
# Check that the port is correct.
156+
if result['port'] < 1 or result['port'] > 65535:
157+
return format_error(result, 'port must be in range [1, 65535] '
158+
'for an inet result')
159159

160160
# Looks okay.
161161
return result, None
@@ -447,7 +447,8 @@ def __init__(self, host=None, port=None,
447447
# Don't change user provided arguments.
448448
addrs = addrs[:]
449449

450-
if host and port:
450+
if port:
451+
# host can be None in the case of socket fd or Unix socket.
451452
addrs.insert(0, {'host': host,
452453
'port': port,
453454
'transport': transport,

test/suites/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from .test_execute import TestSuiteExecute
1616
from .test_dbapi import TestSuiteDBAPI
1717
from .test_encoding import TestSuiteEncoding
18+
from .test_socket_fd import TestSuiteSocketFD
1819
from .test_ssl import TestSuiteSsl
1920
from .test_decimal import TestSuiteDecimal
2021
from .test_uuid import TestSuiteUUID
@@ -33,7 +34,7 @@
3334
TestSuiteEncoding, TestSuitePool, TestSuiteSsl,
3435
TestSuiteDecimal, TestSuiteUUID, TestSuiteDatetime,
3536
TestSuiteInterval, TestSuitePackage, TestSuiteErrorExt,
36-
TestSuitePush, TestSuiteConnection, TestSuiteCrud,)
37+
TestSuitePush, TestSuiteConnection, TestSuiteCrud, TestSuiteSocketFD)
3738

3839

3940
def load_tests(loader, tests, pattern):

test/suites/lib/skip.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,3 +306,14 @@ def skip_or_run_iproto_basic_features_test(func):
306306

307307
return skip_or_run_test_tarantool(func, '2.10.0',
308308
'does not support iproto ID and iproto basic features')
309+
310+
311+
def skip_or_run_box_session_new_tests(func):
312+
"""
313+
Decorator to skip or run tests that use box.session.new.
314+
315+
Tarantool supports box.session.new only in current master since
316+
commit 324872a.
317+
See https://github.com/tarantool/tarantool/issues/8801.
318+
"""
319+
return skip_or_run_test_tarantool(func, '3.0.0', 'does not support box.session.new')

test/suites/sidecar.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# pylint: disable=missing-module-docstring
2+
import os
3+
4+
import tarantool
5+
6+
socket_fd = int(os.environ["SOCKET_FD"])
7+
8+
conn = tarantool.connect(host=None, port=socket_fd)
9+
10+
# Check user.
11+
assert conn.eval("return box.session.user()").data[0] == "test"
12+
13+
# Check db operations.
14+
conn.insert("test", [1])
15+
conn.insert("test", [2])
16+
assert conn.select("test").data == [[1], [2]]

test/suites/test_mesh.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ def test_01_contructor(self):
135135
# Verify that a bad address given at initialization leads
136136
# to an error.
137137
bad_addrs = [
138-
{"port": 1234}, # no host
139138
{"host": "localhost"}, # no port
140139
{"host": "localhost", "port": "1234"}, # port is str
141140
]

0 commit comments

Comments
 (0)