-
-
Notifications
You must be signed in to change notification settings - Fork 18.5k
ENH: add arrow engine to read_csv #31817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
f22ff46
8ae43e4
09074df
6be276d
df4fa7e
9cd9a6f
ecaf3fd
b3c3287
474baf4
2cd9937
48ff255
3d15a56
c969373
98aa134
b9c6d2c
67c5db6
7f891a6
11fc737
23425f7
d9b7a1f
b8adf3c
01c0394
ba5620f
2570c82
b3a1f66
d46ceed
d67925c
6378459
9d64882
852ecf9
93382b4
f1bb4e2
14c13ab
7876b4e
4426642
008acab
2dddae7
261ef6a
88e200a
bf063ab
ede2799
e8eff08
87cfcf5
55139ee
c1aeecf
62fc9d6
b53a620
f13113d
f9ce2e4
4158d6a
d34e75f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
from pandas._libs.parsers import STR_NA_VALUES | ||
from pandas._libs.tslibs import parsing | ||
from pandas._typing import FilePathOrBuffer | ||
from pandas.compat._optional import import_optional_dependency | ||
from pandas.errors import ( | ||
AbstractMethodError, | ||
EmptyDataError, | ||
|
@@ -165,9 +166,10 @@ | |
to preserve and not interpret dtype. | ||
If converters are specified, they will be applied INSTEAD | ||
of dtype conversion. | ||
engine : {{'c', 'python'}}, optional | ||
Parser engine to use. The C engine is faster while the python engine is | ||
currently more feature-complete. | ||
engine : {{'c', 'python', 'arrow'}}, optional | ||
lithomas1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Parser engine to use. The C and arrow engines are faster, while the python engine is | ||
currently more feature-complete. The arrow engine requires ``pyarrow`` | ||
lithomas1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
as a dependency however. | ||
converters : dict, optional | ||
Dict of functions for converting values in certain columns. Keys can either | ||
be integers or column labels. | ||
|
@@ -506,7 +508,6 @@ def _read(filepath_or_buffer: FilePathOrBuffer, kwds): | |
"skip_blank_lines": True, | ||
} | ||
|
||
|
||
lithomas1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
_c_parser_defaults = { | ||
"delim_whitespace": False, | ||
"na_filter": True, | ||
|
@@ -520,6 +521,7 @@ def _read(filepath_or_buffer: FilePathOrBuffer, kwds): | |
_fwf_defaults = {"colspecs": "infer", "infer_nrows": 100, "widths": None} | ||
|
||
_c_unsupported = {"skipfooter"} | ||
_arrow_unsupported = {"skipfooter", "low_memory", "float_precision", "chunksize"} | ||
_python_unsupported = {"low_memory", "float_precision"} | ||
|
||
_deprecated_defaults: Dict[str, Any] = {} | ||
|
@@ -705,7 +707,6 @@ def read_fwf( | |
infer_nrows=100, | ||
**kwds, | ||
): | ||
|
||
r""" | ||
Read a table of fixed-width formatted lines into DataFrame. | ||
|
||
|
@@ -944,17 +945,22 @@ def _clean_options(self, options, engine): | |
sep = options["delimiter"] | ||
delim_whitespace = options["delim_whitespace"] | ||
|
||
# C engine not supported yet | ||
if engine == "c": | ||
# arrow engine not supported yet | ||
if engine == "arrow": | ||
if options["chunksize"] is not None: | ||
fallback_reason = f"the arrow engine does not support chunksize" | ||
lithomas1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
engine = "python" | ||
# C and arrow engine not supported yet | ||
if engine == "c" or engine == "arrow": | ||
if options["skipfooter"] > 0: | ||
fallback_reason = "the 'c' engine does not support skipfooter" | ||
fallback_reason = f"the {engine} engine does not support skipfooter" | ||
engine = "python" | ||
|
||
encoding = sys.getfilesystemencoding() or "utf-8" | ||
if sep is None and not delim_whitespace: | ||
if engine == "c": | ||
if engine == "c" or engine == "arrow": | ||
lithomas1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
fallback_reason = ( | ||
"the 'c' engine does not support " | ||
f"the {engine} engine does not support " | ||
"sep=None with delim_whitespace=False" | ||
) | ||
engine = "python" | ||
|
@@ -1081,14 +1087,20 @@ def _clean_options(self, options, engine): | |
na_values, na_fvalues = _clean_na_values(na_values, keep_default_na) | ||
|
||
# handle skiprows; this is internally handled by the | ||
# c-engine, so only need for python parsers | ||
# c-engine, so only need for python parser | ||
lithomas1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if engine != "c": | ||
if is_integer(skiprows): | ||
skiprows = list(range(skiprows)) | ||
if skiprows is None: | ||
skiprows = set() | ||
elif not callable(skiprows): | ||
skiprows = set(skiprows) | ||
if engine == "arrow": | ||
if not is_integer(skiprows) and skiprows is not None: | ||
raise ValueError( | ||
"skiprows argument must be an integer when using engine='arrow'" | ||
) | ||
else: | ||
if is_integer(skiprows): | ||
skiprows = list(range(skiprows)) | ||
if skiprows is None: | ||
skiprows = set() | ||
elif not callable(skiprows): | ||
skiprows = set(skiprows) | ||
|
||
# put stuff back | ||
result["names"] = names | ||
|
@@ -1109,6 +1121,8 @@ def __next__(self): | |
def _make_engine(self, engine="c"): | ||
if engine == "c": | ||
self._engine = CParserWrapper(self.f, **self.options) | ||
elif engine == "arrow": | ||
self._engine = ArrowParserWrapper(self.f, **self.options) | ||
else: | ||
if engine == "python": | ||
klass = PythonParser | ||
|
@@ -1125,29 +1139,32 @@ def _failover_to_python(self): | |
raise AbstractMethodError(self) | ||
|
||
def read(self, nrows=None): | ||
nrows = _validate_integer("nrows", nrows) | ||
ret = self._engine.read(nrows) | ||
if isinstance(self._engine, ArrowParserWrapper): | ||
return self._engine.read(nrows) | ||
else: | ||
lithomas1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
nrows = _validate_integer("nrows", nrows) | ||
lithomas1 marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you could do this at the top of the function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem here is that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I guess I could do that, but I think my way is cleaner, since all the pyarrow code would be in the if, and the other parser code would be in the else. |
||
ret = self._engine.read(nrows) | ||
|
||
# May alter columns / col_dict | ||
index, columns, col_dict = self._create_index(ret) | ||
# May alter columns / col_dict | ||
jreback marked this conversation as resolved.
Show resolved
Hide resolved
|
||
index, columns, col_dict = self._create_index(ret) | ||
|
||
if index is None: | ||
if col_dict: | ||
# Any column is actually fine: | ||
new_rows = len(next(iter(col_dict.values()))) | ||
index = RangeIndex(self._currow, self._currow + new_rows) | ||
if index is None: | ||
if col_dict: | ||
# Any column is actually fine: | ||
new_rows = len(next(iter(col_dict.values()))) | ||
index = RangeIndex(self._currow, self._currow + new_rows) | ||
else: | ||
new_rows = 0 | ||
else: | ||
new_rows = 0 | ||
else: | ||
new_rows = len(index) | ||
new_rows = len(index) | ||
|
||
df = DataFrame(col_dict, columns=columns, index=index) | ||
df = DataFrame(col_dict, columns=columns, index=index) | ||
|
||
self._currow += new_rows | ||
self._currow += new_rows | ||
|
||
if self.squeeze and len(df.columns) == 1: | ||
return df[df.columns[0]].copy() | ||
return df | ||
if self.squeeze and len(df.columns) == 1: | ||
return df[df.columns[0]].copy() | ||
return df | ||
|
||
def _create_index(self, ret): | ||
index, columns, col_dict = ret | ||
|
@@ -2139,6 +2156,53 @@ def _maybe_parse_dates(self, values, index, try_parse_dates=True): | |
return values | ||
|
||
|
||
class ArrowParserWrapper(ParserBase): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you will need to refactor this as the current code is very different from this. Also I really don't like doing all of this validation in a single function. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Can you clarify a bit more what you mean? Or point to recent changes related to this? For example also on master, the C parser is using a very similar mechanism with the CParserWrapper class. |
||
""" | ||
|
||
""" | ||
lithomas1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def __init__(self, src, **kwds): | ||
self.kwds = kwds | ||
self.src = src | ||
kwds = kwds.copy() | ||
|
||
ParserBase.__init__(self, kwds) | ||
|
||
# #2442 | ||
kwds["allow_leading_cols"] = self.index_col is not False | ||
lithomas1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# GH20529, validate usecol arg before TextReader | ||
self.usecols, self.usecols_dtype = _validate_usecols_arg(kwds["usecols"]) | ||
|
||
def read(self, nrows=None): | ||
pyarrow = import_optional_dependency( | ||
lithomas1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"pyarrow.csv", extra="pyarrow is required to use arrow engine" | ||
) | ||
nrows = _validate_integer("nrows", nrows) | ||
lithomas1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
table = pyarrow.read_csv( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pls add line breaks between section and comments. |
||
self.src, | ||
read_options=pyarrow.ReadOptions( | ||
skip_rows=self.kwds.get("skiprows"), column_names=self.names | ||
), | ||
parse_options=pyarrow.ParseOptions( | ||
delimiter=self.kwds.get("delimiter"), | ||
quote_char=self.kwds.get("quotechar"), | ||
), | ||
convert_options=pyarrow.ConvertOptions( | ||
include_columns=self.usecols, column_types=self.kwds.get("dtype") | ||
), | ||
) | ||
if nrows: | ||
table = table[:nrows] | ||
lithomas1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
table_width = len(table.column_names) | ||
if self.names is None: | ||
if self.prefix: | ||
self.names = [f"{self.prefix}{i}" for i in range(table_width)] | ||
if self.names: | ||
table = table.rename_columns(self.names) | ||
return table.to_pandas() | ||
|
||
|
||
def TextParser(*args, **kwds): | ||
""" | ||
Converts lists of lists/tuples into DataFrames with proper type inference | ||
|
@@ -3340,7 +3404,6 @@ def _try_convert_dates(parser, colspec, data_dict, columns): | |
|
||
|
||
def _clean_na_values(na_values, keep_default_na=True): | ||
|
||
if na_values is None: | ||
if keep_default_na: | ||
na_values = STR_NA_VALUES | ||
|
Uh oh!
There was an error while loading. Please reload this page.