From 5ba60802c7cd89302ae722e11856645e54d7055b Mon Sep 17 00:00:00 2001 From: INADA Naoki Date: Thu, 3 Jul 2014 14:26:38 +0900 Subject: [PATCH 1/3] Add low level APIs for semi-async query. --- _mysql.c | 74 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/_mysql.c b/_mysql.c index 3c19caaf..c7061029 100644 --- a/_mysql.c +++ b/_mysql.c @@ -756,6 +756,16 @@ static int _mysql_ConnectionObject_clear( return 0; } +static char _mysql_ConnectionObject_fileno__doc__[] = +"Return underlaying fd for connection"; + +static PyObject * +_mysql_ConnectionObject_fileno( + _mysql_ConnectionObject *self) +{ + return PyInt_FromLong(self->connection.net.fd); +} + static char _mysql_ConnectionObject_close__doc__[] = "Close the connection. No further activity possible."; @@ -1963,8 +1973,10 @@ _mysql_ConnectionObject_query( { char *query; int len, r; + MYSQL *mysql = &(self->connection); if (!PyArg_ParseTuple(args, "s#:query", &query, &len)) return NULL; check_connection(self); + Py_BEGIN_ALLOW_THREADS r = mysql_real_query(&(self->connection), query, len); Py_END_ALLOW_THREADS @@ -1974,6 +1986,50 @@ _mysql_ConnectionObject_query( } +static char _mysql_ConnectionObject_send_query__doc__[] = +"Send a query. Same to query() except not wait response.\n\n\ +Use read_query_result() before calling store_result() or use_result()\n"; + +static PyObject * +_mysql_ConnectionObject_send_query( + _mysql_ConnectionObject *self, + PyObject *args) +{ + char *query; + int len, r; + MYSQL *mysql = &(self->connection); + if (!PyArg_ParseTuple(args, "s#:query", &query, &len)) return NULL; + check_connection(self); + + Py_BEGIN_ALLOW_THREADS + r = mysql_send_query(mysql, query, len); + Py_END_ALLOW_THREADS + if (r) return _mysql_Exception(self); + Py_INCREF(Py_None); + return Py_None; +} + + +static char _mysql_ConnectionObject_read_query_result__doc__[] = +"Read result of query sent by send_query().\n"; + +static PyObject * +_mysql_ConnectionObject_read_query_result( + _mysql_ConnectionObject *self) +{ + char *query; + int len, r; + MYSQL *mysql = &(self->connection); + check_connection(self); + + Py_BEGIN_ALLOW_THREADS + r = (int)mysql_read_query_result(mysql); + Py_END_ALLOW_THREADS + if (r) return _mysql_Exception(self); + Py_INCREF(Py_None); + return Py_None; +} + static char _mysql_ConnectionObject_select_db__doc__[] = "Causes the database specified by db to become the default\n\ (current) database on the connection specified by mysql. In subsequent\n\ @@ -2344,6 +2400,12 @@ static PyMethodDef _mysql_ConnectionObject_methods[] = { METH_VARARGS, _mysql_ConnectionObject_close__doc__ }, + { + "fileno", + (PyCFunction)_mysql_ConnectionObject_fileno, + METH_NOARGS, + _mysql_ConnectionObject_fileno__doc__ + }, { "dump_debug_info", (PyCFunction)_mysql_ConnectionObject_dump_debug_info, @@ -2428,6 +2490,18 @@ static PyMethodDef _mysql_ConnectionObject_methods[] = { METH_VARARGS, _mysql_ConnectionObject_query__doc__ }, + { + "send_query", + (PyCFunction)_mysql_ConnectionObject_send_query, + METH_VARARGS, + _mysql_ConnectionObject_send_query__doc__, + }, + { + "read_query_result", + (PyCFunction)_mysql_ConnectionObject_read_query_result, + METH_NOARGS, + _mysql_ConnectionObject_read_query_result__doc__, + }, { "select_db", (PyCFunction)_mysql_ConnectionObject_select_db, From 7e15cf86171ca1dd95467e37abda107138607ca3 Mon Sep 17 00:00:00 2001 From: INADA Naoki Date: Thu, 3 Jul 2014 20:52:53 +0900 Subject: [PATCH 2/3] Add `waiter` to MySQLdb.Connection. --- MySQLdb/connections.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/MySQLdb/connections.py b/MySQLdb/connections.py index cd803767..8d5c2d8a 100644 --- a/MySQLdb/connections.py +++ b/MySQLdb/connections.py @@ -63,10 +63,10 @@ class Connection(_mysql.connection): """MySQL Database Connection Object""" default_cursor = cursors.Cursor + waiter = None def __init__(self, *args, **kwargs): """ - Create a connection to the database. It is strongly recommended that you only use keyword parameters. Consult the MySQL C API documentation for more information. @@ -150,9 +150,13 @@ class object, used to create cursors (keyword only) If True, autocommit is enabled. If None, autocommit isn't set and server default is used. + waiter + Callable accepts fd as an argument. It is called after sending + query and before reading response. + This is useful when using with greenlet and async io. + There are a number of undocumented, non-standard methods. See the documentation for the MySQL C API for some hints on what they do. - """ from MySQLdb.constants import CLIENT, FIELD_TYPE from MySQLdb.converters import conversions @@ -195,6 +199,7 @@ class object, used to create cursors (keyword only) # PEP-249 requires autocommit to be initially off autocommit = kwargs2.pop('autocommit', False) + self.waiter = kwargs2.pop('waiter', None) super(Connection, self).__init__(*args, **kwargs2) self.cursorclass = cursorclass @@ -266,6 +271,14 @@ def cursor(self, cursorclass=None): """ return (cursorclass or self.cursorclass)(self) + def query(self, query): + if self.waiter is not None: + self.send_query(query) + self.waiter(self.fileno()) + self.read_query_result() + else: + _mysql.connection.query(self, query) + def __enter__(self): if self.get_autocommit(): self.query("BEGIN") From cac44d4179ec64581b7425b59068619b662dfbb1 Mon Sep 17 00:00:00 2001 From: INADA Naoki Date: Thu, 3 Jul 2014 21:05:39 +0900 Subject: [PATCH 3/3] Add waiter samples. --- samples/waiter_gevent.py | 25 +++++++++++++++++++++++++ samples/waiter_meinheld.py | 19 +++++++++++++++++++ 2 files changed, 44 insertions(+) create mode 100644 samples/waiter_gevent.py create mode 100644 samples/waiter_meinheld.py diff --git a/samples/waiter_gevent.py b/samples/waiter_gevent.py new file mode 100644 index 00000000..7ee835f4 --- /dev/null +++ b/samples/waiter_gevent.py @@ -0,0 +1,25 @@ +from __future__ import print_function +"""Demo using Gevent with mysqlclient.""" + +import gevent.hub +import select +import MySQLdb + + +def gevent_waiter(fd, hub=gevent.hub.get_hub()): + hub.wait(hub.loop.io(fd, 1)) + + +def f(n): + conn = MySQLdb.connect(user='root', waiter=gevent_waiter) + cur = conn.cursor() + cur.execute("SELECT SLEEP(%s)", (n,)) + cur.execute("SELECT 1+%s", (n,)) + print(cur.fetchall()[0]) + + +gevent.spawn(f, 1) +gevent.spawn(f, 2) +gevent.spawn(f, 3) +gevent.spawn(f, 4) +gevent.sleep(5) diff --git a/samples/waiter_meinheld.py b/samples/waiter_meinheld.py new file mode 100644 index 00000000..838c9153 --- /dev/null +++ b/samples/waiter_meinheld.py @@ -0,0 +1,19 @@ +import meinheld +import MySQLdb + + +def meinheld_waiter(fd): + meinheld.server.trampoline(fd, read=True, timeout=10) + + +def app(env, start): + cont = b"Hello, World\n" + conn = MySQLdb.connect(user="root", waiter=meinheld_waiter) + cur = conn.cursor() + cur.execute("SELECT SLEEP(2)") + start(b"200 OK", [('Content-Type', 'text/plain'), ('Content-Length', str(len(cont)))]) + return [cont] + + +meinheld.server.listen(("0.0.0.0", 8080)) +meinheld.server.run(app)