Skip to content

Master timeout param #28

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.idea
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
EXTENSION = aws_s3 # the extensions name
DATA = aws_s3--0.0.1.sql # script files to install
DATA = aws_s3--1.0.0.sql # script files to install

# postgres build stuff
PG_CONFIG = pg_config
Expand Down
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
## chimpler/postgres-aws-s3 is inactive

# postgres-aws-s3

Starting on Postgres version 11.1, AWS RDS added [support](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_PostgreSQL.S3Import.html#USER_PostgreSQL.S3Import.FileFormats) for S3 import using the extension `aws_s3`. It allows to import data from S3 within Postgres using the function `aws_s3.table_import_from_s3` and export the data to S3 using the function `aws_s3.query_export_to_s3`.
Expand Down Expand Up @@ -89,7 +91,7 @@ table_name | the name of the table
column_list | list of columns to copy
options | options passed to the COPY command in Postgres
s3_info | An aws_commons._s3_uri_1 composite type containing the bucket, file path and region information about the s3 object
credentials | An aws_commons._aws_credentials_1 composite type containing the access key, secret key, session token credentials
credentials | An aws_commons._aws_credentials_1 composite type containing the access key, secret key, session token credentials. To omit a value, set it `null`.
endpoint_url | optional endpoint to use (e.g., `http://localhost:4566`)

##### Example
Expand Down Expand Up @@ -282,7 +284,7 @@ Parameter | Description
----------|------------
query | query that returns the data to export
s3_info | An aws_commons._s3_uri_1 composite type containing the bucket, file path and region information about the s3 object
credentials | An aws_commons._aws_credentials_1 composite type containing the access key, secret key, session token credentials
credentials | An aws_commons._aws_credentials_1 composite type containing the access key, secret key, session token credentials. To omit a value, set it `null`.
options | options passed to the COPY command in Postgres
endpoint_url | optional endpoint to use (e.g., `http://localhost:4566`)

Expand Down Expand Up @@ -314,7 +316,7 @@ You can omit the credentials.

##### Example

#### Using the function table_import_from_s3 with all the parameters
#### Using the function query_export_to_s3 with all the parameters
```
aws_s3.query_export_to_s3(
query text,
Expand Down
111 changes: 79 additions & 32 deletions aws_s3--0.0.1.sql → aws_s3--1.0.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ CREATE OR REPLACE FUNCTION aws_s3.table_import_from_s3 (
access_key text default null,
secret_key text default null,
session_token text default null,
endpoint_url text default null
endpoint_url text default null,
read_timeout integer default 60
) RETURNS int
LANGUAGE plpython3u
AS $$
Expand Down Expand Up @@ -85,33 +86,51 @@ AS $$
s3 = boto3.resource(
's3',
region_name=region,
config=boto3.session.Config(read_timeout=read_timeout),
**aws_settings
)

obj = s3.Object(bucket, file_path)
response = obj.get()
content_encoding = response.get('ContentEncoding')
body = response['Body']
user_content_encoding = response.get('x-amz-meta-content-encoding')
formatted_column_list = "({column_list})".format(column_list=column_list) if column_list else ''
num_rows = 0

with tempfile.NamedTemporaryFile() as fd:
if (content_encoding and content_encoding.lower() == 'gzip') or (user_content_encoding and user_content_encoding.lower() == 'gzip'):
with gzip.GzipFile(fileobj=body) as gzipfile:
while fd.write(gzipfile.read(204800)):
pass
else:
while fd.write(body.read(204800)):
pass
fd.flush()
formatted_column_list = "({column_list})".format(column_list=column_list) if column_list else ''
res = plpy.execute("COPY {table_name} {formatted_column_list} FROM {filename} {options};".format(
table_name=table_name,
filename=plpy.quote_literal(fd.name),
formatted_column_list=formatted_column_list,
options=options
)
)
return res.nrows()
for file_path_item in file_path.split(","):
file_path_item = file_path_item.strip()
if not file_path_item:
continue

s3_objects = []
if file_path_item.endswith("/"): # Directory
bucket_objects = s3.Bucket(bucket).objects.filter(Prefix=file_path_item)
s3_objects = [bucket_object for bucket_object in bucket_objects]
else: # File
s3_object = s3.Object(bucket, file_path_item)
s3_objects = [s3_object]

for s3_object in s3_objects:
response = s3_object.get()
content_encoding = response.get('ContentEncoding')
body = response['Body']
user_content_encoding = response.get('x-amz-meta-content-encoding')

with tempfile.NamedTemporaryFile() as fd:
if (content_encoding and content_encoding.lower() == 'gzip') or (user_content_encoding and user_content_encoding.lower() == 'gzip'):
with gzip.GzipFile(fileobj=body) as gzipfile:
while fd.write(gzipfile.read(204800)):
pass
else:
while fd.write(body.read(204800)):
pass
fd.flush()

res = plpy.execute("COPY {table_name} {formatted_column_list} FROM {filename} {options};".format(
table_name=table_name,
filename=plpy.quote_literal(fd.name),
formatted_column_list=formatted_column_list,
options=options
)
)
num_rows += res.nrows()
return num_rows
$$;

--
Expand All @@ -124,14 +143,15 @@ CREATE OR REPLACE FUNCTION aws_s3.table_import_from_s3(
options text,
s3_info aws_commons._s3_uri_1,
credentials aws_commons._aws_credentials_1,
endpoint_url text default null
endpoint_url text default null,
read_timeout integer default 60
) RETURNS INT
LANGUAGE plpython3u
AS $$

plan = plpy.prepare(
'SELECT aws_s3.table_import_from_s3($1, $2, $3, $4, $5, $6, $7, $8, $9) AS num_rows',
['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT']
'SELECT aws_s3.table_import_from_s3($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) AS num_rows',
['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'INTEGER']
)
return plan.execute(
[
Expand All @@ -144,7 +164,8 @@ AS $$
credentials['access_key'],
credentials['secret_key'],
credentials['session_token'],
endpoint_url
endpoint_url,
read_timeout
]
)[0]['num_rows']
$$;
Expand All @@ -159,6 +180,7 @@ CREATE OR REPLACE FUNCTION aws_s3.query_export_to_s3(
session_token text default null,
options text default null,
endpoint_url text default null,
read_timeout integer default 60,
OUT rows_uploaded bigint,
OUT files_uploaded bigint,
OUT bytes_uploaded bigint
Expand All @@ -177,8 +199,19 @@ AS $$
module_cache[module_name] = _module
return _module

def file_exists(bucket, file_path, s3_client):
try:
s3_client.head_object(Bucket=bucket, Key=file_path)
return True
except:
return False

def get_unique_file_path(base_name, counter, extension):
return f"{base_name}_part{counter}{extension}"

boto3 = cache_import('boto3')
tempfile = cache_import('tempfile')
re = cache_import("re")

plan = plpy.prepare("select name, current_setting('aws_s3.' || name, true) as value from (select unnest(array['access_key_id', 'secret_access_key', 'session_token', 'endpoint_url']) as name) a");
default_aws_settings = {
Expand All @@ -196,9 +229,19 @@ AS $$
s3 = boto3.client(
's3',
region_name=region,
config=boto3.session.Config(read_timeout=read_timeout),
**aws_settings
)

# generate unique file path
file_path_parts = re.match(r'^(.*?)(\.[^.]*$|$)', file_path)
base_name = file_path_parts.group(1)
extension = file_path_parts.group(2)
counter = 0
while file_exists(bucket, get_unique_file_path(base_name, counter, extension), s3):
counter += 1
unique_file_path = get_unique_file_path(base_name, counter, extension)

with tempfile.NamedTemporaryFile() as fd:
plan = plpy.prepare(
"COPY ({query}) TO '{filename}' {options}".format(
Expand All @@ -218,7 +261,9 @@ AS $$
num_lines += buffer.count(b'\n')
size += len(buffer)
fd.seek(0)
s3.upload_fileobj(fd, bucket, file_path)
s3.upload_fileobj(fd, bucket, unique_file_path)
if 'HEADER TRUE' in options.upper():
num_lines -= 1
yield (num_lines, 1, size)
$$;

Expand All @@ -228,15 +273,16 @@ CREATE OR REPLACE FUNCTION aws_s3.query_export_to_s3(
credentials aws_commons._aws_credentials_1 default null,
options text default null,
endpoint_url text default null,
read_timeout integer default 60,
OUT rows_uploaded bigint,
OUT files_uploaded bigint,
OUT bytes_uploaded bigint
) RETURNS SETOF RECORD
LANGUAGE plpython3u
AS $$
plan = plpy.prepare(
'SELECT * FROM aws_s3.query_export_to_s3($1, $2, $3, $4, $5, $6, $7, $8, $9)',
['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT']
'SELECT * FROM aws_s3.query_export_to_s3($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)',
['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'INTEGER']
)
return plan.execute(
[
Expand All @@ -248,7 +294,8 @@ AS $$
credentials.get('secret_key') if credentials else None,
credentials.get('session_token') if credentials else None,
options,
endpoint_url
endpoint_url,
read_timeout
]
)
$$;
2 changes: 1 addition & 1 deletion aws_s3.control
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# aws_s3 extension
comment = 'aws s3 wrapper for non rds postgres'
default_version = '0.0.1'
default_version = '1.0.0'
module_pathname = '$libdir/aws_s3'
relocatable = true