Skip to content

Fix driver stuck on RecursionError on COMMIT SUCCESS #1192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ select = [
# allow async functions without await to enable type checking, pretending to be async, matching type signatures
"RUF029",
]
"tests/**" = [
"B011", # allow `assert False` in tests, they won't be run with -O anyway
]
"bin/**" = [
"T20", # print statements are ok in our helper scripts
]
Expand Down
16 changes: 16 additions & 0 deletions src/neo4j/_async/io/_bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from ..._exceptions import (
BoltError,
BoltHandshakeError,
SocketDeadlineExceededError,
)
from ..._io import BoltProtocolVersion
from ..._meta import USER_AGENT
Expand Down Expand Up @@ -887,6 +888,15 @@ async def _set_defunct_write(self, error=None, silent=False):
async def _set_defunct(self, message, error=None, silent=False):
direct_driver = getattr(self.pool, "is_direct_pool", False)
user_cancelled = isinstance(error, asyncio.CancelledError)
connection_failed = isinstance(
error,
(
ServiceUnavailable,
SessionExpired,
OSError,
SocketDeadlineExceededError,
),
)

if not (user_cancelled or self._closing):
log_call = log.error
Expand All @@ -913,6 +923,12 @@ async def _set_defunct(self, message, error=None, silent=False):
if user_cancelled:
self.kill()
raise error # cancellation error should not be re-written
if not connection_failed:
# Something else but the connection failed
# => we're not sure which state we're in
# => ditch the connection and raise the error for user-awareness
await self.close()
raise error
if not self._closing:
# If we fail while closing the connection, there is no need to
# remove the connection from the pool, nor to try to close the
Expand Down
9 changes: 9 additions & 0 deletions src/neo4j/_async/io/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ async def pop(self, hydration_hooks):
self._unpacker.unpack(hydration_hooks) for _ in range(size)
]
return tag, fields
except Exception as error:
log.debug(
"[#%04X] _: Failed to unpack response: %r",
self._local_port,
error,
)
self._broken = True
await AsyncUtil.callback(self.on_error, error)
raise
finally:
# Reset for new message
self._unpacker.reset()
Expand Down
16 changes: 16 additions & 0 deletions src/neo4j/_sync/io/_bolt.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions src/neo4j/_sync/io/_common.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions tests/unit/async_/io/_common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright (c) "Neo4j"
# Neo4j Sweden AB [https://neo4j.com]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
140 changes: 140 additions & 0 deletions tests/unit/async_/io/_common/test_inbox.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Copyright (c) "Neo4j"
# Neo4j Sweden AB [https://neo4j.com]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import asyncio

import pytest

from neo4j._async.io._common import AsyncInbox
from neo4j._codec.packstream.v1 import Unpacker
from neo4j._exceptions import SocketDeadlineExceededError

from ....._async_compat import mark_async_test


class InboxMockHolder:
def __init__(self, mocker):
self.socket_mock = mocker.Mock()
self.socket_mock.getsockname.return_value = ("host", 1234)
self.on_error = mocker.AsyncMock()
self.inbox = AsyncInbox(self.socket_mock, self.on_error, Unpacker)
self.unpacker = mocker.Mock(wraps=self.inbox._unpacker)
self.inbox._unpacker = self.unpacker
# plenty of nonsense messages to read
self.mock_set_data(b"\x00\x01\xff\x00\x00" * 1000)

def mock_set_data(self, data):
async def side_effect(buffer, n):
nonlocal data

if not data:
pytest.fail("Read more data than mocked")

n = min(len(data), len(buffer), n)
buffer[:n] = data[:n]
data = data[n:]
return n

self.socket_mock.recv_into.side_effect = side_effect

def assert_no_error(self):
self.on_error.assert_not_called()
assert not self.inbox._broken

def mock_receive_failure(self, exception):
self.socket_mock.recv_into.side_effect = exception

def mock_unpack_failure(self, exception):
self.unpacker.unpack_structure_header.side_effect = exception


@pytest.mark.parametrize(
("data", "result"),
(
(
bytes((0, 2, 10, 11, 0, 2, 12, 13, 0, 1, 14, 0, 0)),
bytes(range(10, 15)),
),
(
bytes((0, 2, 10, 11, 0, 2, 12, 13, 0, 0)),
bytes(range(10, 14)),
),
(
bytes((0, 1, 5, 0, 0)),
bytes((5,)),
),
),
)
@mark_async_test
async def test_inbox_dechunking(data, result, mocker):
# Given
mocks = InboxMockHolder(mocker)
mocks.mock_set_data(data)
inbox = mocks.inbox
buffer = inbox._buffer

# When
await inbox._buffer_one_chunk()

# Then
mocks.assert_no_error()
assert buffer.used == len(result)
assert buffer.data[: len(result)] == result


@pytest.mark.parametrize(
"error",
(
asyncio.CancelledError("test"),
SocketDeadlineExceededError("test"),
OSError("test"),
),
)
@mark_async_test
async def test_inbox_receive_failure_error_handler(mocker, error):
mocks = InboxMockHolder(mocker)
mocks.mock_receive_failure(error)
inbox = mocks.inbox

with pytest.raises(type(error)) as exc:
await inbox.pop({})

assert exc.value is error
mocks.on_error.assert_awaited_once_with(error)
assert inbox._broken


@pytest.mark.parametrize(
"error",
(
SocketDeadlineExceededError("test"),
OSError("test"),
RecursionError("2deep4u"),
RuntimeError("something funny happened"),
),
)
@mark_async_test
async def test_inbox_unpack_failure(mocker, error):
mocks = InboxMockHolder(mocker)
mocks.mock_unpack_failure(error)
inbox = mocks.inbox

with pytest.raises(type(error)) as exc:
await inbox.pop({})

assert exc.value is error
mocks.on_error.assert_awaited_once_with(error)
assert inbox._broken
Loading