From 3ae3fbd5e48537a4e47450b7324928038f0b004b Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Mon, 31 May 2021 11:54:04 +0200 Subject: [PATCH] Migrate stub tests to testkit - remove all migrated stub tests - some stub tests (or parts of them) got turned into unit tests - implement testkit messages ForcedRoutingTableUpdate and GetRoutingTable - send debug log output to testkit to easy test debugging for people running the backend in docker or similar --- testkitbackend/backend.py | 18 +- testkitbackend/requests.py | 44 ++- testkitbackend/skipped_tests.json | 28 -- testkitbackend/test_config.json | 37 ++ tests/performance/test_results.py | 4 +- tests/performance/tools.py | 11 +- tests/requirements.txt | 1 + tests/stub/test_accesslevel.py | 275 ------------- tests/stub/test_bookmarking.py | 129 ------- tests/stub/test_directdriver.py | 509 +----------------------- tests/stub/test_multi_database.py | 54 --- tests/stub/test_routingdriver.py | 616 +----------------------------- tests/stub/test_transactions.py | 102 ----- tests/unit/test_driver.py | 129 +++++++ tests/unit/work/__init__.py | 0 tests/unit/work/test_result.py | 200 ++++++++++ tests/unit/work/test_session.py | 195 ++++++++++ 17 files changed, 633 insertions(+), 1719 deletions(-) delete mode 100644 testkitbackend/skipped_tests.json create mode 100644 testkitbackend/test_config.json delete mode 100644 tests/stub/test_accesslevel.py delete mode 100644 tests/stub/test_bookmarking.py delete mode 100644 tests/stub/test_multi_database.py delete mode 100644 tests/stub/test_transactions.py create mode 100644 tests/unit/test_driver.py create mode 100644 tests/unit/work/__init__.py create mode 100644 tests/unit/work/test_result.py create mode 100644 tests/unit/work/test_session.py diff --git a/testkitbackend/backend.py b/testkitbackend/backend.py index dc1da13e..beed90b7 100644 --- a/testkitbackend/backend.py +++ b/testkitbackend/backend.py @@ -14,7 +14,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from inspect import getmembers, isfunction +from inspect import ( + getmembers, + isfunction, +) +import io from json import loads, dumps import logging import sys @@ -28,9 +32,13 @@ import testkitbackend.requests as requests +buffer_handler = logging.StreamHandler(io.StringIO()) +buffer_handler.setLevel(logging.DEBUG) + handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.DEBUG) logging.getLogger("neo4j").addHandler(handler) +logging.getLogger("neo4j").addHandler(buffer_handler) logging.getLogger("neo4j").setLevel(logging.DEBUG) log = logging.getLogger("testkitbackend") @@ -165,6 +173,14 @@ def _process(self, request): def send_response(self, name, data): """ Sends a response to backend. """ + buffer_handler.acquire() + log_output = buffer_handler.stream.getvalue() + buffer_handler.stream.truncate(0) + buffer_handler.stream.seek(0) + buffer_handler.release() + if not log_output.endswith("\n"): + log_output += "\n" + self._wr.write(log_output.encode("utf-8")) response = {"name": name, "data": data} response = dumps(response) log.info(">>> " + name + dumps(data)) diff --git a/testkitbackend/requests.py b/testkitbackend/requests.py index c9e01cf8..4fa97676 100644 --- a/testkitbackend/requests.py +++ b/testkitbackend/requests.py @@ -23,8 +23,14 @@ from testkitbackend.fromtestkit import to_meta_and_timeout -with open(path.join(path.dirname(__file__), "skipped_tests.json"), "r") as fd: - SKIPPED_TESTS = json.load(fd) +def load_config(): + with open(path.join(path.dirname(__file__), "test_config.json"), "r") as fd: + config = json.load(fd) + return (config["skips"], + [k for k, v in config["features"].items() if v is True]) + + +SKIPPED_TESTS, FEATURES = load_config() def StartTest(backend, data): @@ -35,6 +41,10 @@ def StartTest(backend, data): backend.send_response("RunTest", {}) +def GetFeatures(backend, data): + backend.send_response("FeatureList", {"features": FEATURES}) + + def NewDriver(backend, data): auth_token = data["authorizationToken"]["data"] data["authorizationToken"].mark_item_as_read_if_equals( @@ -294,7 +304,8 @@ def ResultConsume(backend, data): "serverInfo": { "protocolVersion": ".".join(map(str, summary.server.protocol_version)), - "agent": summary.server.agent + "agent": summary.server.agent, + # "address": ":".join(map(str, summary.server.address)) } }) @@ -310,3 +321,30 @@ def RetryableNegative(backend, data): session_tracker = backend.sessions[key] session_tracker.state = '-' session_tracker.error_id = data.get('errorId', '') + + +def ForcedRoutingTableUpdate(backend, data): + driver_id = data["driverId"] + driver = backend.drivers[driver_id] + database = data["database"] + bookmarks = data["bookmarks"] + with driver._pool.refresh_lock: + driver._pool.create_routing_table(database) + driver._pool.update_routing_table(database=database, + bookmarks=bookmarks) + backend.send_response("Driver", {"id": driver_id}) + + +def GetRoutingTable(backend, data): + driver_id = data["driverId"] + database = data["database"] + driver = backend.drivers[driver_id] + routing_table = driver._pool.routing_tables[database] + response_data = { + "database": routing_table.database, + "ttl": routing_table.ttl, + } + for role in ("routers", "readers", "writers"): + addresses = routing_table.__getattribute__(role) + response_data[role] = list(map(str, addresses)) + backend.send_response("RoutingTable", response_data) diff --git a/testkitbackend/skipped_tests.json b/testkitbackend/skipped_tests.json deleted file mode 100644 index aaab83f2..00000000 --- a/testkitbackend/skipped_tests.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "stub.routing.Routing.test_should_retry_write_until_success_with_leader_change_using_tx_function": - "Driver closes connection to router if DNS resolved name not in routing table", - "stub.routing.RoutingV3.test_should_retry_write_until_success_with_leader_change_using_tx_function": - "Driver closes connection to router if DNS resolved name not in routing table", - "stub.routing.RoutingV4.test_should_retry_write_until_success_with_leader_change_using_tx_function": - "Driver closes connection to router if DNS resolved name not in routing table", - "stub.routing.Routing.test_should_retry_write_until_success_with_leader_shutdown_during_tx_using_tx_function": - "Driver closes connection to router if DNS resolved name not in routing table", - "stub.routing.RoutingV3.test_should_retry_write_until_success_with_leader_shutdown_during_tx_using_tx_function": - "Driver closes connection to router if DNS resolved name not in routing table", - "stub.routing.RoutingV4.test_should_retry_write_until_success_with_leader_shutdown_during_tx_using_tx_function": - "Driver closes connection to router if DNS resolved name not in routing table", - "stub.routing.Routing.test_should_successfully_acquire_rt_when_router_ip_changes": - "Test makes assumptions about how verify_connectivity is implemented", - "stub.routing.RoutingV3.test_should_successfully_acquire_rt_when_router_ip_changes": - "Test makes assumptions about how verify_connectivity is implemented", - "stub.routing.RoutingV4.test_should_successfully_acquire_rt_when_router_ip_changes": - "Test makes assumptions about how verify_connectivity is implemented", - "stub.retry.TestRetryClustering.test_retry_ForbiddenOnReadOnlyDatabase_ChangingWriter": - "Test makes assumptions about how verify_connectivity is implemented", - "stub.authorization.AuthorizationTests.test_should_retry_on_auth_expired_on_begin_using_tx_function": - "Flaky: test requires the driver to contact servers in a specific order", - "stub.authorization.AuthorizationTestsV3.test_should_retry_on_auth_expired_on_begin_using_tx_function": - "Flaky: test requires the driver to contact servers in a specific order", - "stub.authorization.AuthorizationTestsV4.test_should_retry_on_auth_expired_on_begin_using_tx_function": - "Flaky: test requires the driver to contact servers in a specific order" -} diff --git a/testkitbackend/test_config.json b/testkitbackend/test_config.json new file mode 100644 index 00000000..5f6703fd --- /dev/null +++ b/testkitbackend/test_config.json @@ -0,0 +1,37 @@ +{ + "skips": { + "stub.routing.Routing.test_should_retry_write_until_success_with_leader_change_using_tx_function": + "Driver closes connection to router if DNS resolved name not in routing table", + "stub.routing.RoutingV3.test_should_retry_write_until_success_with_leader_change_using_tx_function": + "Driver closes connection to router if DNS resolved name not in routing table", + "stub.routing.RoutingV4.test_should_retry_write_until_success_with_leader_change_using_tx_function": + "Driver closes connection to router if DNS resolved name not in routing table", + "stub.routing.Routing.test_should_retry_write_until_success_with_leader_shutdown_during_tx_using_tx_function": + "Driver closes connection to router if DNS resolved name not in routing table", + "stub.routing.RoutingV3.test_should_retry_write_until_success_with_leader_shutdown_during_tx_using_tx_function": + "Driver closes connection to router if DNS resolved name not in routing table", + "stub.routing.RoutingV4.test_should_retry_write_until_success_with_leader_shutdown_during_tx_using_tx_function": + "Driver closes connection to router if DNS resolved name not in routing table", + "stub.routing.Routing.test_should_successfully_acquire_rt_when_router_ip_changes": + "Test makes assumptions about how verify_connectivity is implemented", + "stub.routing.RoutingV3.test_should_successfully_acquire_rt_when_router_ip_changes": + "Test makes assumptions about how verify_connectivity is implemented", + "stub.routing.RoutingV4.test_should_successfully_acquire_rt_when_router_ip_changes": + "Test makes assumptions about how verify_connectivity is implemented", + "stub.retry.TestRetryClustering.test_retry_ForbiddenOnReadOnlyDatabase_ChangingWriter": + "Test makes assumptions about how verify_connectivity is implemented", + "stub.authorization.AuthorizationTests.test_should_retry_on_auth_expired_on_begin_using_tx_function": + "Flaky: test requires the driver to contact servers in a specific order", + "stub.authorization.AuthorizationTestsV3.test_should_retry_on_auth_expired_on_begin_using_tx_function": + "Flaky: test requires the driver to contact servers in a specific order", + "stub.authorization.AuthorizationTestsV4.test_should_retry_on_auth_expired_on_begin_using_tx_function": + "Flaky: test requires the driver to contact servers in a specific order" + }, + "features": { + "AuthorizationExpiredTreatment": true, + "Optimization:ImplicitDefaultArguments": true, + "Optimization:MinimalResets": "Driver resets some clean connections when put back into pool", + "Optimization:ConnectionReuse": true, + "Optimization:PullPipelining": true + } +} diff --git a/tests/performance/test_results.py b/tests/performance/test_results.py index 1f0a6655..15971faa 100644 --- a/tests/performance/test_results.py +++ b/tests/performance/test_results.py @@ -36,7 +36,9 @@ class ReadWorkload(object): def setup_class(cls): cls.server = server = RemoteGraphDatabaseServer() server.start() - cls.driver = GraphDatabase.driver(server.server_uri, auth=server.auth_token, encrypted=server.encrypted) + cls.driver = GraphDatabase.driver(server.server_uri, + auth=server.auth_token, + encrypted=server.encrypted) @classmethod def teardown_class(cls): diff --git a/tests/performance/tools.py b/tests/performance/tools.py index 743f0813..3c51a4fe 100644 --- a/tests/performance/tools.py +++ b/tests/performance/tools.py @@ -19,26 +19,17 @@ # limitations under the License. -from test.integration.tools import IntegrationTestCase - -from os import makedirs, remove -from os.path import basename, dirname, join as path_join, realpath, isfile, expanduser -import platform from unittest import TestCase, SkipTest -from shutil import copyfile -from sys import exit, stderr try: from urllib.request import urlretrieve except ImportError: from urllib import urlretrieve -from boltkit.controller import WindowsController, UnixController - from neo4j import GraphDatabase from neo4j.exceptions import AuthError -from test.env import NEO4J_USER, NEO4J_PASSWORD, NEO4J_SERVER_URI +from tests.env import NEO4J_USER, NEO4J_PASSWORD, NEO4J_SERVER_URI def is_listening(address): diff --git a/tests/requirements.txt b/tests/requirements.txt index 61d4447a..b9405952 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -3,4 +3,5 @@ coverage pytest pytest-benchmark pytest-cov +pytest-mock teamcity-messages diff --git a/tests/stub/test_accesslevel.py b/tests/stub/test_accesslevel.py deleted file mode 100644 index 12f46513..00000000 --- a/tests/stub/test_accesslevel.py +++ /dev/null @@ -1,275 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- - -# Copyright (c) "Neo4j" -# Neo4j Sweden AB [http://neo4j.com] -# -# This file is part of Neo4j. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import pytest - -from neo4j import GraphDatabase -from neo4j.exceptions import ( - Neo4jError, - TransientError, -) - -from tests.stub.conftest import StubCluster - -# python -m pytest tests/stub/test_accesslevel.py -s -v - - -@pytest.mark.parametrize( - "test_scripts", - [ - ("v3/router.script", "v3/return_1_in_read_tx.script"), - ("v4x0/router.script", "v4x0/tx_return_1_port_9004.script"), - ] -) -def test_read_transaction(driver_info, test_scripts): - # python -m pytest tests/stub/test_accesslevel.py -s -v -k test_read_transaction - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(fetch_size=-1) as session: - - def unit_of_work(tx): - total = 0 - for record in tx.run("RETURN 1"): - total += record[0] - return total - - value = session.read_transaction(unit_of_work) - assert value == 1 - - -@pytest.mark.parametrize( - "test_scripts", - [ - ("v3/router.script", "v3/return_1_in_write_tx.script"), - ("v4x0/router.script", "v4x0/tx_return_1_port_9006.script"), - ] -) -def test_write_transaction(driver_info, test_scripts): - # python -m pytest tests/stub/test_accesslevel.py -s -v -k test_write_transaction - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(fetch_size=-1) as session: - - def unit_of_work(tx): - total = 0 - for record in tx.run("RETURN 1"): - total += record[0] - return total - - value = session.write_transaction(unit_of_work) - assert value == 1 - - -@pytest.mark.skip(reason="BROKEN, This test is broken and should be an integration test.") -@pytest.mark.parametrize( - "test_scripts", - [ - ("v3/router.script", "v3/error_in_read_tx.script"), - ("v4x0/router.script", "v4x0/tx_run_with_failure_syntax_error_port_9004.script"), - ] -) -def test_read_transaction_with_error(driver_info, test_scripts): - # python -m pytest tests/stub/test_accesslevel.py -s -v -k test_read_transaction_with_error - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(fetch_size=-1) as session: - - def unit_of_work(tx): - tx.run("X") - - with pytest.raises(Neo4jError): - _ = session.read_transaction(unit_of_work) - - -@pytest.mark.skip(reason="BROKEN, This test is broken and should be an integration test.") -@pytest.mark.parametrize( - "test_scripts", - [ - ("v3/router.script", "v3/error_in_write_tx.script"), - ("v4x0/router.script", "v4x0/tx_run_with_failure_syntax_error_port_9006.script"), - ] -) -def test_write_transaction_with_error(driver_info, test_scripts): - # python -m pytest tests/stub/test_accesslevel.py -s -v -k test_write_transaction_with_error - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(fetch_size=-1) as session: - - def unit_of_work(tx): - tx.run("X") - - with pytest.raises(Neo4jError): - _ = session.write_transaction(unit_of_work) - - -@pytest.mark.parametrize( - "test_scripts", - [ - ("v3/router.script", "v3/return_1_in_read_tx_twice.script"), - ("v4x0/router.script", "v4x0/tx_two_subsequent_return_1_port_9004.script"), - ] -) -def test_two_subsequent_read_transactions(driver_info, test_scripts): - # python -m pytest tests/stub/test_accesslevel.py -s -v -k test_two_subsequent_read_transactions - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(fetch_size=-1) as session: - - def unit_of_work(tx): - total = 0 - for record in tx.run("RETURN 1"): - total += record[0] - return total - - value = session.read_transaction(unit_of_work) - assert value == 1 - value = session.read_transaction(unit_of_work) - assert value == 1 - - -@pytest.mark.parametrize( - "test_scripts", - [ - ("v3/router.script", "v3/return_1_in_write_tx_twice.script"), - ("v4x0/router.script", "v4x0/tx_two_subsequent_return_1_port_9006.script"), - ] -) -def test_two_subsequent_write_transactions(driver_info, test_scripts): - # python -m pytest tests/stub/test_accesslevel.py -s -v -k test_two_subsequent_write_transactions - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(fetch_size=-1) as session: - - def unit_of_work(tx): - total = 0 - for record in tx.run("RETURN 1"): - total += record[0] - return total - - value = session.write_transaction(unit_of_work) - assert value == 1 - value = session.write_transaction(unit_of_work) - assert value == 1 - - -@pytest.mark.skip(reason="BOOKMARK, AttributeError: 'Session' object has no attribute 'last_bookmark'") -@pytest.mark.parametrize( - "test_scripts", - [ - ("v3/router.script", "v3/return_1_in_read_tx.script", "v3/return_2_in_write_tx.script"), - ("v4x0/router.script", "v4x0/tx_return_1_port_9004.script", "v4x0/tx_return_2_with_bookmark_port_9006.script"), - ] -) -def test_read_tx_then_write_tx(driver_info, test_scripts): - # python -m pytest tests/stub/test_accesslevel.py -s -v -k test_read_tx_then_write_tx - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(fetch_size=-1) as session: - - def unit_of_work_1(tx): - total = 0 - for record in tx.run("RETURN 1"): - total += record[0] - return total - - def unit_of_work_2(tx): - total = 0 - for record in tx.run("RETURN 2"): - total += record[0] - return total - - value = session.read_transaction(unit_of_work_1) - assert session.last_bookmark() == "bookmark:1" - assert value == 1 - value = session.write_transaction(unit_of_work_2) - assert session.last_bookmark() == "bookmark:2" - assert value == 2 - - -@pytest.mark.parametrize( - "test_scripts", - [ - ("v3/router.script", "v3/return_1_in_write_tx.script", "v3/return_2_in_read_tx.script"), - ("v4x0/router.script", "v4x0/tx_return_1_port_9006.script", "v4x0/tx_return_2_with_bookmark_port_9004.script"), - ] -) -def test_write_tx_then_read_tx(driver_info, test_scripts): - # python -m pytest tests/stub/test_accesslevel.py -s -v -k test_write_tx_then_read_tx - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(fetch_size=-1) as session: - - def unit_of_work_1(tx): - total = 0 - for record in tx.run("RETURN 1"): - total += record[0] - return total - - def unit_of_work_2(tx): - total = 0 - for record in tx.run("RETURN 2"): - total += record[0] - return total - - value = session.write_transaction(unit_of_work_1) - assert value == 1 - value = session.read_transaction(unit_of_work_2) - assert value == 2 - - -@pytest.mark.skip(reason="BROKEN") -@pytest.mark.parametrize( - "test_scripts", - [ - ("v3/router.script", "v3/user_canceled_read.script"), - ("v4x0/router.script", "v4x0/tx_return_1_reset_port_9004.script"), - ] -) -def test_no_retry_read_on_user_canceled_tx(driver_info, test_scripts): - # python -m pytest tests/stub/test_accesslevel.py -s -v -k test_no_retry_read_on_user_canceled_tx - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(fetch_size=-1) as session: - def unit_of_work(tx): - tx.run("RETURN 1") - - with pytest.raises(TransientError): - _ = session.read_transaction(unit_of_work) - - -@pytest.mark.skip(reason="BROKEN") -@pytest.mark.parametrize( - "test_scripts", - [ - ("v3/router.script", "v3/user_canceled_write.script"), - ("v4x0/router.script", "v4x0/tx_return_1_reset_port_9006.script"), - ] -) -def test_no_retry_write_on_user_canceled_tx(driver_info, test_scripts): - # python -m pytest tests/stub/test_accesslevel.py -s -v -k test_no_retry_write_on_user_canceled_tx - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(fetch_size=-1) as session: - def unit_of_work(tx): - tx.run("RETURN 1") - - with pytest.raises(TransientError): - _ = session.write_transaction(unit_of_work) diff --git a/tests/stub/test_bookmarking.py b/tests/stub/test_bookmarking.py deleted file mode 100644 index ed87daa6..00000000 --- a/tests/stub/test_bookmarking.py +++ /dev/null @@ -1,129 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- - -# Copyright (c) "Neo4j" -# Neo4j Sweden AB [http://neo4j.com] -# -# This file is part of Neo4j. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import pytest - -from neo4j import ( - GraphDatabase, -) -from neo4j.api import READ_ACCESS - -from tests.stub.conftest import StubCluster - -# python -m pytest tests/stub/test_bookmarking.py -s -v - - -@pytest.mark.parametrize( - "test_script", - [ - "v3/router.script", - "v4x0/router.script", - ] -) -def test_should_be_no_bookmark_in_new_session(driver_info, test_script): - # python -m pytest tests/stub/test_bookmarking.py -s -v -k test_should_be_no_bookmark_in_new_session - with StubCluster(test_script): - uri = "neo4j://localhost:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"]) as driver: - with driver.session(fetch_size=-1) as session: - assert session.last_bookmark() is None - - -@pytest.mark.parametrize( - "test_script", - [ - "v3/router.script", - "v4x0/router.script", - ] -) -def test_should_be_able_to_set_bookmark(driver_info, test_script): - # python -m pytest tests/stub/test_bookmarking.py -s -v -k test_should_be_able_to_set_bookmark - with StubCluster(test_script): - uri = "neo4j://localhost:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"]) as driver: - with driver.session(bookmarks=["X"], fetch_size=-1) as session: - assert session._bookmarks == ("X",) - - -@pytest.mark.parametrize( - "test_script", - [ - "v3/router.script", - "v4x0/router.script", - ] -) -def test_should_be_able_to_set_multiple_bookmarks(driver_info, test_script): - # python -m pytest tests/stub/test_bookmarking.py -s -v -k test_should_be_able_to_set_multiple_bookmarks - with StubCluster(test_script): - uri = "neo4j://localhost:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"]) as driver: - with driver.session(bookmarks=[":1", ":2"], fetch_size=-1) as session: - assert session._bookmarks == (":1", ":2") - - -@pytest.mark.parametrize( - "test_scripts", - [ - ("v3/router.script", "v3/bookmark_chain.script"), - ( - "v4x0/router_with_two_bookmarks.script", - "v4x0/tx_bookmark_chain.script" - ), - ] -) -def test_should_automatically_chain_bookmarks(driver_info, test_scripts): - # python -m pytest tests/stub/test_bookmarking.py -s -v -k test_should_automatically_chain_bookmarks - with StubCluster(*test_scripts): - uri = "neo4j://localhost:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"]) as driver: - with driver.session(default_access_mode=READ_ACCESS, bookmarks=["bookmark:0", "bookmark:1"], fetch_size=-1) as session: - with session.begin_transaction(): - pass - assert session.last_bookmark() == "bookmark:2" - with session.begin_transaction(): - pass - assert session.last_bookmark() == "bookmark:3" - - -@pytest.mark.parametrize( - "test_scripts", - [ - ("v3/router.script", "v3/bookmark_chain_with_autocommit.script"), - ( - "v4x0/router_with_one_bookmark.script", - "v4x0/tx_bookmark_chain_with_autocommit.script" - ), - ] -) -def test_autocommit_transaction_included_in_chain(driver_info, test_scripts): - # python -m pytest tests/stub/test_bookmarking.py -s -v -k test_autocommit_transaction_included_in_chain - with StubCluster(*test_scripts): - uri = "neo4j://localhost:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"]) as driver: - with driver.session(default_access_mode=READ_ACCESS, bookmarks=["bookmark:1"], fetch_size=-1) as session: - with session.begin_transaction(): - pass - assert session.last_bookmark() == "bookmark:2" - session.run("RETURN 1").consume() - assert session.last_bookmark() == "bookmark:3" - with session.begin_transaction(): - pass - assert session.last_bookmark() == "bookmark:4" diff --git a/tests/stub/test_directdriver.py b/tests/stub/test_directdriver.py index 8ebbc19d..2e7ae7f6 100644 --- a/tests/stub/test_directdriver.py +++ b/tests/stub/test_directdriver.py @@ -73,67 +73,8 @@ } -@pytest.mark.parametrize( - "test_script", - [ - "v3/empty.script", - "v4x0/empty.script", - ] -) -def test_bolt_uri_constructs_bolt_driver(driver_info, test_script): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_bolt_uri_constructs_bolt_driver - with StubCluster(test_script): - uri = "bolt://127.0.0.1:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"]) as driver: - assert isinstance(driver, BoltDriver) - - -@pytest.mark.parametrize( - "test_script", - [ - "v3/empty_explicit_hello_goodbye.script", - "v4x0/empty_explicit_hello_goodbye.script", - "v4x1/empty_explicit_hello_goodbye.script", - "v4x2/empty_explicit_hello_goodbye.script", - ] -) -def test_direct_driver_handshake_negotiation(driver_info, test_script): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_direct_driver_handshake_negotiation - with StubCluster(test_script): - uri = "bolt://localhost:9001" - driver = GraphDatabase.driver(uri, auth=driver_info["auth_token"], **driver_config) - assert isinstance(driver, BoltDriver) - driver.close() - - -@pytest.mark.parametrize( - "test_script, test_expected", - [ - ("v3/return_1_port_9001.script", "Neo4j/3.0.0"), - ("v4x0/return_1_port_9001.script", "Neo4j/4.0.0"), - ("v4x1/return_1_port_9001_bogus_server.script", UnsupportedServerProduct), - ("v4x2/return_1_port_9001_bogus_server.script", UnsupportedServerProduct), - ] -) -def test_return_1_as_x(driver_info, test_script, test_expected): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_return_1_as_x - with StubCluster(test_script): - uri = "bolt://localhost:9001" - try: - driver = GraphDatabase.driver(uri, auth=driver_info["auth_token"], user_agent="test") - assert isinstance(driver, BoltDriver) - with driver.session(default_access_mode=READ_ACCESS, fetch_size=-1) as session: - result = session.run("RETURN 1 AS x") - value = result.single().value() - assert value == 1 - summary = result.consume() - assert summary.server.agent == test_expected - assert summary.server.agent.startswith("Neo4j") - driver.close() - except UnsupportedServerProduct as error: - assert isinstance(error, test_expected) - - +# TODO: those tests will stay until a uniform behavior across the drivers has +# been specified and tests are created in testkit def test_direct_driver_with_wrong_port(driver_info): # python -m pytest tests/stub/test_directdriver.py -s -v -k test_direct_driver_with_wrong_port uri = "bolt://127.0.0.1:9002" @@ -172,449 +113,3 @@ def test_direct_verify_connectivity_disconnect_on_run(driver_info, test_script): with GraphDatabase.driver(uri, auth=driver_info["auth_token"], **driver_config) as driver: with pytest.raises(ServiceUnavailable): driver.verify_connectivity(default_access_mode=READ_ACCESS) - - -@pytest.mark.parametrize( - "test_script", - [ - "v3/disconnect_on_run.script", - "v4x0/disconnect_on_run.script", - ] -) -def test_direct_disconnect_on_run(driver_info, test_script): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_direct_disconnect_on_run - with StubCluster(test_script): - uri = "bolt://127.0.0.1:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"], **driver_config) as driver: - with pytest.raises(ServiceUnavailable): - with driver.session(**session_config) as session: - session.run("RETURN 1 AS x").consume() - - -@pytest.mark.parametrize( - "test_script", - [ - "v3/disconnect_on_pull_all.script", - "v4x0/disconnect_on_pull.script", - ] -) -def test_direct_disconnect_on_pull_all(driver_info, test_script): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_direct_disconnect_on_pull_all - with StubCluster(test_script): - uri = "bolt://127.0.0.1:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"], **driver_config) as driver: - with pytest.raises(ServiceUnavailable): - with driver.session(**session_config) as session: - session.run("RETURN $x", {"x": 1}).consume() - - -@pytest.mark.parametrize( - "test_script", - [ - "v3/disconnect_after_init.script", - "v4x0/disconnect_after_init.script", - ] -) -def test_direct_session_close_after_server_close(driver_info, test_script): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_direct_session_close_after_server_close - with StubCluster(test_script): - uri = "bolt://127.0.0.1:9001" - - with GraphDatabase.driver(uri, auth=driver_info["auth_token"], **driver_config) as driver: - with driver.session(**session_config) as session: - with pytest.raises(ServiceUnavailable): - session.write_transaction(lambda tx: tx.run(Query("CREATE (a:Item)", timeout=1))) - - -@pytest.mark.skip(reason="Cant close the Stub Server gracefully") -@pytest.mark.parametrize( - "test_script", - [ - "v3/empty.script", - "v4x0/empty.script", - ] -) -def test_bolt_uri_scheme_self_signed_certificate_constructs_bolt_driver(driver_info, test_script): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_bolt_uri_scheme_self_signed_certificate_constructs_bolt_driver - - test_config = { - "user_agent": "test", - "max_connection_lifetime": 1000, - "max_connection_pool_size": 10, - "keep_alive": False, - "max_transaction_retry_time": 1, - "resolver": None, - } - - with StubCluster(test_script): - uri = "bolt+ssc://127.0.0.1:9001" - try: - driver = GraphDatabase.driver(uri, auth=driver_info["auth_token"], **test_config) - assert isinstance(driver, BoltDriver) - driver.close() - except ServiceUnavailable as error: - assert isinstance(error.__cause__, BoltSecurityError) - pytest.skip("Failed to establish encrypted connection") - - -@pytest.mark.skip(reason="Cant close the Stub Server gracefully") -@pytest.mark.parametrize( - "test_script", - [ - "v3/empty.script", - "v4x0/empty.script", - ] -) -def test_bolt_uri_scheme_secure_constructs_bolt_driver(driver_info, test_script): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_bolt_uri_scheme_secure_constructs_bolt_driver - - test_config = { - "user_agent": "test", - "max_connection_lifetime": 1000, - "max_connection_pool_size": 10, - "keep_alive": False, - "max_transaction_retry_time": 1, - "resolver": None, - } - - with StubCluster(test_script): - uri = "bolt+s://127.0.0.1:9001" - try: - driver = GraphDatabase.driver(uri, auth=driver_info["auth_token"], **test_config) - assert isinstance(driver, BoltDriver) - driver.close() - except ServiceUnavailable as error: - assert isinstance(error.__cause__, BoltSecurityError) - pytest.skip("Failed to establish encrypted connection") - - -@pytest.mark.parametrize( - "test_uri", - [ - "bolt+ssc://127.0.0.1:9001", - "bolt+s://127.0.0.1:9001", - ] -) -@pytest.mark.parametrize( - "test_config, expected_failure, expected_failure_message", - [ - ({"encrypted": False}, ConfigurationError, "The config settings"), - ({"encrypted": True}, ConfigurationError, "The config settings"), - ({"encrypted": True, "trust": TRUST_ALL_CERTIFICATES}, ConfigurationError, "The config settings"), - ({"trust": TRUST_ALL_CERTIFICATES}, ConfigurationError, "The config settings"), - ({"trust": TRUST_SYSTEM_CA_SIGNED_CERTIFICATES}, ConfigurationError, "The config settings"), - ] -) -def test_bolt_uri_scheme_secure_constructs_driver_config_error(driver_info, test_uri, test_config, expected_failure, expected_failure_message): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_bolt_uri_scheme_secure_constructs_driver_config_error - with pytest.raises(expected_failure) as error: - driver = GraphDatabase.driver(test_uri, auth=driver_info["auth_token"], **test_config) - - assert error.match(expected_failure_message) - - -@pytest.mark.parametrize( - "test_config, expected_failure, expected_failure_message", - [ - ({"trust": 1}, ConfigurationError, "The config setting `trust`"), - ({"trust": True}, ConfigurationError, "The config setting `trust`"), - ({"trust": None}, ConfigurationError, "The config setting `trust`"), - ] -) -def test_driver_trust_config_error(driver_info, test_config, expected_failure, expected_failure_message): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_driver_trust_config_error - uri = "bolt://127.0.0.1:9001" - with pytest.raises(expected_failure) as error: - driver = GraphDatabase.driver(uri, auth=driver_info["auth_token"], **test_config) - - assert error.match(expected_failure_message) - - -@pytest.mark.parametrize( - "test_script, database", - [ - ("v3/pull_all_port_9001.script", DEFAULT_DATABASE), - ("v4x0/pull_n_port_9001.script", "test"), - ("v4x0/pull_n_port_9001_slow_network.script", "test"), - ] -) -def test_bolt_driver_fetch_size_config_normal_case(driver_info, test_script, database): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_bolt_driver_fetch_size_config_normal_case - with StubCluster(test_script): - uri = "bolt://127.0.0.1:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"], user_agent="test") as driver: - assert isinstance(driver, BoltDriver) - with driver.session(database=database, fetch_size=2, default_access_mode=READ_ACCESS) as session: - expected = [] - result = session.run("UNWIND [1,2,3,4] AS x RETURN x") - assert isinstance(result, Result) - for record in result: - expected.append(record["x"]) - - assert expected == [1, 2, 3, 4] - - -@pytest.mark.parametrize( - "test_script, database", - [ - ("v3/pull_all_port_9001.script", DEFAULT_DATABASE), - ("v4x0/pull_n_port_9001.script", "test"), - ] -) -def test_bolt_driver_fetch_size_config_result_implements_iterator_protocol(driver_info, test_script, database): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_bolt_driver_fetch_size_config_result_implements_iterator_protocol - with StubCluster(test_script): - uri = "bolt://127.0.0.1:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"], user_agent="test") as driver: - assert isinstance(driver, BoltDriver) - with driver.session(database=database, fetch_size=2, default_access_mode=READ_ACCESS) as session: - expected = [] - result = iter(session.run("UNWIND [1,2,3,4] AS x RETURN x")) - expected.append(next(result)["x"]) - expected.append(next(result)["x"]) - expected.append(next(result)["x"]) - expected.append(next(result)["x"]) - with pytest.raises(StopIteration): - expected.append(next(result)["x"]) - - assert expected == [1, 2, 3, 4] - - -@pytest.mark.parametrize( - "test_script, database", - [ - ("v3/pull_all_port_9001.script", DEFAULT_DATABASE), - ("v4x0/pull_2_discard_all_port_9001.script", "test"), - ] -) -def test_bolt_driver_fetch_size_config_consume_case(driver_info, test_script, database): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_bolt_driver_fetch_size_config_consume_case - with StubCluster(test_script): - uri = "bolt://127.0.0.1:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"], **driver_config) as driver: - with driver.session(database=database, fetch_size=2, default_access_mode=READ_ACCESS) as session: - result = session.run("UNWIND [1,2,3,4] AS x RETURN x") - result_summary = result.consume() - - assert result_summary.server.address == ('127.0.0.1', 9001) - - -@pytest.mark.parametrize( - "test_script, database", - [ - ("v3/pull_all_port_9001.script", DEFAULT_DATABASE), - ("v4x0/pull_2_discard_all_port_9001.script", "test"), - ] -) -def test_bolt_driver_on_exit_consume_remaining_result(driver_info, test_script, database): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_bolt_driver_on_exit_consume_remaining_result - with StubCluster(test_script): - uri = "bolt://127.0.0.1:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"], **driver_config) as driver: - with driver.session(database=database, fetch_size=2, default_access_mode=READ_ACCESS) as session: - result = session.run("UNWIND [1,2,3,4] AS x RETURN x") - - -@pytest.mark.parametrize( - "test_script, database", - [ - ("v3/pull_all_port_9001.script", DEFAULT_DATABASE), - ("v4x0/pull_2_discard_all_port_9001.script", "test"), - ] -) -def test_bolt_driver_on_close_result_consume(driver_info, test_script, database): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_bolt_driver_on_close_result_consume - with StubCluster(test_script): - uri = "bolt://127.0.0.1:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"], **driver_config) as driver: - session = driver.session(database=database, fetch_size=2, default_access_mode=READ_ACCESS) - result = session.run("UNWIND [1,2,3,4] AS x RETURN x") - session.close() - - -@pytest.mark.parametrize( - "test_script, database", - [ - ("v3/pull_all_port_9001.script", DEFAULT_DATABASE), - ("v4x0/pull_2_discard_all_port_9001.script", "test"), - ] -) -def test_bolt_driver_on_exit_result_consume(driver_info, test_script, database): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_bolt_driver_on_exit_result_consume - with StubCluster(test_script): - uri = "bolt://127.0.0.1:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"], **driver_config) as driver: - with driver.session(database=database, fetch_size=2, default_access_mode=READ_ACCESS) as session: - result = session.run("UNWIND [1,2,3,4] AS x RETURN x") - - -@pytest.mark.parametrize( - "test_script, database", - [ - ("v3/pull_all_port_9001.script", DEFAULT_DATABASE), - ("v4x0/pull_n_port_9001.script", "test"), - ] -) -def test_bolt_driver_fetch_size_config_case_result_consume(driver_info, test_script, database): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_bolt_driver_fetch_size_config_case_result_consume - with StubCluster(test_script): - uri = "bolt://127.0.0.1:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"], user_agent="test") as driver: - assert isinstance(driver, BoltDriver) - with driver.session(database=database, fetch_size=2, default_access_mode=READ_ACCESS) as session: - expected = [] - result = session.run("UNWIND [1,2,3,4] AS x RETURN x") - for record in result: - expected.append(record["x"]) - - result_summary = result.consume() - assert result_summary.server.address == ('127.0.0.1', 9001) - - result_summary = result.consume() # It will just return the result summary - - assert expected == [1, 2, 3, 4] - assert result_summary.server.address == ('127.0.0.1', 9001) - - -@pytest.mark.skip(reason="Bolt Stub Server cant handle this script correctly, see tests/integration/test_bolt_driver.py test_bolt_driver_fetch_size_config_run_consume_run") -@pytest.mark.parametrize( - "test_script, database", - [ - ("v4x0/pull_2_discard_all_pull_n_port_9001.script", "test"), - ] -) -def test_bolt_driver_fetch_size_config_run_consume_run(driver_info, test_script, database): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_bolt_driver_fetch_size_config_run_consume_run - with StubCluster(test_script): - uri = "bolt://127.0.0.1:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"], user_agent="test") as driver: - assert isinstance(driver, BoltDriver) - with driver.session(database=database, fetch_size=2, default_access_mode=READ_ACCESS) as session: - expected = [] - result1 = session.run("UNWIND [1,2,3,4] AS x RETURN x") - result1.consume() - result2 = session.run("UNWIND [5,6,7,8] AS x RETURN x") - - for record in result2: - expected.append(record["x"]) - - result_summary = result2.consume() - assert result_summary.server.address == ('127.0.0.1', 9001) - - assert expected == [5, 6, 7, 8] - assert result_summary.server.address == ('127.0.0.1', 9001) - - -@pytest.mark.skip(reason="Bolt Stub Server cant handle this script correctly, see tests/integration/test_bolt_driver.py test_bolt_driver_fetch_size_config_run_run") -@pytest.mark.parametrize( - "test_script, database", - [ - ("v4x0/pull_2_discard_all_pull_n_port_9001.script", "test"), - ] -) -def test_bolt_driver_fetch_size_config_run_run(driver_info, test_script, database): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_bolt_driver_fetch_size_config_run_run - with StubCluster(test_script): - uri = "bolt://127.0.0.1:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"], user_agent="test") as driver: - assert isinstance(driver, BoltDriver) - with driver.session(database=database, fetch_size=2, default_access_mode=READ_ACCESS) as session: - expected = [] - result1 = session.run("UNWIND [1,2,3,4] AS x RETURN x") # The session should discard all if in streaming mode and a new run is invoked. - result2 = session.run("UNWIND [5,6,7,8] AS x RETURN x") - - for record in result2: - expected.append(record["x"]) - - result_summary = result2.consume() - assert result_summary.server.address == ('127.0.0.1', 9001) - - assert expected == [5, 6, 7, 8] - assert result_summary.server.address == ('127.0.0.1', 9001) - - -@pytest.mark.parametrize( - "test_script, database", - [ - ("v3/pull_all_port_9001_transaction_function.script", DEFAULT_DATABASE), - ("v4x0/tx_pull_n_port_9001.script", "test"), - ("v4x0/tx_pull_n_port_9001_slow_network.script", "test"), - # TODO: Test retry mechanism - ] -) -def test_bolt_driver_read_transaction_fetch_size_config_normal_case(driver_info, test_script, database): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_bolt_driver_read_transaction_fetch_size_config_normal_case - @unit_of_work(timeout=3, metadata={"foo": "bar"}) - def unwind(transaction): - assert isinstance(transaction, Transaction) - values = [] - result = transaction.run("UNWIND [1,2,3,4] AS x RETURN x") - assert isinstance(result, Result) - for record in result: - values.append(record["x"]) - return values - - with StubCluster(test_script): - uri = "bolt://127.0.0.1:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"], user_agent="test") as driver: - assert isinstance(driver, BoltDriver) - with driver.session(database=database, fetch_size=2, default_access_mode=READ_ACCESS) as session: - expected = session.read_transaction(unwind) - - assert expected == [1, 2, 3, 4] - - -@pytest.mark.parametrize( - "test_script, database", - [ - ("v4x0/tx_pull_2_discard_all_port_9001.script", "test"), - ] -) -def test_bolt_driver_explicit_transaction_consume_result_case_a(driver_info, test_script, database): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_bolt_driver_explicit_transaction_consume_result_case_a - # This test is to check that the implicit consume that is triggered on transaction.commit() is working properly. - with StubCluster(test_script): - uri = "bolt://127.0.0.1:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"], user_agent="test") as driver: - with driver.session(database=database, fetch_size=2, default_access_mode=READ_ACCESS) as session: - transaction = session.begin_transaction(timeout=3, metadata={"foo": "bar"}) - result = transaction.run("UNWIND [1,2,3,4] AS x RETURN x") - transaction.commit() - - -@pytest.mark.parametrize( - "test_script, database", - [ - ("v4x0/tx_pull_2_discard_all_port_9001.script", "test"), - ] -) -def test_bolt_driver_explicit_transaction_consume_result_case_b(driver_info, test_script, database): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_bolt_driver_explicit_transaction_consume_result_case_b - # This test is to check that the implicit consume that is triggered on transaction.commit() is working properly. - with StubCluster(test_script): - uri = "bolt://127.0.0.1:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"], user_agent="test") as driver: - with driver.session(database=database, fetch_size=2, default_access_mode=READ_ACCESS) as session: - transaction = session.begin_transaction(timeout=3, metadata={"foo": "bar"}) - result = transaction.run("UNWIND [1,2,3,4] AS x RETURN x") - result.consume() - transaction.commit() - - -@pytest.mark.parametrize( - "test_script", - [ - "v4x1/return_1_noop_port_9001.script", - "v4x2/return_1_noop_port_9001.script", - ] -) -def test_direct_can_handle_noop(driver_info, test_script): - # python -m pytest tests/stub/test_directdriver.py -s -v -k test_direct_can_handle_noop - with StubCluster(test_script): - uri = "bolt://localhost:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"], **driver_config) as driver: - assert isinstance(driver, BoltDriver) - with driver.session(fetch_size=2, default_access_mode=READ_ACCESS) as session: - result = session.run("RETURN 1 AS x") - value = result.single().value() - assert value == 1 diff --git a/tests/stub/test_multi_database.py b/tests/stub/test_multi_database.py deleted file mode 100644 index 489291aa..00000000 --- a/tests/stub/test_multi_database.py +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- - -# Copyright (c) "Neo4j" -# Neo4j Sweden AB [http://neo4j.com] -# -# This file is part of Neo4j. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import pytest - -from neo4j import ( - GraphDatabase, - Neo4jDriver, - DEFAULT_DATABASE, -) -from tests.stub.conftest import StubCluster - -# python -m pytest tests/stub/test_multi_database.py -s -v - - -@pytest.mark.parametrize( - "test_script, test_database", - [ - ("v3/dbms_cluster_routing_get_routing_table_system.script", DEFAULT_DATABASE), - ("v4x0/dbms_routing_get_routing_table_system_default.script", DEFAULT_DATABASE), - ("v4x0/dbms_routing_get_routing_table_system_neo4j.script", "neo4j"), - ] -) -def test_dbms_cluster_routing_get_routing_table(driver_info, test_script, test_database): - # python -m pytest tests/stub/test_multi_database.py -s -v -k test_dbms_cluster_routing_get_routing_table - - test_config = { - "user_agent": "test", - "database": test_database, - } - - with StubCluster(test_script): - uri = "neo4j://localhost:9001" - driver = GraphDatabase.driver(uri, auth=driver_info["auth_token"], **test_config) - assert isinstance(driver, Neo4jDriver) - driver.close() diff --git a/tests/stub/test_routingdriver.py b/tests/stub/test_routingdriver.py index d1319d8c..139c225d 100644 --- a/tests/stub/test_routingdriver.py +++ b/tests/stub/test_routingdriver.py @@ -44,110 +44,8 @@ from tests.stub.conftest import StubCluster # python -m pytest tests/stub/test_routingdriver.py -s -v - - -@pytest.mark.parametrize( - "test_script", - [ - "v3/router.script", - "v4x0/router.script", - ] -) -def test_neo4j_uri_scheme_constructs_neo4j_driver(driver_info, test_script): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_neo4j_uri_scheme_constructs_neo4j_driver - with StubCluster(test_script): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - assert isinstance(driver, Neo4jDriver) - - -@pytest.mark.skip(reason="Cant close the Stub Server gracefully") -@pytest.mark.parametrize( - "test_script", - [ - "v3/router.script", - "v4x0/router.script", - ] -) -def test_neo4j_uri_scheme_self_signed_certificate_constructs_neo4j_driver(driver_info, test_script): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_neo4j_uri_scheme_self_signed_certificate_constructs_neo4j_driver - with StubCluster(test_script): - uri = "neo4j+ssc://localhost:9001" - - test_config = { - "user_agent": "test", - "max_connection_lifetime": 1000, - "max_connection_pool_size": 10, - "keep_alive": False, - "max_transaction_retry_time": 1, - "resolver": None, - } - - try: - driver = GraphDatabase.driver(uri, auth=driver_info["auth_token"], **test_config) - assert isinstance(driver, Neo4jDriver) - driver.close() - except ServiceUnavailable as error: - assert isinstance(error.__cause__, BoltSecurityError) - pytest.skip("Failed to establish encrypted connection") - - -@pytest.mark.skip(reason="Cant close the Stub Server gracefully") -@pytest.mark.parametrize( - "test_script", - [ - "v3/router.script", - "v4x0/router.script", - ] -) -def test_neo4j_uri_scheme_secure_constructs_neo4j_driver(driver_info, test_script): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_neo4j_uri_scheme_secure_constructs_neo4j_driver - with StubCluster(test_script): - uri = "neo4j+s://localhost:9001" - - test_config = { - "user_agent": "test", - "max_connection_lifetime": 1000, - "max_connection_pool_size": 10, - "keep_alive": False, - "max_transaction_retry_time": 1, - "resolver": None, - } - - try: - driver = GraphDatabase.driver(uri, auth=driver_info["auth_token"], **test_config) - assert isinstance(driver, Neo4jDriver) - driver.close() - except ServiceUnavailable as error: - assert isinstance(error.__cause__, BoltSecurityError) - pytest.skip("Failed to establish encrypted connection") - - -@pytest.mark.parametrize( - "test_uri", - [ - "neo4j+ssc://localhost:9001", - "neo4j+s://localhost:9001", - ] -) -@pytest.mark.parametrize( - "test_config, expected_failure, expected_failure_message", - [ - ({"encrypted": False}, ConfigurationError, "The config settings"), - ({"encrypted": True}, ConfigurationError, "The config settings"), - ({"encrypted": True, "trust": TRUST_ALL_CERTIFICATES}, ConfigurationError, "The config settings"), - ({"trust": TRUST_ALL_CERTIFICATES}, ConfigurationError, "The config settings"), - ({"trust": TRUST_SYSTEM_CA_SIGNED_CERTIFICATES}, ConfigurationError, "The config settings"), - ] -) -def test_neo4j_uri_scheme_secure_constructs_neo4j_driver_config_error(driver_info, test_uri, test_config, expected_failure, expected_failure_message): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_neo4j_uri_scheme_secure_constructs_neo4j_driver_config_error - with pytest.raises(expected_failure) as error: - driver = GraphDatabase.driver(test_uri, auth=driver_info["auth_token"], **test_config) - - assert error.match(expected_failure_message) - - -@pytest.mark.skip(reason="Flaky") +# TODO: those tests will stay until a uniform behavior across the drivers has +# been specified and tests are created in testkit @pytest.mark.parametrize( "test_script", [ @@ -158,15 +56,12 @@ def test_neo4j_uri_scheme_secure_constructs_neo4j_driver_config_error(driver_inf def test_neo4j_driver_verify_connectivity(driver_info, test_script): # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_neo4j_driver_verify_connectivity with StubCluster(test_script): - driver = GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"], user_agent="test") - assert isinstance(driver, Neo4jDriver) - - with StubCluster(test_script): - assert driver.verify_connectivity() is not None - driver.close() + with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"], user_agent="test") as driver: + assert isinstance(driver, Neo4jDriver) + assert driver.verify_connectivity() is not None -@pytest.mark.skip(reason="Flaky") +# @pytest.mark.skip(reason="Flaky") @pytest.mark.parametrize( "test_script", [ @@ -176,505 +71,8 @@ def test_neo4j_driver_verify_connectivity(driver_info, test_script): ) def test_neo4j_driver_verify_connectivity_server_down(driver_info, test_script): # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_neo4j_driver_verify_connectivity_server_down - with StubCluster(test_script): - driver = GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"], user_agent="test") + with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"], user_agent="test") as driver: assert isinstance(driver, Neo4jDriver) with pytest.raises(ServiceUnavailable): driver.verify_connectivity() - - driver.close() - - -@pytest.mark.parametrize( - "test_script", - [ - "v3/non_router.script", - "v4x0/routing_table_failure_not_a_router.script", - ] -) -def test_cannot_discover_servers_on_non_router(driver_info, test_script): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_cannot_discover_servers_on_non_router - with StubCluster(test_script): - with pytest.raises(ServiceUnavailable): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - assert isinstance(driver, Neo4jDriver) - driver._pool.update_routing_table(database=None, bookmarks=None) - - -@pytest.mark.parametrize( - "test_script", - [ - "v3/silent_router.script", - "v4x0/routing_table_silent_router.script", - ] -) -def test_cannot_discover_servers_on_silent_router(driver_info, test_script): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_cannot_discover_servers_on_silent_router - with StubCluster(test_script): - with pytest.raises(ServiceUnavailable, match="routing"): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - assert isinstance(driver, Neo4jDriver) - driver._pool.update_routing_table(database=None, bookmarks=None) - - -@pytest.mark.parametrize( - "test_script", - [ - "v3/router.script", - "v4x0/router.script", - ] -) -def test_should_discover_servers_on_driver_construction(driver_info, test_script): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_should_discover_servers_on_driver_construction - with StubCluster(test_script): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - assert isinstance(driver, Neo4jDriver) - driver._pool.update_routing_table(database=None, bookmarks=None) - table = driver._pool.routing_tables[DEFAULT_DATABASE] - assert table.routers == {('127.0.0.1', 9001), ('127.0.0.1', 9002), - ('127.0.0.1', 9003)} - assert table.readers == {('127.0.0.1', 9004), ('127.0.0.1', 9005)} - assert table.writers == {('127.0.0.1', 9006)} - - -@pytest.mark.parametrize( - "test_scripts", - [ - ("v3/router.script", "v3/return_1.script"), - ("v4x0/router.script", "v4x0/return_1_port_9004.script"), - ] -) -def test_should_be_able_to_read(driver_info, test_scripts): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_should_be_able_to_read - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(default_access_mode=READ_ACCESS, fetch_size=-1) as session: - result = session.run("RETURN $x", {"x": 1}) - for record in result: - assert record["x"] == 1 - assert result.consume().server.address == ('127.0.0.1', 9004) - - -@pytest.mark.parametrize( - "test_scripts", - [ - ("v3/router.script", "v3/create_a.script"), - ("v4x0/router.script", "v4x0/create_test_node_port_9006.script"), - ] -) -def test_should_be_able_to_write(driver_info, test_scripts): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_should_be_able_to_write - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(default_access_mode=WRITE_ACCESS, fetch_size=-1) as session: - result = session.run("CREATE (a $x)", {"x": {"name": "Alice"}}) - assert not list(result) - assert result.consume().server.address == ('127.0.0.1', 9006) - - -@pytest.mark.parametrize( - "test_scripts", - [ - ("v3/router.script", "v3/create_a.script"), - ("v4x0/router.script", "v4x0/create_test_node_port_9006.script"), - ] -) -def test_should_be_able_to_write_as_default(driver_info, test_scripts): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_should_be_able_to_write_as_default - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(fetch_size=-1) as session: - result = session.run("CREATE (a $x)", {"x": {"name": "Alice"}}) - assert not list(result) - assert result.consume().server.address == ('127.0.0.1', 9006) - - -@pytest.mark.parametrize( - "test_scripts", - [ - ("v3/router.script", "v3/disconnect_on_run_9004.script"), - ("v4x0/router.script", "v4x0/disconnect_on_run_port_9004.script"), - ] -) -def test_routing_disconnect_on_run(driver_info, test_scripts): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_routing_disconnect_on_run - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with pytest.raises(SessionExpired): - with driver.session(default_access_mode=READ_ACCESS, fetch_size=-1) as session: - session.run("RETURN $x", {"x": 1}).consume() - - -@pytest.mark.parametrize( - "test_scripts", - [ - ("v3/router.script", "v3/disconnect_on_pull_all_9004.script"), - ("v4x0/router.script", "v4x0/disconnect_on_pull_port_9004.script"), - ] -) -def test_routing_disconnect_on_pull_all(driver_info, test_scripts): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_routing_disconnect_on_pull_all - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with pytest.raises(SessionExpired): - with driver.session(default_access_mode=READ_ACCESS, fetch_size=-1) as session: - session.run("RETURN $x", {"x": 1}).consume() - - -@pytest.mark.parametrize( - "test_scripts", - [ - ("v3/router.script", "v3/return_1.script"), - ("v4x0/router.script", "v4x0/return_1_port_9004.script"), - ] -) -def test_should_disconnect_after_fetching_autocommit_result(driver_info, test_scripts): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_should_disconnect_after_fetching_autocommit_result - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(default_access_mode=READ_ACCESS, fetch_size=-1) as session: - result = session.run("RETURN $x", {"x": 1}) - assert session._connection is not None - result.consume() - assert session._connection is None - - -@pytest.mark.parametrize( - "test_scripts, test_run_args", - [ - (("v3/router.script", "v3/return_1_twice_in_read_tx.script"), ("RETURN $x", {"x": 1})), - (("v4x0/router.script", "v4x0/tx_return_1_twice_port_9004.script"), ("RETURN 1", )), - ] -) -def test_should_disconnect_after_explicit_commit(driver_info, test_scripts, test_run_args): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_should_disconnect_after_explicit_commit - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(default_access_mode=READ_ACCESS, fetch_size=-1) as session: - with session.begin_transaction() as tx: - result = tx.run(*test_run_args) - assert session._connection is not None - result.consume() - assert session._connection is not None - result = tx.run(*test_run_args) - assert session._connection is not None - result.consume() - assert session._connection is not None - assert session._connection is None - - -@pytest.mark.parametrize( - "test_scripts, test_run_args", - [ - (("v3/router.script", "v3/return_1_twice_in_read_tx.script"), ("RETURN $x", {"x": 1})), - (("v4x0/router.script", "v4x0/tx_return_1_twice_port_9004.script"), ("RETURN 1", )), - ] -) -def test_default_access_mode_defined_at_session_level(driver_info, test_scripts, test_run_args): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_default_access_mode_defined_at_session_level - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(default_access_mode=READ_ACCESS, fetch_size=-1) as session: - with session.begin_transaction() as tx: - result = tx.run(*test_run_args) - assert session._connection is not None - result.consume() - assert session._connection is not None - result = tx.run(*test_run_args) - assert session._connection is not None - result.consume() - assert session._connection is not None - assert session._connection is None - - -@pytest.mark.parametrize( - "test_scripts, test_run_args", - [ - (("v3/router.script", "v3/return_1_twice.script"), ("RETURN $x", {"x": 1})), - (("v4x0/router.script", "v4x0/return_1_twice_port_9004.script"), ("RETURN 1", )), - ] -) -def test_should_reconnect_for_new_query(driver_info, test_scripts, test_run_args): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_should_reconnect_for_new_query - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(default_access_mode=READ_ACCESS, fetch_size=-1) as session: - result_1 = session.run(*test_run_args) - assert session._connection is not None - result_1.consume() - assert session._connection is None - result_2 = session.run(*test_run_args) - assert session._connection is not None - result_2.consume() - assert session._connection is None - - -@pytest.mark.parametrize( - "test_scripts, test_run_args", - [ - (("v3/router.script", "v3/return_1_twice.script"), ("RETURN $x", {"x": 1})), - (("v4x0/router.script", "v4x0/return_1_twice_port_9004.script"), ("RETURN 1", )), - ] -) -def test_should_retain_connection_if_fetching_multiple_results(driver_info, test_scripts, test_run_args): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_should_retain_connection_if_fetching_multiple_results - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(default_access_mode=READ_ACCESS, fetch_size=-1) as session: - result_1 = session.run(*test_run_args) - result_2 = session.run(*test_run_args) - assert session._connection is not None - result_1.consume() - assert session._connection is not None - result_2.consume() - assert session._connection is None - - -@pytest.mark.parametrize( - "test_scripts, test_run_args", - [ - (("v3/router.script", "v3/return_1_four_times.script"), ("RETURN $x", {"x": 1})), - (("v4x0/router.script", "v4x0/return_1_four_times_port_9004.script"), ("RETURN 1", )), - ] -) -def test_two_sessions_can_share_a_connection(driver_info, test_scripts, test_run_args): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_two_sessions_can_share_a_connection - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - session_1 = driver.session(default_access_mode=READ_ACCESS, fetch_size=-1) - session_2 = driver.session(default_access_mode=READ_ACCESS, fetch_size=-1) - - result_1a = session_1.run(*test_run_args) - c = session_1._connection - result_1a.consume() - - result_2a = session_2.run(*test_run_args) - assert session_2._connection is c - result_2a.consume() - - result_1b = session_1.run(*test_run_args) - assert session_1._connection is c - result_1b.consume() - - result_2b = session_2.run(*test_run_args) - assert session_2._connection is c - result_2b.consume() - - session_2.close() - session_1.close() - - -@pytest.mark.parametrize( - "test_scripts, test_run_args", - [ - (("v3/get_routing_table.script", "v3/return_1_on_9002.script"), ("RETURN $x", {"x": 1})), - (("v4x0/router_role_route_share_port_with_role_read_and_role_write.script", "v4x0/return_1_port_9002.script"), ("RETURN 1 AS x", )), - ] -) -def test_should_call_get_routing_table_procedure(driver_info, test_scripts, test_run_args): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_should_call_get_routing_table_procedure - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(default_access_mode=READ_ACCESS, fetch_size=-1) as session: - result = session.run(*test_run_args) - for record in result: - assert record["x"] == 1 - assert result.consume().server.address == ('127.0.0.1', 9002) - - -@pytest.mark.parametrize( - "test_scripts, test_run_args", - [ - (("v3/get_routing_table_with_context.script", "v3/return_1_on_9002.script"), ("RETURN $x", {"x": 1})), - (("v4x0/router_get_routing_table_with_context.script", "v4x0/return_1_port_9002.script"), ("RETURN 1 AS x", )), - ] -) -def test_should_call_get_routing_table_with_context(driver_info, test_scripts, test_run_args): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_should_call_get_routing_table_with_context - with StubCluster(*test_scripts): - uri = "neo4j://localhost:9001/?name=molly&age=1" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"]) as driver: - with driver.session(default_access_mode=READ_ACCESS, fetch_size=-1) as session: - result = session.run(*test_run_args) - for record in result: - assert record["x"] == 1 - assert result.consume().server.address == ('127.0.0.1', 9002) - - -@pytest.mark.parametrize( - "test_scripts, test_run_args", - [ - (("v3/router_no_writers.script", "v3/return_1_on_9005.script"), ("RETURN $x", {"x": 1})), - (("v4x0/router_with_no_role_write.script", "v4x0/return_1_port_9005.script"), ("RETURN 1 AS x", )), - ] -) -def test_should_serve_read_when_missing_writer(driver_info, test_scripts, test_run_args): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_should_serve_read_when_missing_writer - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(default_access_mode=READ_ACCESS, fetch_size=-1) as session: - result = session.run(*test_run_args) - for record in result: - assert record["x"] == 1 - assert result.consume().server.address == ('127.0.0.1', 9005) - - -@pytest.mark.parametrize( - "test_script", - [ - "v3/router_no_readers.script", - "v4x0/router_with_no_role_read.script", - ] -) -def test_should_error_when_missing_reader(driver_info, test_script): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_should_error_when_missing_reader - with StubCluster(test_script): - with pytest.raises(ServiceUnavailable, match="routing"): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - assert isinstance(driver, Neo4jDriver) - driver._pool.update_routing_table(database=None, bookmarks=None) - - -@pytest.mark.parametrize( - "test_scripts, test_run_args", - [ - (("v3/router.script", "v3/not_a_leader.script"), ("CREATE (n {name:'Bob'})", )), - (("v4x0/router.script", "v4x0/run_with_failure_cluster_not_a_leader.script"), ("CREATE (n:TEST {name:'test'})", )), - ] -) -def test_forgets_address_on_not_a_leader_error(driver_info, test_scripts, test_run_args): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_forgets_address_on_not_a_leader_error - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(default_access_mode=WRITE_ACCESS, fetch_size=-1) as session: - with pytest.raises(TransientError): - _ = session.run(*test_run_args) - - pool = driver._pool - table = driver._pool.routing_tables[DEFAULT_DATABASE] - - # address might still have connections in the pool, failed instance just can't serve writes - assert ('127.0.0.1', 9006) in pool.connections - assert table.routers == {('127.0.0.1', 9001), ('127.0.0.1', 9002), ('127.0.0.1', 9003)} - assert table.readers == {('127.0.0.1', 9004), ('127.0.0.1', 9005)} - # writer 127.0.0.1:9006 should've been forgotten because of an error - assert len(table.writers) == 0 - - -@pytest.mark.parametrize( - "test_scripts, test_run_args", - [ - (("v3/router.script", "v3/forbidden_on_read_only_database.script"), ("CREATE (n {name:'Bob'})", )), - (("v4x0/router.script", "v4x0/run_with_failure_forbidden_on_read_only_database.script"), ("CREATE (n:TEST {name:'test'})", )), - ] -) -def test_forgets_address_on_forbidden_on_read_only_database_error(driver_info, test_scripts, test_run_args): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_forgets_address_on_forbidden_on_read_only_database_error - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - with driver.session(default_access_mode=WRITE_ACCESS, fetch_size=-1) as session: - with pytest.raises(TransientError): - _ = session.run(*test_run_args) - - pool = driver._pool - table = driver._pool.routing_tables[DEFAULT_DATABASE] - - # address might still have connections in the pool, failed instance just can't serve writes - assert ('127.0.0.1', 9006) in pool.connections - assert table.routers == {('127.0.0.1', 9001), ('127.0.0.1', 9002), ('127.0.0.1', 9003)} - assert table.readers == {('127.0.0.1', 9004), ('127.0.0.1', 9005)} - # writer 127.0.0.1:9006 should've been forgotten because of an error - assert len(table.writers) == 0 - - -@pytest.mark.parametrize( - "test_scripts, test_run_args", - [ - (("v3/router.script", "v3/rude_reader.script"), ("RETURN 1", )), - (("v4x0/router.script", "v4x0/disconnect_on_pull_port_9004.script"), ("RETURN $x", {"x": 1})), - ] -) -def test_forgets_address_on_service_unavailable_error(driver_info, test_scripts, test_run_args): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_forgets_address_on_service_unavailable_error - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - assert isinstance(driver, Neo4jDriver) - driver._pool.update_routing_table(database=None, bookmarks=None) - with driver.session(default_access_mode=READ_ACCESS, fetch_size=-1) as session: - - pool = driver._pool - table = driver._pool.routing_tables[DEFAULT_DATABASE] - table.readers.remove(('127.0.0.1', 9005)) - - with pytest.raises(SessionExpired): - _ = session.run(*test_run_args) - - # address should have connections in the pool but be inactive, it has failed - assert ('127.0.0.1', 9004) in pool.connections - conns = pool.connections[('127.0.0.1', 9004)] - conn = conns[0] - assert conn._closed is True - assert conn.in_use is False - assert session._connection is None - assert table.routers == {('127.0.0.1', 9001), ('127.0.0.1', 9002), ('127.0.0.1', 9003)} - # reader 127.0.0.1:9004 should've been forgotten because of an error - assert not table.readers - assert table.writers == {('127.0.0.1', 9006)} - - assert conn.in_use is False - - -@pytest.mark.parametrize( - "test_scripts, test_run_args", - [ - (("v3/router.script", "v3/database_unavailable.script"), ("RETURN 1", )), - (("v4x0/router.script", "v4x0/run_with_failure_database_unavailable.script"), ("RETURN 1", )), - ] -) -def test_forgets_address_on_database_unavailable_error(driver_info, test_scripts, test_run_args): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_forgets_address_on_database_unavailable_error - with StubCluster(*test_scripts): - with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: - assert isinstance(driver, Neo4jDriver) - driver._pool.update_routing_table(database=None, bookmarks=None) - with driver.session(default_access_mode=READ_ACCESS, fetch_size=-1) as session: - - pool = driver._pool - table = driver._pool.routing_tables[DEFAULT_DATABASE] - table.readers.remove(('127.0.0.1', 9005)) - - with pytest.raises(TransientError) as raised: - _ = session.run(*test_run_args) - assert raised.exception.title == "DatabaseUnavailable" - - pool = driver._pool - table = driver._pool.routing_tables[DEFAULT_DATABASE] - - # address should not have connections in the pool, it has failed - assert ('127.0.0.1', 9004) not in pool.connections - assert table.routers == {('127.0.0.1', 9001), ('127.0.0.1', 9002), ('127.0.0.1', 9003)} - # reader 127.0.0.1:9004 should've been forgotten because of an raised - assert not table.readers - assert table.writers == {('127.0.0.1', 9006)} - - -@pytest.mark.parametrize( - "test_scripts", - [ - ("v4x1/router_get_routing_table_with_context.script", "v4x2/hello_with_routing_context_return_1_port_9002.script",), - ("v4x1/router_get_routing_table_with_context.script", "v4x2/hello_with_routing_context_return_1_port_9002.script",), - ] -) -def test_hello_routing(driver_info, test_scripts): - # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_hello_routing - with StubCluster(*test_scripts): - uri = "neo4j://localhost:9001/?region=china&policy=my_policy" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"], user_agent="test") as driver: - with driver.session(default_access_mode=READ_ACCESS, fetch_size=-1) as session: - result = session.run("RETURN 1 AS x") - for record in result: - assert record["x"] == 1 - address = result.consume().server.address - assert address.host == "127.0.0.1" - assert address.port == 9002 diff --git a/tests/stub/test_transactions.py b/tests/stub/test_transactions.py deleted file mode 100644 index b1e705d6..00000000 --- a/tests/stub/test_transactions.py +++ /dev/null @@ -1,102 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- - -# Copyright (c) "Neo4j" -# Neo4j Sweden AB [http://neo4j.com] -# -# This file is part of Neo4j. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import pytest - -from neo4j import ( - GraphDatabase, - WRITE_ACCESS, - TRUST_SYSTEM_CA_SIGNED_CERTIFICATES, -) -from neo4j.exceptions import ServiceUnavailable - -from tests.stub.conftest import StubCluster - -# python -m pytest tests/stub/test_transactions.py -s -v - - -driver_config = { - "encrypted": False, - "trust": TRUST_SYSTEM_CA_SIGNED_CERTIFICATES, - "user_agent": "test", - "max_connection_lifetime": 1000, - "max_connection_pool_size": 10, - "keep_alive": False, - "max_transaction_retry_time": 1, - "resolver": None, -} - -session_config = { - # "fetch_size": 100, - # "database": "default", - # "bookmarks": ["bookmark-1", ], - "default_access_mode": WRITE_ACCESS, - "connection_acquisition_timeout": 1.0, - "max_transaction_retry_time": 1.0, - "initial_retry_delay": 1.0, - "retry_delay_multiplier": 1.0, - "retry_delay_jitter_factor": 0.1, - "fetch_size": -1, -} - - - - - -@pytest.mark.parametrize( - "test_script", - [ - "v3/connection_error_on_commit.script", - "v4x0/tx_connection_error_on_commit.script", - ] -) -def test_connection_error_on_explicit_commit(driver_info, test_script): - # python -m pytest tests/stub/test_transactions.py -s -v -k test_connection_error_on_explicit_commit - with StubCluster(test_script): - uri = "bolt://127.0.0.1:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"], **driver_config) as driver: - with driver.session(**session_config) as session: - tx = session.begin_transaction() - result = tx.run("CREATE (n {name:'Bob'})") - _ = list(result) - with pytest.raises(ServiceUnavailable): - tx.commit() - - -@pytest.mark.parametrize( - "test_script", - [ - "v3/connection_error_on_commit.script", - "v4x0/tx_connection_error_on_commit.script", - ] -) -def test_connection_error_on_commit(driver_info, test_script): - # python -m pytest tests/stub/test_transactions.py -s -v -k test_connection_error_on_commit - def create_bob(tx): - result = tx.run("CREATE (n {name:'Bob'})") - return list(result) - - with StubCluster(test_script): - uri = "bolt://127.0.0.1:9001" - with GraphDatabase.driver(uri, auth=driver_info["auth_token"], **driver_config) as driver: - with driver.session(**session_config) as session: - with pytest.raises(ServiceUnavailable): - session.write_transaction(create_bob) diff --git a/tests/unit/test_driver.py b/tests/unit/test_driver.py new file mode 100644 index 00000000..588fb9df --- /dev/null +++ b/tests/unit/test_driver.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- + +# Copyright (c) "Neo4j" +# Neo4j Sweden AB [http://neo4j.com] +# +# This file is part of Neo4j. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from neo4j import ( + BoltDriver, + GraphDatabase, + Neo4jDriver, + TRUST_SYSTEM_CA_SIGNED_CERTIFICATES, + TRUST_ALL_CERTIFICATES, +) +from neo4j.api import WRITE_ACCESS +from neo4j.exceptions import ConfigurationError + + +@pytest.mark.parametrize("protocol", ("bolt://", "bolt+s://", "bolt+ssc://")) +@pytest.mark.parametrize("host", ("localhost", "127.0.0.1", + "[::1]", "[0:0:0:0:0:0:0:1]")) +@pytest.mark.parametrize("port", (":1234", "", ":7687")) +@pytest.mark.parametrize("auth_token", (("test", "test"), None)) +def test_direct_driver_constructor(protocol, host, port, auth_token): + uri = protocol + host + port + driver = GraphDatabase.driver(uri, auth=auth_token) + assert isinstance(driver, BoltDriver) + + +@pytest.mark.parametrize("protocol", ("neo4j://", "neo4j+s://", "neo4j+ssc://")) +@pytest.mark.parametrize("host", ("localhost", "127.0.0.1", + "[::1]", "[0:0:0:0:0:0:0:1]")) +@pytest.mark.parametrize("port", (":1234", "", ":7687")) +@pytest.mark.parametrize("auth_token", (("test", "test"), None)) +def test_routing_driver_constructor(protocol, host, port, auth_token): + uri = protocol + host + port + driver = GraphDatabase.driver(uri, auth=auth_token) + assert isinstance(driver, Neo4jDriver) + + +@pytest.mark.parametrize("test_uri", ( + "bolt+ssc://127.0.0.1:9001", + "bolt+s://127.0.0.1:9001", + "neo4j+ssc://127.0.0.1:9001", + "neo4j+s://127.0.0.1:9001", +)) +@pytest.mark.parametrize( + ("test_config", "expected_failure", "expected_failure_message"), + ( + ({"encrypted": False}, ConfigurationError, "The config settings"), + ({"encrypted": True}, ConfigurationError, "The config settings"), + ( + {"encrypted": True, "trust": TRUST_ALL_CERTIFICATES}, + ConfigurationError, "The config settings" + ), + ( + {"trust": TRUST_ALL_CERTIFICATES}, + ConfigurationError, "The config settings" + ), + ( + {"trust": TRUST_SYSTEM_CA_SIGNED_CERTIFICATES}, + ConfigurationError, "The config settings" + ), + ) +) +def test_driver_config_error( + test_uri, test_config, expected_failure, expected_failure_message +): + with pytest.raises(expected_failure, match=expected_failure_message): + GraphDatabase.driver(test_uri, **test_config) + + +@pytest.mark.parametrize( + ("test_config", "expected_failure", "expected_failure_message"), + ( + ({"trust": 1}, ConfigurationError, "The config setting `trust`"), + ({"trust": True}, ConfigurationError, "The config setting `trust`"), + ({"trust": None}, ConfigurationError, "The config setting `trust`"), + ) +) +def test_driver_trust_config_error( + test_config, expected_failure, expected_failure_message +): + with pytest.raises(expected_failure, match=expected_failure_message): + GraphDatabase.driver("bolt://127.0.0.1:9001", **test_config) + + +@pytest.mark.parametrize("uri", ( + "bolt://127.0.0.1:9000", + "neo4j://127.0.0.1:9000", +)) +def test_driver_opens_write_session_by_default(uri, mocker): + driver = GraphDatabase.driver(uri) + from neo4j.work.transaction import Transaction + with driver.session() as session: + acquire_mock = mocker.patch.object(session._pool, "acquire", + autospec=True) + tx_begin_mock = mocker.patch.object(Transaction, "_begin", + autospec=True) + tx = session.begin_transaction() + acquire_mock.assert_called_once_with( + access_mode=WRITE_ACCESS, + timeout=mocker.ANY, + database=mocker.ANY, + bookmarks=mocker.ANY + ) + tx_begin_mock.assert_called_once_with( + tx, + mocker.ANY, + mocker.ANY, + WRITE_ACCESS, + mocker.ANY, + mocker.ANY + ) diff --git a/tests/unit/work/__init__.py b/tests/unit/work/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/work/test_result.py b/tests/unit/work/test_result.py new file mode 100644 index 00000000..3136f50f --- /dev/null +++ b/tests/unit/work/test_result.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- + +# Copyright (c) "Neo4j" +# Neo4j Sweden AB [http://neo4j.com] +# +# This file is part of Neo4j. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import pytest + +from neo4j import Record +from neo4j.data import DataHydrator +from neo4j.work.result import Result + + +class Records: + def __init__(self, fields, records): + assert all(len(fields) == len(r) for r in records) + self.fields = fields + # self.records = [{"record_values": r} for r in records] + self.records = records + + def __len__(self): + return self.records.__len__() + + def __iter__(self): + return self.records.__iter__() + + def __getitem__(self, item): + return self.records.__getitem__(item) + + +class ConnectionStub: + class Message: + def __init__(self, message, *args, **kwargs): + self.message = message + self.args = args + self.kwargs = kwargs + + def _cb(self, cb_name, *args, **kwargs): + # print(self.message, cb_name.upper(), args, kwargs) + cb = self.kwargs.get(cb_name) + if callable(self.kwargs.get(cb_name)): + cb(*args, **kwargs) + + def on_success(self, metadata): + self._cb("on_success", metadata) + + def on_summary(self): + self._cb("on_summary") + + def on_records(self, records): + self._cb("on_records", records) + + def __eq__(self, other): + return self.message == other + + def __repr__(self): + return "Message(%s)" % self.message + + def __init__(self, records=None): + self._records = records + self.fetch_idx = 0 + self.record_idx = 0 + self.to_pull = None + self.queued = [] + self.sent = [] + + def send_all(self): + self.sent += self.queued + self.queued = [] + + def fetch_message(self): + if self.fetch_idx >= len(self.sent): + pytest.fail("Waits for reply to never sent message") + msg = self.sent[self.fetch_idx] + if msg == "RUN": + self.fetch_idx += 1 + msg.on_success({"fields": self._records.fields}) + elif msg == "DISCARD": + self.fetch_idx += 1 + self.record_idx = len(self._records) + msg.on_success() + elif msg == "PULL": + if self.to_pull is None: + n = msg.kwargs.get("n", -1) + if n < 0: + n = len(self._records) + self.to_pull = min(n, len(self._records) - self.record_idx) + # if to == len(self._records): + # self.fetch_idx += 1 + if self.to_pull > 0: + record = self._records[self.record_idx] + self.record_idx += 1 + self.to_pull -= 1 + msg.on_records([record]) + elif self.to_pull == 0: + self.to_pull = None + self.fetch_idx += 1 + if self.record_idx < len(self._records): + msg.on_success({"has_more": True}) + else: + msg.on_success({"bookmark": "foo"}) + msg.on_summary() + + def run(self, *args, **kwargs): + self.queued.append(ConnectionStub.Message("RUN", *args, **kwargs)) + + def discard(self, *args, **kwargs): + self.queued.append(ConnectionStub.Message("DISCARD", *args, **kwargs)) + + def pull(self, *args, **kwargs): + self.queued.append(ConnectionStub.Message("PULL", *args, **kwargs)) + + server_info = "ServerInfo" + + def defunct(self): + return False + + +class HydratorStub(DataHydrator): + def hydrate(self, values): + return values + + +def noop(*_, **__): + pass + + +def test_result_iteration(): + records = [[1], [2], [3], [4], [5]] + connection = ConnectionStub(records=Records("x", records)) + result = Result(connection, HydratorStub(), 2, noop, noop) + result._run("CYPHER", {}, None, "r", None) + received = [] + for i, record in enumerate(result): + assert isinstance(record, Record) + received.append([record.data().get("x", None)]) + assert received == records + + +def test_result_next(): + records = [[1], [2], [3], [4], [5]] + connection = ConnectionStub(records=Records("x", records)) + result = Result(connection, HydratorStub(), 2, noop, noop) + result._run("CYPHER", {}, None, "r", None) + iter_ = iter(result) + received = [] + for _ in range(len(records)): + received.append([next(iter_).get("x", None)]) + with pytest.raises(StopIteration): + received.append([next(iter_).get("x", None)]) + assert received == records + + +@pytest.mark.parametrize("records", ([[1], [2]], [[1]], [])) +@pytest.mark.parametrize("fetch_size", (1, 2)) +def test_result_peek(records, fetch_size): + connection = ConnectionStub(records=Records("x", records)) + result = Result(connection, HydratorStub(), fetch_size, noop, noop) + result._run("CYPHER", {}, None, "r", None) + record = result.peek() + if not records: + assert record is None + else: + assert isinstance(record, Record) + assert record.get("x") == records[0][0] + + +@pytest.mark.parametrize("records", ([[1], [2]], [[1]], [])) +@pytest.mark.parametrize("fetch_size", (1, 2)) +def test_result_single(records, fetch_size): + connection = ConnectionStub(records=Records("x", records)) + result = Result(connection, HydratorStub(), fetch_size, noop, noop) + result._run("CYPHER", {}, None, "r", None) + with pytest.warns(None) as warning_record: + record = result.single() + if not records: + assert not warning_record + assert record is None + else: + if len(records) > 1: + assert len(warning_record) == 1 + else: + assert not warning_record + assert isinstance(record, Record) + assert record.get("x") == records[0][0] diff --git a/tests/unit/work/test_session.py b/tests/unit/work/test_session.py new file mode 100644 index 00000000..ff6c0ffa --- /dev/null +++ b/tests/unit/work/test_session.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- + +# Copyright (c) "Neo4j" +# Neo4j Sweden AB [http://neo4j.com] +# +# This file is part of Neo4j. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import inspect +import pytest +from unittest.mock import NonCallableMagicMock + +from neo4j import ( + Session, + SessionConfig, ServerInfo, +) + + +class FakeConnection(NonCallableMagicMock): + callbacks = [] + server_info = ServerInfo("127.0.0.1", (4, 3)) + + def fetch_message(self, *args, **kwargs): + if self.callbacks: + cb = self.callbacks.pop(0) + cb() + return super().__getattr__("fetch_message")(*args, **kwargs) + + def fetch_all(self, *args, **kwargs): + while self.callbacks: + cb = self.callbacks.pop(0) + cb() + return super().__getattr__("fetch_all")(*args, **kwargs) + + def __getattr__(self, name): + parent = super() + + def build_message_handler(name): + def func(*args, **kwargs): + def callback(): + for cb_name, param_count in ( + ("on_success", 1), + ("on_summary", 0) + ): + cb = kwargs.get(cb_name, None) + if callable(cb): + try: + param_count = \ + len(inspect.signature(cb).parameters) + except ValueError: + # e.g. built-in method as cb + pass + if param_count == 1: + cb({}) + else: + cb() + self.callbacks.append(callback) + return parent.__getattr__(name)(*args, **kwargs) + + return func + + if name in ("run", "commit", "pull", "rollback", "discard"): + return build_message_handler(name) + return parent.__getattr__(name) + + def defunct(self): + return False + + +@pytest.fixture() +def pool(mocker): + pool = mocker.MagicMock() + pool.acquire = mocker.MagicMock(side_effect=iter(FakeConnection, 0)) + return pool + + +def test_session_context_calls_close(mocker): + s = Session(None, SessionConfig()) + mock_close = mocker.patch.object(s, 'close', autospec=True) + with s: + pass + mock_close.assert_called_once_with() + + +@pytest.mark.parametrize("test_run_args", ( + ("RETURN $x", {"x": 1}), ("RETURN 1",) +)) +@pytest.mark.parametrize(("repetitions", "consume"), ( + (1, False), (2, False), (2, True) +)) +def test_opens_connection_on_run(pool, test_run_args, repetitions, consume): + with Session(pool, SessionConfig()) as session: + assert session._connection is None + result = session.run(*test_run_args) + assert session._connection is not None + if consume: + result.consume() + + +@pytest.mark.parametrize("test_run_args", ( + ("RETURN $x", {"x": 1}), ("RETURN 1",) +)) +@pytest.mark.parametrize("repetitions", range(1, 3)) +def test_closes_connection_after_consume(pool, test_run_args, repetitions): + with Session(pool, SessionConfig()) as session: + result = session.run(*test_run_args) + result.consume() + assert session._connection is None + assert session._connection is None + + +@pytest.mark.parametrize("test_run_args", ( + ("RETURN $x", {"x": 1}), ("RETURN 1",) +)) +def test_keeps_connection_until_last_result_consumed(pool, test_run_args): + with Session(pool, SessionConfig()) as session: + result1 = session.run(*test_run_args) + result2 = session.run(*test_run_args) + assert session._connection is not None + result1.consume() + assert session._connection is not None + result2.consume() + assert session._connection is None + + +def test_opens_connection_on_tx_begin(pool): + with Session(pool, SessionConfig()) as session: + assert session._connection is None + with session.begin_transaction() as _: + assert session._connection is not None + + +@pytest.mark.parametrize("test_run_args", ( + ("RETURN $x", {"x": 1}), ("RETURN 1",) +)) +@pytest.mark.parametrize("repetitions", range(1, 3)) +def test_keeps_connection_on_tx_run(pool, test_run_args, repetitions): + with Session(pool, SessionConfig()) as session: + with session.begin_transaction() as tx: + for _ in range(repetitions): + tx.run(*test_run_args) + assert session._connection is not None + + +@pytest.mark.parametrize("test_run_args", ( + ("RETURN $x", {"x": 1}), ("RETURN 1",) +)) +@pytest.mark.parametrize("repetitions", range(1, 3)) +def test_keeps_connection_on_tx_consume(pool, test_run_args, repetitions): + with Session(pool, SessionConfig()) as session: + with session.begin_transaction() as tx: + for _ in range(repetitions): + result = tx.run(*test_run_args) + result.consume() + assert session._connection is not None + + +@pytest.mark.parametrize("test_run_args", ( + ("RETURN $x", {"x": 1}), ("RETURN 1",) +)) +def test_closes_connection_after_tx_close(pool, test_run_args): + with Session(pool, SessionConfig()) as session: + with session.begin_transaction() as tx: + for _ in range(2): + result = tx.run(*test_run_args) + result.consume() + tx.close() + assert session._connection is None + assert session._connection is None + + +@pytest.mark.parametrize("test_run_args", ( + ("RETURN $x", {"x": 1}), ("RETURN 1",) +)) +def test_closes_connection_after_tx_commit(pool, test_run_args): + with Session(pool, SessionConfig()) as session: + with session.begin_transaction() as tx: + for _ in range(2): + result = tx.run(*test_run_args) + result.consume() + tx.commit() + assert session._connection is None + assert session._connection is None