@@ -9,7 +9,7 @@ use fallible_iterator::FallibleIterator;
9
9
use futures_util:: { ready, Stream } ;
10
10
use log:: { debug, log_enabled, Level } ;
11
11
use pin_project_lite:: pin_project;
12
- use postgres_protocol:: message:: backend:: { CommandCompleteBody , Message , RowDescriptionBody } ;
12
+ use postgres_protocol:: message:: backend:: { CommandCompleteBody , Message } ;
13
13
use postgres_protocol:: message:: frontend;
14
14
use postgres_types:: Type ;
15
15
use std:: fmt;
61
61
} )
62
62
}
63
63
64
- enum QueryProcessingState {
65
- Empty ,
66
- ParseCompleted ,
67
- BindCompleted ,
68
- ParameterDescribed ,
69
- Final ( Vec < Column > ) ,
70
- }
71
-
72
- /// State machine for processing messages for `query_with_param_types`.
73
- impl QueryProcessingState {
74
- pub async fn process_message (
75
- self ,
76
- client : & Arc < InnerClient > ,
77
- message : Message ,
78
- ) -> Result < Self , Error > {
79
- match ( self , message) {
80
- ( QueryProcessingState :: Empty , Message :: ParseComplete ) => {
81
- Ok ( QueryProcessingState :: ParseCompleted )
82
- }
83
- ( QueryProcessingState :: ParseCompleted , Message :: BindComplete ) => {
84
- Ok ( QueryProcessingState :: BindCompleted )
85
- }
86
- ( QueryProcessingState :: BindCompleted , Message :: ParameterDescription ( _) ) => {
87
- Ok ( QueryProcessingState :: ParameterDescribed )
88
- }
89
- (
90
- QueryProcessingState :: ParameterDescribed ,
91
- Message :: RowDescription ( row_description) ,
92
- ) => Self :: form_final ( client, Some ( row_description) ) . await ,
93
- ( QueryProcessingState :: ParameterDescribed , Message :: NoData ) => {
94
- Self :: form_final ( client, None ) . await
95
- }
96
- ( _, Message :: ErrorResponse ( body) ) => Err ( Error :: db ( body) ) ,
97
- _ => Err ( Error :: unexpected_message ( ) ) ,
98
- }
99
- }
100
-
101
- async fn form_final (
102
- client : & Arc < InnerClient > ,
103
- row_description : Option < RowDescriptionBody > ,
104
- ) -> Result < Self , Error > {
105
- let mut columns = vec ! [ ] ;
106
- if let Some ( row_description) = row_description {
107
- let mut it = row_description. fields ( ) ;
108
- while let Some ( field) = it. next ( ) . map_err ( Error :: parse) ? {
109
- let type_ = get_type ( client, field. type_oid ( ) ) . await ?;
110
- let column = Column {
111
- name : field. name ( ) . to_string ( ) ,
112
- table_oid : Some ( field. table_oid ( ) ) . filter ( |n| * n != 0 ) ,
113
- column_id : Some ( field. column_id ( ) ) . filter ( |n| * n != 0 ) ,
114
- r#type : type_,
115
- } ;
116
- columns. push ( column) ;
117
- }
118
- }
119
-
120
- Ok ( Self :: Final ( columns) )
121
- }
122
- }
123
-
124
64
pub async fn query_with_param_types < ' a , P , I > (
125
65
client : & Arc < InnerClient > ,
126
66
query : & str ,
@@ -155,20 +95,33 @@ where
155
95
156
96
let mut responses = client. send ( RequestMessages :: Single ( FrontendMessage :: Raw ( buf) ) ) ?;
157
97
158
- let mut state = QueryProcessingState :: Empty ;
159
-
160
98
loop {
161
- let message = responses. next ( ) . await ?;
162
-
163
- state = state. process_message ( client, message) . await ?;
164
-
165
- if let QueryProcessingState :: Final ( columns) = state {
166
- return Ok ( RowStream {
167
- statement : Statement :: unnamed ( vec ! [ ] , columns) ,
168
- responses,
169
- rows_affected : None ,
170
- _p : PhantomPinned ,
171
- } ) ;
99
+ match responses. next ( ) . await ? {
100
+ Message :: ParseComplete
101
+ | Message :: BindComplete
102
+ | Message :: ParameterDescription ( _)
103
+ | Message :: NoData => { }
104
+ Message :: RowDescription ( row_description) => {
105
+ let mut columns: Vec < Column > = vec ! [ ] ;
106
+ let mut it = row_description. fields ( ) ;
107
+ while let Some ( field) = it. next ( ) . map_err ( Error :: parse) ? {
108
+ let type_ = get_type ( client, field. type_oid ( ) ) . await ?;
109
+ let column = Column {
110
+ name : field. name ( ) . to_string ( ) ,
111
+ table_oid : Some ( field. table_oid ( ) ) . filter ( |n| * n != 0 ) ,
112
+ column_id : Some ( field. column_id ( ) ) . filter ( |n| * n != 0 ) ,
113
+ r#type : type_,
114
+ } ;
115
+ columns. push ( column) ;
116
+ }
117
+ return Ok ( RowStream {
118
+ statement : Statement :: unnamed ( vec ! [ ] , columns) ,
119
+ responses,
120
+ rows_affected : None ,
121
+ _p : PhantomPinned ,
122
+ } ) ;
123
+ }
124
+ _ => return Err ( Error :: unexpected_message ( ) ) ,
172
125
}
173
126
}
174
127
}
0 commit comments