Skip to content

ENH: Add Arrow CSV Reader #43072

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Aug 27, 2021
Merged
4 changes: 2 additions & 2 deletions asv_bench/benchmarks/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def time_read_csv(self, bad_date_value):
class ReadCSVSkipRows(BaseIO):

fname = "__test__.csv"
params = ([None, 10000], ["c", "python"])
params = ([None, 10000], ["c", "python", "pyarrow"])
param_names = ["skiprows", "engine"]

def setup(self, skiprows, engine):
Expand Down Expand Up @@ -320,7 +320,7 @@ def time_read_csv_python_engine(self, sep, decimal, float_precision):


class ReadCSVEngine(StringIORewind):
params = ["c", "python"]
params = ["c", "python", "pyarrow"]
param_names = ["engine"]

def setup(self, engine):
Expand Down
29 changes: 21 additions & 8 deletions doc/source/user_guide/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,14 @@ dtype : Type name or dict of column -> type, default ``None``
(unsupported with ``engine='python'``). Use ``str`` or ``object`` together
with suitable ``na_values`` settings to preserve and
not interpret dtype.
engine : {``'c'``, ``'python'``}
Parser engine to use. The C engine is faster while the Python engine is
currently more feature-complete.
engine : {``'c'``, ``'python'``, ``'pyarrow'``}
Parser engine to use. The C and pyarrow engines are faster, while the python engine
is currently more feature-complete. Multithreading is currently only supported by
the pyarrow engine.

.. versionadded:: 1.4.0
The "pyarrow" engine was added as an *experimental* engine, and some features
are unsupported, or may not work correctly, with this engine.
converters : dict, default ``None``
Dict of functions for converting values in certain columns. Keys can either be
integers or column labels.
Expand Down Expand Up @@ -1622,11 +1627,19 @@ Specifying ``iterator=True`` will also return the ``TextFileReader`` object:
Specifying the parser engine
''''''''''''''''''''''''''''

Under the hood pandas uses a fast and efficient parser implemented in C as well
as a Python implementation which is currently more feature-complete. Where
possible pandas uses the C parser (specified as ``engine='c'``), but may fall
back to Python if C-unsupported options are specified. Currently, C-unsupported
options include:
Pandas currently supports three engines, the C engine, the python engine, and an experimental
pyarrow engine(which requires the ``pyarrow`` package). In general, the pyarrow engine is fastest
on larger workloads, and is equivalent in speed to the C engine on most other workloads.
The python engine tends to be slower than the pyarrow and C engines on most workloads. However,
the pyarrow engine is much less robust than the C engine, which lacks a few features compared to the
Python engines

Where possible pandas uses the C parser (specified as ``engine='c'``), but it may fall
back to Python if C-unsupported options are specified. If pyarrow unsupported options are
specified while using ``engine='pyarrow'``, the parser will throw an error.
(a full list of unsupported options is available at ``pandas.io.parsers._pyarrow_unsupported``).

Currently, C-unsupported options include:.

* ``sep`` other than a single character (e.g. regex separators)
* ``skipfooter``
Expand Down
9 changes: 6 additions & 3 deletions doc/source/whatsnew/v1.4.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,13 @@ In Pandas 2.0, :class:`NumericIndex` will become the default numeric index type

See :ref:`here <advanced.numericindex>` for more.

.. _whatsnew_140.enhancements.enhancement2:
.. _whatsnew_140.enhancements.pyarrow_csv_engine:

enhancement2
^^^^^^^^^^^^
Multithreaded CSV reading with a new CSV Engine based on pyarrow
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

:func:`pandas.read_csv` now accepts ``engine="pyarrow"`` as an argument, allowing for faster csv parsing on multicore machines
with pyarrow installed. See the :doc:`I/O docs </user_guide/io>` for more info. (:issue:`23697`)

.. _whatsnew_140.enhancements.other:

Expand Down
107 changes: 107 additions & 0 deletions pandas/io/parsers/arrow_parser_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from __future__ import annotations

from pandas.compat._optional import import_optional_dependency

from pandas.core.dtypes.inference import is_integer

from pandas.io.common import get_handle
from pandas.io.parsers.base_parser import ParserBase


class ArrowParserWrapper(ParserBase):
"""
Wrapper for the pyarrow engine for read_csv()
"""

def __init__(self, src, **kwds):
self.kwds = kwds
self.src = src

ParserBase.__init__(self, kwds)

self._parse_kwds()

def _parse_kwds(self):
encoding: str | None = self.kwds.get("encoding")
self.encoding = "utf-8" if encoding is None else encoding

self.usecols, self.usecols_dtype = self._validate_usecols_arg(
self.kwds["usecols"]
)
na_values = self.kwds["na_values"]
if isinstance(na_values, dict):
raise ValueError(
"The pyarrow engine doesn't support passing a dict for na_values"
)
self.na_values = list(self.kwds["na_values"])

def _get_pyarrow_options(self):
# rename some arguments to pass to pyarrow
mapping = {
"usecols": "include_columns",
"na_values": "null_values",
"escapechar": "escape_char",
"skip_blank_lines": "ignore_empty_lines",
}
for pandas_name, pyarrow_name in mapping.items():
if pandas_name in self.kwds and self.kwds.get(pandas_name) is not None:
self.kwds[pyarrow_name] = self.kwds.pop(pandas_name)

self.parse_options = {
option_name: option_value
for option_name, option_value in self.kwds.items()
if option_value is not None
and option_name
in ("delimiter", "quote_char", "escape_char", "ignore_empty_lines")
}
self.convert_options = {
option_name: option_value
for option_name, option_value in self.kwds.items()
if option_value is not None
and option_name
in ("include_columns", "null_values", "true_values", "false_values")
}
self.read_options = {
"autogenerate_column_names": self.header is None,
"skip_rows": self.header
if self.header is not None
else self.kwds["skiprows"],
}

def _finalize_output(self, frame):
num_cols = len(frame.columns)
if self.header is None:
if self.names is None:
if self.prefix is not None:
self.names = [f"{self.prefix}{i}" for i in range(num_cols)]
elif self.header is None:
self.names = range(num_cols)
frame.columns = self.names
# we only need the frame not the names
frame.columns, frame = self._do_date_conversions(frame.columns, frame)
if self.index_col is not None:
for i, item in enumerate(self.index_col):
if is_integer(item):
self.index_col[i] = frame.columns[item]
frame.set_index(self.index_col, drop=True, inplace=True)

if self.kwds.get("dtype") is not None:
frame = frame.astype(self.kwds.get("dtype"))
return frame

def read(self):
pyarrow_csv = import_optional_dependency("pyarrow.csv")
self._get_pyarrow_options()

with get_handle(
self.src, "rb", encoding=self.encoding, is_text=False
) as handles:
table = pyarrow_csv.read_csv(
handles.handle,
read_options=pyarrow_csv.ReadOptions(**self.read_options),
parse_options=pyarrow_csv.ParseOptions(**self.parse_options),
convert_options=pyarrow_csv.ConvertOptions(**self.convert_options),
)

frame = table.to_pandas()
return self._finalize_output(frame)
119 changes: 95 additions & 24 deletions pandas/io/parsers/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from pandas.core.indexes.api import RangeIndex

from pandas.io.common import validate_header_arg
from pandas.io.parsers.arrow_parser_wrapper import ArrowParserWrapper
from pandas.io.parsers.base_parser import (
ParserBase,
is_index_col,
Expand Down Expand Up @@ -143,9 +144,14 @@
to preserve and not interpret dtype.
If converters are specified, they will be applied INSTEAD
of dtype conversion.
engine : {{'c', 'python'}}, optional
Parser engine to use. The C engine is faster while the python engine is
currently more feature-complete.
engine : {{'c', 'python', 'pyarrow'}}, optional
Parser engine to use. The C and pyarrow engines are faster, while the python engine
is currently more feature-complete. Multithreading is currently only supported by
the pyarrow engine.

.. versionadded:: 1.4.0
The "pyarrow" engine was added as an *experimental* engine, and some features
are unsupported, or may not work correctly, with this engine.
converters : dict, optional
Dict of functions for converting values in certain columns. Keys can either
be integers or column labels.
Expand Down Expand Up @@ -406,6 +412,33 @@

_c_unsupported = {"skipfooter"}
_python_unsupported = {"low_memory", "float_precision"}
_pyarrow_unsupported = {
"skipfooter",
"float_precision",
"chunksize",
"comment",
"nrows",
"thousands",
"memory_map",
"dialect",
"warn_bad_lines",
"error_bad_lines",
# TODO(1.4)
# This doesn't error properly ATM, fix for release
# but not blocker for initial PR
# "on_bad_lines",
"delim_whitespace",
"quoting",
"lineterminator",
"converters",
"decimal",
"iterator",
"dayfirst",
"infer_datetime_format",
"verbose",
"skipinitialspace",
"low_memory",
}

_deprecated_defaults: dict[str, Any] = {"error_bad_lines": None, "warn_bad_lines": None}
_deprecated_args: set[str] = {"error_bad_lines", "warn_bad_lines"}
Expand Down Expand Up @@ -472,7 +505,20 @@ def _read(filepath_or_buffer: FilePathOrBuffer, kwds):

# Extract some of the arguments (pass chunksize on).
iterator = kwds.get("iterator", False)
chunksize = validate_integer("chunksize", kwds.get("chunksize", None), 1)
chunksize = kwds.get("chunksize", None)
if kwds.get("engine") == "pyarrow":
if iterator:
raise ValueError(
"The 'iterator' option is not supported with the 'pyarrow' engine"
)

if chunksize is not None:
raise ValueError(
"The 'chunksize' option is not supported with the 'pyarrow' engine"
)
else:
chunksize = validate_integer("chunksize", kwds.get("chunksize", None), 1)

nrows = kwds.get("nrows", None)

# Check for duplicates in names.
Expand Down Expand Up @@ -785,6 +831,10 @@ def __init__(self, f, engine=None, **kwds):

dialect = _extract_dialect(kwds)
if dialect is not None:
if engine == "pyarrow":
raise ValueError(
"The 'dialect' option is not supported with the 'pyarrow' engine"
)
kwds = _merge_with_dialect_properties(dialect, kwds)

if kwds.get("header", "infer") == "infer":
Expand Down Expand Up @@ -823,7 +873,17 @@ def _get_options_with_defaults(self, engine):
value = kwds.get(argname, default)

# see gh-12935
if argname == "mangle_dupe_cols" and not value:
if (
engine == "pyarrow"
and argname in _pyarrow_unsupported
and value != default
):
raise ValueError(
f"The {repr(argname)} option is not supported with the "
f"'pyarrow' engine"
)
elif argname == "mangle_dupe_cols" and value is False:
# GH12935
raise ValueError("Setting mangle_dupe_cols=False is not supported yet")
else:
options[argname] = value
Expand Down Expand Up @@ -878,7 +938,7 @@ def _clean_options(self, options, engine):
delim_whitespace = options["delim_whitespace"]

if sep is None and not delim_whitespace:
if engine == "c":
if engine in ("c", "pyarrow"):
fallback_reason = (
"the 'c' engine does not support "
"sep=None with delim_whitespace=False"
Expand All @@ -891,7 +951,7 @@ def _clean_options(self, options, engine):
elif engine not in ("python", "python-fwf"):
# wait until regex engine integrated
fallback_reason = (
"the 'c' engine does not support "
f"the '{engine}' engine does not support "
"regex separators (separators > 1 char and "
r"different from '\s+' are interpreted as regex)"
)
Expand All @@ -910,7 +970,7 @@ def _clean_options(self, options, engine):
if not encodeable and engine not in ("python", "python-fwf"):
fallback_reason = (
f"the separator encoded in {encoding} "
"is > 1 char long, and the 'c' engine "
f"is > 1 char long, and the '{engine}' engine "
"does not support such separators"
)
engine = "python"
Expand All @@ -925,7 +985,7 @@ def _clean_options(self, options, engine):
fallback_reason = (
"ord(quotechar) > 127, meaning the "
"quotechar is larger than one byte, "
"and the 'c' engine does not support such quotechars"
f"and the '{engine}' engine does not support such quotechars"
)
engine = "python"

Expand Down Expand Up @@ -1001,8 +1061,15 @@ def _clean_options(self, options, engine):
na_values, na_fvalues = _clean_na_values(na_values, keep_default_na)

# handle skiprows; this is internally handled by the
# c-engine, so only need for python parsers
if engine != "c":
# c-engine, so only need for python and pyarrow parsers
if engine == "pyarrow":
if not is_integer(skiprows) and skiprows is not None:
# pyarrow expects skiprows to be passed as an integer
raise ValueError(
"skiprows argument must be an integer when using "
"engine='pyarrow'"
)
else:
if is_integer(skiprows):
skiprows = list(range(skiprows))
if skiprows is None:
Expand Down Expand Up @@ -1030,6 +1097,7 @@ def _make_engine(self, engine="c"):
mapping: dict[str, type[ParserBase]] = {
"c": CParserWrapper,
"python": PythonParser,
"pyarrow": ArrowParserWrapper,
"python-fwf": FixedWidthFieldParser,
}
if engine not in mapping:
Expand All @@ -1043,22 +1111,25 @@ def _failover_to_python(self):
raise AbstractMethodError(self)

def read(self, nrows=None):
nrows = validate_integer("nrows", nrows)
index, columns, col_dict = self._engine.read(nrows)

if index is None:
if col_dict:
# Any column is actually fine:
new_rows = len(next(iter(col_dict.values())))
index = RangeIndex(self._currow, self._currow + new_rows)
else:
new_rows = 0
if self.engine == "pyarrow":
df = self._engine.read()
else:
new_rows = len(index)
nrows = validate_integer("nrows", nrows)
index, columns, col_dict = self._engine.read(nrows)

if index is None:
if col_dict:
# Any column is actually fine:
new_rows = len(next(iter(col_dict.values())))
index = RangeIndex(self._currow, self._currow + new_rows)
else:
new_rows = 0
else:
new_rows = len(index)

df = DataFrame(col_dict, columns=columns, index=index)
df = DataFrame(col_dict, columns=columns, index=index)

self._currow += new_rows
self._currow += new_rows

if self.squeeze and len(df.columns) == 1:
return df[df.columns[0]].copy()
Expand Down
Loading