@@ -621,6 +621,7 @@ class _ConnectionIOBuffer(object):
621
621
_io_buffer = None
622
622
_cql_frame_buffer = None
623
623
_connection = None
624
+ _segment_consumed = False
624
625
625
626
def __init__ (self , connection ):
626
627
self ._io_buffer = io .BytesIO ()
@@ -643,6 +644,10 @@ def set_checksumming_buffer(self):
643
644
def is_checksumming_enabled (self ):
644
645
return self ._connection ._is_checksumming_enabled
645
646
647
+ @property
648
+ def has_consumed_segment (self ):
649
+ return self ._segment_consumed ;
650
+
646
651
def readable_io_bytes (self ):
647
652
return self .io_buffer .tell ()
648
653
@@ -1118,23 +1123,33 @@ def _process_segment_buffer(self):
1118
1123
try :
1119
1124
self ._io_buffer .io_buffer .seek (0 )
1120
1125
segment_header = self ._segment_codec .decode_header (self ._io_buffer .io_buffer )
1126
+
1121
1127
if readable_bytes >= segment_header .segment_length :
1122
1128
segment = self ._segment_codec .decode (self ._iobuf , segment_header )
1129
+ self ._io_buffer ._segment_consumed = True
1123
1130
self ._io_buffer .cql_frame_buffer .write (segment .payload )
1124
1131
else :
1125
1132
# not enough data to read the segment. reset the buffer pointer at the
1126
1133
# beginning to not lose what we previously read (header).
1134
+ self ._io_buffer ._segment_consumed = False
1127
1135
self ._io_buffer .io_buffer .seek (0 )
1128
1136
except CrcException as exc :
1129
1137
# re-raise an exception that inherits from ConnectionException
1130
1138
raise CrcMismatchException (str (exc ), self .endpoint )
1139
+ else :
1140
+ self ._io_buffer ._segment_consumed = False
1131
1141
1132
1142
def process_io_buffer (self ):
1133
1143
while True :
1134
- if self ._is_checksumming_enabled :
1144
+ if self ._is_checksumming_enabled and self . _io_buffer . readable_io_bytes () :
1135
1145
self ._process_segment_buffer ()
1136
1146
self ._io_buffer .reset_io_buffer ()
1137
1147
1148
+ if self ._is_checksumming_enabled and not self ._io_buffer .has_consumed_segment :
1149
+ # We couldn't read an entire segment from the io buffer, so return
1150
+ # control to allow more bytes to be read off the wire
1151
+ return
1152
+
1138
1153
if not self ._current_frame :
1139
1154
pos = self ._read_frame_header ()
1140
1155
else :
0 commit comments