@@ -350,13 +350,14 @@ async def _readline(self) -> bytes:
350
350
class HiredisParser (BaseParser ):
351
351
"""Parser class for connections using Hiredis"""
352
352
353
- __slots__ = BaseParser .__slots__ + ("_reader" ,)
353
+ __slots__ = BaseParser .__slots__ + ("_reader" , "_connected" )
354
354
355
355
def __init__ (self , socket_read_size : int ):
356
356
if not HIREDIS_AVAILABLE :
357
357
raise RedisError ("Hiredis is not available." )
358
358
super ().__init__ (socket_read_size = socket_read_size )
359
359
self ._reader : Optional [hiredis .Reader ] = None
360
+ self ._connected : bool = False
360
361
361
362
def on_connect (self , connection : "Connection" ):
362
363
self ._stream = connection ._reader
@@ -369,13 +370,13 @@ def on_connect(self, connection: "Connection"):
369
370
kwargs ["errors" ] = connection .encoder .encoding_errors
370
371
371
372
self ._reader = hiredis .Reader (** kwargs )
373
+ self ._connected = False
372
374
373
375
def on_disconnect (self ):
374
- self ._stream = None
375
- self ._reader = None
376
+ self ._connected = False
376
377
377
378
async def can_read_destructive (self ):
378
- if not self ._stream or not self . _reader :
379
+ if not self ._connected :
379
380
raise ConnectionError (SERVER_CLOSED_CONNECTION_ERROR )
380
381
if self ._reader .gets ():
381
382
return True
@@ -397,8 +398,10 @@ async def read_from_socket(self):
397
398
async def read_response (
398
399
self , disable_decoding : bool = False
399
400
) -> Union [EncodableT , List [EncodableT ]]:
400
- if not self ._stream or not self ._reader :
401
- self .on_disconnect ()
401
+ # If `on_disconnect()` has been called, prohibit any more reads
402
+ # even if they could happen because data might be present.
403
+ # We still allow reads in progress to finish
404
+ if not self ._connected :
402
405
raise ConnectionError (SERVER_CLOSED_CONNECTION_ERROR ) from None
403
406
404
407
response = self ._reader .gets ()
0 commit comments