Skip to content

Fix for non-blocking sendall #378

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 1 addition & 91 deletions neo4j/io/_bolt3.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __init__(self, unresolved_address, sock, *, auth=None, **config):
self.socket = sock
self.server = ServerInfo(Address(sock.getpeername()), Bolt3.PROTOCOL_VERSION)
self.outbox = Outbox()
self.inbox = Inbox(BufferedSocket(self.socket, 32768), on_error=self._set_defunct)
self.inbox = Inbox(self.socket, on_error=self._set_defunct)
self.packer = Packer(self.outbox)
self.unpacker = Unpacker(self.inbox)
self.responses = deque()
Expand Down Expand Up @@ -463,96 +463,6 @@ def view(self):
return memoryview(self._data[:end])


class BufferedSocket:
""" Wrapper for a regular socket, with an added a dynamically-resizing
receive buffer to reduce the number of calls to recv.

NOTE: not all socket methods are implemented yet
"""

def __init__(self, socket_, initial_capacity=0):
self.socket = socket_
self.buffer = bytearray(initial_capacity)
self.r_pos = 0
self.w_pos = 0

def _fill_buffer(self, min_bytes):
""" Fill the buffer with at least `min_bytes` bytes, requesting more if
the buffer has space. Internally, this method attempts to do as little
allocation as possible and make as few calls to socket.recv as
possible.
"""
# First, we need to calculate how much spare space exists between the
# write cursor and the end of the buffer.
space_at_end = len(self.buffer) - self.w_pos
if min_bytes <= space_at_end:
# If there's at least enough here for the minimum number of bytes
# we need, then do nothing
#
pass
elif min_bytes <= space_at_end + self.r_pos:
# If the buffer contains enough space, but it's split between the
# end of the buffer and recyclable space at the start of the
# buffer, then recycle that space by pushing the remaining data
# towards the front.
#
# print("Recycling {} bytes".format(self.r_pos))
size = self.w_pos - self.r_pos
view = memoryview(self.buffer)
self.buffer[0:size] = view[self.r_pos:self.w_pos]
self.r_pos = 0
self.w_pos = size
else:
# Otherwise, there's just not enough space whichever way you shake
# it. So, rebuild the buffer from scratch, taking the unread data
# and appending empty space big enough to hold the minimum number
# of bytes we're looking for.
#
# print("Rebuilding buffer from {} bytes ({} used) to "
# "{} bytes".format(len(self.buffer),
# self.w_pos - self.r_pos,
# self.w_pos - self.r_pos + min_bytes))
self.buffer = (self.buffer[self.r_pos:self.w_pos] +
bytearray(min_bytes))
self.w_pos -= self.r_pos
self.r_pos = 0
min_end = self.w_pos + min_bytes
end = len(self.buffer)
view = memoryview(self.buffer)
self.socket.setblocking(0)
while self.w_pos < min_end:
ready_to_read, _, _ = select([self.socket], [], [])
subview = view[self.w_pos:end]
n = self.socket.recv_into(subview, end - self.w_pos)
if n == 0:
raise OSError("No data")
self.w_pos += n

def recv_into(self, buffer, n_bytes=0, flags=0):
""" Intercepts a regular socket.recv_into call, taking data from the
internal buffer, if available. If not enough data exists in the buffer,
more will be retrieved first.

Unlike the lower-level call, this method will never return 0, instead
raising an OSError if no data is returned on the underlying socket.

:param buffer:
:param n_bytes:
:param flags:
:raises OSError:
:return:
"""
available = self.w_pos - self.r_pos
required = n_bytes - available
if required > 0:
self._fill_buffer(required)
view = memoryview(self.buffer)
end = self.r_pos + n_bytes
buffer[:] = view[self.r_pos:end]
self.r_pos = end
return n_bytes


class Inbox(MessageInbox):

def __next__(self):
Expand Down
92 changes: 1 addition & 91 deletions neo4j/io/_bolt4x0.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __init__(self, unresolved_address, sock, *, auth=None, **config):
self.socket = sock
self.server = ServerInfo(Address(sock.getpeername()), Bolt4x0.PROTOCOL_VERSION)
self.outbox = Outbox()
self.inbox = Inbox(BufferedSocket(self.socket, 32768), on_error=self._set_defunct)
self.inbox = Inbox(self.socket, on_error=self._set_defunct)
self.packer = Packer(self.outbox)
self.unpacker = Unpacker(self.inbox)
self.responses = deque()
Expand Down Expand Up @@ -461,96 +461,6 @@ def view(self):
return memoryview(self._data[:end])


class BufferedSocket:
""" Wrapper for a regular socket, with an added a dynamically-resizing
receive buffer to reduce the number of calls to recv.

NOTE: not all socket methods are implemented yet
"""

def __init__(self, socket_, initial_capacity=0):
self.socket = socket_
self.buffer = bytearray(initial_capacity)
self.r_pos = 0
self.w_pos = 0

def _fill_buffer(self, min_bytes):
""" Fill the buffer with at least `min_bytes` bytes, requesting more if
the buffer has space. Internally, this method attempts to do as little
allocation as possible and make as few calls to socket.recv as
possible.
"""
# First, we need to calculate how much spare space exists between the
# write cursor and the end of the buffer.
space_at_end = len(self.buffer) - self.w_pos
if min_bytes <= space_at_end:
# If there's at least enough here for the minimum number of bytes
# we need, then do nothing
#
pass
elif min_bytes <= space_at_end + self.r_pos:
# If the buffer contains enough space, but it's split between the
# end of the buffer and recyclable space at the start of the
# buffer, then recycle that space by pushing the remaining data
# towards the front.
#
# print("Recycling {} bytes".format(self.r_pos))
size = self.w_pos - self.r_pos
view = memoryview(self.buffer)
self.buffer[0:size] = view[self.r_pos:self.w_pos]
self.r_pos = 0
self.w_pos = size
else:
# Otherwise, there's just not enough space whichever way you shake
# it. So, rebuild the buffer from scratch, taking the unread data
# and appending empty space big enough to hold the minimum number
# of bytes we're looking for.
#
# print("Rebuilding buffer from {} bytes ({} used) to "
# "{} bytes".format(len(self.buffer),
# self.w_pos - self.r_pos,
# self.w_pos - self.r_pos + min_bytes))
self.buffer = (self.buffer[self.r_pos:self.w_pos] +
bytearray(min_bytes))
self.w_pos -= self.r_pos
self.r_pos = 0
min_end = self.w_pos + min_bytes
end = len(self.buffer)
view = memoryview(self.buffer)
self.socket.setblocking(0)
while self.w_pos < min_end:
ready_to_read, _, _ = select([self.socket], [], [])
subview = view[self.w_pos:end]
n = self.socket.recv_into(subview, end - self.w_pos)
if n == 0:
raise OSError("No data")
self.w_pos += n

def recv_into(self, buffer, n_bytes=0, flags=0):
""" Intercepts a regular socket.recv_into call, taking data from the
internal buffer, if available. If not enough data exists in the buffer,
more will be retrieved first.

Unlike the lower-level call, this method will never return 0, instead
raising an OSError if no data is returned on the underlying socket.

:param buffer:
:param n_bytes:
:param flags:
:raises OSError:
:return:
"""
available = self.w_pos - self.r_pos
required = n_bytes - available
if required > 0:
self._fill_buffer(required)
view = memoryview(self.buffer)
end = self.r_pos + n_bytes
buffer[:] = view[self.r_pos:end]
self.r_pos = end
return n_bytes


class Inbox(MessageInbox):

def __next__(self):
Expand Down
3 changes: 0 additions & 3 deletions tests/unit/io/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ def __init__(self, address):
self.captured = b""
self.messages = MessageInbox(self, on_error=print)

def setblocking(self, flag):
pass

def getsockname(self):
return "127.0.0.1", 0xFFFF

Expand Down
3 changes: 0 additions & 3 deletions tests/unit/io/test_direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ class FakeSocket:
def __init__(self, address):
self.address = address

def setblocking(self, flag):
pass

def getpeername(self):
return self.address

Expand Down
15 changes: 11 additions & 4 deletions tests/unit/test_addressing.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,14 @@ def test_address_resolve_with_custom_resolver():
resolved = address.resolve(resolver=custom_resolver)
assert isinstance(resolved, Address) is False
assert isinstance(resolved, list) is True
assert len(resolved) == 3
assert resolved[0] == IPv4Address(('127.0.0.1', 7687))
assert resolved[1] == IPv6Address(('::1', 1234, 0, 0))
assert resolved[2] == IPv4Address(('127.0.0.1', 1234))
if len(resolved) == 2:
# IPv4 only
assert resolved[0] == IPv4Address(('127.0.0.1', 7687))
assert resolved[1] == IPv4Address(('127.0.0.1', 1234))
elif len(resolved) == 3:
# IPv4 and IPv6
assert resolved[0] == IPv4Address(('127.0.0.1', 7687))
assert resolved[1] == IPv6Address(('::1', 1234, 0, 0))
assert resolved[2] == IPv4Address(('127.0.0.1', 1234))
else:
assert False