42
42
43
43
# Signature bytes for each message type
44
44
INIT = b"\x01 " # 0000 0001 // INIT <user_agent>
45
- ACK_FAILURE = b"\x0F " # 0000 1111 // ACK_FAILURE
45
+ RESET = b"\x0F " # 0000 1111 // RESET
46
46
RUN = b"\x10 " # 0001 0000 // RUN <statement> <parameters>
47
47
DISCARD_ALL = b"\x2F " # 0010 1111 // DISCARD *
48
48
PULL_ALL = b"\x3F " # 0011 1111 // PULL *
56
56
57
57
message_names = {
58
58
INIT : "INIT" ,
59
- ACK_FAILURE : "ACK_FAILURE " ,
59
+ RESET : "RESET " ,
60
60
RUN : "RUN" ,
61
61
DISCARD_ALL : "DISCARD_ALL" ,
62
62
PULL_ALL : "PULL_ALL" ,
@@ -200,12 +200,6 @@ def on_ignored(self, metadata=None):
200
200
pass
201
201
202
202
203
- class AckFailureResponse (Response ):
204
-
205
- def on_failure (self , metadata ):
206
- raise ProtocolError ("Could not acknowledge failure" )
207
-
208
-
209
203
class Connection (object ):
210
204
""" Server connection through which all protocol messages
211
205
are sent and received. This class is designed for protocol
@@ -215,6 +209,7 @@ class Connection(object):
215
209
"""
216
210
217
211
def __init__ (self , sock , ** config ):
212
+ self .defunct = False
218
213
self .channel = ChunkChannel (sock )
219
214
self .packer = Packer (self .channel )
220
215
self .responses = deque ()
@@ -237,6 +232,10 @@ def on_failure(metadata):
237
232
238
233
def append (self , signature , fields = (), response = None ):
239
234
""" Add a message to the outgoing queue.
235
+
236
+ :arg signature: the signature of the message
237
+ :arg fields: the fields of the message as a tuple
238
+ :arg response: a response object to handle callbacks
240
239
"""
241
240
if __debug__ :
242
241
log_info ("C: %s %s" , message_names [signature ], " " .join (map (repr , fields )))
@@ -247,6 +246,18 @@ def append(self, signature, fields=(), response=None):
247
246
self .channel .flush (end_of_message = True )
248
247
self .responses .append (response )
249
248
249
+ def append_reset (self ):
250
+ """ Add a RESET message to the outgoing queue.
251
+ """
252
+
253
+ def on_failure (metadata ):
254
+ raise ProtocolError ("Reset failed" )
255
+
256
+ response = Response (self )
257
+ response .on_failure = on_failure
258
+
259
+ self .append (RESET , response = response )
260
+
250
261
def send (self ):
251
262
""" Send all queued messages to the server.
252
263
"""
@@ -257,8 +268,12 @@ def fetch_next(self):
257
268
"""
258
269
raw = BytesIO ()
259
270
unpack = Unpacker (raw ).unpack
260
- raw .writelines (self .channel .chunk_reader ())
261
-
271
+ try :
272
+ raw .writelines (self .channel .chunk_reader ())
273
+ except ProtocolError :
274
+ self .defunct = True
275
+ self .close ()
276
+ return
262
277
# Unpack from the raw byte stream and call the relevant message handler(s)
263
278
raw .seek (0 )
264
279
response = self .responses [0 ]
@@ -276,7 +291,7 @@ def fetch_next(self):
276
291
response .complete = True
277
292
self .responses .popleft ()
278
293
if signature == FAILURE :
279
- self .append ( ACK_FAILURE , response = AckFailureResponse ( self ) )
294
+ self .append_reset ( )
280
295
raw .close ()
281
296
282
297
def close (self ):
0 commit comments