Skip to content

Support async query #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 3, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions MySQLdb/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ class Connection(_mysql.connection):
"""MySQL Database Connection Object"""

default_cursor = cursors.Cursor
waiter = None

def __init__(self, *args, **kwargs):
"""
Create a connection to the database. It is strongly recommended
that you only use keyword parameters. Consult the MySQL C API
documentation for more information.
Expand Down Expand Up @@ -150,9 +150,13 @@ class object, used to create cursors (keyword only)
If True, autocommit is enabled.
If None, autocommit isn't set and server default is used.
waiter
Callable accepts fd as an argument. It is called after sending
query and before reading response.
This is useful when using with greenlet and async io.
There are a number of undocumented, non-standard methods. See the
documentation for the MySQL C API for some hints on what they do.
"""
from MySQLdb.constants import CLIENT, FIELD_TYPE
from MySQLdb.converters import conversions
Expand Down Expand Up @@ -195,6 +199,7 @@ class object, used to create cursors (keyword only)

# PEP-249 requires autocommit to be initially off
autocommit = kwargs2.pop('autocommit', False)
self.waiter = kwargs2.pop('waiter', None)

super(Connection, self).__init__(*args, **kwargs2)
self.cursorclass = cursorclass
Expand Down Expand Up @@ -266,6 +271,14 @@ def cursor(self, cursorclass=None):
"""
return (cursorclass or self.cursorclass)(self)

def query(self, query):
if self.waiter is not None:
self.send_query(query)
self.waiter(self.fileno())
self.read_query_result()
else:
_mysql.connection.query(self, query)

def __enter__(self):
if self.get_autocommit():
self.query("BEGIN")
Expand Down
74 changes: 74 additions & 0 deletions _mysql.c
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,16 @@ static int _mysql_ConnectionObject_clear(
return 0;
}

static char _mysql_ConnectionObject_fileno__doc__[] =
"Return underlaying fd for connection";

static PyObject *
_mysql_ConnectionObject_fileno(
_mysql_ConnectionObject *self)
{
return PyInt_FromLong(self->connection.net.fd);
}

static char _mysql_ConnectionObject_close__doc__[] =
"Close the connection. No further activity possible.";

Expand Down Expand Up @@ -1963,8 +1973,10 @@ _mysql_ConnectionObject_query(
{
char *query;
int len, r;
MYSQL *mysql = &(self->connection);
if (!PyArg_ParseTuple(args, "s#:query", &query, &len)) return NULL;
check_connection(self);

Py_BEGIN_ALLOW_THREADS
r = mysql_real_query(&(self->connection), query, len);
Py_END_ALLOW_THREADS
Expand All @@ -1974,6 +1986,50 @@ _mysql_ConnectionObject_query(
}


static char _mysql_ConnectionObject_send_query__doc__[] =
"Send a query. Same to query() except not wait response.\n\n\
Use read_query_result() before calling store_result() or use_result()\n";

static PyObject *
_mysql_ConnectionObject_send_query(
_mysql_ConnectionObject *self,
PyObject *args)
{
char *query;
int len, r;
MYSQL *mysql = &(self->connection);
if (!PyArg_ParseTuple(args, "s#:query", &query, &len)) return NULL;
check_connection(self);

Py_BEGIN_ALLOW_THREADS
r = mysql_send_query(mysql, query, len);
Py_END_ALLOW_THREADS
if (r) return _mysql_Exception(self);
Py_INCREF(Py_None);
return Py_None;
}


static char _mysql_ConnectionObject_read_query_result__doc__[] =
"Read result of query sent by send_query().\n";

static PyObject *
_mysql_ConnectionObject_read_query_result(
_mysql_ConnectionObject *self)
{
char *query;
int len, r;
MYSQL *mysql = &(self->connection);
check_connection(self);

Py_BEGIN_ALLOW_THREADS
r = (int)mysql_read_query_result(mysql);
Py_END_ALLOW_THREADS
if (r) return _mysql_Exception(self);
Py_INCREF(Py_None);
return Py_None;
}

static char _mysql_ConnectionObject_select_db__doc__[] =
"Causes the database specified by db to become the default\n\
(current) database on the connection specified by mysql. In subsequent\n\
Expand Down Expand Up @@ -2344,6 +2400,12 @@ static PyMethodDef _mysql_ConnectionObject_methods[] = {
METH_VARARGS,
_mysql_ConnectionObject_close__doc__
},
{
"fileno",
(PyCFunction)_mysql_ConnectionObject_fileno,
METH_NOARGS,
_mysql_ConnectionObject_fileno__doc__
},
{
"dump_debug_info",
(PyCFunction)_mysql_ConnectionObject_dump_debug_info,
Expand Down Expand Up @@ -2428,6 +2490,18 @@ static PyMethodDef _mysql_ConnectionObject_methods[] = {
METH_VARARGS,
_mysql_ConnectionObject_query__doc__
},
{
"send_query",
(PyCFunction)_mysql_ConnectionObject_send_query,
METH_VARARGS,
_mysql_ConnectionObject_send_query__doc__,
},
{
"read_query_result",
(PyCFunction)_mysql_ConnectionObject_read_query_result,
METH_NOARGS,
_mysql_ConnectionObject_read_query_result__doc__,
},
{
"select_db",
(PyCFunction)_mysql_ConnectionObject_select_db,
Expand Down
25 changes: 25 additions & 0 deletions samples/waiter_gevent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from __future__ import print_function
"""Demo using Gevent with mysqlclient."""

import gevent.hub
import select
import MySQLdb


def gevent_waiter(fd, hub=gevent.hub.get_hub()):
hub.wait(hub.loop.io(fd, 1))


def f(n):
conn = MySQLdb.connect(user='root', waiter=gevent_waiter)
cur = conn.cursor()
cur.execute("SELECT SLEEP(%s)", (n,))
cur.execute("SELECT 1+%s", (n,))
print(cur.fetchall()[0])


gevent.spawn(f, 1)
gevent.spawn(f, 2)
gevent.spawn(f, 3)
gevent.spawn(f, 4)
gevent.sleep(5)
19 changes: 19 additions & 0 deletions samples/waiter_meinheld.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import meinheld
import MySQLdb


def meinheld_waiter(fd):
meinheld.server.trampoline(fd, read=True, timeout=10)


def app(env, start):
cont = b"Hello, World\n"
conn = MySQLdb.connect(user="root", waiter=meinheld_waiter)
cur = conn.cursor()
cur.execute("SELECT SLEEP(2)")
start(b"200 OK", [('Content-Type', 'text/plain'), ('Content-Length', str(len(cont)))])
return [cont]


meinheld.server.listen(("0.0.0.0", 8080))
meinheld.server.run(app)