Skip to content

bpo-32604: [_xxsubinterpreters] Add channel_send_wait(). #19829

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Include/cpython/pystate.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ PyAPI_FUNC(const PyConfig*) _PyInterpreterState_GetConfig(PyInterpreterState *in
PyAPI_FUNC(const PyConfig*) _Py_GetConfig(void);


/* cross-interpreter operations */

PyAPI_FUNC(int) _Py_DECREF_in_interpreter(PyInterpreterState *, PyObject *);

/* cross-interpreter data */

struct _xid;
Expand Down
6 changes: 6 additions & 0 deletions Include/pythread.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ PyAPI_FUNC(int) PyThread_tss_set(Py_tss_t *key, void *value);
PyAPI_FUNC(void *) PyThread_tss_get(Py_tss_t *key);
#endif /* New in 3.7 */

#ifndef Py_LIMITED_API
PyAPI_DATA(const PY_TIMEOUT_T) _PyThread_TIMEOUT_NOT_SET;
// This is for use with PyArg_ParseTupleAndKeywords():
PyAPI_FUNC(int) _PyThread_timeout_arg_converter(PyObject *, void *);
#endif

#ifdef __cplusplus
}
#endif
Expand Down
208 changes: 202 additions & 6 deletions Modules/_xxsubinterpretersmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
/* low-level access to interpreter primitives */

#include "Python.h"
#include "pythread.h"
#include "frameobject.h"
#include "interpreteridobject.h"

Expand Down Expand Up @@ -276,6 +277,149 @@ _sharedexception_apply(_sharedexception *exc, PyObject *wrapperclass)
}
}

/* locks */

typedef struct _lockobj {
PyObject_HEAD
PyThread_type_lock lock;
PyInterpreterState *owner;
int done;
} _lockobj;

static int
_lockobj_init(_lockobj *lock)
{
lock->lock = PyThread_allocate_lock();
if (lock->lock == NULL) {
return -1;
}
lock->done = 0;
lock->owner = _get_current();
return 0;
}

static void
_lockobj_dealloc(_lockobj *lock)
{
PyThread_free_lock(lock->lock);
PyObject_Del(lock);
}

// This is cross-interpreter safe.
static int
_lockobj_acquire(_lockobj *lock)
{
// Do not wait.
return PyThread_acquire_lock(lock->lock, 0);
}

// This is cross-interpreter safe.
static void
_lockobj_release(_lockobj *lock)
{
PyThread_release_lock(lock->lock);
}

static PyObject *
_lockobj_call(PyObject *self, PyObject *args, PyObject *kwargs)
{
static char *kwlist[] = {"timeout", NULL};
PY_TIMEOUT_T timeout = _PyThread_TIMEOUT_NOT_SET;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|O&:__call__", kwlist,
_PyThread_timeout_arg_converter,
&timeout)) {
return NULL;
}
_lockobj *lock = (_lockobj *)self;

if (lock->done) {
Py_RETURN_TRUE;
}

// Wait for the lock to be released.
_PyTime_t end = timeout > 0 ? _PyTime_GetMonotonicClock() + timeout : 0;
PyLockStatus r = PY_LOCK_FAILURE;
do {
_PyTime_t microseconds = _PyTime_AsMicroseconds(timeout,
_PyTime_ROUND_CEILING);

/* first a simple non-blocking try without releasing the GIL */
r = PyThread_acquire_lock_timed(lock->lock, 0, 0);
if (r == PY_LOCK_FAILURE && microseconds != 0) {
Py_BEGIN_ALLOW_THREADS
r = PyThread_acquire_lock_timed(lock, microseconds, 0);
Py_END_ALLOW_THREADS
}

if (r == PY_LOCK_INTR) {
/* Run signal handlers if we were interrupted. Propagate
* exceptions from signal handlers, such as KeyboardInterrupt, by
* passing up PY_LOCK_INTR. */
if (Py_MakePendingCalls() < 0) {
return NULL;
}

/* If we're using a timeout, recompute the timeout after processing
* signals, since those can take time. */
if (timeout > 0) {
timeout = end - _PyTime_GetMonotonicClock();

/* Check for negative values, since those mean block forever.
*/
if (timeout < 0) {
r = PY_LOCK_FAILURE;
}
}
}
} while (r == PY_LOCK_INTR); /* Retry if we were interrupted. */
if (r == PY_LOCK_FAILURE) {
Py_RETURN_FALSE;
}

// Success!
_lockobj_release(lock);
lock->done = 1;
Py_RETURN_TRUE;
}

static PyTypeObject _lockobjtype = {
PyVarObject_HEAD_INIT(&PyType_Type, 0)
.tp_name = "_xxsubinterpreters.lock",
.tp_doc = PyDoc_STR("a basic waitable wrapper around a mutex"),
.tp_flags = Py_TPFLAGS_DEFAULT,
.tp_basicsize = sizeof(_lockobj),
// functionality
.tp_new = NULL, // It cannot be instantiated from Python code.
.tp_dealloc = (destructor)_lockobj_dealloc,
.tp_call = _lockobj_call,
};

static _lockobj *
_lockobj_new(void)
{
_lockobj *lock = PyObject_New(_lockobj, &_lockobjtype);
if (lock == NULL) {
return NULL;
}
if (_lockobj_init(lock) != 0) {
PyMem_Free(lock);
return NULL;
}
return lock;
}

static void
_lockobj_free(_lockobj *lock)
{
_lockobj_release(lock);
if (lock->owner == NULL || lock->owner == _get_current()) {
Py_DECREF(lock);
} else {
int res = _Py_DECREF_in_interpreter(lock->owner, (PyObject *)lock);
assert(res == 0);
}
}


/* channel-specific code ****************************************************/

Expand Down Expand Up @@ -353,6 +497,7 @@ struct _channelitem;

typedef struct _channelitem {
_PyCrossInterpreterData *data;
_lockobj *recvlock;
struct _channelitem *next;
} _channelitem;

Expand All @@ -365,6 +510,7 @@ _channelitem_new(void)
return NULL;
}
item->data = NULL;
item->recvlock = NULL;
item->next = NULL;
return item;
}
Expand All @@ -377,6 +523,9 @@ _channelitem_clear(_channelitem *item)
PyMem_Free(item->data);
item->data = NULL;
}
if (item->recvlock != NULL) {
_lockobj_free(item->recvlock);
}
item->next = NULL;
}

Expand All @@ -402,6 +551,7 @@ _channelitem_popped(_channelitem *item)
{
_PyCrossInterpreterData *data = item->data;
item->data = NULL;
// The lock (if any) is released here:
_channelitem_free(item);
return data;
}
Expand Down Expand Up @@ -443,13 +593,15 @@ _channelqueue_free(_channelqueue *queue)
}

static int
_channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data)
_channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data,
_lockobj *recvlock)
{
_channelitem *item = _channelitem_new();
if (item == NULL) {
return -1;
}
item->data = data;
item->recvlock = recvlock;

queue->count += 1;
if (queue->first == NULL) {
Expand Down Expand Up @@ -761,7 +913,7 @@ _channel_free(_PyChannelState *chan)

static int
_channel_add(_PyChannelState *chan, int64_t interp,
_PyCrossInterpreterData *data)
_PyCrossInterpreterData *data, _lockobj *recvlock)
{
int res = -1;
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
Expand All @@ -774,7 +926,7 @@ _channel_add(_PyChannelState *chan, int64_t interp,
goto done;
}

if (_channelqueue_put(chan->queue, data) != 0) {
if (_channelqueue_put(chan->queue, data, recvlock) != 0) {
goto done;
}

Expand Down Expand Up @@ -1280,7 +1432,8 @@ _channel_destroy(_channels *channels, int64_t id)
}

static int
_channel_send(_channels *channels, int64_t id, PyObject *obj)
_channel_send(_channels *channels, int64_t id, PyObject *obj,
_lockobj *recvlock)
{
PyInterpreterState *interp = _get_current();
if (interp == NULL) {
Expand Down Expand Up @@ -1314,7 +1467,8 @@ _channel_send(_channels *channels, int64_t id, PyObject *obj)
}

// Add the data to the channel.
int res = _channel_add(chan, PyInterpreterState_GetID(interp), data);
int res = _channel_add(chan, PyInterpreterState_GetID(interp), data,
recvlock);
PyThread_release_lock(mutex);
if (res != 0) {
_PyCrossInterpreterData_Release(data);
Expand Down Expand Up @@ -2412,7 +2566,7 @@ channel_send(PyObject *self, PyObject *args, PyObject *kwds)
return NULL;
}

if (_channel_send(&_globals.channels, cid, obj) != 0) {
if (_channel_send(&_globals.channels, cid, obj, NULL) != 0) {
return NULL;
}
Py_RETURN_NONE;
Expand All @@ -2423,6 +2577,43 @@ PyDoc_STRVAR(channel_send_doc,
\n\
Add the object's data to the channel's queue.");

static PyObject *
channel_send_wait(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"cid", "obj", NULL};
int64_t cid;
PyObject *obj;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist,
channel_id_converter, &cid, &obj)) {
return NULL;
}

_lockobj *lock = _lockobj_new();
if (lock == NULL) {
return NULL;
}
if (_lockobj_acquire(lock) != 0) {
PyErr_SetString(PyExc_RuntimeError, "could not acquire lock");
_lockobj_dealloc(lock);
return NULL;
}
if (_channel_send(&_globals.channels, cid, obj, lock) != 0) {
_lockobj_dealloc(lock);
return NULL;
}
Py_INCREF(lock);
return (PyObject*)lock;
}

PyDoc_STRVAR(channel_send_wait_doc,
"channel_send_wait(cid, obj)\n\
\n\
Add the object's data to the channel's queue.\n\
\n\
The returned callable will block until the object is received.\n\
Note that it takes an optional 'timeout' arg like\n\
threading.Lock.acquire() does.");

static PyObject *
channel_recv(PyObject *self, PyObject *args, PyObject *kwds)
{
Expand Down Expand Up @@ -2575,6 +2766,8 @@ static PyMethodDef module_functions[] = {
METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc},
{"channel_send", (PyCFunction)(void(*)(void))channel_send,
METH_VARARGS | METH_KEYWORDS, channel_send_doc},
{"channel_send_wait", (PyCFunction)(void(*)(void))channel_send_wait,
METH_VARARGS | METH_KEYWORDS, channel_send_wait_doc},
{"channel_recv", (PyCFunction)(void(*)(void))channel_recv,
METH_VARARGS | METH_KEYWORDS, channel_recv_doc},
{"channel_close", (PyCFunction)(void(*)(void))channel_close,
Expand Down Expand Up @@ -2618,6 +2811,9 @@ PyInit__xxsubinterpreters(void)
if (PyType_Ready(&ChannelIDtype) != 0) {
return NULL;
}
if (PyType_Ready(&_lockobjtype) != 0) {
return NULL;
}

/* Create the module */
PyObject *module = PyModule_Create(&interpretersmodule);
Expand Down
Loading