Skip to content

conn: create from socket fd #316

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ jobs:
- '2.8'
- '2.10'
- '2.11'
- 'master'
python:
- '3.6'
- '3.7'
Expand Down Expand Up @@ -57,11 +58,45 @@ jobs:
- name: Clone the connector
uses: actions/checkout@v3

- name: Setup tt
run: |
curl -L https://tarantool.io/release/2/installer.sh | sudo bash
sudo apt install -y tt
tt version
tt init

- name: Install tarantool ${{ matrix.tarantool }}
if: matrix.tarantool != 'master'
uses: tarantool/setup-tarantool@v2
with:
tarantool-version: ${{ matrix.tarantool }}

- name: Get Tarantool master latest commit
if: matrix.tarantool == 'master'
run: |
commit_hash=$(git ls-remote https://github.com/tarantool/tarantool.git --branch master | head -c 8)
echo "LATEST_COMMIT=${commit_hash}" >> $GITHUB_ENV
shell: bash

- name: Cache Tarantool master
if: matrix.tarantool == 'master'
id: cache-latest
uses: actions/cache@v3
with:
path: |
${{ github.workspace }}/bin
${{ github.workspace }}/include
key: cache-latest-${{ env.LATEST_COMMIT }}

- name: Setup Tarantool master
if: matrix.tarantool == 'master' && steps.cache-latest.outputs.cache-hit != 'true'
run: |
tt install tarantool master

- name: Add Tarantool master to PATH
if: matrix.tarantool == 'master'
run: echo "${GITHUB_WORKSPACE}/bin" >> $GITHUB_PATH

- name: Setup Python for tests
uses: actions/setup-python@v4
with:
Expand All @@ -86,8 +121,6 @@ jobs:

- name: Install the crud module for testing purposes
run: |
curl -L https://tarantool.io/release/2/installer.sh | bash
sudo apt install -y tt
tt rocks install crud

- name: Run tests
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

### Added
- The ability to connect to the Tarantool using an existing socket fd (#304).

## 1.1.2 - 2023-09-20

### Fixed
Expand Down
5 changes: 4 additions & 1 deletion tarantool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
__version__ = '0.0.0-dev'


def connect(host="localhost", port=33013, user=None, password=None,
def connect(host="localhost", port=33013, socket_fd=None, user=None, password=None,
encoding=ENCODING_DEFAULT, transport=DEFAULT_TRANSPORT,
ssl_key_file=DEFAULT_SSL_KEY_FILE,
ssl_cert_file=DEFAULT_SSL_CERT_FILE,
Expand All @@ -64,6 +64,8 @@ def connect(host="localhost", port=33013, user=None, password=None,

:param port: Refer to :paramref:`~tarantool.Connection.params.port`.

:param socket_fd: Refer to :paramref:`~tarantool.Connection.params.socket_fd`.

:param user: Refer to :paramref:`~tarantool.Connection.params.user`.

:param password: Refer to
Expand Down Expand Up @@ -93,6 +95,7 @@ def connect(host="localhost", port=33013, user=None, password=None,
"""

return Connection(host, port,
socket_fd=socket_fd,
user=user,
password=password,
socket_timeout=SOCKET_TIMEOUT,
Expand Down
104 changes: 97 additions & 7 deletions tarantool/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# pylint: disable=too-many-lines,duplicate-code

import os
import select
import time
import errno
from enum import Enum
Expand Down Expand Up @@ -51,6 +52,9 @@
RECONNECT_DELAY,
DEFAULT_TRANSPORT,
SSL_TRANSPORT,
DEFAULT_HOST,
DEFAULT_PORT,
DEFAULT_SOCKET_FD,
DEFAULT_SSL_KEY_FILE,
DEFAULT_SSL_CERT_FILE,
DEFAULT_SSL_CA_FILE,
Expand Down Expand Up @@ -594,7 +598,10 @@ class Connection(ConnectionInterface):
:value: :exc:`~tarantool.error.CrudModuleError`
"""

def __init__(self, host, port,
def __init__(self,
host=DEFAULT_HOST,
port=DEFAULT_PORT,
socket_fd=DEFAULT_SOCKET_FD,
user=None,
password=None,
socket_timeout=SOCKET_TIMEOUT,
Expand Down Expand Up @@ -623,8 +630,11 @@ def __init__(self, host, port,
Unix sockets.
:type host: :obj:`str` or :obj:`None`

:param port: Server port or Unix socket path.
:type port: :obj:`int` or :obj:`str`
:param port: Server port, or Unix socket path.
:type port: :obj:`int` or :obj:`str` or :obj:`None`

:param socket_fd: socket fd number.
:type socket_fd: :obj:`int` or :obj:`None`

:param user: User name for authentication on the Tarantool
server.
Expand Down Expand Up @@ -804,6 +814,18 @@ def __init__(self, host, port,
"""
# pylint: disable=too-many-arguments,too-many-locals,too-many-statements

if host is None and port is None and socket_fd is None:
raise ConfigurationError("need to specify host/port, "
"port (in case of Unix sockets) "
"or socket_fd")

if socket_fd is not None and (host is not None or port is not None):
raise ConfigurationError("specifying both socket_fd and host/port is not allowed")

if host is not None and port is None:
raise ConfigurationError("when specifying host, "
"it is also necessary to specify port")

if msgpack.version >= (1, 0, 0) and encoding not in (None, 'utf-8'):
raise ConfigurationError("msgpack>=1.0.0 only supports None and "
+ "'utf-8' encoding option values")
Expand All @@ -820,6 +842,7 @@ def __init__(self, host, port,
recv.restype = ctypes.c_int
self.host = host
self.port = port
self.socket_fd = socket_fd
self.user = user
self.password = password
self.socket_timeout = socket_timeout
Expand Down Expand Up @@ -897,10 +920,37 @@ def connect_basic(self):
:meta private:
"""

if self.host is None:
self.connect_unix()
else:
if self.socket_fd is not None:
self.connect_socket_fd()
elif self.host is not None:
self.connect_tcp()
else:
self.connect_unix()

def connect_socket_fd(self):
"""
Establish a connection using an existing socket fd.

+---------------------+--------------------------+-------------------------+
| socket_fd / timeout | >= 0 | `None` |
+=====================+==========================+=========================+
| blocking | Set non-blocking socket | Don't change, `select` |
| | lib call `select` | isn't needed |
+---------------------+--------------------------+-------------------------+
| non-blocking | Don't change, socket lib | Don't change, call |
| | call `select` | `select` ourselves |
+---------------------+--------------------------+-------------------------+

:meta private:
"""

self.connected = True
if self._socket:
self._socket.close()

self._socket = socket.socket(fileno=self.socket_fd)
if self.socket_timeout is not None:
self._socket.settimeout(self.socket_timeout)

def connect_tcp(self):
"""
Expand Down Expand Up @@ -1124,6 +1174,11 @@ def _recv(self, to_read):
while to_read > 0:
try:
tmp = self._socket.recv(to_read)
except BlockingIOError:
ready, _, _ = select.select([self._socket.fileno()], [], [], self.socket_timeout)
if not ready:
raise NetworkError(TimeoutError()) # pylint: disable=raise-missing-from
continue
except OverflowError as exc:
self._socket.close()
err = socket.error(
Expand Down Expand Up @@ -1163,6 +1218,41 @@ def _read_response(self):
# Read the packet
return self._recv(length)

def _sendall(self, bytes_to_send):
"""
Sends bytes to the transport (socket).

:param bytes_to_send: Message to send.
:type bytes_to_send: :obj:`bytes`

:raise: :exc:`~tarantool.error.NetworkError`

:meta private:
"""

total_sent = 0
while total_sent < len(bytes_to_send):
try:
sent = self._socket.send(bytes_to_send[total_sent:])
if sent == 0:
err = socket.error(
errno.ECONNRESET,
"Lost connection to server during query"
)
raise NetworkError(err)
total_sent += sent
except BlockingIOError as exc:
total_sent += exc.characters_written
_, ready, _ = select.select([], [self._socket.fileno()], [], self.socket_timeout)
if not ready:
raise NetworkError(TimeoutError()) # pylint: disable=raise-missing-from
except socket.error as exc:
err = socket.error(
errno.ECONNRESET,
"Lost connection to server during query"
)
raise NetworkError(err) from exc

def _send_request_wo_reconnect(self, request, on_push=None, on_push_ctx=None):
"""
Send request without trying to reconnect.
Expand Down Expand Up @@ -1191,7 +1281,7 @@ def _send_request_wo_reconnect(self, request, on_push=None, on_push_ctx=None):
response = None
while True:
try:
self._socket.sendall(bytes(request))
self._sendall(bytes(request))
response = request.response_class(self, self._read_response())
break
except SchemaReloadException as exc:
Expand Down
29 changes: 20 additions & 9 deletions tarantool/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class PoolUnit():

addr: dict
"""
``{"host": host, "port": port}`` info.
``{"host": host, "port": port, "socket_fd": socket_fd}`` info.

:type: :obj:`dict`
"""
Expand Down Expand Up @@ -161,6 +161,14 @@ class PoolUnit():
:type: :obj:`bool`
"""

def get_address(self):
"""
Get an address string representation.
"""
if self.addr['socket_fd'] is not None:
return f'fd://{self.addr["socket_fd"]}'
return f'{self.addr["host"]}:{self.addr["port"]}'


# Based on https://realpython.com/python-interface/
class StrategyInterface(metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -398,6 +406,7 @@ def __init__(self,
{
"host': "str" or None, # mandatory
"port": int or "str", # mandatory
"socket_fd": int, # optional
"transport": "str", # optional
"ssl_key_file": "str", # optional
"ssl_cert_file": "str", # optional
Expand Down Expand Up @@ -499,6 +508,7 @@ def __init__(self,
conn=Connection(
host=addr['host'],
port=addr['port'],
socket_fd=addr['socket_fd'],
user=user,
password=password,
socket_timeout=socket_timeout,
Expand Down Expand Up @@ -529,15 +539,16 @@ def _make_key(self, addr):
"""
Make a unique key for a server based on its address.

:param addr: `{"host": host, "port": port}` dictionary.
:param addr: `{"host": host, "port": port, "socket_fd": socket_fd}` dictionary.
:type addr: :obj:`dict`

:rtype: :obj:`str`

:meta private:
"""

return f"{addr['host']}:{addr['port']}"
if addr['socket_fd'] is None:
return f"{addr['host']}:{addr['port']}"
return addr['socket_fd']

def _get_new_state(self, unit):
"""
Expand All @@ -557,23 +568,23 @@ def _get_new_state(self, unit):
try:
conn.connect()
except NetworkError as exc:
msg = (f"Failed to connect to {unit.addr['host']}:{unit.addr['port']}, "
msg = (f"Failed to connect to {unit.get_address()}, "
f"reason: {repr(exc)}")
warn(msg, ClusterConnectWarning)
return InstanceState(Status.UNHEALTHY)

try:
resp = conn.call('box.info')
except NetworkError as exc:
msg = (f"Failed to get box.info for {unit.addr['host']}:{unit.addr['port']}, "
msg = (f"Failed to get box.info for {unit.get_address()}, "
f"reason: {repr(exc)}")
warn(msg, PoolTolopogyWarning)
return InstanceState(Status.UNHEALTHY)

try:
read_only = resp.data[0]['ro']
except (IndexError, KeyError) as exc:
msg = (f"Incorrect box.info response from {unit.addr['host']}:{unit.addr['port']}"
msg = (f"Incorrect box.info response from {unit.get_address()}"
f"reason: {repr(exc)}")
warn(msg, PoolTolopogyWarning)
return InstanceState(Status.UNHEALTHY)
Expand All @@ -582,11 +593,11 @@ def _get_new_state(self, unit):
status = resp.data[0]['status']

if status != 'running':
msg = f"{unit.addr['host']}:{unit.addr['port']} instance status is not 'running'"
msg = f"{unit.get_address()} instance status is not 'running'"
warn(msg, PoolTolopogyWarning)
return InstanceState(Status.UNHEALTHY)
except (IndexError, KeyError) as exc:
msg = (f"Incorrect box.info response from {unit.addr['host']}:{unit.addr['port']}"
msg = (f"Incorrect box.info response from {unit.get_address()}"
f"reason: {repr(exc)}")
warn(msg, PoolTolopogyWarning)
return InstanceState(Status.UNHEALTHY)
Expand Down
6 changes: 6 additions & 0 deletions tarantool/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@
IPROTO_FEATURE_SPACE_AND_INDEX_NAMES = 5
IPROTO_FEATURE_WATCH_ONCE = 6

# Default value for host.
DEFAULT_HOST = None
# Default value for port.
DEFAULT_PORT = None
# Default value for socket_fd.
DEFAULT_SOCKET_FD = None
# Default value for connection timeout (seconds)
CONNECTION_TIMEOUT = None
# Default value for socket timeout (seconds)
Expand Down
Loading