Skip to content

Commit 3fc1b2c

Browse files
authored
Improve performance of packstream packer (#622)
The chunking bit of the protocol implementation could be sped up by not constantly checking if chunking is necessary but postponing the chunking until it's needed (i.e., before wrapping up a bolt message or when collecting the byte stream to send it over the wire). This change includes an extra copy of the data in memory, but since data is usually written in small amounts per buffer write and chunking occurs rarely compared to this, the cost of constant checking outweighs the cost of the extra copy for most practical use-cases.
1 parent 26c002b commit 3fc1b2c

File tree

4 files changed

+79
-174
lines changed

4 files changed

+79
-174
lines changed

neo4j/io/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -487,8 +487,7 @@ def _append(self, signature, fields=(), response=None):
487487
:param response: a response object to handle callbacks
488488
"""
489489
self.packer.pack_struct(signature, fields)
490-
self.outbox.chunk()
491-
self.outbox.chunk()
490+
self.outbox.wrap_message()
492491
self.responses.append(response)
493492

494493
def _send_all(self):

neo4j/io/_common.py

Lines changed: 37 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -89,54 +89,51 @@ def __next__(self):
8989

9090
class Outbox:
9191

92-
def __init__(self, capacity=8192, max_chunk_size=16384):
92+
def __init__(self, max_chunk_size=16384):
9393
self._max_chunk_size = max_chunk_size
94-
self._header = 0
95-
self._start = 2
96-
self._end = 2
97-
self._data = bytearray(capacity)
94+
self._chunked_data = bytearray()
95+
self._raw_data = bytearray()
96+
self.write = self._raw_data.extend
9897

9998
def max_chunk_size(self):
10099
return self._max_chunk_size
101100

102101
def clear(self):
103-
self._header = 0
104-
self._start = 2
105-
self._end = 2
106-
self._data[0:2] = b"\x00\x00"
107-
108-
def write(self, b):
109-
to_write = len(b)
110-
max_chunk_size = self._max_chunk_size
111-
pos = 0
112-
while to_write > 0:
113-
chunk_size = self._end - self._start
114-
remaining = max_chunk_size - chunk_size
115-
if remaining == 0 or remaining < to_write <= max_chunk_size:
116-
self.chunk()
117-
else:
118-
wrote = min(to_write, remaining)
119-
new_end = self._end + wrote
120-
self._data[self._end:new_end] = b[pos:pos+wrote]
121-
self._end = new_end
122-
pos += wrote
123-
new_chunk_size = self._end - self._start
124-
self._data[self._header:(self._header + 2)] = struct_pack(">H", new_chunk_size)
125-
to_write -= wrote
126-
127-
def chunk(self):
128-
self._header = self._end
129-
self._start = self._header + 2
130-
self._end = self._start
131-
self._data[self._header:self._start] = b"\x00\x00"
102+
self._chunked_data = bytearray()
103+
self._raw_data.clear()
104+
105+
def _chunk_data(self):
106+
data_len = len(self._raw_data)
107+
num_full_chunks, chunk_rest = divmod(
108+
data_len, self._max_chunk_size
109+
)
110+
num_chunks = num_full_chunks + bool(chunk_rest)
111+
112+
data_view = memoryview(self._raw_data)
113+
header_start = len(self._chunked_data)
114+
data_start = header_start + 2
115+
raw_data_start = 0
116+
for i in range(num_chunks):
117+
chunk_size = min(data_len - raw_data_start,
118+
self._max_chunk_size)
119+
self._chunked_data[header_start:data_start] = struct_pack(
120+
">H", chunk_size
121+
)
122+
self._chunked_data[data_start:(data_start + chunk_size)] = \
123+
data_view[raw_data_start:(raw_data_start + chunk_size)]
124+
header_start += chunk_size + 2
125+
data_start = header_start + 2
126+
raw_data_start += chunk_size
127+
del data_view
128+
self._raw_data.clear()
129+
130+
def wrap_message(self):
131+
self._chunk_data()
132+
self._chunked_data += b"\x00\x00"
132133

133134
def view(self):
134-
end = self._end
135-
chunk_size = end - self._start
136-
if chunk_size == 0:
137-
return memoryview(self._data[:self._header])
138-
else:
139-
return memoryview(self._data[:end])
135+
self._chunk_data()
136+
return memoryview(self._chunked_data)
140137

141138

142139
class ConnectionErrorHandler:

neo4j/packstream.py

Lines changed: 9 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -125,12 +125,9 @@ def _pack(self, value):
125125
self.pack_raw(encoded)
126126

127127
# Bytes
128-
elif isinstance(value, bytes):
128+
elif isinstance(value, (bytes, bytearray)):
129129
self.pack_bytes_header(len(value))
130130
self.pack_raw(value)
131-
elif isinstance(value, bytearray):
132-
self.pack_bytes_header(len(value))
133-
self.pack_raw(bytes(value))
134131

135132
# List
136133
elif isinstance(value, list):
@@ -169,38 +166,8 @@ def pack_bytes_header(self, size):
169166

170167
def pack_string_header(self, size):
171168
write = self._write
172-
if size == 0x00:
173-
write(b"\x80")
174-
elif size == 0x01:
175-
write(b"\x81")
176-
elif size == 0x02:
177-
write(b"\x82")
178-
elif size == 0x03:
179-
write(b"\x83")
180-
elif size == 0x04:
181-
write(b"\x84")
182-
elif size == 0x05:
183-
write(b"\x85")
184-
elif size == 0x06:
185-
write(b"\x86")
186-
elif size == 0x07:
187-
write(b"\x87")
188-
elif size == 0x08:
189-
write(b"\x88")
190-
elif size == 0x09:
191-
write(b"\x89")
192-
elif size == 0x0A:
193-
write(b"\x8A")
194-
elif size == 0x0B:
195-
write(b"\x8B")
196-
elif size == 0x0C:
197-
write(b"\x8C")
198-
elif size == 0x0D:
199-
write(b"\x8D")
200-
elif size == 0x0E:
201-
write(b"\x8E")
202-
elif size == 0x0F:
203-
write(b"\x8F")
169+
if size <= 0x0F:
170+
write(bytes((0x80 | size,)))
204171
elif size < 0x100:
205172
write(b"\xD0")
206173
write(PACKED_UINT_8[size])
@@ -215,38 +182,8 @@ def pack_string_header(self, size):
215182

216183
def pack_list_header(self, size):
217184
write = self._write
218-
if size == 0x00:
219-
write(b"\x90")
220-
elif size == 0x01:
221-
write(b"\x91")
222-
elif size == 0x02:
223-
write(b"\x92")
224-
elif size == 0x03:
225-
write(b"\x93")
226-
elif size == 0x04:
227-
write(b"\x94")
228-
elif size == 0x05:
229-
write(b"\x95")
230-
elif size == 0x06:
231-
write(b"\x96")
232-
elif size == 0x07:
233-
write(b"\x97")
234-
elif size == 0x08:
235-
write(b"\x98")
236-
elif size == 0x09:
237-
write(b"\x99")
238-
elif size == 0x0A:
239-
write(b"\x9A")
240-
elif size == 0x0B:
241-
write(b"\x9B")
242-
elif size == 0x0C:
243-
write(b"\x9C")
244-
elif size == 0x0D:
245-
write(b"\x9D")
246-
elif size == 0x0E:
247-
write(b"\x9E")
248-
elif size == 0x0F:
249-
write(b"\x9F")
185+
if size <= 0x0F:
186+
write(bytes((0x90 | size,)))
250187
elif size < 0x100:
251188
write(b"\xD4")
252189
write(PACKED_UINT_8[size])
@@ -264,38 +201,8 @@ def pack_list_stream_header(self):
264201

265202
def pack_map_header(self, size):
266203
write = self._write
267-
if size == 0x00:
268-
write(b"\xA0")
269-
elif size == 0x01:
270-
write(b"\xA1")
271-
elif size == 0x02:
272-
write(b"\xA2")
273-
elif size == 0x03:
274-
write(b"\xA3")
275-
elif size == 0x04:
276-
write(b"\xA4")
277-
elif size == 0x05:
278-
write(b"\xA5")
279-
elif size == 0x06:
280-
write(b"\xA6")
281-
elif size == 0x07:
282-
write(b"\xA7")
283-
elif size == 0x08:
284-
write(b"\xA8")
285-
elif size == 0x09:
286-
write(b"\xA9")
287-
elif size == 0x0A:
288-
write(b"\xAA")
289-
elif size == 0x0B:
290-
write(b"\xAB")
291-
elif size == 0x0C:
292-
write(b"\xAC")
293-
elif size == 0x0D:
294-
write(b"\xAD")
295-
elif size == 0x0E:
296-
write(b"\xAE")
297-
elif size == 0x0F:
298-
write(b"\xAF")
204+
if size <= 0x0F:
205+
write(bytes((0xA0 | size,)))
299206
elif size < 0x100:
300207
write(b"\xD8")
301208
write(PACKED_UINT_8[size])
@@ -316,38 +223,8 @@ def pack_struct(self, signature, fields):
316223
raise ValueError("Structure signature must be a single byte value")
317224
write = self._write
318225
size = len(fields)
319-
if size == 0x00:
320-
write(b"\xB0")
321-
elif size == 0x01:
322-
write(b"\xB1")
323-
elif size == 0x02:
324-
write(b"\xB2")
325-
elif size == 0x03:
326-
write(b"\xB3")
327-
elif size == 0x04:
328-
write(b"\xB4")
329-
elif size == 0x05:
330-
write(b"\xB5")
331-
elif size == 0x06:
332-
write(b"\xB6")
333-
elif size == 0x07:
334-
write(b"\xB7")
335-
elif size == 0x08:
336-
write(b"\xB8")
337-
elif size == 0x09:
338-
write(b"\xB9")
339-
elif size == 0x0A:
340-
write(b"\xBA")
341-
elif size == 0x0B:
342-
write(b"\xBB")
343-
elif size == 0x0C:
344-
write(b"\xBC")
345-
elif size == 0x0D:
346-
write(b"\xBD")
347-
elif size == 0x0E:
348-
write(b"\xBE")
349-
elif size == 0x0F:
350-
write(b"\xBF")
226+
if size <= 0x0F:
227+
write(bytes((0xB0 | size,)))
351228
else:
352229
raise OverflowError("Structure size out of range")
353230
write(signature)

tests/unit/io/test__common.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import pytest
2+
3+
from neo4j.io._common import Outbox
4+
5+
6+
@pytest.mark.parametrize(("chunk_size", "data", "result"), (
7+
(
8+
2,
9+
(bytes(range(10, 15)),),
10+
bytes((0, 2, 10, 11, 0, 2, 12, 13, 0, 1, 14))
11+
),
12+
(
13+
2,
14+
(bytes(range(10, 14)),),
15+
bytes((0, 2, 10, 11, 0, 2, 12, 13))
16+
),
17+
(
18+
2,
19+
(bytes((5, 6, 7)), bytes((8, 9))),
20+
bytes((0, 2, 5, 6, 0, 2, 7, 8, 0, 1, 9))
21+
),
22+
))
23+
def test_outbox_chunking(chunk_size, data, result):
24+
outbox = Outbox(max_chunk_size=chunk_size)
25+
assert bytes(outbox.view()) == b""
26+
for d in data:
27+
outbox.write(d)
28+
assert bytes(outbox.view()) == result
29+
# make sure this works multiple times
30+
assert bytes(outbox.view()) == result
31+
outbox.clear()
32+
assert bytes(outbox.view()) == b""

0 commit comments

Comments
 (0)