Skip to content

Add UDF support to write table #79

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,34 @@ try:
except BigQueryTimeoutException:
print "Timeout"

# write to permanent table with UDF in query string
external_udf_uris = ["gs://bigquery-sandbox-udf/url_decode.js"]
query = """SELECT requests, title
FROM
urlDecode(
SELECT
title, sum(requests) AS num_requests
FROM
[fh-bigquery:wikipedia.pagecounts_201504]
WHERE language = 'fr'
GROUP EACH BY title
)
WHERE title LIKE '%ç%'
ORDER BY requests DESC
LIMIT 100
"""
job = client.write_to_table(
query,
'dataset',
'table',
external_udf_uris=external_udf_uris
)

try:
job_resource = client.wait_for_job(job, timeout=60)
print job_resource
except BigQueryTimeoutException:
print "Timeout"

# write to temporary table
job = client.write_to_table('SELECT * FROM dataset.original_table LIMIT 100')
Expand All @@ -176,6 +204,8 @@ try:
print job_resource
except BigQueryTimeoutException:
print "Timeout"


```

# Import data from Google cloud storage
Expand Down
16 changes: 15 additions & 1 deletion bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,7 @@ def write_to_table(
query,
dataset=None,
table=None,
external_udf_uris=[],
allow_large_results=None,
use_query_cache=None,
priority=None,
Expand All @@ -874,6 +875,11 @@ def write_to_table(
query: required BigQuery query string.
dataset: optional string id of the dataset
table: optional string id of the table
external_udf_uris: optional list of external UDF URIs
(if given,
URIs must be Google Cloud Storage
and have .js extensions
)
allow_large_results: optional boolean
use_query_cache: optional boolean
priority: optional string
Expand Down Expand Up @@ -919,6 +925,14 @@ def write_to_table(
if write_disposition:
configuration['writeDisposition'] = write_disposition

configuration['userDefinedFunctionResources'] = []
for external_udf_uri in external_udf_uris:
configuration['userDefinedFunctionResources'].append(
{
"resourceUri": external_udf_uri
}
)

body = {
"configuration": {
'query': configuration
Expand Down Expand Up @@ -1230,7 +1244,7 @@ def _transform_row(self, row, schema):

elif col_dict['type'] == 'BOOLEAN':
row_value = row_value in ('True', 'true', 'TRUE')

elif col_dict['type'] == 'TIMESTAMP':
row_value = float(row_value)

Expand Down
5 changes: 5 additions & 0 deletions bigquery/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,7 @@ def setUp(self):
self.project_id = 'project'
self.dataset_id = 'dataset'
self.table_id = 'table'
self.external_udf_uris = ['gs://bucket/external_udf.js']
self.use_query_cache = False
self.priority = "INTERACTIVE"
self.client = client.BigQueryClient(self.mock_api,
Expand All @@ -1032,6 +1033,9 @@ def test_write(self):
"tableId": self.table_id
},
"query": self.query,
"userDefinedFunctionResources": [{
"resourceUri": self.external_udf_uris[0]
}],
"useQueryCache": self.use_query_cache,
"priority": self.priority,
}
Expand All @@ -1042,6 +1046,7 @@ def test_write(self):
result = self.client.write_to_table(self.query,
self.dataset_id,
self.table_id,
external_udf_uris=self.external_udf_uris,
use_query_cache=False,
priority=self.priority)

Expand Down