@@ -141,7 +141,7 @@ def decode(self, value: EncodableT, force=False) -> EncodableT:
141
141
class BaseParser :
142
142
"""Plain Python parsing class"""
143
143
144
- __slots__ = "_stream" , "_read_size"
144
+ __slots__ = "_stream" , "_read_size" , "_connected"
145
145
146
146
EXCEPTION_CLASSES : ExceptionMappingT = {
147
147
"ERR" : {
@@ -172,6 +172,7 @@ class BaseParser:
172
172
def __init__ (self , socket_read_size : int ):
173
173
self ._stream : Optional [asyncio .StreamReader ] = None
174
174
self ._read_size = socket_read_size
175
+ self ._connected = False
175
176
176
177
def __del__ (self ):
177
178
try :
@@ -208,7 +209,7 @@ async def read_response(
208
209
class PythonParser (BaseParser ):
209
210
"""Plain Python parsing class"""
210
211
211
- __slots__ = BaseParser . __slots__ + ("encoder" , "_buffer" , "_pos" , "_chunks" )
212
+ __slots__ = ("encoder" , "_buffer" , "_pos" , "_chunks" )
212
213
213
214
def __init__ (self , socket_read_size : int ):
214
215
super ().__init__ (socket_read_size )
@@ -226,28 +227,28 @@ def on_connect(self, connection: "Connection"):
226
227
self ._stream = connection ._reader
227
228
if self ._stream is None :
228
229
raise RedisError ("Buffer is closed." )
229
-
230
230
self .encoder = connection .encoder
231
+ self ._clear ()
232
+ self ._connected = True
231
233
232
234
def on_disconnect (self ):
233
235
"""Called when the stream disconnects"""
234
- if self ._stream is not None :
235
- self ._stream = None
236
- self .encoder = None
237
- self ._clear ()
236
+ self ._connected = False
238
237
239
238
async def can_read_destructive (self ) -> bool :
239
+ if not self ._connected :
240
+ raise RedisError ("Buffer is closed." )
240
241
if self ._buffer :
241
242
return True
242
- if self ._stream is None :
243
- raise RedisError ("Buffer is closed." )
244
243
try :
245
244
async with async_timeout .timeout (0 ):
246
245
return await self ._stream .read (1 )
247
246
except asyncio .TimeoutError :
248
247
return False
249
248
250
249
async def read_response (self , disable_decoding : bool = False ):
250
+ if not self ._connected :
251
+ raise ConnectionError (SERVER_CLOSED_CONNECTION_ERROR )
251
252
if self ._chunks :
252
253
# augment parsing buffer with previously read data
253
254
self ._buffer += b"" .join (self ._chunks )
@@ -261,8 +262,6 @@ async def read_response(self, disable_decoding: bool = False):
261
262
async def _read_response (
262
263
self , disable_decoding : bool = False
263
264
) -> Union [EncodableT , ResponseError , None ]:
264
- if not self ._stream or not self .encoder :
265
- raise ConnectionError (SERVER_CLOSED_CONNECTION_ERROR )
266
265
raw = await self ._readline ()
267
266
response : Any
268
267
byte , response = raw [:1 ], raw [1 :]
@@ -350,14 +349,13 @@ async def _readline(self) -> bytes:
350
349
class HiredisParser (BaseParser ):
351
350
"""Parser class for connections using Hiredis"""
352
351
353
- __slots__ = BaseParser . __slots__ + ("_reader" , "_connected" )
352
+ __slots__ = ("_reader" ,)
354
353
355
354
def __init__ (self , socket_read_size : int ):
356
355
if not HIREDIS_AVAILABLE :
357
356
raise RedisError ("Hiredis is not available." )
358
357
super ().__init__ (socket_read_size = socket_read_size )
359
358
self ._reader : Optional [hiredis .Reader ] = None
360
- self ._connected : bool = False
361
359
362
360
def on_connect (self , connection : "Connection" ):
363
361
self ._stream = connection ._reader
0 commit comments