diff --git a/neo4j/io/_bolt3.py b/neo4j/io/_bolt3.py index 002ad7d5..e00bd3ee 100644 --- a/neo4j/io/_bolt3.py +++ b/neo4j/io/_bolt3.py @@ -82,7 +82,7 @@ def __init__(self, unresolved_address, sock, *, auth=None, **config): self.socket = sock self.server = ServerInfo(Address(sock.getpeername()), Bolt3.PROTOCOL_VERSION) self.outbox = Outbox() - self.inbox = Inbox(BufferedSocket(self.socket, 32768), on_error=self._set_defunct) + self.inbox = Inbox(self.socket, on_error=self._set_defunct) self.packer = Packer(self.outbox) self.unpacker = Unpacker(self.inbox) self.responses = deque() @@ -463,96 +463,6 @@ def view(self): return memoryview(self._data[:end]) -class BufferedSocket: - """ Wrapper for a regular socket, with an added a dynamically-resizing - receive buffer to reduce the number of calls to recv. - - NOTE: not all socket methods are implemented yet - """ - - def __init__(self, socket_, initial_capacity=0): - self.socket = socket_ - self.buffer = bytearray(initial_capacity) - self.r_pos = 0 - self.w_pos = 0 - - def _fill_buffer(self, min_bytes): - """ Fill the buffer with at least `min_bytes` bytes, requesting more if - the buffer has space. Internally, this method attempts to do as little - allocation as possible and make as few calls to socket.recv as - possible. - """ - # First, we need to calculate how much spare space exists between the - # write cursor and the end of the buffer. - space_at_end = len(self.buffer) - self.w_pos - if min_bytes <= space_at_end: - # If there's at least enough here for the minimum number of bytes - # we need, then do nothing - # - pass - elif min_bytes <= space_at_end + self.r_pos: - # If the buffer contains enough space, but it's split between the - # end of the buffer and recyclable space at the start of the - # buffer, then recycle that space by pushing the remaining data - # towards the front. - # - # print("Recycling {} bytes".format(self.r_pos)) - size = self.w_pos - self.r_pos - view = memoryview(self.buffer) - self.buffer[0:size] = view[self.r_pos:self.w_pos] - self.r_pos = 0 - self.w_pos = size - else: - # Otherwise, there's just not enough space whichever way you shake - # it. So, rebuild the buffer from scratch, taking the unread data - # and appending empty space big enough to hold the minimum number - # of bytes we're looking for. - # - # print("Rebuilding buffer from {} bytes ({} used) to " - # "{} bytes".format(len(self.buffer), - # self.w_pos - self.r_pos, - # self.w_pos - self.r_pos + min_bytes)) - self.buffer = (self.buffer[self.r_pos:self.w_pos] + - bytearray(min_bytes)) - self.w_pos -= self.r_pos - self.r_pos = 0 - min_end = self.w_pos + min_bytes - end = len(self.buffer) - view = memoryview(self.buffer) - self.socket.setblocking(0) - while self.w_pos < min_end: - ready_to_read, _, _ = select([self.socket], [], []) - subview = view[self.w_pos:end] - n = self.socket.recv_into(subview, end - self.w_pos) - if n == 0: - raise OSError("No data") - self.w_pos += n - - def recv_into(self, buffer, n_bytes=0, flags=0): - """ Intercepts a regular socket.recv_into call, taking data from the - internal buffer, if available. If not enough data exists in the buffer, - more will be retrieved first. - - Unlike the lower-level call, this method will never return 0, instead - raising an OSError if no data is returned on the underlying socket. - - :param buffer: - :param n_bytes: - :param flags: - :raises OSError: - :return: - """ - available = self.w_pos - self.r_pos - required = n_bytes - available - if required > 0: - self._fill_buffer(required) - view = memoryview(self.buffer) - end = self.r_pos + n_bytes - buffer[:] = view[self.r_pos:end] - self.r_pos = end - return n_bytes - - class Inbox(MessageInbox): def __next__(self): diff --git a/neo4j/io/_bolt4x0.py b/neo4j/io/_bolt4x0.py index b00f9026..3f60eba6 100644 --- a/neo4j/io/_bolt4x0.py +++ b/neo4j/io/_bolt4x0.py @@ -82,7 +82,7 @@ def __init__(self, unresolved_address, sock, *, auth=None, **config): self.socket = sock self.server = ServerInfo(Address(sock.getpeername()), Bolt4x0.PROTOCOL_VERSION) self.outbox = Outbox() - self.inbox = Inbox(BufferedSocket(self.socket, 32768), on_error=self._set_defunct) + self.inbox = Inbox(self.socket, on_error=self._set_defunct) self.packer = Packer(self.outbox) self.unpacker = Unpacker(self.inbox) self.responses = deque() @@ -461,96 +461,6 @@ def view(self): return memoryview(self._data[:end]) -class BufferedSocket: - """ Wrapper for a regular socket, with an added a dynamically-resizing - receive buffer to reduce the number of calls to recv. - - NOTE: not all socket methods are implemented yet - """ - - def __init__(self, socket_, initial_capacity=0): - self.socket = socket_ - self.buffer = bytearray(initial_capacity) - self.r_pos = 0 - self.w_pos = 0 - - def _fill_buffer(self, min_bytes): - """ Fill the buffer with at least `min_bytes` bytes, requesting more if - the buffer has space. Internally, this method attempts to do as little - allocation as possible and make as few calls to socket.recv as - possible. - """ - # First, we need to calculate how much spare space exists between the - # write cursor and the end of the buffer. - space_at_end = len(self.buffer) - self.w_pos - if min_bytes <= space_at_end: - # If there's at least enough here for the minimum number of bytes - # we need, then do nothing - # - pass - elif min_bytes <= space_at_end + self.r_pos: - # If the buffer contains enough space, but it's split between the - # end of the buffer and recyclable space at the start of the - # buffer, then recycle that space by pushing the remaining data - # towards the front. - # - # print("Recycling {} bytes".format(self.r_pos)) - size = self.w_pos - self.r_pos - view = memoryview(self.buffer) - self.buffer[0:size] = view[self.r_pos:self.w_pos] - self.r_pos = 0 - self.w_pos = size - else: - # Otherwise, there's just not enough space whichever way you shake - # it. So, rebuild the buffer from scratch, taking the unread data - # and appending empty space big enough to hold the minimum number - # of bytes we're looking for. - # - # print("Rebuilding buffer from {} bytes ({} used) to " - # "{} bytes".format(len(self.buffer), - # self.w_pos - self.r_pos, - # self.w_pos - self.r_pos + min_bytes)) - self.buffer = (self.buffer[self.r_pos:self.w_pos] + - bytearray(min_bytes)) - self.w_pos -= self.r_pos - self.r_pos = 0 - min_end = self.w_pos + min_bytes - end = len(self.buffer) - view = memoryview(self.buffer) - self.socket.setblocking(0) - while self.w_pos < min_end: - ready_to_read, _, _ = select([self.socket], [], []) - subview = view[self.w_pos:end] - n = self.socket.recv_into(subview, end - self.w_pos) - if n == 0: - raise OSError("No data") - self.w_pos += n - - def recv_into(self, buffer, n_bytes=0, flags=0): - """ Intercepts a regular socket.recv_into call, taking data from the - internal buffer, if available. If not enough data exists in the buffer, - more will be retrieved first. - - Unlike the lower-level call, this method will never return 0, instead - raising an OSError if no data is returned on the underlying socket. - - :param buffer: - :param n_bytes: - :param flags: - :raises OSError: - :return: - """ - available = self.w_pos - self.r_pos - required = n_bytes - available - if required > 0: - self._fill_buffer(required) - view = memoryview(self.buffer) - end = self.r_pos + n_bytes - buffer[:] = view[self.r_pos:end] - self.r_pos = end - return n_bytes - - class Inbox(MessageInbox): def __next__(self): diff --git a/tests/unit/io/conftest.py b/tests/unit/io/conftest.py index 9dcdac18..8a4f2307 100644 --- a/tests/unit/io/conftest.py +++ b/tests/unit/io/conftest.py @@ -31,9 +31,6 @@ def __init__(self, address): self.captured = b"" self.messages = MessageInbox(self, on_error=print) - def setblocking(self, flag): - pass - def getsockname(self): return "127.0.0.1", 0xFFFF diff --git a/tests/unit/io/test_direct.py b/tests/unit/io/test_direct.py index bc718443..40975292 100644 --- a/tests/unit/io/test_direct.py +++ b/tests/unit/io/test_direct.py @@ -41,9 +41,6 @@ class FakeSocket: def __init__(self, address): self.address = address - def setblocking(self, flag): - pass - def getpeername(self): return self.address diff --git a/tests/unit/test_addressing.py b/tests/unit/test_addressing.py index 7b7d02fc..d10ac387 100644 --- a/tests/unit/test_addressing.py +++ b/tests/unit/test_addressing.py @@ -236,7 +236,14 @@ def test_address_resolve_with_custom_resolver(): resolved = address.resolve(resolver=custom_resolver) assert isinstance(resolved, Address) is False assert isinstance(resolved, list) is True - assert len(resolved) == 3 - assert resolved[0] == IPv4Address(('127.0.0.1', 7687)) - assert resolved[1] == IPv6Address(('::1', 1234, 0, 0)) - assert resolved[2] == IPv4Address(('127.0.0.1', 1234)) + if len(resolved) == 2: + # IPv4 only + assert resolved[0] == IPv4Address(('127.0.0.1', 7687)) + assert resolved[1] == IPv4Address(('127.0.0.1', 1234)) + elif len(resolved) == 3: + # IPv4 and IPv6 + assert resolved[0] == IPv4Address(('127.0.0.1', 7687)) + assert resolved[1] == IPv6Address(('::1', 1234, 0, 0)) + assert resolved[2] == IPv4Address(('127.0.0.1', 1234)) + else: + assert False