Skip to content

Commit 04b6748

Browse files
committed
Revert "Stop treating ReadyForQuery as a universal result indicator"
This reverts commit 7a81613. This has caused race regressions and will need to be rethought.
1 parent 8c5c1bf commit 04b6748

File tree

3 files changed

+151
-75
lines changed

3 files changed

+151
-75
lines changed

asyncpg/protocol/coreproto.pxd

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ cdef enum ProtocolState:
1919
PROTOCOL_CANCELLED = 3
2020

2121
PROTOCOL_AUTH = 10
22-
PROTOCOL_PARSE_DESCRIBE = 11
22+
PROTOCOL_PREPARE = 11
2323
PROTOCOL_BIND_EXECUTE = 12
2424
PROTOCOL_BIND_EXECUTE_MANY = 13
2525
PROTOCOL_CLOSE_STMT_PORTAL = 14
@@ -105,7 +105,7 @@ cdef class CoreProtocol:
105105
bint result_execute_completed
106106

107107
cdef _process__auth(self, char mtype)
108-
cdef _process__parse_describe(self, char mtype)
108+
cdef _process__prepare(self, char mtype)
109109
cdef _process__bind_execute(self, char mtype)
110110
cdef _process__bind_execute_many(self, char mtype)
111111
cdef _process__close_stmt_portal(self, char mtype)

asyncpg/protocol/coreproto.pyx

Lines changed: 145 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -55,47 +55,11 @@ cdef class CoreProtocol:
5555
# 'N' - NoticeResponse
5656
self._on_notice(self._parse_msg_error_response(False))
5757

58-
elif mtype == b'E':
59-
# ErrorResponse
60-
self._parse_msg_error_response(True)
61-
# In all cases, except Auth, ErrorResponse will
62-
# be followed by a ReadyForQuery, which is when
63-
# _push_result() will be called.
64-
if state == PROTOCOL_AUTH:
65-
self._push_result()
66-
67-
elif mtype == b'Z':
68-
# ReadyForQuery
69-
self._parse_msg_ready_for_query()
70-
71-
if state != PROTOCOL_BIND_EXECUTE_MANY:
72-
self._push_result()
73-
74-
else:
75-
if self.result_type == RESULT_FAILED:
76-
self._push_result()
77-
else:
78-
try:
79-
buf = <WriteBuffer>next(self._execute_iter)
80-
except StopIteration:
81-
self._push_result()
82-
except Exception as e:
83-
self.result_type = RESULT_FAILED
84-
self.result = e
85-
self._push_result()
86-
else:
87-
# Next iteration over the executemany()
88-
# arg sequence.
89-
self._send_bind_message(
90-
self._execute_portal_name,
91-
self._execute_stmt_name,
92-
buf, 0)
93-
9458
elif state == PROTOCOL_AUTH:
9559
self._process__auth(mtype)
9660

97-
elif state == PROTOCOL_PARSE_DESCRIBE:
98-
self._process__parse_describe(mtype)
61+
elif state == PROTOCOL_PREPARE:
62+
self._process__prepare(mtype)
9963

10064
elif state == PROTOCOL_BIND_EXECUTE:
10165
self._process__bind_execute(mtype)
@@ -130,26 +94,42 @@ cdef class CoreProtocol:
13094

13195
elif state == PROTOCOL_CANCELLED:
13296
# discard all messages until the sync message
133-
self.buffer.discard_message()
97+
if mtype == b'E':
98+
self._parse_msg_error_response(True)
99+
elif mtype == b'Z':
100+
self._parse_msg_ready_for_query()
101+
self._push_result()
102+
else:
103+
self.buffer.discard_message()
134104

135105
elif state == PROTOCOL_ERROR_CONSUME:
136106
# Error in protocol (on asyncpg side);
137107
# discard all messages until sync message
138-
self.buffer.discard_message()
108+
109+
if mtype == b'Z':
110+
# Sync point, self to push the result
111+
if self.result_type != RESULT_FAILED:
112+
self.result_type = RESULT_FAILED
113+
self.result = apg_exc.InternalClientError(
114+
'unknown error in protocol implementation')
115+
116+
self._push_result()
117+
118+
else:
119+
self.buffer.discard_message()
139120

140121
else:
141122
raise apg_exc.InternalClientError(
142123
'protocol is in an unknown state {}'.format(state))
143124

144125
except Exception as ex:
145-
self.state = PROTOCOL_ERROR_CONSUME
146126
self.result_type = RESULT_FAILED
147127
self.result = ex
148128

149129
if mtype == b'Z':
150-
# This should only happen if _parse_msg_ready_for_query()
151-
# has failed.
152130
self._push_result()
131+
else:
132+
self.state = PROTOCOL_ERROR_CONSUME
153133

154134
finally:
155135
self.buffer.finish_message()
@@ -171,27 +151,43 @@ cdef class CoreProtocol:
171151
# BackendKeyData
172152
self._parse_msg_backend_key_data()
173153

174-
# push_result() will be initiated by handling
175-
# ReadyForQuery or ErrorResponse in the main loop.
154+
elif mtype == b'E':
155+
# ErrorResponse
156+
self.con_status = CONNECTION_BAD
157+
self._parse_msg_error_response(True)
158+
self._push_result()
176159

177-
cdef _process__parse_describe(self, char mtype):
178-
if mtype == b'1':
179-
# ParseComplete
180-
self.buffer.discard_message()
160+
elif mtype == b'Z':
161+
# ReadyForQuery
162+
self._parse_msg_ready_for_query()
163+
self.con_status = CONNECTION_OK
164+
self._push_result()
181165

182-
elif mtype == b't':
183-
# ParameterDescription
166+
cdef _process__prepare(self, char mtype):
167+
if mtype == b't':
168+
# Parameters description
184169
self.result_param_desc = self.buffer.consume_message()
185170

171+
elif mtype == b'1':
172+
# ParseComplete
173+
self.buffer.discard_message()
174+
186175
elif mtype == b'T':
187-
# RowDescription
176+
# Row description
188177
self.result_row_desc = self.buffer.consume_message()
178+
179+
elif mtype == b'E':
180+
# ErrorResponse
181+
self._parse_msg_error_response(True)
182+
183+
elif mtype == b'Z':
184+
# ReadyForQuery
185+
self._parse_msg_ready_for_query()
189186
self._push_result()
190187

191188
elif mtype == b'n':
192189
# NoData
193190
self.buffer.discard_message()
194-
self._push_result()
195191

196192
cdef _process__bind_execute(self, char mtype):
197193
if mtype == b'D':
@@ -201,22 +197,28 @@ cdef class CoreProtocol:
201197
elif mtype == b's':
202198
# PortalSuspended
203199
self.buffer.discard_message()
204-
self._push_result()
205200

206201
elif mtype == b'C':
207202
# CommandComplete
208203
self.result_execute_completed = True
209204
self._parse_msg_command_complete()
210-
self._push_result()
205+
206+
elif mtype == b'E':
207+
# ErrorResponse
208+
self._parse_msg_error_response(True)
211209

212210
elif mtype == b'2':
213211
# BindComplete
214212
self.buffer.discard_message()
215213

214+
elif mtype == b'Z':
215+
# ReadyForQuery
216+
self._parse_msg_ready_for_query()
217+
self._push_result()
218+
216219
elif mtype == b'I':
217220
# EmptyQueryResponse
218221
self.buffer.discard_message()
219-
self._push_result()
220222

221223
cdef _process__bind_execute_many(self, char mtype):
222224
cdef WriteBuffer buf
@@ -233,24 +235,64 @@ cdef class CoreProtocol:
233235
# CommandComplete
234236
self._parse_msg_command_complete()
235237

238+
elif mtype == b'E':
239+
# ErrorResponse
240+
self._parse_msg_error_response(True)
241+
236242
elif mtype == b'2':
237243
# BindComplete
238244
self.buffer.discard_message()
239245

246+
elif mtype == b'Z':
247+
# ReadyForQuery
248+
self._parse_msg_ready_for_query()
249+
if self.result_type == RESULT_FAILED:
250+
self._push_result()
251+
else:
252+
try:
253+
buf = <WriteBuffer>next(self._execute_iter)
254+
except StopIteration:
255+
self._push_result()
256+
except Exception as e:
257+
self.result_type = RESULT_FAILED
258+
self.result = e
259+
self._push_result()
260+
else:
261+
# Next iteration over the executemany() arg sequence
262+
self._send_bind_message(
263+
self._execute_portal_name, self._execute_stmt_name,
264+
buf, 0)
265+
240266
elif mtype == b'I':
241267
# EmptyQueryResponse
242268
self.buffer.discard_message()
243269

244270
cdef _process__bind(self, char mtype):
245-
if mtype == b'2':
271+
if mtype == b'E':
272+
# ErrorResponse
273+
self._parse_msg_error_response(True)
274+
275+
elif mtype == b'2':
246276
# BindComplete
247277
self.buffer.discard_message()
278+
279+
elif mtype == b'Z':
280+
# ReadyForQuery
281+
self._parse_msg_ready_for_query()
248282
self._push_result()
249283

250284
cdef _process__close_stmt_portal(self, char mtype):
251-
if mtype == b'3':
285+
if mtype == b'E':
286+
# ErrorResponse
287+
self._parse_msg_error_response(True)
288+
289+
elif mtype == b'3':
252290
# CloseComplete
253291
self.buffer.discard_message()
292+
293+
elif mtype == b'Z':
294+
# ReadyForQuery
295+
self._parse_msg_ready_for_query()
254296
self._push_result()
255297

256298
cdef _process__simple_query(self, char mtype):
@@ -260,21 +302,42 @@ cdef class CoreProtocol:
260302
# 'T' - RowDescription
261303
self.buffer.discard_message()
262304

305+
elif mtype == b'E':
306+
# ErrorResponse
307+
self._parse_msg_error_response(True)
308+
309+
elif mtype == b'Z':
310+
# ReadyForQuery
311+
self._parse_msg_ready_for_query()
312+
self._push_result()
313+
263314
elif mtype == b'C':
264315
# CommandComplete
265316
self._parse_msg_command_complete()
317+
266318
else:
267319
# We don't really care about COPY IN etc
268320
self.buffer.discard_message()
269321

270322
cdef _process__copy_out(self, char mtype):
271-
if mtype == b'H':
323+
if mtype == b'E':
324+
self._parse_msg_error_response(True)
325+
326+
elif mtype == b'H':
272327
# CopyOutResponse
273328
self._set_state(PROTOCOL_COPY_OUT_DATA)
274329
self.buffer.discard_message()
275330

331+
elif mtype == b'Z':
332+
# ReadyForQuery
333+
self._parse_msg_ready_for_query()
334+
self._push_result()
335+
276336
cdef _process__copy_out_data(self, char mtype):
277-
if mtype == b'd':
337+
if mtype == b'E':
338+
self._parse_msg_error_response(True)
339+
340+
elif mtype == b'd':
278341
# CopyData
279342
self._parse_copy_data_msgs()
280343

@@ -286,18 +349,37 @@ cdef class CoreProtocol:
286349
elif mtype == b'C':
287350
# CommandComplete
288351
self._parse_msg_command_complete()
352+
353+
elif mtype == b'Z':
354+
# ReadyForQuery
355+
self._parse_msg_ready_for_query()
289356
self._push_result()
290357

291358
cdef _process__copy_in(self, char mtype):
292-
if mtype == b'G':
359+
if mtype == b'E':
360+
self._parse_msg_error_response(True)
361+
362+
elif mtype == b'G':
293363
# CopyInResponse
294364
self._set_state(PROTOCOL_COPY_IN_DATA)
295365
self.buffer.discard_message()
296366

367+
elif mtype == b'Z':
368+
# ReadyForQuery
369+
self._parse_msg_ready_for_query()
370+
self._push_result()
371+
297372
cdef _process__copy_in_data(self, char mtype):
298-
if mtype == b'C':
373+
if mtype == b'E':
374+
self._parse_msg_error_response(True)
375+
376+
elif mtype == b'C':
299377
# CommandComplete
300378
self._parse_msg_command_complete()
379+
380+
elif mtype == b'Z':
381+
# ReadyForQuery
382+
self._parse_msg_ready_for_query()
301383
self._push_result()
302384

303385
cdef _parse_msg_command_complete(self):
@@ -659,7 +741,7 @@ cdef class CoreProtocol:
659741
WriteBuffer buf
660742

661743
self._ensure_connected()
662-
self._set_state(PROTOCOL_PARSE_DESCRIBE)
744+
self._set_state(PROTOCOL_PREPARE)
663745

664746
buf = WriteBuffer.new_message(b'P')
665747
buf.write_str(stmt_name, self.encoding)

0 commit comments

Comments
 (0)