Skip to content

Commit a04c592

Browse files
authored
Merge pull request #1100 from beobal/python-1232-follow-up
Follow up to PYTHON-1232; better handling of incomplete segments
2 parents 1de685b + eba143a commit a04c592

File tree

1 file changed

+19
-3
lines changed

1 file changed

+19
-3
lines changed

cassandra/connection.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,7 @@ class _ConnectionIOBuffer(object):
621621
_io_buffer = None
622622
_cql_frame_buffer = None
623623
_connection = None
624+
_segment_consumed = False
624625

625626
def __init__(self, connection):
626627
self._io_buffer = io.BytesIO()
@@ -643,6 +644,10 @@ def set_checksumming_buffer(self):
643644
def is_checksumming_enabled(self):
644645
return self._connection._is_checksumming_enabled
645646

647+
@property
648+
def has_consumed_segment(self):
649+
return self._segment_consumed;
650+
646651
def readable_io_bytes(self):
647652
return self.io_buffer.tell()
648653

@@ -1118,22 +1123,33 @@ def _process_segment_buffer(self):
11181123
try:
11191124
self._io_buffer.io_buffer.seek(0)
11201125
segment_header = self._segment_codec.decode_header(self._io_buffer.io_buffer)
1126+
11211127
if readable_bytes >= segment_header.segment_length:
11221128
segment = self._segment_codec.decode(self._iobuf, segment_header)
1129+
self._io_buffer._segment_consumed = True
11231130
self._io_buffer.cql_frame_buffer.write(segment.payload)
11241131
else:
1125-
# not enough data to read the segment
1126-
self._io_buffer.io_buffer.seek(0, 2)
1132+
# not enough data to read the segment. reset the buffer pointer at the
1133+
# beginning to not lose what we previously read (header).
1134+
self._io_buffer._segment_consumed = False
1135+
self._io_buffer.io_buffer.seek(0)
11271136
except CrcException as exc:
11281137
# re-raise an exception that inherits from ConnectionException
11291138
raise CrcMismatchException(str(exc), self.endpoint)
1139+
else:
1140+
self._io_buffer._segment_consumed = False
11301141

11311142
def process_io_buffer(self):
11321143
while True:
1133-
if self._is_checksumming_enabled:
1144+
if self._is_checksumming_enabled and self._io_buffer.readable_io_bytes():
11341145
self._process_segment_buffer()
11351146
self._io_buffer.reset_io_buffer()
11361147

1148+
if self._is_checksumming_enabled and not self._io_buffer.has_consumed_segment:
1149+
# We couldn't read an entire segment from the io buffer, so return
1150+
# control to allow more bytes to be read off the wire
1151+
return
1152+
11371153
if not self._current_frame:
11381154
pos = self._read_frame_header()
11391155
else:

0 commit comments

Comments
 (0)