@@ -120,17 +120,15 @@ def __init__(self, bq_service, project_id, swallow_results=True):
120
120
self .swallow_results = swallow_results
121
121
self .cache = {}
122
122
123
- def query (self , query , max_results = None , timeout = 0 , dry_run = False ):
124
- """Submit a query to BigQuery.
123
+ def _submit_query_job (self , query_data ):
124
+
125
+ """ Submit a query job to BigQuery
126
+
127
+ This is similar to BigQueryClient.query, but gives the user
125
128
126
129
Args:
127
- query: BigQuery query string.
128
- max_results: maximum number of rows to return per page of results.
129
- timeout: how long to wait for the query to complete, in seconds,
130
- before the request times out and returns.
131
- dry_run: if True, the query isn't actually run. A valid query will
132
- return an empty response, while an invalid one will return
133
- the same error message it would if it wasn't a dry run.
130
+ query_data: query object as per "configuration.query" in
131
+ https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query
134
132
135
133
Returns:
136
134
job id and query results if query completed. If dry_run is True,
@@ -141,21 +139,15 @@ def query(self, query, max_results=None, timeout=0, dry_run=False):
141
139
BigQueryTimeoutException on timeout
142
140
"""
143
141
144
- logging .debug ('Executing query: %s' % query )
142
+ logging .debug ('Submitting query job : %s' % query_data )
145
143
146
144
job_collection = self .bigquery .jobs ()
147
- query_data = {
148
- 'query' : query ,
149
- 'timeoutMs' : timeout * 1000 ,
150
- 'dryRun' : dry_run ,
151
- 'maxResults' : max_results ,
152
- }
153
145
154
146
try :
155
147
query_reply = job_collection .query (
156
148
projectId = self .project_id , body = query_data ).execute ()
157
149
except HttpError as e :
158
- if dry_run :
150
+ if query_data . get ( "dryRun" , False ) :
159
151
return None , json .loads (e .content )
160
152
raise
161
153
@@ -166,12 +158,43 @@ def query(self, query, max_results=None, timeout=0, dry_run=False):
166
158
167
159
# raise exceptions if it's not an async query
168
160
# and job is not completed after timeout
169
- if not job_complete and timeout :
161
+ if not job_complete and query_data . get ( "timeoutMs" , False ) :
170
162
logging .error ('BigQuery job %s timeout' % job_id )
171
163
raise BigQueryTimeoutException ()
172
164
173
165
return job_id , [self ._transform_row (row , schema ) for row in rows ]
174
166
167
+ def query (self , query , max_results = None , timeout = 0 , dry_run = False ):
168
+ """Submit a query to BigQuery.
169
+
170
+ Args:
171
+ query: BigQuery query string.
172
+ max_results: maximum number of rows to return per page of results.
173
+ timeout: how long to wait for the query to complete, in seconds,
174
+ before the request times out and returns.
175
+ dry_run: if True, the query isn't actually run. A valid query will
176
+ return an empty response, while an invalid one will return
177
+ the same error message it would if it wasn't a dry run.
178
+
179
+ Returns:
180
+ job id and query results if query completed. If dry_run is True,
181
+ job id will be None and results will be empty if the query is valid
182
+ or a dict containing the response if invalid.
183
+
184
+ Raises:
185
+ BigQueryTimeoutException on timeout
186
+ """
187
+
188
+ logging .debug ('Executing query: %s' % query )
189
+
190
+ query_data = {
191
+ 'query' : query ,
192
+ 'timeoutMs' : timeout * 1000 ,
193
+ 'dryRun' : dry_run ,
194
+ 'maxResults' : max_results ,
195
+ }
196
+ return self ._submit_query_job (query_data )
197
+
175
198
def get_query_schema (self , job_id ):
176
199
"""Retrieve the schema of a query by job id.
177
200
0 commit comments