diff --git a/neo4j/_async/io/_bolt.py b/neo4j/_async/io/_bolt.py index 2ea01062..c566c62b 100644 --- a/neo4j/_async/io/_bolt.py +++ b/neo4j/_async/io/_bolt.py @@ -182,20 +182,21 @@ def protocol_handlers(cls, protocol_version=None): # Carry out Bolt subclass imports locally to avoid circular dependency issues. from ._bolt3 import AsyncBolt3 from ._bolt4 import ( - AsyncBolt4x0, AsyncBolt4x1, AsyncBolt4x2, AsyncBolt4x3, AsyncBolt4x4, ) + from ._bolt5 import AsyncBolt5x0 handlers = { AsyncBolt3.PROTOCOL_VERSION: AsyncBolt3, - AsyncBolt4x0.PROTOCOL_VERSION: AsyncBolt4x0, + # 4.0 unsupported because no space left in the handshake AsyncBolt4x1.PROTOCOL_VERSION: AsyncBolt4x1, AsyncBolt4x2.PROTOCOL_VERSION: AsyncBolt4x2, AsyncBolt4x3.PROTOCOL_VERSION: AsyncBolt4x3, AsyncBolt4x4.PROTOCOL_VERSION: AsyncBolt4x4, + AsyncBolt5x0.PROTOCOL_VERSION: AsyncBolt5x0, } if protocol_version is None: @@ -215,9 +216,9 @@ def version_list(cls, versions, limit=4): preference. The number of protocol versions (or ranges) returned is limited to four. """ - # In fact, 4.3 is the fist version to support ranges. However, the range - # support got backported to 4.2. But even if the server is too old to - # have the backport, negotiating BOLT 4.1 is no problem as it's + # In fact, 4.3 is the fist version to support ranges. However, the + # range support got backported to 4.2. But even if the server is too + # old to have the backport, negotiating BOLT 4.1 is no problem as it's # equivalent to 4.2 first_with_range_support = Version(4, 2) result = [] @@ -313,9 +314,12 @@ def time_remaining(): if pool_config.protocol_version == (3, 0): from ._bolt3 import AsyncBolt3 bolt_cls = AsyncBolt3 - elif pool_config.protocol_version == (4, 0): - from ._bolt4 import AsyncBolt4x0 - bolt_cls = AsyncBolt4x0 + # Implementation for 4.0 exists, but there was no space left in the + # handshake to offer this version to the server. Hence, the server + # should never request us to speak bolt 4.0. + # elif pool_config.protocol_version == (4, 0): + # from ._bolt4 import AsyncBolt4x0 + # bolt_cls = AsyncBolt4x0 elif pool_config.protocol_version == (4, 1): from ._bolt4 import AsyncBolt4x1 bolt_cls = AsyncBolt4x1 @@ -328,6 +332,9 @@ def time_remaining(): elif pool_config.protocol_version == (4, 4): from ._bolt4 import AsyncBolt4x4 bolt_cls = AsyncBolt4x4 + elif pool_config.protocol_version == (5, 0): + from ._bolt5 import AsyncBolt5x0 + bolt_cls = AsyncBolt5x0 else: log.debug("[#%04X] S: ", s.getsockname()[1]) AsyncBoltSocket.close_socket(s) @@ -335,8 +342,8 @@ def time_remaining(): supported_versions = cls.protocol_handlers().keys() raise BoltHandshakeError( "The Neo4J server does not support communication with this " - "driver. This driver have support for Bolt Protocols {}" - "".format(supported_versions), + "driver. This driver has support for Bolt protocols " + "{}".format(tuple(map(str, supported_versions))), address=address, request_data=handshake, response_data=data ) @@ -670,13 +677,11 @@ async def close_non_blocking(self): self.socket.settimeout(0) await self.close() - @abc.abstractmethod def closed(self): - pass + return self._closed - @abc.abstractmethod def defunct(self): - pass + return self._defunct def is_idle_for(self, timeout): """Check if connection has been idle for at least the given timeout. diff --git a/neo4j/_async/io/_bolt3.py b/neo4j/_async/io/_bolt3.py index 77d12375..ed8f1f95 100644 --- a/neo4j/_async/io/_bolt3.py +++ b/neo4j/_async/io/_bolt3.py @@ -363,9 +363,3 @@ async def _process_message(self, details, summary_signature, raise BoltProtocolError("Unexpected response message with signature %02X" % summary_signature, address=self.unresolved_address) return len(details), 1 - - def closed(self): - return self._closed - - def defunct(self): - return self._defunct diff --git a/neo4j/_async/io/_bolt4.py b/neo4j/_async/io/_bolt4.py index 81fc765f..c719bc09 100644 --- a/neo4j/_async/io/_bolt4.py +++ b/neo4j/_async/io/_bolt4.py @@ -53,7 +53,7 @@ class AsyncBolt4x0(AsyncBolt): """ Protocol handler for Bolt 4.0. - This is supported by Neo4j versions 4.0, 4.1 and 4.2. + This is supported by Neo4j versions 4.0-4.4. """ PROTOCOL_VERSION = Version(4, 0) @@ -312,12 +312,6 @@ async def _process_message(self, details, summary_signature, return len(details), 1 - def closed(self): - return self._closed - - def defunct(self): - return self._defunct - class AsyncBolt4x1(AsyncBolt4x0): """ Protocol handler for Bolt 4.1. diff --git a/neo4j/_async/io/_bolt5.py b/neo4j/_async/io/_bolt5.py new file mode 100644 index 00000000..001ce7be --- /dev/null +++ b/neo4j/_async/io/_bolt5.py @@ -0,0 +1,317 @@ +# Copyright (c) "Neo4j" +# Neo4j Sweden AB [http://neo4j.com] +# +# This file is part of Neo4j. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from logging import getLogger +from ssl import SSLSocket + +from ..._async_compat.util import AsyncUtil +from ..._exceptions import ( + BoltError, + BoltProtocolError, +) +from ...api import ( + READ_ACCESS, + Version, +) +from ...exceptions import ( + DatabaseUnavailable, + DriverError, + ForbiddenOnReadOnlyDatabase, + Neo4jError, + NotALeader, + ServiceUnavailable, +) +from ._bolt3 import ( + ServerStateManager, + ServerStates, +) +from ._bolt import AsyncBolt +from ._common import ( + check_supported_server_product, + CommitResponse, + InitResponse, + Response, +) + + +log = getLogger("neo4j") + + +class AsyncBolt5x0(AsyncBolt): + """Protocol handler for Bolt 5.0. """ + + PROTOCOL_VERSION = Version(5, 0) + + supports_multiple_results = True + + supports_multiple_databases = True + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._server_state_manager = ServerStateManager( + ServerStates.CONNECTED, on_change=self._on_server_state_change + ) + + def _on_server_state_change(self, old_state, new_state): + log.debug("[#%04X] State: %s > %s", self.local_port, + old_state.name, new_state.name) + + @property + def is_reset(self): + # We can't be sure of the server's state if there are still pending + # responses. Unless the last message we sent was RESET. In that case + # the server state will always be READY when we're done. + if (self.responses and self.responses[-1] + and self.responses[-1].message == "reset"): + return True + return self._server_state_manager.state == ServerStates.READY + + @property + def encrypted(self): + return isinstance(self.socket, SSLSocket) + + @property + def der_encoded_server_certificate(self): + return self.socket.getpeercert(binary_form=True) + + @property + def local_port(self): + try: + return self.socket.getsockname()[1] + except OSError: + return 0 + + def get_base_headers(self): + headers = {"user_agent": self.user_agent} + if self.routing_context is not None: + headers["routing"] = self.routing_context + return headers + + async def hello(self): + def on_success(metadata): + self.configuration_hints.update(metadata.pop("hints", {})) + self.server_info.update(metadata) + if "connection.recv_timeout_seconds" in self.configuration_hints: + recv_timeout = self.configuration_hints[ + "connection.recv_timeout_seconds" + ] + if isinstance(recv_timeout, int) and recv_timeout > 0: + self.socket.settimeout(recv_timeout) + else: + log.info("[#%04X] Server supplied an invalid value for " + "connection.recv_timeout_seconds (%r). Make sure " + "the server and network is set up correctly.", + self.local_port, recv_timeout) + + headers = self.get_base_headers() + headers.update(self.auth_dict) + logged_headers = dict(headers) + if "credentials" in logged_headers: + logged_headers["credentials"] = "*******" + log.debug("[#%04X] C: HELLO %r", self.local_port, logged_headers) + self._append(b"\x01", (headers,), + response=InitResponse(self, "hello", + on_success=on_success)) + await self.send_all() + await self.fetch_all() + check_supported_server_product(self.server_info.agent) + + async def route(self, database=None, imp_user=None, bookmarks=None): + routing_context = self.routing_context or {} + db_context = {} + if database is not None: + db_context.update(db=database) + if imp_user is not None: + db_context.update(imp_user=imp_user) + log.debug("[#%04X] C: ROUTE %r %r %r", self.local_port, + routing_context, bookmarks, db_context) + metadata = {} + if bookmarks is None: + bookmarks = [] + else: + bookmarks = list(bookmarks) + self._append(b"\x66", (routing_context, bookmarks, db_context), + response=Response(self, "route", + on_success=metadata.update)) + await self.send_all() + await self.fetch_all() + return [metadata.get("rt")] + + def run(self, query, parameters=None, mode=None, bookmarks=None, + metadata=None, timeout=None, db=None, imp_user=None, **handlers): + if not parameters: + parameters = {} + extra = {} + if mode in (READ_ACCESS, "r"): + # It will default to mode "w" if nothing is specified + extra["mode"] = "r" + if db: + extra["db"] = db + if imp_user: + extra["imp_user"] = imp_user + if bookmarks: + try: + extra["bookmarks"] = list(bookmarks) + except TypeError: + raise TypeError("Bookmarks must be provided as iterable") + if metadata: + try: + extra["tx_metadata"] = dict(metadata) + except TypeError: + raise TypeError("Metadata must be coercible to a dict") + if timeout is not None: + try: + extra["tx_timeout"] = int(1000 * float(timeout)) + except TypeError: + raise TypeError("Timeout must be a number (in seconds)") + if extra["tx_timeout"] < 0: + raise ValueError("Timeout must be a number <= 0") + fields = (query, parameters, extra) + log.debug("[#%04X] C: RUN %s", self.local_port, + " ".join(map(repr, fields))) + if query.upper() == u"COMMIT": + self._append(b"\x10", fields, CommitResponse(self, "run", + **handlers)) + else: + self._append(b"\x10", fields, Response(self, "run", **handlers)) + + def discard(self, n=-1, qid=-1, **handlers): + extra = {"n": n} + if qid != -1: + extra["qid"] = qid + log.debug("[#%04X] C: DISCARD %r", self.local_port, extra) + self._append(b"\x2F", (extra,), Response(self, "discard", **handlers)) + + def pull(self, n=-1, qid=-1, **handlers): + extra = {"n": n} + if qid != -1: + extra["qid"] = qid + log.debug("[#%04X] C: PULL %r", self.local_port, extra) + self._append(b"\x3F", (extra,), Response(self, "pull", **handlers)) + + def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, + db=None, imp_user=None, **handlers): + extra = {} + if mode in (READ_ACCESS, "r"): + # It will default to mode "w" if nothing is specified + extra["mode"] = "r" + if db: + extra["db"] = db + if imp_user: + extra["imp_user"] = imp_user + if bookmarks: + try: + extra["bookmarks"] = list(bookmarks) + except TypeError: + raise TypeError("Bookmarks must be provided as iterable") + if metadata: + try: + extra["tx_metadata"] = dict(metadata) + except TypeError: + raise TypeError("Metadata must be coercible to a dict") + if timeout is not None: + try: + extra["tx_timeout"] = int(1000 * float(timeout)) + except TypeError: + raise TypeError("Timeout must be a number (in seconds)") + if extra["tx_timeout"] < 0: + raise ValueError("Timeout must be a number <= 0") + log.debug("[#%04X] C: BEGIN %r", self.local_port, extra) + self._append(b"\x11", (extra,), Response(self, "begin", **handlers)) + + def commit(self, **handlers): + log.debug("[#%04X] C: COMMIT", self.local_port) + self._append(b"\x12", (), CommitResponse(self, "commit", **handlers)) + + def rollback(self, **handlers): + log.debug("[#%04X] C: ROLLBACK", self.local_port) + self._append(b"\x13", (), Response(self, "rollback", **handlers)) + + async def reset(self): + """Reset the connection. + + Add a RESET message to the outgoing queue, send it and consume all + remaining messages. + """ + + def fail(metadata): + raise BoltProtocolError("RESET failed %r" % metadata, + self.unresolved_address) + + log.debug("[#%04X] C: RESET", self.local_port) + self._append(b"\x0F", response=Response(self, "reset", + on_failure=fail)) + await self.send_all() + await self.fetch_all() + + def goodbye(self): + log.debug("[#%04X] C: GOODBYE", self.local_port) + self._append(b"\x02", ()) + + async def _process_message(self, details, summary_signature, + summary_metadata): + """Process at most one message from the server, if available. + + :return: 2-tuple of number of detail messages and number of summary + messages fetched + """ + if details: + # Do not log any data + log.debug("[#%04X] S: RECORD * %d", self.local_port, len(details)) + await self.responses[0].on_records(details) + + if summary_signature is None: + return len(details), 0 + + response = self.responses.popleft() + response.complete = True + if summary_signature == b"\x70": + log.debug("[#%04X] S: SUCCESS %r", self.local_port, + summary_metadata) + self._server_state_manager.transition(response.message, + summary_metadata) + await response.on_success(summary_metadata or {}) + elif summary_signature == b"\x7E": + log.debug("[#%04X] S: IGNORED", self.local_port) + await response.on_ignored(summary_metadata or {}) + elif summary_signature == b"\x7F": + log.debug("[#%04X] S: FAILURE %r", self.local_port, + summary_metadata) + self._server_state_manager.state = ServerStates.FAILED + try: + await response.on_failure(summary_metadata or {}) + except (ServiceUnavailable, DatabaseUnavailable): + if self.pool: + await self.pool.deactivate(address=self.unresolved_address) + raise + except (NotALeader, ForbiddenOnReadOnlyDatabase): + if self.pool: + self.pool.on_write_failure(address=self.unresolved_address) + raise + except Neo4jError as e: + if self.pool and e.invalidates_all_connections(): + await self.pool.mark_all_stale() + raise + else: + raise BoltProtocolError( + "Unexpected response message with signature %02X" % ord( + summary_signature + ), self.unresolved_address + ) + + return len(details), 1 diff --git a/neo4j/_sync/io/_bolt.py b/neo4j/_sync/io/_bolt.py index 671a29d8..7ebf9bd8 100644 --- a/neo4j/_sync/io/_bolt.py +++ b/neo4j/_sync/io/_bolt.py @@ -182,20 +182,21 @@ def protocol_handlers(cls, protocol_version=None): # Carry out Bolt subclass imports locally to avoid circular dependency issues. from ._bolt3 import Bolt3 from ._bolt4 import ( - Bolt4x0, Bolt4x1, Bolt4x2, Bolt4x3, Bolt4x4, ) + from ._bolt5 import Bolt5x0 handlers = { Bolt3.PROTOCOL_VERSION: Bolt3, - Bolt4x0.PROTOCOL_VERSION: Bolt4x0, + # 4.0 unsupported because no space left in the handshake Bolt4x1.PROTOCOL_VERSION: Bolt4x1, Bolt4x2.PROTOCOL_VERSION: Bolt4x2, Bolt4x3.PROTOCOL_VERSION: Bolt4x3, Bolt4x4.PROTOCOL_VERSION: Bolt4x4, + Bolt5x0.PROTOCOL_VERSION: Bolt5x0, } if protocol_version is None: @@ -215,9 +216,9 @@ def version_list(cls, versions, limit=4): preference. The number of protocol versions (or ranges) returned is limited to four. """ - # In fact, 4.3 is the fist version to support ranges. However, the range - # support got backported to 4.2. But even if the server is too old to - # have the backport, negotiating BOLT 4.1 is no problem as it's + # In fact, 4.3 is the fist version to support ranges. However, the + # range support got backported to 4.2. But even if the server is too + # old to have the backport, negotiating BOLT 4.1 is no problem as it's # equivalent to 4.2 first_with_range_support = Version(4, 2) result = [] @@ -313,9 +314,12 @@ def time_remaining(): if pool_config.protocol_version == (3, 0): from ._bolt3 import Bolt3 bolt_cls = Bolt3 - elif pool_config.protocol_version == (4, 0): - from ._bolt4 import Bolt4x0 - bolt_cls = Bolt4x0 + # Implementation for 4.0 exists, but there was no space left in the + # handshake to offer this version to the server. Hence, the server + # should never request us to speak bolt 4.0. + # elif pool_config.protocol_version == (4, 0): + # from ._bolt4 import AsyncBolt4x0 + # bolt_cls = AsyncBolt4x0 elif pool_config.protocol_version == (4, 1): from ._bolt4 import Bolt4x1 bolt_cls = Bolt4x1 @@ -328,6 +332,9 @@ def time_remaining(): elif pool_config.protocol_version == (4, 4): from ._bolt4 import Bolt4x4 bolt_cls = Bolt4x4 + elif pool_config.protocol_version == (5, 0): + from ._bolt5 import Bolt5x0 + bolt_cls = Bolt5x0 else: log.debug("[#%04X] S: ", s.getsockname()[1]) BoltSocket.close_socket(s) @@ -335,8 +342,8 @@ def time_remaining(): supported_versions = cls.protocol_handlers().keys() raise BoltHandshakeError( "The Neo4J server does not support communication with this " - "driver. This driver have support for Bolt Protocols {}" - "".format(supported_versions), + "driver. This driver has support for Bolt protocols " + "{}".format(tuple(map(str, supported_versions))), address=address, request_data=handshake, response_data=data ) @@ -670,13 +677,11 @@ def close_non_blocking(self): self.socket.settimeout(0) self.close() - @abc.abstractmethod def closed(self): - pass + return self._closed - @abc.abstractmethod def defunct(self): - pass + return self._defunct def is_idle_for(self, timeout): """Check if connection has been idle for at least the given timeout. diff --git a/neo4j/_sync/io/_bolt3.py b/neo4j/_sync/io/_bolt3.py index 333bef79..034aa2d0 100644 --- a/neo4j/_sync/io/_bolt3.py +++ b/neo4j/_sync/io/_bolt3.py @@ -363,9 +363,3 @@ def _process_message(self, details, summary_signature, raise BoltProtocolError("Unexpected response message with signature %02X" % summary_signature, address=self.unresolved_address) return len(details), 1 - - def closed(self): - return self._closed - - def defunct(self): - return self._defunct diff --git a/neo4j/_sync/io/_bolt4.py b/neo4j/_sync/io/_bolt4.py index e5e3fe05..ccd6d977 100644 --- a/neo4j/_sync/io/_bolt4.py +++ b/neo4j/_sync/io/_bolt4.py @@ -53,7 +53,7 @@ class Bolt4x0(Bolt): """ Protocol handler for Bolt 4.0. - This is supported by Neo4j versions 4.0, 4.1 and 4.2. + This is supported by Neo4j versions 4.0-4.4. """ PROTOCOL_VERSION = Version(4, 0) @@ -312,12 +312,6 @@ def _process_message(self, details, summary_signature, return len(details), 1 - def closed(self): - return self._closed - - def defunct(self): - return self._defunct - class Bolt4x1(Bolt4x0): """ Protocol handler for Bolt 4.1. diff --git a/neo4j/_sync/io/_bolt5.py b/neo4j/_sync/io/_bolt5.py new file mode 100644 index 00000000..d1c51703 --- /dev/null +++ b/neo4j/_sync/io/_bolt5.py @@ -0,0 +1,317 @@ +# Copyright (c) "Neo4j" +# Neo4j Sweden AB [http://neo4j.com] +# +# This file is part of Neo4j. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from logging import getLogger +from ssl import SSLSocket + +from ..._async_compat.util import Util +from ..._exceptions import ( + BoltError, + BoltProtocolError, +) +from ...api import ( + READ_ACCESS, + Version, +) +from ...exceptions import ( + DatabaseUnavailable, + DriverError, + ForbiddenOnReadOnlyDatabase, + Neo4jError, + NotALeader, + ServiceUnavailable, +) +from ._bolt3 import ( + ServerStateManager, + ServerStates, +) +from ._bolt import Bolt +from ._common import ( + check_supported_server_product, + CommitResponse, + InitResponse, + Response, +) + + +log = getLogger("neo4j") + + +class Bolt5x0(Bolt): + """Protocol handler for Bolt 5.0. """ + + PROTOCOL_VERSION = Version(5, 0) + + supports_multiple_results = True + + supports_multiple_databases = True + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._server_state_manager = ServerStateManager( + ServerStates.CONNECTED, on_change=self._on_server_state_change + ) + + def _on_server_state_change(self, old_state, new_state): + log.debug("[#%04X] State: %s > %s", self.local_port, + old_state.name, new_state.name) + + @property + def is_reset(self): + # We can't be sure of the server's state if there are still pending + # responses. Unless the last message we sent was RESET. In that case + # the server state will always be READY when we're done. + if (self.responses and self.responses[-1] + and self.responses[-1].message == "reset"): + return True + return self._server_state_manager.state == ServerStates.READY + + @property + def encrypted(self): + return isinstance(self.socket, SSLSocket) + + @property + def der_encoded_server_certificate(self): + return self.socket.getpeercert(binary_form=True) + + @property + def local_port(self): + try: + return self.socket.getsockname()[1] + except OSError: + return 0 + + def get_base_headers(self): + headers = {"user_agent": self.user_agent} + if self.routing_context is not None: + headers["routing"] = self.routing_context + return headers + + def hello(self): + def on_success(metadata): + self.configuration_hints.update(metadata.pop("hints", {})) + self.server_info.update(metadata) + if "connection.recv_timeout_seconds" in self.configuration_hints: + recv_timeout = self.configuration_hints[ + "connection.recv_timeout_seconds" + ] + if isinstance(recv_timeout, int) and recv_timeout > 0: + self.socket.settimeout(recv_timeout) + else: + log.info("[#%04X] Server supplied an invalid value for " + "connection.recv_timeout_seconds (%r). Make sure " + "the server and network is set up correctly.", + self.local_port, recv_timeout) + + headers = self.get_base_headers() + headers.update(self.auth_dict) + logged_headers = dict(headers) + if "credentials" in logged_headers: + logged_headers["credentials"] = "*******" + log.debug("[#%04X] C: HELLO %r", self.local_port, logged_headers) + self._append(b"\x01", (headers,), + response=InitResponse(self, "hello", + on_success=on_success)) + self.send_all() + self.fetch_all() + check_supported_server_product(self.server_info.agent) + + def route(self, database=None, imp_user=None, bookmarks=None): + routing_context = self.routing_context or {} + db_context = {} + if database is not None: + db_context.update(db=database) + if imp_user is not None: + db_context.update(imp_user=imp_user) + log.debug("[#%04X] C: ROUTE %r %r %r", self.local_port, + routing_context, bookmarks, db_context) + metadata = {} + if bookmarks is None: + bookmarks = [] + else: + bookmarks = list(bookmarks) + self._append(b"\x66", (routing_context, bookmarks, db_context), + response=Response(self, "route", + on_success=metadata.update)) + self.send_all() + self.fetch_all() + return [metadata.get("rt")] + + def run(self, query, parameters=None, mode=None, bookmarks=None, + metadata=None, timeout=None, db=None, imp_user=None, **handlers): + if not parameters: + parameters = {} + extra = {} + if mode in (READ_ACCESS, "r"): + # It will default to mode "w" if nothing is specified + extra["mode"] = "r" + if db: + extra["db"] = db + if imp_user: + extra["imp_user"] = imp_user + if bookmarks: + try: + extra["bookmarks"] = list(bookmarks) + except TypeError: + raise TypeError("Bookmarks must be provided as iterable") + if metadata: + try: + extra["tx_metadata"] = dict(metadata) + except TypeError: + raise TypeError("Metadata must be coercible to a dict") + if timeout is not None: + try: + extra["tx_timeout"] = int(1000 * float(timeout)) + except TypeError: + raise TypeError("Timeout must be a number (in seconds)") + if extra["tx_timeout"] < 0: + raise ValueError("Timeout must be a number <= 0") + fields = (query, parameters, extra) + log.debug("[#%04X] C: RUN %s", self.local_port, + " ".join(map(repr, fields))) + if query.upper() == u"COMMIT": + self._append(b"\x10", fields, CommitResponse(self, "run", + **handlers)) + else: + self._append(b"\x10", fields, Response(self, "run", **handlers)) + + def discard(self, n=-1, qid=-1, **handlers): + extra = {"n": n} + if qid != -1: + extra["qid"] = qid + log.debug("[#%04X] C: DISCARD %r", self.local_port, extra) + self._append(b"\x2F", (extra,), Response(self, "discard", **handlers)) + + def pull(self, n=-1, qid=-1, **handlers): + extra = {"n": n} + if qid != -1: + extra["qid"] = qid + log.debug("[#%04X] C: PULL %r", self.local_port, extra) + self._append(b"\x3F", (extra,), Response(self, "pull", **handlers)) + + def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, + db=None, imp_user=None, **handlers): + extra = {} + if mode in (READ_ACCESS, "r"): + # It will default to mode "w" if nothing is specified + extra["mode"] = "r" + if db: + extra["db"] = db + if imp_user: + extra["imp_user"] = imp_user + if bookmarks: + try: + extra["bookmarks"] = list(bookmarks) + except TypeError: + raise TypeError("Bookmarks must be provided as iterable") + if metadata: + try: + extra["tx_metadata"] = dict(metadata) + except TypeError: + raise TypeError("Metadata must be coercible to a dict") + if timeout is not None: + try: + extra["tx_timeout"] = int(1000 * float(timeout)) + except TypeError: + raise TypeError("Timeout must be a number (in seconds)") + if extra["tx_timeout"] < 0: + raise ValueError("Timeout must be a number <= 0") + log.debug("[#%04X] C: BEGIN %r", self.local_port, extra) + self._append(b"\x11", (extra,), Response(self, "begin", **handlers)) + + def commit(self, **handlers): + log.debug("[#%04X] C: COMMIT", self.local_port) + self._append(b"\x12", (), CommitResponse(self, "commit", **handlers)) + + def rollback(self, **handlers): + log.debug("[#%04X] C: ROLLBACK", self.local_port) + self._append(b"\x13", (), Response(self, "rollback", **handlers)) + + def reset(self): + """Reset the connection. + + Add a RESET message to the outgoing queue, send it and consume all + remaining messages. + """ + + def fail(metadata): + raise BoltProtocolError("RESET failed %r" % metadata, + self.unresolved_address) + + log.debug("[#%04X] C: RESET", self.local_port) + self._append(b"\x0F", response=Response(self, "reset", + on_failure=fail)) + self.send_all() + self.fetch_all() + + def goodbye(self): + log.debug("[#%04X] C: GOODBYE", self.local_port) + self._append(b"\x02", ()) + + def _process_message(self, details, summary_signature, + summary_metadata): + """Process at most one message from the server, if available. + + :return: 2-tuple of number of detail messages and number of summary + messages fetched + """ + if details: + # Do not log any data + log.debug("[#%04X] S: RECORD * %d", self.local_port, len(details)) + self.responses[0].on_records(details) + + if summary_signature is None: + return len(details), 0 + + response = self.responses.popleft() + response.complete = True + if summary_signature == b"\x70": + log.debug("[#%04X] S: SUCCESS %r", self.local_port, + summary_metadata) + self._server_state_manager.transition(response.message, + summary_metadata) + response.on_success(summary_metadata or {}) + elif summary_signature == b"\x7E": + log.debug("[#%04X] S: IGNORED", self.local_port) + response.on_ignored(summary_metadata or {}) + elif summary_signature == b"\x7F": + log.debug("[#%04X] S: FAILURE %r", self.local_port, + summary_metadata) + self._server_state_manager.state = ServerStates.FAILED + try: + response.on_failure(summary_metadata or {}) + except (ServiceUnavailable, DatabaseUnavailable): + if self.pool: + self.pool.deactivate(address=self.unresolved_address) + raise + except (NotALeader, ForbiddenOnReadOnlyDatabase): + if self.pool: + self.pool.on_write_failure(address=self.unresolved_address) + raise + except Neo4jError as e: + if self.pool and e.invalidates_all_connections(): + self.pool.mark_all_stale() + raise + else: + raise BoltProtocolError( + "Unexpected response message with signature %02X" % ord( + summary_signature + ), self.unresolved_address + ) + + return len(details), 1 diff --git a/neo4j/graph/__init__.py b/neo4j/graph/__init__.py index bc1e8a52..cc0d97ed 100644 --- a/neo4j/graph/__init__.py +++ b/neo4j/graph/__init__.py @@ -31,6 +31,8 @@ from collections.abc import Mapping +from ..meta import deprecated + class Graph: """ Local, self-contained graph object that acts as a container for @@ -71,35 +73,61 @@ class Hydrator: def __init__(self, graph): self.graph = graph - def hydrate_node(self, n_id, n_labels=None, properties=None): + def hydrate_node(self, id_, labels=None, + properties=None, element_id=None): assert isinstance(self.graph, Graph) + # backwards compatibility with Neo4j < 5.0 + if element_id is None: + element_id = str(id_) + try: - inst = self.graph._nodes[n_id] + inst = self.graph._nodes[element_id] except KeyError: - inst = self.graph._nodes[n_id] = Node(self.graph, n_id, n_labels, properties) + inst = self.graph._nodes[element_id] = Node( + self.graph, element_id, id_, labels, properties + ) else: # If we have already hydrated this node as the endpoint of # a relationship, it won't have any labels or properties. # Therefore, we need to add the ones we have here. - if n_labels: - inst._labels = inst._labels.union(n_labels) # frozen_set + if labels: + inst._labels = inst._labels.union(labels) # frozen_set if properties: inst._properties.update(properties) return inst - def hydrate_relationship(self, r_id, n0_id, n1_id, r_type, properties=None): - inst = self.hydrate_unbound_relationship(r_id, r_type, properties) - inst._start_node = self.hydrate_node(n0_id) - inst._end_node = self.hydrate_node(n1_id) + def hydrate_relationship(self, id_, n0_id, n1_id, type_, + properties=None, element_id=None, + n0_element_id=None, n1_element_id=None): + # backwards compatibility with Neo4j < 5.0 + if element_id is None: + element_id = str(id_) + if n0_element_id is None: + n0_element_id = str(n0_id) + if n1_element_id is None: + n1_element_id = str(n1_id) + + inst = self.hydrate_unbound_relationship(id_, type_, properties, + element_id) + inst._start_node = self.hydrate_node(n0_id, + element_id=n0_element_id) + inst._end_node = self.hydrate_node(n1_id, element_id=n1_element_id) return inst - def hydrate_unbound_relationship(self, r_id, r_type, properties=None): + def hydrate_unbound_relationship(self, id_, type_, properties=None, + element_id=None): assert isinstance(self.graph, Graph) + # backwards compatibility with Neo4j < 5.0 + if element_id is None: + element_id = str(id_) + try: - inst = self.graph._relationships[r_id] + inst = self.graph._relationships[element_id] except KeyError: - r = self.graph.relationship_type(r_type) - inst = self.graph._relationships[r_id] = r(self.graph, r_id, properties) + r = self.graph.relationship_type(type_) + inst = self.graph._relationships[element_id] = r( + self.graph, element_id, id_, properties + ) return inst def hydrate_path(self, nodes, relationships, sequence): @@ -131,14 +159,19 @@ class Entity(Mapping): functionality. """ - def __init__(self, graph, id, properties): + def __init__(self, graph, element_id, id_, properties): self._graph = graph - self._id = id - self._properties = dict((k, v) for k, v in (properties or {}).items() if v is not None) + self._element_id = element_id + self._id = id_ + self._properties = { + k: v for k, v in (properties or {}).items() if v is not None + } def __eq__(self, other): try: - return type(self) == type(other) and self.graph == other.graph and self.id == other.id + return (type(self) == type(other) + and self.graph == other.graph + and self.element_id == other.element_id) except AttributeError: return False @@ -146,7 +179,7 @@ def __ne__(self, other): return not self.__eq__(other) def __hash__(self): - return hash(self.id) + return hash(self._element_id) def __len__(self): return len(self._properties) @@ -167,11 +200,30 @@ def graph(self): return self._graph @property + @deprecated("`id` is deprecated, use `element_id` instead") def id(self): - """ The identity of this entity in its container :class:`.Graph`. + """The legacy identity of this entity in its container :class:`.Graph`. + + Depending on the version of the server this entity was retrieved from, + this may be empty (None). + + .. deprecated:: 5.0 + Use :attr:`.element_id` instead. + + :rtype: int """ return self._id + @property + def element_id(self): + """The identity of this entity in its container :class:`.Graph`. + + .. added:: 5.0 + + :rtype: str + """ + return self._element_id + def get(self, name, default=None): """ Get a property value by name, optionally with a default. """ @@ -214,12 +266,14 @@ class Node(Entity): """ Self-contained graph node. """ - def __init__(self, graph, n_id, n_labels=None, properties=None): - Entity.__init__(self, graph, n_id, properties) + def __init__(self, graph, element_id, id_, n_labels=None, + properties=None): + Entity.__init__(self, graph, element_id, id_, properties) self._labels = frozenset(n_labels or ()) def __repr__(self): - return "" % (self._id, self._labels, self._properties) + return (f"") @property def labels(self): @@ -232,14 +286,15 @@ class Relationship(Entity): """ Self-contained graph relationship. """ - def __init__(self, graph, r_id, properties): - Entity.__init__(self, graph, r_id, properties) + def __init__(self, graph, element_id, id_, properties): + Entity.__init__(self, graph, element_id, id_, properties) self._start_node = None self._end_node = None def __repr__(self): - return "" % ( - self._id, self._start_node, self._end_node, self.type, self._properties) + return (f"") @property def nodes(self): diff --git a/testkitbackend/test_config.json b/testkitbackend/test_config.json index 3ee223ae..283f936d 100644 --- a/testkitbackend/test_config.json +++ b/testkitbackend/test_config.json @@ -2,12 +2,16 @@ "skips": { "stub.retry.test_retry_clustering.TestRetryClustering.test_retry_ForbiddenOnReadOnlyDatabase_ChangingWriter": "Test makes assumptions about how verify_connectivity is implemented", + "stub.authorization.test_authorization.TestAuthorizationV5x0.test_should_retry_on_auth_expired_on_begin_using_tx_function": + "Flaky: test requires the driver to contact servers in a specific order", "stub.authorization.test_authorization.TestAuthorizationV4x3.test_should_retry_on_auth_expired_on_begin_using_tx_function": "Flaky: test requires the driver to contact servers in a specific order", "stub.authorization.test_authorization.TestAuthorizationV3.test_should_retry_on_auth_expired_on_begin_using_tx_function": "Flaky: test requires the driver to contact servers in a specific order", "stub.authorization.test_authorization.TestAuthorizationV4x1.test_should_retry_on_auth_expired_on_begin_using_tx_function": "Flaky: test requires the driver to contact servers in a specific order", + "stub.authorization.test_authorization.TestAuthorizationV5x0.test_should_fail_on_token_expired_on_begin_using_tx_function": + "Flaky: test requires the driver to contact servers in a specific order", "stub.authorization.test_authorization.TestAuthorizationV4x3.test_should_fail_on_token_expired_on_begin_using_tx_function": "Flaky: test requires the driver to contact servers in a specific order", "stub.authorization.test_authorization.TestAuthorizationV3.test_should_fail_on_token_expired_on_begin_using_tx_function": @@ -38,6 +42,7 @@ "Feature:Bolt:4.2": true, "Feature:Bolt:4.3": true, "Feature:Bolt:4.4": true, + "Feature:Bolt:5.0": true, "Feature:Impersonation": true, "Feature:TLS:1.1": "Driver blocks TLS 1.1 for security reasons.", "Feature:TLS:1.2": true, @@ -51,6 +56,8 @@ "Optimization:PullPipelining": true, "Optimization:ResultListFetchAll": "The idiomatic way to cast to list is indistinguishable from iterating over the result.", + "Detail:NullOnMissingId": true, + "ConfHint:connection.recv_timeout_seconds": true, "Backend:RTFetch": true, diff --git a/testkitbackend/totestkit.py b/testkitbackend/totestkit.py index 2e48861e..d4f5a084 100644 --- a/testkitbackend/totestkit.py +++ b/testkitbackend/totestkit.py @@ -67,6 +67,7 @@ def to(name, val): "id": field(v.id), "labels": field(v.labels), "props": field(v._properties), + "elementId": field(v.element_id), } return {"name": "Node", "data": node} if isinstance(v, Relationship): @@ -76,6 +77,9 @@ def to(name, val): "endNodeId": field(v.end_node.id), "type": field(v.type), "props": field(v._properties), + "elementId": field(v.element_id), + "startNodeElementId": field(v.start_node.element_id), + "endNodeElementId": field(v.end_node.element_id), } return {"name": "Relationship", "data": rel} if isinstance(v, Path): diff --git a/tests/unit/async_/io/test_class_bolt.py b/tests/unit/async_/io/test_class_bolt.py index 50706d75..8fcb9158 100644 --- a/tests/unit/async_/io/test_class_bolt.py +++ b/tests/unit/async_/io/test_class_bolt.py @@ -25,38 +25,52 @@ def test_class_method_protocol_handlers(): - # python -m pytest tests/unit/io/test_class_bolt.py -s -v -k test_class_method_protocol_handlers protocol_handlers = AsyncBolt.protocol_handlers() assert len(protocol_handlers) == 6 + assert protocol_handlers.keys() == { + (3, 0), + (4, 1), (4, 2), (4, 3), (4, 4), + (5, 0), + } @pytest.mark.parametrize( "test_input, expected", [ ((0, 0), 0), - ((4, 0), 1), + ((1, 0), 0), + ((2, 0), 0), + ((3, 0), 1), + ((4, 0), 0), + ((4, 1), 1), + ((4, 2), 1), + ((4, 3), 1), + ((4, 4), 1), + ((5, 0), 1), + ((5, 1), 0), + ((6, 0), 0), ] ) -def test_class_method_protocol_handlers_with_protocol_version(test_input, expected): - # python -m pytest tests/unit/io/test_class_bolt.py -s -v -k test_class_method_protocol_handlers_with_protocol_version - protocol_handlers = AsyncBolt.protocol_handlers(protocol_version=test_input) +def test_class_method_protocol_handlers_with_protocol_version(test_input, + expected): + protocol_handlers = AsyncBolt.protocol_handlers( + protocol_version=test_input + ) assert len(protocol_handlers) == expected def test_class_method_protocol_handlers_with_invalid_protocol_version(): - # python -m pytest tests/unit/io/test_class_bolt.py -s -v -k test_class_method_protocol_handlers_with_invalid_protocol_version with pytest.raises(TypeError): AsyncBolt.protocol_handlers(protocol_version=2) def test_class_method_get_handshake(): - # python -m pytest tests/unit/io/test_class_bolt.py -s -v -k test_class_method_get_handshake handshake = AsyncBolt.get_handshake() - assert handshake == b"\x00\x02\x04\x04\x00\x00\x01\x04\x00\x00\x00\x04\x00\x00\x00\x03" + assert (b"\x00\x00\x00\x05\x00\x02\x04\x04\x00\x00\x01\x04\x00\x00\x00\x03" + == handshake) def test_magic_preamble(): - # python -m pytest tests/unit/io/test_class_bolt.py -s -v -k test_magic_preamble preamble = 0x6060B017 preamble_bytes = preamble.to_bytes(4, byteorder="big") assert AsyncBolt.MAGIC_PREAMBLE == preamble_bytes diff --git a/tests/unit/common/test_data.py b/tests/unit/common/test_data.py index 14577efd..7a799a6c 100644 --- a/tests/unit/common/test_data.py +++ b/tests/unit/common/test_data.py @@ -16,6 +16,8 @@ # limitations under the License. +import pytest + from neo4j.data import DataHydrator from neo4j.packstream import Structure @@ -23,18 +25,87 @@ # python -m pytest -s -v tests/unit/test_data.py -def test_can_hydrate_node_structure(): +def test_can_hydrate_v1_node_structure(): hydrant = DataHydrator() struct = Structure(b'N', 123, ["Person"], {"name": "Alice"}) alice, = hydrant.hydrate([struct]) - assert alice.id == 123 + with pytest.warns(DeprecationWarning, match="element_id"): + assert alice.id == 123 + # for backwards compatibility, the driver should compy the element_id + assert alice.element_id == "123" + assert alice.labels == {"Person"} + assert set(alice.keys()) == {"name"} + assert alice.get("name") == "Alice" + + +@pytest.mark.parametrize("with_id", (True, False)) +def test_can_hydrate_v2_node_structure(with_id): + hydrant = DataHydrator() + + id_ = 123 if with_id else None + + struct = Structure(b'N', id_, ["Person"], {"name": "Alice"}, "abc") + alice, = hydrant.hydrate([struct]) + + with pytest.warns(DeprecationWarning, match="element_id"): + assert alice.id == id_ + assert alice.element_id == "abc" assert alice.labels == {"Person"} assert set(alice.keys()) == {"name"} assert alice.get("name") == "Alice" +def test_can_hydrate_v1_relationship_structure(): + hydrant = DataHydrator() + + struct = Structure(b'R', 123, 456, 789, "KNOWS", {"since": 1999}) + rel, = hydrant.hydrate([struct]) + + with pytest.warns(DeprecationWarning, match="element_id"): + assert rel.id == 123 + with pytest.warns(DeprecationWarning, match="element_id"): + assert rel.start_node.id == 456 + with pytest.warns(DeprecationWarning, match="element_id"): + assert rel.end_node.id == 789 + # for backwards compatibility, the driver should compy the element_id + assert rel.element_id == "123" + assert rel.start_node.element_id == "456" + assert rel.end_node.element_id == "789" + assert rel.type == "KNOWS" + assert set(rel.keys()) == {"since"} + assert rel.get("since") == 1999 + + +@pytest.mark.parametrize("with_ids", (True, False)) +def test_can_hydrate_v2_relationship_structure(with_ids): + hydrant = DataHydrator() + + id_ = 123 if with_ids else None + start_id = 456 if with_ids else None + end_id = 789 if with_ids else None + + struct = Structure(b'R', id_, start_id, end_id, "KNOWS", {"since": 1999}, + "abc", "def", "ghi") + + rel, = hydrant.hydrate([struct]) + + with pytest.warns(DeprecationWarning, match="element_id"): + assert rel.id == id_ + with pytest.warns(DeprecationWarning, match="element_id"): + assert rel.start_node.id == start_id + with pytest.warns(DeprecationWarning, match="element_id"): + assert rel.end_node.id == end_id + # for backwards compatibility, the driver should compy the element_id + assert rel.element_id == "abc" + assert rel.start_node.element_id == "def" + assert rel.end_node.element_id == "ghi" + assert rel.type == "KNOWS" + assert set(rel.keys()) == {"since"} + assert rel.get("since") == 1999 + + def test_hydrating_unknown_structure_returns_same(): hydrant = DataHydrator() diff --git a/tests/unit/common/test_record.py b/tests/unit/common/test_record.py index 85e0bc1f..f976d78b 100644 --- a/tests/unit/common/test_record.py +++ b/tests/unit/common/test_record.py @@ -273,7 +273,7 @@ def test_record_repr(len_): {"x": {"one": 1, "two": 2}} ), ( - zip(["a"], [Node("graph", 42, "Person", {"name": "Alice"})]), + zip(["a"], [Node("graph", "42", 42, "Person", {"name": "Alice"})]), (), {"a": {"name": "Alice"}} ), diff --git a/tests/unit/common/test_types.py b/tests/unit/common/test_types.py index 979c4080..25206760 100644 --- a/tests/unit/common/test_types.py +++ b/tests/unit/common/test_types.py @@ -34,12 +34,26 @@ # Node -def test_can_create_node(): +@pytest.mark.parametrize(("id_", "element_id"), ( + (123, "123"), + (123, None), + (None, "foobar"), +)) +def test_can_create_node(id_, element_id): g = Graph() gh = Graph.Hydrator(g) - alice = gh.hydrate_node(123, {"Person"}, {"name": "Alice", "age": 33}) + + fields = [id_, {"Person"}, {"name": "Alice", "age": 33}] + if element_id is not None: + fields.append(element_id) + alice = gh.hydrate_node(*fields) assert isinstance(alice, Node) - assert alice.id == 123 + with pytest.warns(DeprecationWarning, match="element_id"): + assert alice.id == id_ + if element_id is not None: + assert alice.element_id == element_id + else: + assert alice.element_id == str(id_) assert alice.labels == {"Person"} assert set(alice.keys()) == {"name", "age"} assert set(alice.values()) == {"Alice", 33} @@ -75,54 +89,81 @@ def test_node_with_null_properties(): assert "bad" not in stuff -@pytest.mark.parametrize(("g1", "id1", "props1", "g2", "id2", "props2"), ( +@pytest.mark.parametrize(("g1", "id1", "eid1", "props1", + "g2", "id2", "eid2", "props2"), ( (*n1, *n2) for n1, n2 in product( ( - (g, id_, props) + (g, id_, element_id, props) for g in (0, 1) - for id_ in (1, 1234) + for id_, element_id in ( + (1, "1"), + (1234, "1234"), + (None, "1234"), + (None, "foobar"), + ) for props in (None, {}, {"a": 1}) ), repeat=2 ) )) -def test_node_equality(g1, id1, props1, g2, id2, props2): +def test_node_equality(g1, id1, eid1, props1, g2, id2, eid2, props2): graphs = (Graph(), Graph()) - node_1 = Node(graphs[g1], id1, props1) - node_2 = Node(graphs[g2], id2, props2) - if g1 == g2 and id1 == id2: + node_1 = Node(graphs[g1], eid1, id1, props1) + node_2 = Node(graphs[g2], eid2, id2, props2) + if g1 == g2 and eid1 == eid2: assert node_1 == node_2 else: assert node_1 != node_2 assert node_1 != "this is not a node" -def test_node_hashing(): +@pytest.mark.parametrize("legacy_id", (True, False)) +def test_node_hashing(legacy_id): g = Graph() - node_1 = Node(g, 1234) - node_2 = Node(g, 1234) - node_3 = Node(g, 5678) + node_1 = Node(g, "1234" + ("abc" if not legacy_id else ""), + 1234 if legacy_id else None) + node_2 = Node(g, "1234" + ("abc" if not legacy_id else ""), + 1234 if legacy_id else None) + node_3 = Node(g, "5678" + ("abc" if not legacy_id else ""), + 5678 if legacy_id else None) assert hash(node_1) == hash(node_2) assert hash(node_1) != hash(node_3) -def test_node_repr(): +def test_node_v1_repr(): g = Graph() gh = Graph.Hydrator(g) alice = gh.hydrate_node(1, {"Person"}, {"name": "Alice"}) - assert repr(alice) == "" + assert repr(alice) == ( + "" + ) + + +@pytest.mark.parametrize("legacy_id", (True, False)) +def test_node_v2_repr(legacy_id): + g = Graph() + gh = Graph.Hydrator(g) + id_ = 1234 if legacy_id else None + element_id = str(id_) if legacy_id else "foobar" + alice = gh.hydrate_node(id_, {"Person"}, {"name": "Alice"}, element_id) + assert repr(alice) == ( + f"" + ) # Relationship -def test_can_create_relationship(): +def test_can_create_relationship_v1(): g = Graph() gh = Graph.Hydrator(g) alice = gh.hydrate_node(1, {"Person"}, {"name": "Alice", "age": 33}) bob = gh.hydrate_node(2, {"Person"}, {"name": "Bob", "age": 44}) - alice_knows_bob = gh.hydrate_relationship(1, alice.id, bob.id, "KNOWS", {"since": 1999}) + alice_knows_bob = gh.hydrate_relationship(1, 1, 2, "KNOWS", + {"since": 1999}) assert isinstance(alice_knows_bob, Relationship) assert alice_knows_bob.start_node == alice assert alice_knows_bob.type == "KNOWS" @@ -134,28 +175,133 @@ def test_can_create_relationship(): assert alice_knows_bob.get("since") == 1999 -def test_relationship_repr(): +@pytest.mark.parametrize("legacy_id", (True, False)) +def test_can_create_relationship_v2(legacy_id): g = Graph() gh = Graph.Hydrator(g) - alice = gh.hydrate_node(1, {"Person"}, {"name": "Alice"}) - bob = gh.hydrate_node(2, {"Person"}, {"name": "Bob"}) - alice_knows_bob = gh.hydrate_relationship(1, alice.id, bob.id, "KNOWS", {"since": 1999}) - assert repr(alice_knows_bob) == ", ) type='KNOWS' properties={'since': 1999}>" + alice = gh.hydrate_node( + 1 if legacy_id else None, {"Person"}, {"name": "Alice", "age": 33}, + "1" if legacy_id else "alice" + ) + bob = gh.hydrate_node( + 2 if legacy_id else None, {"Person"}, {"name": "Bob", "age": 44}, + "2" if legacy_id else "bob" + ) + alice_knows_bob = gh.hydrate_relationship( + 1 if legacy_id else None, + 1 if legacy_id else None, 2 if legacy_id else None, + "KNOWS", {"since": 1999}, + "1" if legacy_id else "alice_knows_bob", + "1" if legacy_id else "alice", "2" if legacy_id else "bob" + ) + assert isinstance(alice_knows_bob, Relationship) + assert alice_knows_bob.start_node == alice + assert alice_knows_bob.type == "KNOWS" + assert alice_knows_bob.end_node == bob + assert dict(alice_knows_bob) == {"since": 1999} + assert set(alice_knows_bob.keys()) == {"since"} + assert set(alice_knows_bob.values()) == {1999} + assert set(alice_knows_bob.items()) == {("since", 1999)} + assert alice_knows_bob.get("since") == 1999 -# Path +def test_relationship_v1_repr(): + g = Graph() + gh = Graph.Hydrator(g) + _alice = gh.hydrate_node(1, {"Person"}, {"name": "Alice"}) + _bob = gh.hydrate_node(2, {"Person"}, {"name": "Bob"}) + alice_knows_bob = gh.hydrate_relationship(3, 1, 2, "KNOWS", + {"since": 1999}) + assert repr(alice_knows_bob) == ( + ", ) " + "type='KNOWS' properties={'since': 1999}>" + ) -def test_can_create_path(): +@pytest.mark.parametrize("legacy_id", (True, False)) +def test_relationship_v2_repr(legacy_id): + g = Graph() + gh = Graph.Hydrator(g) + alice = gh.hydrate_node( + 1 if legacy_id else None, {"Person"}, {"name": "Alice"}, + "1" if legacy_id else "alice" + ) + bob = gh.hydrate_node( + 2 if legacy_id else None, {"Person"}, {"name": "Bob"}, + "2" if legacy_id else "bob" + ) + alice_knows_bob = gh.hydrate_relationship( + 1 if legacy_id else None, + 1 if legacy_id else None, 2 if legacy_id else None, + "KNOWS", {"since": 1999}, + "1" if legacy_id else "alice_knows_bob", + "1" if legacy_id else "alice", "2" if legacy_id else "bob" + ) + expected_eid = "1" if legacy_id else "alice_knows_bob" + expected_eid_alice = "1" if legacy_id else "alice" + expected_eid_bob = "2" if legacy_id else "bob" + assert repr(alice_knows_bob) == ( + f", " + f") " + "type='KNOWS' properties={'since': 1999}>" + ) + + +# Path + +def test_can_create_path_v1(): g = Graph() gh = Graph.Hydrator(g) alice = gh.hydrate_node(1, {"Person"}, {"name": "Alice", "age": 33}) bob = gh.hydrate_node(2, {"Person"}, {"name": "Bob", "age": 44}) carol = gh.hydrate_node(3, {"Person"}, {"name": "Carol", "age": 55}) - alice_knows_bob = gh.hydrate_relationship(1, alice.id, bob.id, "KNOWS", + alice_knows_bob = gh.hydrate_relationship(1, 1, 2, "KNOWS", {"since": 1999}) - carol_dislikes_bob = gh.hydrate_relationship(2, carol.id, bob.id, - "DISLIKES", {}) + carol_dislikes_bob = gh.hydrate_relationship(2, 3, 2, "DISLIKES", {}) + path = Path(alice, alice_knows_bob, carol_dislikes_bob) + assert isinstance(path, Path) + assert path.start_node is alice + assert path.end_node is carol + assert path.nodes == (alice, bob, carol) + assert path.relationships == (alice_knows_bob, carol_dislikes_bob) + assert list(path) == [alice_knows_bob, carol_dislikes_bob] + + +@pytest.mark.parametrize("legacy_id", (True, False)) +def test_can_create_path_v2(legacy_id): + g = Graph() + gh = Graph.Hydrator(g) + alice = gh.hydrate_node( + 1 if legacy_id else None, {"Person"}, {"name": "Alice", "age": 33}, + "1" if legacy_id else "alice" + ) + bob = gh.hydrate_node( + 2 if legacy_id else None, {"Person"}, {"name": "Bob", "age": 44}, + "2" if legacy_id else "bob" + ) + carol = gh.hydrate_node( + 3 if legacy_id else None, {"Person"}, {"name": "Carol", "age": 55}, + "3" if legacy_id else "carol" + ) + alice_knows_bob = gh.hydrate_relationship( + 1 if legacy_id else None, + 1 if legacy_id else None, 2 if legacy_id else None, + "KNOWS", {"since": 1999}, "1" if legacy_id else "alice_knows_bob", + "1" if legacy_id else "alice", "2" if legacy_id else "bob" + + ) + carol_dislikes_bob = gh.hydrate_relationship( + 2 if legacy_id else None, + 3 if legacy_id else None, 2 if legacy_id else None, + "DISLIKES", {}, "2" if legacy_id else "carol_dislikes_bob", + "3" if legacy_id else "carol", "2" if legacy_id else "bob" + ) path = Path(alice, alice_knows_bob, carol_dislikes_bob) assert isinstance(path, Path) assert path.start_node is alice @@ -169,61 +315,181 @@ def test_can_create_path(): def test_can_hydrate_path(cyclic): g = Graph() gh = Graph.Hydrator(g) - alice = gh.hydrate_node(1, {"Person"}, {"name": "Alice", "age": 33}) - bob = gh.hydrate_node(2, {"Person"}, {"name": "Bob", "age": 44}) + alice = gh.hydrate_node(1, {"Person"}, {"name": "Alice", "age": 33}, "1") + bob = gh.hydrate_node(2, {"Person"}, {"name": "Bob", "age": 44}, "2") if cyclic: + carol_id = 1 + carol_eid = "1" carol = alice else: - carol = gh.hydrate_node(3, {"Person"}, {"name": "Carol", "age": 55}) - r = [gh.hydrate_unbound_relationship(1, "KNOWS", {"since": 1999}), - gh.hydrate_unbound_relationship(2, "DISLIKES", {})] + carol_id = 3 + carol_eid = "3" + carol = gh.hydrate_node(carol_id, {"Person"}, + {"name": "Carol", "age": 55}, carol_eid) + r = [gh.hydrate_unbound_relationship(1, "KNOWS", {"since": 1999}, "1"), + gh.hydrate_unbound_relationship(2, "DISLIKES", {}), "2"] path = gh.hydrate_path([alice, bob, carol], r, [1, 1, -2, 2]) assert path.start_node is alice assert path.end_node is carol assert path.nodes == (alice, bob, carol) - expected_alice_knows_bob = gh.hydrate_relationship(1, alice.id, bob.id, - "KNOWS", {"since": 1999}) - expected_carol_dislikes_bob = gh.hydrate_relationship(2, carol.id, bob.id, - "DISLIKES", {}) + expected_alice_knows_bob = gh.hydrate_relationship( + 1, 1, 2, "KNOWS", {"since": 1999}, "1", "1", "2" + ) + expected_carol_dislikes_bob = gh.hydrate_relationship( + 2, carol_id, 2, "DISLIKES", {}, "2", carol_eid, "2" + ) assert path.relationships == (expected_alice_knows_bob, expected_carol_dislikes_bob) - assert list(path) == [expected_alice_knows_bob, expected_carol_dislikes_bob] + assert list(path) == [expected_alice_knows_bob, + expected_carol_dislikes_bob] -def test_path_equality(): +def test_path_v1_equality(): g = Graph() gh = Graph.Hydrator(g) alice = gh.hydrate_node(1, {"Person"}, {"name": "Alice", "age": 33}) - bob = gh.hydrate_node(2, {"Person"}, {"name": "Bob", "age": 44}) - carol = gh.hydrate_node(3, {"Person"}, {"name": "Carol", "age": 55}) - alice_knows_bob = gh.hydrate_relationship(1, alice.id, bob.id, "KNOWS", {"since": 1999}) - carol_dislikes_bob = gh.hydrate_relationship(2, carol.id, bob.id, "DISLIKES", {}) + _bob = gh.hydrate_node(2, {"Person"}, {"name": "Bob", "age": 44}) + _carol = gh.hydrate_node(3, {"Person"}, {"name": "Carol", "age": 55}) + alice_knows_bob = gh.hydrate_relationship(1, 1, 2, "KNOWS", + {"since": 1999}) + carol_dislikes_bob = gh.hydrate_relationship(2, 3, 2, "DISLIKES", {}) path_1 = Path(alice, alice_knows_bob, carol_dislikes_bob) path_2 = Path(alice, alice_knows_bob, carol_dislikes_bob) assert path_1 == path_2 assert path_1 != "this is not a path" -def test_path_hashing(): +@pytest.mark.parametrize("legacy_id", (True, False)) +def test_path_v2_equality(legacy_id): + g = Graph() + gh = Graph.Hydrator(g) + alice = gh.hydrate_node( + 1 if legacy_id else None, {"Person"}, {"name": "Alice", "age": 33}, + "1" if legacy_id else "alice" + ) + _bob = gh.hydrate_node( + 2 if legacy_id else None, {"Person"}, {"name": "Bob", "age": 44}, + "2" if legacy_id else "bob" + ) + _carol = gh.hydrate_node( + 3 if legacy_id else None, {"Person"}, {"name": "Carol", "age": 55}, + "3" if legacy_id else "carol" + ) + alice_knows_bob = gh.hydrate_relationship( + 1 if legacy_id else None, + 1 if legacy_id else None, 2 if legacy_id else None, + "KNOWS", {"since": 1999}, "1" if legacy_id else "alice_knows_bob", + "1" if legacy_id else "alice", "2" if legacy_id else "bob" + ) + carol_dislikes_bob = gh.hydrate_relationship( + 2 if legacy_id else None, + 3 if legacy_id else None, 2 if legacy_id else None, + "DISLIKES", {}, "2" if legacy_id else "carol_dislikes_bob", + "3" if legacy_id else "carol", "2" if legacy_id else "bob" + ) + path_1 = Path(alice, alice_knows_bob, carol_dislikes_bob) + path_2 = Path(alice, alice_knows_bob, carol_dislikes_bob) + assert path_1 == path_2 + assert path_1 != "this is not a path" + + +def test_path_v1_hashing(): g = Graph() gh = Graph.Hydrator(g) alice = gh.hydrate_node(1, {"Person"}, {"name": "Alice", "age": 33}) - bob = gh.hydrate_node(2, {"Person"}, {"name": "Bob", "age": 44}) - carol = gh.hydrate_node(3, {"Person"}, {"name": "Carol", "age": 55}) - alice_knows_bob = gh.hydrate_relationship(1, alice.id, bob.id, "KNOWS", {"since": 1999}) - carol_dislikes_bob = gh.hydrate_relationship(2, carol.id, bob.id, "DISLIKES", {}) + _bob = gh.hydrate_node(2, {"Person"}, {"name": "Bob", "age": 44}) + _carol = gh.hydrate_node(3, {"Person"}, {"name": "Carol", "age": 55}) + alice_knows_bob = gh.hydrate_relationship(1, 1, 2, "KNOWS", + {"since": 1999}) + carol_dislikes_bob = gh.hydrate_relationship(2, 3, 2, "DISLIKES", {}) path_1 = Path(alice, alice_knows_bob, carol_dislikes_bob) path_2 = Path(alice, alice_knows_bob, carol_dislikes_bob) assert hash(path_1) == hash(path_2) -def test_path_repr(): +@pytest.mark.parametrize("legacy_id", (True, False)) +def test_path_v2_hashing(legacy_id): + g = Graph() + gh = Graph.Hydrator(g) + alice = gh.hydrate_node( + 1 if legacy_id else None, {"Person"}, {"name": "Alice", "age": 33}, + "1" if legacy_id else "alice" + ) + _bob = gh.hydrate_node( + 2 if legacy_id else None, {"Person"}, {"name": "Bob", "age": 44}, + "2" if legacy_id else "bob" + ) + _carol = gh.hydrate_node( + 3 if legacy_id else None, {"Person"}, {"name": "Carol", "age": 55}, + "3" if legacy_id else "carol" + ) + alice_knows_bob = gh.hydrate_relationship( + 1 if legacy_id else None, + 1 if legacy_id else None, 2 if legacy_id else None, + "KNOWS", {"since": 1999}, "1" if legacy_id else "alice_knows_bob", + "1" if legacy_id else "alice", "2" if legacy_id else "bob" + ) + carol_dislikes_bob = gh.hydrate_relationship( + 2 if legacy_id else None, + 3 if legacy_id else None, 2 if legacy_id else None, + "DISLIKES", {}, "2" if legacy_id else "carol_dislikes_bob", + "3" if legacy_id else "carol", "2" if legacy_id else "bob" + ) + path_1 = Path(alice, alice_knows_bob, carol_dislikes_bob) + path_2 = Path(alice, alice_knows_bob, carol_dislikes_bob) + assert hash(path_1) == hash(path_2) + + +def test_path_v1_repr(): g = Graph() gh = Graph.Hydrator(g) alice = gh.hydrate_node(1, {"Person"}, {"name": "Alice"}) - bob = gh.hydrate_node(2, {"Person"}, {"name": "Bob"}) - carol = gh.hydrate_node(3, {"Person"}, {"name": "Carol"}) - alice_knows_bob = gh.hydrate_relationship(1, alice.id, bob.id, "KNOWS", {"since": 1999}) - carol_dislikes_bob = gh.hydrate_relationship(2, carol.id, bob.id, "DISLIKES", {}) + _bob = gh.hydrate_node(2, {"Person"}, {"name": "Bob"}) + _carol = gh.hydrate_node(3, {"Person"}, {"name": "Carol"}) + alice_knows_bob = gh.hydrate_relationship(1, 1, 2, "KNOWS", + {"since": 1999}) + carol_dislikes_bob = gh.hydrate_relationship(2, 3, 2, "DISLIKES", {}) + path = Path(alice, alice_knows_bob, carol_dislikes_bob) + assert repr(path) == ( + " end= size=2>" + ) + + +@pytest.mark.parametrize("legacy_id", (True, False)) +def test_path_v2_repr(legacy_id): + g = Graph() + gh = Graph.Hydrator(g) + alice = gh.hydrate_node( + 1 if legacy_id else None, {"Person"}, {"name": "Alice"}, + "1" if legacy_id else "alice" + + ) + bob = gh.hydrate_node( + 2 if legacy_id else None, {"Person"}, {"name": "Bob"}, + "2" if legacy_id else "bob" + + ) + carol = gh.hydrate_node( + 3 if legacy_id else None, {"Person"}, {"name": "Carol"}, + "3" if legacy_id else "carol" + + ) + alice_knows_bob = gh.hydrate_relationship( + 1 if legacy_id else None, alice.id, bob.id, "KNOWS", {"since": 1999}, + "1" if legacy_id else "alice_knows_bob", + alice.element_id, bob.element_id + ) + carol_dislikes_bob = gh.hydrate_relationship( + 2 if legacy_id else None, carol.id, bob.id, "DISLIKES", {}, + "2" if legacy_id else "carol_dislikes_bob", + carol.element_id, bob.element_id + ) path = Path(alice, alice_knows_bob, carol_dislikes_bob) - assert repr(path) == " end= size=2>" + assert repr(path) == ( + f" " + f"end= size=2>" + ) diff --git a/tests/unit/sync/io/test_class_bolt.py b/tests/unit/sync/io/test_class_bolt.py index b7d1e6c5..f6601ae0 100644 --- a/tests/unit/sync/io/test_class_bolt.py +++ b/tests/unit/sync/io/test_class_bolt.py @@ -25,38 +25,52 @@ def test_class_method_protocol_handlers(): - # python -m pytest tests/unit/io/test_class_bolt.py -s -v -k test_class_method_protocol_handlers protocol_handlers = Bolt.protocol_handlers() assert len(protocol_handlers) == 6 + assert protocol_handlers.keys() == { + (3, 0), + (4, 1), (4, 2), (4, 3), (4, 4), + (5, 0), + } @pytest.mark.parametrize( "test_input, expected", [ ((0, 0), 0), - ((4, 0), 1), + ((1, 0), 0), + ((2, 0), 0), + ((3, 0), 1), + ((4, 0), 0), + ((4, 1), 1), + ((4, 2), 1), + ((4, 3), 1), + ((4, 4), 1), + ((5, 0), 1), + ((5, 1), 0), + ((6, 0), 0), ] ) -def test_class_method_protocol_handlers_with_protocol_version(test_input, expected): - # python -m pytest tests/unit/io/test_class_bolt.py -s -v -k test_class_method_protocol_handlers_with_protocol_version - protocol_handlers = Bolt.protocol_handlers(protocol_version=test_input) +def test_class_method_protocol_handlers_with_protocol_version(test_input, + expected): + protocol_handlers = Bolt.protocol_handlers( + protocol_version=test_input + ) assert len(protocol_handlers) == expected def test_class_method_protocol_handlers_with_invalid_protocol_version(): - # python -m pytest tests/unit/io/test_class_bolt.py -s -v -k test_class_method_protocol_handlers_with_invalid_protocol_version with pytest.raises(TypeError): Bolt.protocol_handlers(protocol_version=2) def test_class_method_get_handshake(): - # python -m pytest tests/unit/io/test_class_bolt.py -s -v -k test_class_method_get_handshake handshake = Bolt.get_handshake() - assert handshake == b"\x00\x02\x04\x04\x00\x00\x01\x04\x00\x00\x00\x04\x00\x00\x00\x03" + assert (b"\x00\x00\x00\x05\x00\x02\x04\x04\x00\x00\x01\x04\x00\x00\x00\x03" + == handshake) def test_magic_preamble(): - # python -m pytest tests/unit/io/test_class_bolt.py -s -v -k test_magic_preamble preamble = 0x6060B017 preamble_bytes = preamble.to_bytes(4, byteorder="big") assert Bolt.MAGIC_PREAMBLE == preamble_bytes