-
-
Notifications
You must be signed in to change notification settings - Fork 18.5k
Add SQL Support for ADBC Drivers #53869
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
4f2b760
a4ebbb5
b2cd149
512bd00
f49115c
a8512b5
c1c68ef
3d7fb15
1093bc8
926e567
093dd86
fcc21a8
88642f7
156096d
dd26edb
4d8a233
5238e69
51c6c98
39b462b
428c4f7
84d95bb
a4d5b31
21b35f6
e709d52
6077fa9
5bba566
236e12b
b35374c
8d814e1
cfac2c7
47caaf1
a22e5d1
c51b7f4
7f5e6ac
9ee6255
a8b645f
902df4f
90ca2cb
d753c3c
d469e24
2bc11a1
f205f90
2755100
3577a59
c5bf7f8
98d22ce
4f72010
7223e63
c2cd90a
de65ec0
7645727
3dc914c
6dbaae5
3bf550c
492301f
fc463a4
cc72ecd
f5fd529
1207bc4
e8d93c7
3eed897
adef2f2
67101fd
ea5dcb9
fb38411
a0bed67
150e267
1e77f2b
97ed24f
7dc07da
52ee8d3
1de8488
21edaea
2d077e9
accbd49
64b63bd
f84f63a
391d045
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -629,6 +629,19 @@ def read_sql( | |
int_column date_column | ||
0 0 2012-11-10 | ||
1 1 2010-11-12 | ||
|
||
.. versionadded:: 2.1.0 | ||
|
||
pandas now supports reading via ADBC drivers | ||
|
||
>>> from adbc_driver_postgresql import dbapi | ||
>>> with dbapi.connect('postgres:///db_name') as conn: | ||
... pd.read_sql('SELECT int_column, | ||
... conn, | ||
... parse_dates={"date_column": {"format": "%d/%m/%y"}}) | ||
int_column | ||
0 0 | ||
1 1 | ||
""" | ||
|
||
check_dtype_backend(dtype_backend) | ||
|
@@ -837,6 +850,10 @@ def pandasSQL_builder( | |
if sqlalchemy is not None and isinstance(con, (str, sqlalchemy.engine.Connectable)): | ||
return SQLDatabase(con, schema, need_transaction) | ||
|
||
adbc = import_optional_dependency("adbc_driver_manager") | ||
if adbc and isinstance(con, adbc.dbapi.Connection): | ||
return ADBCDatabase(con) | ||
|
||
warnings.warn( | ||
"pandas only supports SQLAlchemy connectable (engine/connection) or " | ||
"database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 " | ||
|
@@ -2002,6 +2019,188 @@ def _create_sql_schema( | |
|
||
|
||
# ---- SQL without SQLAlchemy --- | ||
|
||
|
||
class ADBCDatabase(PandasSQL): | ||
""" | ||
This class enables conversion between DataFrame and SQL databases | ||
using ADBC to handle DataBase abstraction. | ||
|
||
Parameters | ||
---------- | ||
con : adbc_driver_manager.dbapi.Connection | ||
""" | ||
|
||
def __init__(self, con) -> None: | ||
self.con = con | ||
|
||
def execute(self, sql: str | Select | TextClause, params=None): | ||
with self.con.cursor() as cur: | ||
return cur(sql) | ||
WillAyd marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def read_table( | ||
self, | ||
table_name: str, | ||
index_col: str | list[str] | None = None, | ||
coerce_float: bool = True, | ||
parse_dates=None, | ||
columns=None, | ||
schema: str | None = None, | ||
chunksize: int | None = None, | ||
dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", | ||
) -> DataFrame | Iterator[DataFrame]: | ||
""" | ||
Read SQL database table into a DataFrame. Only keyword arguments used | ||
WillAyd marked this conversation as resolved.
Show resolved
Hide resolved
|
||
are table_name and schema. The rest are silently discarded. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They raise an error now, I think |
||
|
||
Parameters | ||
---------- | ||
table_name : str | ||
Name of SQL table in database. | ||
schema : string, default None | ||
Name of SQL schema in database to read from | ||
|
||
Returns | ||
------- | ||
DataFrame | ||
|
||
See Also | ||
-------- | ||
pandas.read_sql_table | ||
SQLDatabase.read_query | ||
|
||
""" | ||
if schema: | ||
stmt = f"SELECT * FROM {schema}.{table_name}" | ||
else: | ||
stmt = f"SELECT * FROM {table_name}" | ||
|
||
with self.con.cursor() as cur: | ||
return cur(stmt).fetch_arrow_table().to_pandas() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would be nice to at minimum support There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should always just return arrow backed types. Related to the other conversation around kwargs I am unsure of the best way to handle this. If we raise for non-default arguments this wouldn't work; alternately we could except the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You'd have to add Not sure how I'd feel about arrow backed only, this makes sense but we went in a different way for other readers... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah didn't realize that. Thanks for the heads up There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. xref #51846 for long-term |
||
|
||
def read_query( | ||
self, | ||
sql: str, | ||
index_col: str | list[str] | None = None, | ||
coerce_float: bool = True, | ||
parse_dates=None, | ||
params=None, | ||
chunksize: int | None = None, | ||
dtype: DtypeArg | None = None, | ||
dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", | ||
) -> DataFrame | Iterator[DataFrame]: | ||
""" | ||
Read SQL query into a DataFrame. Keyword arguments are discarded. | ||
|
||
Parameters | ||
---------- | ||
sql : str | ||
SQL query to be executed. | ||
|
||
Returns | ||
------- | ||
DataFrame | ||
|
||
See Also | ||
-------- | ||
read_sql_table : Read SQL database table into a DataFrame. | ||
read_sql | ||
|
||
""" | ||
with self.con.cursor() as cur: | ||
return cur(sql).fetch_arrow_table().to_pandas() | ||
|
||
read_sql = read_query | ||
|
||
def to_sql( | ||
self, | ||
frame, | ||
name: str, | ||
if_exists: Literal["fail", "replace", "append"] = "fail", | ||
index: bool = True, | ||
index_label=None, | ||
schema: str | None = None, | ||
chunksize: int | None = None, | ||
dtype: DtypeArg | None = None, | ||
method: Literal["multi"] | Callable | None = None, | ||
engine: str = "auto", | ||
**engine_kwargs, | ||
) -> int | None: | ||
""" | ||
Write records stored in a DataFrame to a SQL database. | ||
Only frame, name, if_exists and schema are valid arguments. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
Parameters | ||
---------- | ||
frame : DataFrame | ||
name : string | ||
Name of SQL table. | ||
if_exists : {'fail', 'replace', 'append'}, default 'fail' | ||
- fail: If table exists, do nothing. | ||
- replace: If table exists, drop it, recreate it, and insert data. | ||
- append: If table exists, insert data. Create if does not exist. | ||
schema : string, default None | ||
Name of SQL schema in database to write to (if database flavor | ||
supports this). If specified, this overwrites the default | ||
schema of the SQLDatabase object. | ||
""" | ||
if schema: | ||
table_name = f"{schema}.{name}" | ||
else: | ||
table_name = name | ||
|
||
# TODO: pandas if_exists="append" will still create the | ||
# table if it does not exist; ADBC has append/create | ||
# as applicable modes, so the semantics get blurred across | ||
# the libraries | ||
mode = "create" | ||
if self.has_table(name, schema): | ||
if if_exists == "fail": | ||
raise ValueError(f"Table '{table_name}' already exists.") | ||
elif if_exists == "replace": | ||
with self.con.cursor() as cur: | ||
cur.execute(f"DROP TABLE {table_name}") | ||
elif if_exists == "append": | ||
mode = "append" | ||
|
||
import pyarrow as pa | ||
|
||
tbl = pa.Table.from_pandas(frame) | ||
WillAyd marked this conversation as resolved.
Show resolved
Hide resolved
|
||
with self.con.cursor() as cur: | ||
total_inserted = cur.adbc_ingest(table_name, tbl, mode=mode) | ||
|
||
self.con.commit() | ||
return total_inserted | ||
|
||
def has_table(self, name: str, schema: str | None = None) -> bool: | ||
meta = self.con.adbc_get_objects( | ||
db_schema_filter=schema, table_name_filter=name | ||
).read_all() | ||
|
||
for catalog_schema in meta["catalog_db_schemas"].to_pylist(): | ||
if not catalog_schema: | ||
continue | ||
for schema_record in catalog_schema: | ||
if not schema_record: | ||
continue | ||
|
||
for table_record in schema_record["db_schema_tables"]: | ||
if table_record["table_name"] == name: | ||
return True | ||
|
||
return False | ||
|
||
def _create_sql_schema( | ||
self, | ||
frame: DataFrame, | ||
table_name: str, | ||
keys: list[str] | None = None, | ||
dtype: DtypeArg | None = None, | ||
schema: str | None = None, | ||
): | ||
raise NotImplementedError("not implemented for adbc") | ||
|
||
|
||
# sqlite-specific sql strings and handler class | ||
# dictionary used for readability purposes | ||
_SQL_TYPES = { | ||
|
Uh oh!
There was an error while loading. Please reload this page.