diff --git a/asv_bench/benchmarks/gil.py b/asv_bench/benchmarks/gil.py index fdeace108f76e..2eb6786356511 100644 --- a/asv_bench/benchmarks/gil.py +++ b/asv_bench/benchmarks/gil.py @@ -1,15 +1,24 @@ from .pandas_vb_common import * from pandas.core import common as com + +try: + from cStringIO import StringIO +except ImportError: + from io import StringIO + try: from pandas.util.testing import test_parallel + have_real_test_parallel = True except ImportError: have_real_test_parallel = False + def test_parallel(num_threads=1): def wrapper(fname): return fname + return wrapper @@ -321,6 +330,7 @@ def run(arr): algos.kth_smallest(arr, self.k) run() + class nogil_datetime_fields(object): goal_time = 0.2 @@ -435,4 +445,47 @@ def time_nogil_rolling_std(self): @test_parallel(num_threads=2) def run(arr, win): rolling_std(arr, win) - run(self.arr, self.win) \ No newline at end of file + run(self.arr, self.win) + + +class nogil_read_csv(object): + number = 1 + repeat = 5 + + def setup(self): + if (not have_real_test_parallel): + raise NotImplementedError + # Using the values + self.df = DataFrame(np.random.randn(10000, 50)) + self.df.to_csv('__test__.csv') + + self.rng = date_range('1/1/2000', periods=10000) + self.df_date_time = DataFrame(np.random.randn(10000, 50), index=self.rng) + self.df_date_time.to_csv('__test_datetime__.csv') + + self.df_object = DataFrame('foo', index=self.df.index, columns=self.create_cols('object')) + self.df_object.to_csv('__test_object__.csv') + + def create_cols(self, name): + return [('%s%03d' % (name, i)) for i in range(5)] + + @test_parallel(num_threads=2) + def pg_read_csv(self): + read_csv('__test__.csv', sep=',', header=None, float_precision=None) + + def time_nogil_read_csv(self): + self.pg_read_csv() + + @test_parallel(num_threads=2) + def pg_read_csv_object(self): + read_csv('__test_object__.csv', sep=',') + + def time_nogil_read_csv_object(self): + self.pg_read_csv_object() + + @test_parallel(num_threads=2) + def pg_read_csv_datetime(self): + read_csv('__test_datetime__.csv', sep=',', header=None) + + def time_nogil_read_csv_datetime(self): + self.pg_read_csv_datetime() diff --git a/doc/source/whatsnew/v0.17.1.txt b/doc/source/whatsnew/v0.17.1.txt index 3ca56ecc00d36..0d0f4c66c1fec 100755 --- a/doc/source/whatsnew/v0.17.1.txt +++ b/doc/source/whatsnew/v0.17.1.txt @@ -60,6 +60,7 @@ Performance Improvements - Release the GIL on most datetime field operations (e.g. ``DatetimeIndex.year``, ``Series.dt.year``), normalization, and conversion to and from ``Period``, ``DatetimeIndex.to_period`` and ``PeriodIndex.to_timestamp`` (:issue:`11263`) - Release the GIL on some srolling algos (``rolling_median``, ``rolling_mean``, ``rolling_max``, ``rolling_min``, ``rolling_var``, ``rolling_kurt``, `rolling_skew`` (:issue:`11450`) +- Release the GIL when reading and parsing text files in ``read_csv``, ``read_table`` (:issue:`11272`) - Improved performance of ``rolling_median`` (:issue:`11450`) - Improved performance to ``to_excel`` (:issue:`11352`) diff --git a/pandas/parser.pyx b/pandas/parser.pyx index 8ac1f64f2d50e..f9b8d921f02d1 100644 --- a/pandas/parser.pyx +++ b/pandas/parser.pyx @@ -161,7 +161,7 @@ cdef extern from "parser/tokenizer.h": void *skipset int64_t skip_first_N_rows int skip_footer - double (*converter)(const char *, char **, char, char, char, int) + double (*converter)(const char *, char **, char, char, char, int) nogil # error handling char *warn_msg @@ -174,8 +174,8 @@ cdef extern from "parser/tokenizer.h": int *line_start int col - void coliter_setup(coliter_t *it, parser_t *parser, int i, int start) - void COLITER_NEXT(coliter_t, const char *) + void coliter_setup(coliter_t *it, parser_t *parser, int i, int start) nogil + void COLITER_NEXT(coliter_t, const char *) nogil parser_t* parser_new() @@ -193,26 +193,26 @@ cdef extern from "parser/tokenizer.h": void debug_print_parser(parser_t *self) - int tokenize_all_rows(parser_t *self) - int tokenize_nrows(parser_t *self, size_t nrows) + int tokenize_all_rows(parser_t *self) nogil + int tokenize_nrows(parser_t *self, size_t nrows) nogil int64_t str_to_int64(char *p_item, int64_t int_min, - int64_t int_max, int *error, char tsep) + int64_t int_max, int *error, char tsep) nogil # uint64_t str_to_uint64(char *p_item, uint64_t uint_max, int *error) double xstrtod(const char *p, char **q, char decimal, char sci, - char tsep, int skip_trailing) + char tsep, int skip_trailing) nogil double precise_xstrtod(const char *p, char **q, char decimal, char sci, - char tsep, int skip_trailing) + char tsep, int skip_trailing) nogil double round_trip(const char *p, char **q, char decimal, char sci, - char tsep, int skip_trailing) + char tsep, int skip_trailing) nogil # inline int to_complex(char *item, double *p_real, # double *p_imag, char sci, char decimal) - inline int to_longlong(char *item, long long *p_value) + inline int to_longlong(char *item, long long *p_value) nogil # inline int to_longlong_thousands(char *item, long long *p_value, # char tsep) - int to_boolean(const char *item, uint8_t *val) + int to_boolean(const char *item, uint8_t *val) nogil cdef extern from "parser/io.h": @@ -255,16 +255,19 @@ cdef class TextReader: cdef: parser_t *parser object file_handle, na_fvalues + object true_values, false_values bint na_filter, verbose, has_usecols, has_mi_columns int parser_start list clocks char *c_encoding + kh_str_t *false_set + kh_str_t *true_set cdef public: int leading_cols, table_width, skip_footer, buffer_lines object allow_leading_cols object delimiter, converters, delim_whitespace - object na_values, true_values, false_values + object na_values object memory_map object as_recarray object header, orig_header, names, header_start, header_end @@ -418,11 +421,14 @@ cdef class TextReader: self.na_values = na_values if na_fvalues is None: - na_fvalues = set() + na_fvalues = set() self.na_fvalues = na_fvalues - self.true_values = _maybe_encode(true_values) - self.false_values = _maybe_encode(false_values) + self.true_values = _maybe_encode(true_values) + _true_values + self.false_values = _maybe_encode(false_values) + _false_values + + self.true_set = kset_from_list(self.true_values) + self.false_set = kset_from_list(self.false_values) self.converters = converters @@ -522,6 +528,8 @@ cdef class TextReader: def __dealloc__(self): parser_free(self.parser) + kh_destroy_str(self.true_set) + kh_destroy_str(self.false_set) def set_error_bad_lines(self, int status): self.parser.error_bad_lines = status @@ -676,10 +684,10 @@ cdef class TextReader: if hr == self.header[-1]: lc = len(this_header) ic = len(self.index_col) if self.index_col is not None else 0 - if lc != unnamed_count and lc-ic > unnamed_count: - hr -= 1 - self.parser_start -= 1 - this_header = [ None ] * lc + if lc != unnamed_count and lc - ic > unnamed_count: + hr -= 1 + self.parser_start -= 1 + this_header = [None] * lc data_line = hr + 1 header.append(this_header) @@ -809,7 +817,8 @@ cdef class TextReader: cdef _tokenize_rows(self, size_t nrows): cdef int status - status = tokenize_nrows(self.parser, nrows) + with nogil: + status = tokenize_nrows(self.parser, nrows) if self.parser.warn_msg != NULL: print >> sys.stderr, self.parser.warn_msg @@ -836,7 +845,8 @@ cdef class TextReader: raise ValueError('skip_footer can only be used to read ' 'the whole file') else: - status = tokenize_all_rows(self.parser) + with nogil: + status = tokenize_all_rows(self.parser) if self.parser.warn_msg != NULL: print >> sys.stderr, self.parser.warn_msg @@ -1055,9 +1065,6 @@ cdef class TextReader: bint user_dtype, kh_str_t *na_hashset, object na_flist): - cdef kh_str_t *true_set - cdef kh_str_t *false_set - if dtype[1] == 'i' or dtype[1] == 'u': result, na_count = _try_int64(self.parser, i, start, end, na_filter, na_hashset) @@ -1073,25 +1080,16 @@ cdef class TextReader: elif dtype[1] == 'f': result, na_count = _try_double(self.parser, i, start, end, - na_filter, na_hashset, na_flist) + na_filter, na_hashset, na_flist) if result is not None and dtype[1:] != 'f8': result = result.astype(dtype) return result, na_count elif dtype[1] == 'b': - if self.true_values is not None or self.false_values is not None: - - true_set = kset_from_list(self.true_values + _true_values) - false_set = kset_from_list(self.false_values + _false_values) - result, na_count = _try_bool_flex(self.parser, i, start, end, - na_filter, na_hashset, - true_set, false_set) - kh_destroy_str(true_set) - kh_destroy_str(false_set) - else: - result, na_count = _try_bool(self.parser, i, start, end, - na_filter, na_hashset) + result, na_count = _try_bool_flex(self.parser, i, start, end, + na_filter, na_hashset, + self.true_set, self.false_set) return result, na_count elif dtype[1] == 'c': raise NotImplementedError("the dtype %s is not supported for parsing" % dtype) @@ -1442,8 +1440,7 @@ cdef _string_box_decode(parser_t *parser, int col, cdef _to_fw_string(parser_t *parser, int col, int line_start, int line_end, size_t width): cdef: - int error - Py_ssize_t i, j + Py_ssize_t i coliter_t it const char *word = NULL char *data @@ -1452,6 +1449,18 @@ cdef _to_fw_string(parser_t *parser, int col, int line_start, result = np.empty(line_end - line_start, dtype='|S%d' % width) data = result.data + with nogil: + _to_fw_string_nogil(parser, col, line_start, line_end, width, data) + + return result + +cdef inline void _to_fw_string_nogil(parser_t *parser, int col, int line_start, + int line_end, size_t width, char *data) nogil: + cdef: + Py_ssize_t i + coliter_t it + const char *word = NULL + coliter_setup(&it, parser, col, line_start) for i in range(line_end - line_start): @@ -1459,8 +1468,6 @@ cdef _to_fw_string(parser_t *parser, int col, int line_start, strncpy(data, word, width) data += width - return result - cdef char* cinf = b'inf' cdef char* cneginf = b'-inf' @@ -1474,14 +1481,40 @@ cdef _try_double(parser_t *parser, int col, int line_start, int line_end, char *p_end double *data double NA = na_values[np.float64] + kh_float64_t *na_fset ndarray result khiter_t k bint use_na_flist = len(na_flist) > 0 - global errno lines = line_end - line_start result = np.empty(lines, dtype=np.float64) data = result.data + na_fset = kset_float64_from_list(na_flist) + with nogil: + error = _try_double_nogil(parser, col, line_start, line_end, + na_filter, na_hashset, use_na_flist, na_fset, NA, data, &na_count) + kh_destroy_float64(na_fset) + if error != 0: + return None, None + return result, na_count + +cdef inline int _try_double_nogil(parser_t *parser, int col, int line_start, int line_end, + bint na_filter, kh_str_t *na_hashset, bint use_na_flist, + const kh_float64_t *na_flist, + double NA, + double *data, int *na_count) nogil: + cdef: + int error, + size_t i + size_t lines = line_end - line_start + coliter_t it + const char *word = NULL + char *p_end + khiter_t k, k64 + + global errno + + na_count[0] = 0 coliter_setup(&it, parser, col, line_start) if na_filter: @@ -1491,39 +1524,41 @@ cdef _try_double(parser_t *parser, int col, int line_start, int line_end, k = kh_get_str(na_hashset, word) # in the hash table if k != na_hashset.n_buckets: - na_count += 1 + na_count[0] += 1 data[0] = NA else: data[0] = parser.converter(word, &p_end, parser.decimal, parser.sci, - parser.thousands, 1) + parser.thousands, 1) if errno != 0 or p_end[0] or p_end == word: if strcasecmp(word, cinf) == 0: data[0] = INF elif strcasecmp(word, cneginf) == 0: data[0] = NEGINF else: - return None, None + # Just return a non-zero value since the errno is never consumed. + return 1 if use_na_flist: - if data[0] in na_flist: - na_count += 1 + k64 = kh_get_float64(na_flist, data[0]) + if k64 != na_flist.n_buckets: + na_count[0] += 1 data[0] = NA data += 1 else: for i in range(lines): COLITER_NEXT(it, word) data[0] = parser.converter(word, &p_end, parser.decimal, parser.sci, - parser.thousands, 1) + parser.thousands, 1) if errno != 0 or p_end[0] or p_end == word: if strcasecmp(word, cinf) == 0: data[0] = INF elif strcasecmp(word, cneginf) == 0: data[0] = NEGINF else: - return None, None + # Just return a non-zero value since the errno is never consumed. + return 1 data += 1 - return result, na_count - + return 0 cdef _try_int64(parser_t *parser, int col, int line_start, int line_end, bint na_filter, kh_str_t *na_hashset): @@ -1531,7 +1566,6 @@ cdef _try_int64(parser_t *parser, int col, int line_start, int line_end, int error, na_count = 0 size_t i, lines coliter_t it - const char *word = NULL int64_t *data ndarray result @@ -1542,6 +1576,29 @@ cdef _try_int64(parser_t *parser, int col, int line_start, int line_end, result = np.empty(lines, dtype=np.int64) data = result.data coliter_setup(&it, parser, col, line_start) + with nogil: + error = _try_int64_nogil(parser, col, line_start, line_end, na_filter, na_hashset, NA, data, &na_count) + if error != 0: + if error == ERROR_OVERFLOW: + # Can't get the word variable + raise OverflowError('Overflow') + return None, None + + return result, na_count + +cdef inline int _try_int64_nogil(parser_t *parser, int col, int line_start, int line_end, + bint na_filter, const kh_str_t *na_hashset, int64_t NA, int64_t *data, + int *na_count) nogil: + cdef: + int error + size_t i + size_t lines = line_end - line_start + coliter_t it + const char *word = NULL + khiter_t k + + na_count[0] = 0 + coliter_setup(&it, parser, col, line_start) if na_filter: for i in range(lines): @@ -1549,46 +1606,54 @@ cdef _try_int64(parser_t *parser, int col, int line_start, int line_end, k = kh_get_str(na_hashset, word) # in the hash table if k != na_hashset.n_buckets: - na_count += 1 + na_count[0] += 1 data[i] = NA continue data[i] = str_to_int64(word, INT64_MIN, INT64_MAX, &error, parser.thousands) if error != 0: - if error == ERROR_OVERFLOW: - raise OverflowError(word) - - return None, None + return error else: for i in range(lines): COLITER_NEXT(it, word) data[i] = str_to_int64(word, INT64_MIN, INT64_MAX, &error, parser.thousands) if error != 0: - if error == ERROR_OVERFLOW: - raise OverflowError(word) - return None, None - - return result, na_count + return error + return 0 cdef _try_bool(parser_t *parser, int col, int line_start, int line_end, bint na_filter, kh_str_t *na_hashset): cdef: - int error, na_count = 0 - size_t i, lines - coliter_t it - const char *word = NULL + int na_count + size_t lines = line_end - line_start uint8_t *data - ndarray result + cnp.ndarray[cnp.uint8_t, ndim=1] result uint8_t NA = na_values[np.bool_] - khiter_t k - lines = line_end - line_start - result = np.empty(lines, dtype=np.uint8) + result = np.empty(lines) data = result.data + + with nogil: + error = _try_bool_nogil(parser, col, line_start, line_end, na_filter, na_hashset, NA, data, &na_count) + if error != 0: + return None, None + return result.view(np.bool_), na_count + +cdef inline int _try_bool_nogil(parser_t *parser, int col, int line_start, int line_end, + bint na_filter, const kh_str_t *na_hashset, uint8_t NA, uint8_t *data, + int *na_count) nogil: + cdef: + int error + size_t lines = line_end - line_start + size_t i + coliter_t it + const char *word = NULL + khiter_t k + na_count[0] = 0 coliter_setup(&it, parser, col, line_start) if na_filter: @@ -1598,14 +1663,14 @@ cdef _try_bool(parser_t *parser, int col, int line_start, int line_end, k = kh_get_str(na_hashset, word) # in the hash table if k != na_hashset.n_buckets: - na_count += 1 + na_count[0] += 1 data[0] = NA data += 1 continue error = to_boolean(word, data) if error != 0: - return None, None + return error data += 1 else: for i in range(lines): @@ -1613,15 +1678,13 @@ cdef _try_bool(parser_t *parser, int col, int line_start, int line_end, error = to_boolean(word, data) if error != 0: - return None, None + return error data += 1 - - return result.view(np.bool_), na_count - + return 0 cdef _try_bool_flex(parser_t *parser, int col, int line_start, int line_end, - bint na_filter, kh_str_t *na_hashset, - kh_str_t *true_hashset, kh_str_t *false_hashset): + bint na_filter, const kh_str_t *na_hashset, + const kh_str_t *true_hashset, const kh_str_t *false_hashset): cdef: int error, na_count = 0 size_t i, lines @@ -1636,6 +1699,26 @@ cdef _try_bool_flex(parser_t *parser, int col, int line_start, int line_end, lines = line_end - line_start result = np.empty(lines, dtype=np.uint8) data = result.data + with nogil: + error = _try_bool_flex_nogil(parser, col, line_start, line_end, na_filter, na_hashset, + true_hashset, false_hashset, NA, data, &na_count) + if error != 0: + return None, None + return result.view(np.bool_), na_count + +cdef inline int _try_bool_flex_nogil(parser_t *parser, int col, int line_start, int line_end, + bint na_filter, const kh_str_t *na_hashset, + const kh_str_t *true_hashset, const kh_str_t *false_hashset, + uint8_t NA, uint8_t *data, int *na_count) nogil: + cdef: + int error = 0 + size_t i + size_t lines = line_end - line_start + coliter_t it + const char *word = NULL + khiter_t k + + na_count[0] = 0 coliter_setup(&it, parser, col, line_start) if na_filter: @@ -1645,7 +1728,7 @@ cdef _try_bool_flex(parser_t *parser, int col, int line_start, int line_end, k = kh_get_str(na_hashset, word) # in the hash table if k != na_hashset.n_buckets: - na_count += 1 + na_count[0] += 1 data[0] = NA data += 1 continue @@ -1655,7 +1738,6 @@ cdef _try_bool_flex(parser_t *parser, int col, int line_start, int line_end, data[0] = 1 data += 1 continue - k = kh_get_str(false_hashset, word) if k != false_hashset.n_buckets: data[0] = 0 @@ -1664,7 +1746,7 @@ cdef _try_bool_flex(parser_t *parser, int col, int line_start, int line_end, error = to_boolean(word, data) if error != 0: - return None, None + return error data += 1 else: for i in range(lines): @@ -1684,10 +1766,10 @@ cdef _try_bool_flex(parser_t *parser, int col, int line_start, int line_end, error = to_boolean(word, data) if error != 0: - return None, None + return error data += 1 - return result.view(np.bool_), na_count + return 0 cdef kh_str_t* kset_from_list(list values) except NULL: # caller takes responsibility for freeing the hash table @@ -1712,6 +1794,25 @@ cdef kh_str_t* kset_from_list(list values) except NULL: return table +cdef kh_float64_t* kset_float64_from_list(values) except NULL: + # caller takes responsibility for freeing the hash table + cdef: + Py_ssize_t i + khiter_t k + kh_float64_t *table + int ret = 0 + cnp.float64_t val + object value + + table = kh_init_float64() + + for value in values: + val = float(value) + + k = kh_put_float64(table, val, &ret) + + return table + # if at first you don't succeed...