|
42 | 42 |
|
43 | 43 |
|
44 | 44 | # Signature bytes for each message type
|
45 |
| -INIT = b"\x01" # 0000 0001 // INIT <user_agent> |
| 45 | +INIT = b"\x01" # 0000 0001 // INIT <user_agent> <auth> |
| 46 | +ACK_FAILURE = b"\x0E" # 0000 1110 // ACK_FAILURE |
46 | 47 | RESET = b"\x0F" # 0000 1111 // RESET
|
47 | 48 | RUN = b"\x10" # 0001 0000 // RUN <statement> <parameters>
|
48 | 49 | DISCARD_ALL = b"\x2F" # 0010 1111 // DISCARD *
|
|
57 | 58 |
|
58 | 59 | message_names = {
|
59 | 60 | INIT: "INIT",
|
| 61 | + ACK_FAILURE: "ACK_FAILURE", |
60 | 62 | RESET: "RESET",
|
61 | 63 | RUN: "RUN",
|
62 | 64 | DISCARD_ALL: "DISCARD_ALL",
|
@@ -224,7 +226,7 @@ def __init__(self, sock, **config):
|
224 | 226 | self.der_encoded_server_certificate = config.get("der_encoded_server_certificate")
|
225 | 227 |
|
226 | 228 | def on_failure(metadata):
|
227 |
| - raise ProtocolError(metadata.get("message", "Inititalisation failed")) |
| 229 | + raise ProtocolError(metadata.get("message", "INIT failed")) |
228 | 230 |
|
229 | 231 | response = Response(self)
|
230 | 232 | response.on_failure = on_failure
|
@@ -253,6 +255,23 @@ def append(self, signature, fields=(), response=None):
|
253 | 255 | self.channel.flush(end_of_message=True)
|
254 | 256 | self.responses.append(response)
|
255 | 257 |
|
| 258 | + def acknowledge_failure(self): |
| 259 | + """ Add an ACK_FAILURE message to the outgoing queue, send |
| 260 | + it and consume all remaining messages. |
| 261 | + """ |
| 262 | + response = Response(self) |
| 263 | + |
| 264 | + def on_failure(metadata): |
| 265 | + raise ProtocolError("ACK_FAILURE failed") |
| 266 | + |
| 267 | + response.on_failure = on_failure |
| 268 | + |
| 269 | + self.append(ACK_FAILURE, response=response) |
| 270 | + self.send() |
| 271 | + fetch = self.fetch |
| 272 | + while not response.complete: |
| 273 | + fetch() |
| 274 | + |
256 | 275 | def reset(self):
|
257 | 276 | """ Add a RESET message to the outgoing queue, send
|
258 | 277 | it and consume all remaining messages.
|
@@ -304,7 +323,7 @@ def fetch(self):
|
304 | 323 | response.complete = True
|
305 | 324 | self.responses.popleft()
|
306 | 325 | if signature == FAILURE:
|
307 |
| - self.reset() |
| 326 | + self.acknowledge_failure() |
308 | 327 | handler_name = "on_%s" % message_names[signature].lower()
|
309 | 328 | try:
|
310 | 329 | handler = getattr(response, handler_name)
|
|
0 commit comments