-
-
Notifications
You must be signed in to change notification settings - Fork 18.5k
Add SQL Support for ADBC Drivers #53869
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 29 commits
4f2b760
a4ebbb5
b2cd149
512bd00
f49115c
a8512b5
c1c68ef
3d7fb15
1093bc8
926e567
093dd86
fcc21a8
88642f7
156096d
dd26edb
4d8a233
5238e69
51c6c98
39b462b
428c4f7
84d95bb
a4d5b31
21b35f6
e709d52
6077fa9
5bba566
236e12b
b35374c
8d814e1
cfac2c7
47caaf1
a22e5d1
c51b7f4
7f5e6ac
9ee6255
a8b645f
902df4f
90ca2cb
d753c3c
d469e24
2bc11a1
f205f90
2755100
3577a59
c5bf7f8
98d22ce
4f72010
7223e63
c2cd90a
de65ec0
7645727
3dc914c
6dbaae5
3bf550c
492301f
fc463a4
cc72ecd
f5fd529
1207bc4
e8d93c7
3eed897
adef2f2
67101fd
ea5dcb9
fb38411
a0bed67
150e267
1e77f2b
97ed24f
7dc07da
52ee8d3
1de8488
21edaea
2d077e9
accbd49
64b63bd
f84f63a
391d045
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -113,6 +113,8 @@ dependencies: | |
- pygments # Code highlighting | ||
|
||
- pip: | ||
- adbc_driver_postgresql>=0.6.0 | ||
- adbc_driver_sqlite>=0.6.0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The conda packages should be available nowadays, so I think you can move it to the normal packages list There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not yet for Windows, so I think can leave it to a follow up to do this after apache/arrow-adbc#1149 |
||
- dataframe-api-compat>=0.1.7 | ||
- sphinx-toggleprompt # conda-forge version has stricter pins on jinja2 | ||
- typing_extensions; python_version<"3.11" | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -45,7 +45,10 @@ | |||||||||||
is_dict_like, | ||||||||||||
is_list_like, | ||||||||||||
) | ||||||||||||
from pandas.core.dtypes.dtypes import DatetimeTZDtype | ||||||||||||
from pandas.core.dtypes.dtypes import ( | ||||||||||||
ArrowDtype, | ||||||||||||
DatetimeTZDtype, | ||||||||||||
) | ||||||||||||
from pandas.core.dtypes.missing import isna | ||||||||||||
|
||||||||||||
from pandas import get_option | ||||||||||||
|
@@ -642,6 +645,17 @@ def read_sql( | |||||||||||
int_column date_column | ||||||||||||
0 0 2012-11-10 | ||||||||||||
1 1 2010-11-12 | ||||||||||||
|
||||||||||||
.. versionadded:: 2.1.0 | ||||||||||||
mroeschke marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
|
||||||||||||
pandas now supports reading via ADBC drivers | ||||||||||||
|
||||||||||||
>>> from adbc_driver_postgresql import dbapi | ||||||||||||
>>> with dbapi.connect('postgres:///db_name') as conn: # doctest:+SKIP | ||||||||||||
... pd.read_sql('SELECT int_column FROM test_data', conn) | ||||||||||||
int_column | ||||||||||||
0 0 | ||||||||||||
1 1 | ||||||||||||
""" | ||||||||||||
|
||||||||||||
check_dtype_backend(dtype_backend) | ||||||||||||
|
@@ -850,6 +864,10 @@ def pandasSQL_builder( | |||||||||||
if sqlalchemy is not None and isinstance(con, (str, sqlalchemy.engine.Connectable)): | ||||||||||||
return SQLDatabase(con, schema, need_transaction) | ||||||||||||
|
||||||||||||
adbc = import_optional_dependency("adbc_driver_manager.dbapi", errors="ignore") | ||||||||||||
if adbc and isinstance(con, adbc.Connection): | ||||||||||||
return ADBCDatabase(con) | ||||||||||||
|
||||||||||||
warnings.warn( | ||||||||||||
"pandas only supports SQLAlchemy connectable (engine/connection) or " | ||||||||||||
"database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 " | ||||||||||||
|
@@ -2024,6 +2042,255 @@ def _create_sql_schema( | |||||||||||
|
||||||||||||
|
||||||||||||
# ---- SQL without SQLAlchemy --- | ||||||||||||
|
||||||||||||
|
||||||||||||
class ADBCDatabase(PandasSQL): | ||||||||||||
""" | ||||||||||||
This class enables conversion between DataFrame and SQL databases | ||||||||||||
using ADBC to handle DataBase abstraction. | ||||||||||||
|
||||||||||||
Parameters | ||||||||||||
---------- | ||||||||||||
con : adbc_driver_manager.dbapi.Connection | ||||||||||||
""" | ||||||||||||
|
||||||||||||
def __init__(self, con) -> None: | ||||||||||||
self.con = con | ||||||||||||
|
||||||||||||
def execute(self, sql: str | Select | TextClause, params=None): | ||||||||||||
with self.con.cursor() as cur: | ||||||||||||
return cur.execute(sql) | ||||||||||||
|
||||||||||||
def read_table( | ||||||||||||
self, | ||||||||||||
table_name: str, | ||||||||||||
index_col: str | list[str] | None = None, | ||||||||||||
coerce_float: bool = True, | ||||||||||||
parse_dates=None, | ||||||||||||
columns=None, | ||||||||||||
schema: str | None = None, | ||||||||||||
chunksize: int | None = None, | ||||||||||||
dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", | ||||||||||||
) -> DataFrame | Iterator[DataFrame]: | ||||||||||||
""" | ||||||||||||
Read SQL database table into a DataFrame. Only keyword arguments used | ||||||||||||
WillAyd marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
are table_name and schema. The rest are silently discarded. | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They raise an error now, I think |
||||||||||||
|
||||||||||||
Parameters | ||||||||||||
---------- | ||||||||||||
table_name : str | ||||||||||||
Name of SQL table in database. | ||||||||||||
schema : string, default None | ||||||||||||
Name of SQL schema in database to read from | ||||||||||||
|
||||||||||||
Returns | ||||||||||||
------- | ||||||||||||
DataFrame | ||||||||||||
|
||||||||||||
See Also | ||||||||||||
-------- | ||||||||||||
pandas.read_sql_table | ||||||||||||
SQLDatabase.read_query | ||||||||||||
|
||||||||||||
""" | ||||||||||||
if index_col: | ||||||||||||
raise NotImplementedError("'index_col' is not implemented for ADBC drivers") | ||||||||||||
if coerce_float is not True: | ||||||||||||
raise NotImplementedError( | ||||||||||||
"'coerce_float' is not implemented for ADBC drivers" | ||||||||||||
) | ||||||||||||
if parse_dates: | ||||||||||||
raise NotImplementedError( | ||||||||||||
"'parse_dates' is not implemented for ADBC drivers" | ||||||||||||
) | ||||||||||||
if columns: | ||||||||||||
raise NotImplementedError("'columns' is not implemented for ADBC drivers") | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't necessarily need to happen for this initial PR, but since we are generating the SQL query string below, it should be relatively straightforward to support selecting a subset of columns, instead of selecting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea that's a good point. I can't remember why I didn't do this in the first place. Should be straightforward to add here or in a follow up though, especially since ADBC should handle sanitizing |
||||||||||||
if chunksize: | ||||||||||||
raise NotImplementedError("'chunksize' is not implemented for ADBC drivers") | ||||||||||||
Comment on lines
+2189
to
+2190
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again not for this PR, but something to note for future improvements: I think it should be possible to support chunksize? Because we can get an RecordBatchReader from ADBC, and then read from that iterator and convert to pandas in chunks? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Its a good question. I'm not sure I see anything in the ADBC specification around batch / chunk handling. Might be overlooking the general approach to that. @lidavidm always knows best There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not necessarily in ABDC itself, but you should be able to get a RecordBatchReader instead of a materialized Table. And then pyarrow provides APIs to consume that reader chunk by chunk. It might not exactly support the user-specified chunksize, but it does give you a similar result: a generator of pandas DataFrames. (a RecordBatchReader allows you to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops, I missed this. Drivers have various parameters to request a batch size but perhaps we should standardize on one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||
|
||||||||||||
if schema: | ||||||||||||
stmt = f"SELECT * FROM {schema}.{table_name}" | ||||||||||||
else: | ||||||||||||
stmt = f"SELECT * FROM {table_name}" | ||||||||||||
|
||||||||||||
mapping: type[ArrowDtype] | None | Callable | ||||||||||||
if dtype_backend == "pyarrow": | ||||||||||||
mapping = ArrowDtype | ||||||||||||
elif dtype_backend == "numpy_nullable": | ||||||||||||
from pandas.io._util import _arrow_dtype_mapping | ||||||||||||
|
||||||||||||
mapping = _arrow_dtype_mapping().get | ||||||||||||
else: | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
with For another PR, but we should probably factor out the above in a helper to give you the mapping based on the |
||||||||||||
mapping = None | ||||||||||||
|
||||||||||||
with self.con.cursor() as cur: | ||||||||||||
return cur(stmt).fetch_arrow_table().to_pandas(types_mapper=mapping) | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
? (I mentioned it before, but I don't fully understand how this PR is working, because if I try that locally with an adbc dbapi connection, I get "TypeError: 'Cursor' object is not callable") There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I think because you are basically only testing With a quick hacky patch I can get the tests to fail with the "'Cursor' object is not callable" error: --- a/pandas/tests/io/test_sql.py
+++ b/pandas/tests/io/test_sql.py
@@ -146,7 +146,9 @@ def create_and_load_iris_sqlite3(conn: sqlite3.Connection, iris_file: Path):
reader = csv.reader(csvfile)
next(reader)
stmt = "INSERT INTO iris VALUES(?, ?, ?, ?, ?)"
- cur.executemany(stmt, reader)
+ cur.executemany(stmt, list(reader))
+ conn.commit()
+ cur.close()
def create_and_load_iris(conn, iris_file: Path, dialect: str):
@@ -532,6 +534,23 @@ def sqlite_iris_conn(sqlite_iris_engine):
yield conn
+@pytest.fixture
+def sqlite_iris_adbc_conn(iris_path):
+ if pa_version_under8p0:
+ pytest.skip("ADBC requires pyarrow >= 8.0.0")
+ pytest.importorskip("adbc_driver_sqlite")
+ from adbc_driver_sqlite import dbapi
+
+ with tm.ensure_clean() as name:
+ uri = f"file:{name}"
+ with dbapi.connect(uri) as conn:
+ create_and_load_iris_sqlite3(conn, iris_path)
+
+ yield conn
+ with conn.cursor() as cur:
+ cur.execute("DROP TABLE IF EXISTS test_frame")
+
+
@pytest.fixture
def sqlite_buildin():
with contextlib.closing(sqlite3.connect(":memory:")) as closing_conn:
@@ -566,6 +585,7 @@ sqlite_iris_connectable = [
"sqlite_iris_engine",
"sqlite_iris_conn",
"sqlite_iris_str",
+ "sqlite_iris_adbc_conn",
]
sqlalchemy_connectable = mysql_connectable + postgresql_connectable + sqlite_connectable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah cool - nice find |
||||||||||||
|
||||||||||||
def read_query( | ||||||||||||
self, | ||||||||||||
sql: str, | ||||||||||||
index_col: str | list[str] | None = None, | ||||||||||||
coerce_float: bool = True, | ||||||||||||
parse_dates=None, | ||||||||||||
params=None, | ||||||||||||
chunksize: int | None = None, | ||||||||||||
dtype: DtypeArg | None = None, | ||||||||||||
dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", | ||||||||||||
) -> DataFrame | Iterator[DataFrame]: | ||||||||||||
""" | ||||||||||||
Read SQL query into a DataFrame. Keyword arguments are discarded. | ||||||||||||
|
||||||||||||
Parameters | ||||||||||||
---------- | ||||||||||||
sql : str | ||||||||||||
SQL query to be executed. | ||||||||||||
|
||||||||||||
Returns | ||||||||||||
------- | ||||||||||||
DataFrame | ||||||||||||
|
||||||||||||
See Also | ||||||||||||
-------- | ||||||||||||
read_sql_table : Read SQL database table into a DataFrame. | ||||||||||||
read_sql | ||||||||||||
|
||||||||||||
""" | ||||||||||||
if index_col: | ||||||||||||
raise NotImplementedError("'index_col' is not implemented for ADBC drivers") | ||||||||||||
if coerce_float is not True: | ||||||||||||
raise NotImplementedError( | ||||||||||||
"'coerce_float' is not implemented for ADBC drivers" | ||||||||||||
) | ||||||||||||
if parse_dates: | ||||||||||||
raise NotImplementedError( | ||||||||||||
"'parse_dates' is not implemented for ADBC drivers" | ||||||||||||
) | ||||||||||||
if params: | ||||||||||||
raise NotImplementedError("'params' is not implemented for ADBC drivers") | ||||||||||||
if chunksize: | ||||||||||||
raise NotImplementedError("'chunksize' is not implemented for ADBC drivers") | ||||||||||||
if dtype: | ||||||||||||
raise NotImplementedError("'dtype' is not implemented for ADBC drivers") | ||||||||||||
|
||||||||||||
mapping: type[ArrowDtype] | None | Callable | ||||||||||||
if dtype_backend == "pyarrow": | ||||||||||||
mapping = ArrowDtype | ||||||||||||
elif dtype_backend == "numpy_nullable": | ||||||||||||
from pandas.io._util import _arrow_dtype_mapping | ||||||||||||
|
||||||||||||
mapping = _arrow_dtype_mapping().get | ||||||||||||
else: | ||||||||||||
mapping = None | ||||||||||||
|
||||||||||||
with self.con.cursor() as cur: | ||||||||||||
return cur(sql).fetch_arrow_table().to_pandas(types_mapper=mapping) | ||||||||||||
|
||||||||||||
read_sql = read_query | ||||||||||||
|
||||||||||||
def to_sql( | ||||||||||||
self, | ||||||||||||
frame, | ||||||||||||
name: str, | ||||||||||||
if_exists: Literal["fail", "replace", "append"] = "fail", | ||||||||||||
index: bool = True, | ||||||||||||
index_label=None, | ||||||||||||
schema: str | None = None, | ||||||||||||
chunksize: int | None = None, | ||||||||||||
dtype: DtypeArg | None = None, | ||||||||||||
method: Literal["multi"] | Callable | None = None, | ||||||||||||
engine: str = "auto", | ||||||||||||
**engine_kwargs, | ||||||||||||
) -> int | None: | ||||||||||||
""" | ||||||||||||
Write records stored in a DataFrame to a SQL database. | ||||||||||||
Only frame, name, if_exists and schema are valid arguments. | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||||||||
|
||||||||||||
Parameters | ||||||||||||
---------- | ||||||||||||
frame : DataFrame | ||||||||||||
name : string | ||||||||||||
Name of SQL table. | ||||||||||||
if_exists : {'fail', 'replace', 'append'}, default 'fail' | ||||||||||||
- fail: If table exists, do nothing. | ||||||||||||
- replace: If table exists, drop it, recreate it, and insert data. | ||||||||||||
- append: If table exists, insert data. Create if does not exist. | ||||||||||||
schema : string, default None | ||||||||||||
Name of SQL schema in database to write to (if database flavor | ||||||||||||
supports this). If specified, this overwrites the default | ||||||||||||
schema of the SQLDatabase object. | ||||||||||||
""" | ||||||||||||
if index_label: | ||||||||||||
raise NotImplementedError( | ||||||||||||
"'index_label' is not implemented for ADBC drivers" | ||||||||||||
) | ||||||||||||
if schema: | ||||||||||||
raise NotImplementedError("'schema' is not implemented for ADBC drivers") | ||||||||||||
if chunksize: | ||||||||||||
raise NotImplementedError("'chunksize' is not implemented for ADBC drivers") | ||||||||||||
if dtype: | ||||||||||||
raise NotImplementedError("'dtype' is not implemented for ADBC drivers") | ||||||||||||
if method: | ||||||||||||
raise NotImplementedError("'method' is not implemented for ADBC drivers") | ||||||||||||
if engine != "auto": | ||||||||||||
raise NotImplementedError("'auto' is not implemented for ADBC drivers") | ||||||||||||
|
||||||||||||
if schema: | ||||||||||||
table_name = f"{schema}.{name}" | ||||||||||||
else: | ||||||||||||
table_name = name | ||||||||||||
|
||||||||||||
# TODO: pandas if_exists="append" will still create the | ||||||||||||
# table if it does not exist; ADBC has append/create | ||||||||||||
# as applicable modes, so the semantics get blurred across | ||||||||||||
# the libraries | ||||||||||||
mode = "create" | ||||||||||||
if self.has_table(name, schema): | ||||||||||||
if if_exists == "fail": | ||||||||||||
raise ValueError(f"Table '{table_name}' already exists.") | ||||||||||||
elif if_exists == "replace": | ||||||||||||
with self.con.cursor() as cur: | ||||||||||||
cur.execute(f"DROP TABLE {table_name}") | ||||||||||||
elif if_exists == "append": | ||||||||||||
mode = "append" | ||||||||||||
|
||||||||||||
import pyarrow as pa | ||||||||||||
|
||||||||||||
tbl = pa.Table.from_pandas(frame, preserve_index=index) | ||||||||||||
with self.con.cursor() as cur: | ||||||||||||
total_inserted = cur.adbc_ingest(table_name, tbl, mode=mode) | ||||||||||||
|
||||||||||||
self.con.commit() | ||||||||||||
return total_inserted | ||||||||||||
|
||||||||||||
def has_table(self, name: str, schema: str | None = None) -> bool: | ||||||||||||
meta = self.con.adbc_get_objects( | ||||||||||||
db_schema_filter=schema, table_name_filter=name | ||||||||||||
).read_all() | ||||||||||||
|
||||||||||||
for catalog_schema in meta["catalog_db_schemas"].to_pylist(): | ||||||||||||
if not catalog_schema: | ||||||||||||
continue | ||||||||||||
for schema_record in catalog_schema: | ||||||||||||
if not schema_record: | ||||||||||||
continue | ||||||||||||
|
||||||||||||
for table_record in schema_record["db_schema_tables"]: | ||||||||||||
if table_record["table_name"] == name: | ||||||||||||
return True | ||||||||||||
|
||||||||||||
return False | ||||||||||||
|
||||||||||||
def _create_sql_schema( | ||||||||||||
self, | ||||||||||||
frame: DataFrame, | ||||||||||||
table_name: str, | ||||||||||||
keys: list[str] | None = None, | ||||||||||||
dtype: DtypeArg | None = None, | ||||||||||||
schema: str | None = None, | ||||||||||||
): | ||||||||||||
raise NotImplementedError("not implemented for adbc") | ||||||||||||
|
||||||||||||
|
||||||||||||
# sqlite-specific sql strings and handler class | ||||||||||||
# dictionary used for readability purposes | ||||||||||||
_SQL_TYPES = { | ||||||||||||
|
Uh oh!
There was an error while loading. Please reload this page.