blog icon indicating copy to clipboard operation
blog copied to clipboard

Python的多线程与GIL

Open junnplus opened this issue 7 years ago • 5 comments

Python从0.9.8版就开始支持多线程(thread模块),1.5.1版引入了threading高级模块,是对thread模块的封装。

在Python3中,thread模块被重命名为_thread,强调其作为底层模块应该避免使用。

_thread模块对应源码在Modules/_threadmodule.c中,我们看下_thread模块提供的接口函数:

static PyMethodDef thread_methods[] = {
    {"start_new_thread",        (PyCFunction)thread_PyThread_start_new_thread,
     METH_VARARGS, start_new_doc},
    {"start_new",               (PyCFunction)thread_PyThread_start_new_thread,
     METH_VARARGS, start_new_doc},
    {"allocate_lock",           (PyCFunction)thread_PyThread_allocate_lock,
     METH_NOARGS, allocate_doc},
    {"allocate",                (PyCFunction)thread_PyThread_allocate_lock,
     METH_NOARGS, allocate_doc},
    {"exit_thread",             (PyCFunction)thread_PyThread_exit_thread,
     METH_NOARGS, exit_doc},
    {"exit",                    (PyCFunction)thread_PyThread_exit_thread,
     METH_NOARGS, exit_doc},
    {"interrupt_main",          (PyCFunction)thread_PyThread_interrupt_main,
     METH_NOARGS, interrupt_doc},
    {"get_ident",               (PyCFunction)thread_get_ident,
     METH_NOARGS, get_ident_doc},
    {"_count",                  (PyCFunction)thread__count,
     METH_NOARGS, _count_doc},
    {"stack_size",              (PyCFunction)thread_stack_size,
     METH_VARARGS, stack_size_doc},
    {"_set_sentinel",           (PyCFunction)thread__set_sentinel,
     METH_NOARGS, _set_sentinel_doc},
    {NULL,                      NULL}           /* sentinel */
};

junnplus avatar Mar 19 '18 03:03 junnplus

0x00 主线程

Python 默认会有一个主线程,其实就是进程本身。

// Python/pylifecycle.c
_PyInitError
_Py_InitializeCore(const _PyCoreConfig *core_config)
{
...
    tstate = PyThreadState_New(interp);
    if (tstate == NULL)
        return _Py_INIT_ERR("can't make first thread");

_Py_InitializeCore 初始化调用PyThreadState_New来创建第一个线程状态。

// Python/pystate.c
PyThreadState *
PyThreadState_New(PyInterpreterState *interp)
{
    return new_threadstate(interp, 1);
}

static PyThreadState *
new_threadstate(PyInterpreterState *interp, int init)
{
    PyThreadState *tstate = (PyThreadState *)PyMem_RawMalloc(sizeof(PyThreadState));

    if (_PyThreadState_GetFrame == NULL)
        _PyThreadState_GetFrame = threadstate_getframe;

    if (tstate != NULL) {
        tstate->interp = interp;
...
        if (init)
            _PyThreadState_Init(tstate);
}

new_threadstate 创建一个线程状态。

在Python3.7之前,Python默认是没有开启支持多线程的,换句话说,Python中支持多线程的数据结构以及GIL都是没有创建的,Python之所以有这种行为是因为大多数的Python程序都不需要多线程的支持。当用户使用_thread模块创建新线程,通过调用PyEval_InitThreads来初始化多线程环境,这个时候,Python虚拟机会自动建立多线程机制需要的数据结构、环境以及那个至关重要的GIL。

而从3.7开始之后,PyEval_InitThreads函数会在Python启动的时候默认被调用 https://github.com/python/cpython/commit/2914bb32e2adf8dff77c0ca58b33201bc94e398c#diff-baf5eab51059d96fb8837152dab0d1a4R689

PyEval_InitThreads函数定义在Python/ceval.c

void
PyEval_InitThreads(void)
{
    // [A] 检查是否存在GIL
    if (gil_created())
        return;
    // [B] 创建并初始化GIL
    create_gil();
    // [C] 主线程获取GIL
    take_gil(PyThreadState_GET());
    // [D] 检查和初始化原生系统线程环境
    _PyRuntime.ceval.pending.main_thread = PyThread_get_thread_ident();
    if (!_PyRuntime.ceval.pending.lock)
        _PyRuntime.ceval.pending.lock = PyThread_allocate_lock();
}

[A] 检查是否存在GIL

gil_created函数定义在Python/ceval_gil.h

static int gil_created(void)
{
    return (_Py_atomic_load_explicit(&_PyRuntime.ceval.gil.locked,
                                     _Py_memory_order_acquire)
            ) >= 0;
}

_Py_atomic_*是一组原子操作,封装了不同的平台的原子接口。

_Py_atomic_load_explicit定义在Include/pyatomic.h

#define _Py_atomic_load_explicit(ATOMIC_VAL, ORDER) \
    atomic_load_explicit(&(ATOMIC_VAL)->_value, ORDER)

atomic_load_explicit函数是c11标准的原子读操作,参考Atomic operations library

_Py_memory_order_acquire是枚举元素:

typedef enum _Py_memory_order {
    _Py_memory_order_relaxed = memory_order_relaxed,
    _Py_memory_order_acquire = memory_order_acquire,
    _Py_memory_order_release = memory_order_release,
    _Py_memory_order_acq_rel = memory_order_acq_rel,
    _Py_memory_order_seq_cst = memory_order_seq_cst
} _Py_memory_order;

memory_order_*指定常规的非原子内存访问如何围绕原子操作排序,这边我们不细究这些原子操作排序的不同,感兴趣可以参考memory order

我们只关心两种原子操作,一种load原子读和一种store原子写。

_PyRuntime是一个_PyRuntimeState对象,表示Python运行时状态对象。 ceval指向_ceval_runtime_state对象,表示Python字节码执行器的运行时状态对象,定义在Include/internal/ceval.h

struct _ceval_runtime_state {
    int recursion_limit;
    /* Records whether tracing is on for any thread.  Counts the number
       of threads for which tstate->c_tracefunc is non-NULL, so if the
       value is 0, we know we don't have to check this thread's
       c_tracefunc.  This speeds up the if statement in
       PyEval_EvalFrameEx() after fast_next_opcode. */
    int tracing_possible;
    /* This single variable consolidates all requests to break out of
       the fast path in the eval loop. */
    _Py_atomic_int eval_breaker;
    /* Request for dropping the GIL */
    _Py_atomic_int gil_drop_request;
    struct _pending_calls pending;
    struct _gil_runtime_state gil;
};

ceval.gil指向_gil_runtime_state对象,表示GIL的运行状态对象,定义在Include/internal/gil.h

struct _gil_runtime_state {
    /* microseconds (the Python API uses seconds, though) */
    unsigned long interval;
    /* Last PyThreadState holding / having held the GIL. This helps us
       know whether anyone else was scheduled after we dropped the GIL. */
    _Py_atomic_address last_holder;
    /* Whether the GIL is already taken (-1 if uninitialized). This is
       atomic because it can be read without any lock taken in ceval.c. */
    _Py_atomic_int locked;
    /* Number of GIL switches since the beginning. */
    unsigned long switch_number;
    /* This condition variable allows one or several threads to wait
       until the GIL is released. In addition, the mutex also protects
       the above variables. */
    PyCOND_T cond;
    PyMUTEX_T mutex;
#ifdef FORCE_SWITCHING
    /* This condition variable helps the GIL-releasing thread wait for
       a GIL-awaiting thread to be scheduled and take the GIL. */
    PyCOND_T switch_cond;
    PyMUTEX_T switch_mutex;
#endif
};

locked就是传说中的GIL,是一个_Py_atomic_int对象,定义在Include/pyatomic.h

typedef struct _Py_atomic_int {
    atomic_int _value;
} _Py_atomic_int;

所以,gil_created函数通过原子读操作检查原子位locked是否初始化。未初始化时,locked为-1。当GIL被线程获取时,locked为1,反之为0。

[B] 创建并初始化GIL

static void create_gil(void)
{
    MUTEX_INIT(_PyRuntime.ceval.gil.mutex);
#ifdef FORCE_SWITCHING
    MUTEX_INIT(_PyRuntime.ceval.gil.switch_mutex);
#endif
    COND_INIT(_PyRuntime.ceval.gil.cond);
#ifdef FORCE_SWITCHING
    COND_INIT(_PyRuntime.ceval.gil.switch_cond);
#endif
    _Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.last_holder, 0);
    _Py_ANNOTATE_RWLOCK_CREATE(&_PyRuntime.ceval.gil.locked);
    _Py_atomic_store_explicit(&_PyRuntime.ceval.gil.locked, 0,
                              _Py_memory_order_release);
}

_gil_runtime_state结构体中的condmutex用来保护locked

The GIL is just a boolean variable (locked) whose access is protected by a mutex (gil_mutex), and whose changes are signalled by a condition variable (gil_cond). gil_mutex is taken for short periods of time, and therefore mostly uncontended.

GIL利用条件机制和互斥锁<cond, mutex>保护一个锁变量locked作为实现。

PyCOND_TPyMUTEX_T定义在Include/internal/condvar.h,根据平台不一样具体实现也不一样,以POSIX Thread(pthread)为例:

#define PyMUTEX_T pthread_mutex_t
#define PyCOND_T pthread_cond_t

create_gil函数初始化锁变量mutex和初始化条件变量cond之后,通过_Py_atomic_store_relaxed原子写初始化last_holderlast_holder是一个_Py_atomic_address对象,定义在Include/pyatomic.h

typedef struct _Py_atomic_address {
    atomic_uintptr_t _value;
} _Py_atomic_address;

atomic_uintptr_t是一个原子类型指针,当一个线程获得GIL之后,_value将指向线程状态PyThreadState对象。

并通过_Py_atomic_store_explicit原子写初始化locked

[C] 主线程获取GIL

take_gil函数定义在Python/ceval_gil.h

static void take_gil(PyThreadState *tstate)
{
    int err;
    if (tstate == NULL)
        Py_FatalError("take_gil: NULL tstate");

    err = errno;
    MUTEX_LOCK(_PyRuntime.ceval.gil.mutex);

    if (!_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil.locked))
        goto _ready;

    while (_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil.locked)) {
        int timed_out = 0;
        unsigned long saved_switchnum;

        saved_switchnum = _PyRuntime.ceval.gil.switch_number;
        COND_TIMED_WAIT(_PyRuntime.ceval.gil.cond, _PyRuntime.ceval.gil.mutex,
                        INTERVAL, timed_out);
        /* If we timed out and no switch occurred in the meantime, it is time
           to ask the GIL-holding thread to drop it. */
        if (timed_out &&
            _Py_atomic_load_relaxed(&_PyRuntime.ceval.gil.locked) &&
            _PyRuntime.ceval.gil.switch_number == saved_switchnum) {
            SET_GIL_DROP_REQUEST();
        }
    }
_ready:
#ifdef FORCE_SWITCHING
    /* This mutex must be taken before modifying
       _PyRuntime.ceval.gil.last_holder (see drop_gil()). */
    MUTEX_LOCK(_PyRuntime.ceval.gil.switch_mutex);
#endif
    /* We now hold the GIL */
    _Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.locked, 1);
    _Py_ANNOTATE_RWLOCK_ACQUIRED(&_PyRuntime.ceval.gil.locked, /*is_write=*/1);

    if (tstate != (PyThreadState*)_Py_atomic_load_relaxed(
                    &_PyRuntime.ceval.gil.last_holder))
    {
        _Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.last_holder,
                                 (uintptr_t)tstate);
        ++_PyRuntime.ceval.gil.switch_number;
    }

#ifdef FORCE_SWITCHING
    COND_SIGNAL(_PyRuntime.ceval.gil.switch_cond);
    MUTEX_UNLOCK(_PyRuntime.ceval.gil.switch_mutex);
#endif
    if (_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil_drop_request)) {
        RESET_GIL_DROP_REQUEST();
    }
    if (tstate->async_exc != NULL) {
        _PyEval_SignalAsyncExc();
    }

    MUTEX_UNLOCK(_PyRuntime.ceval.gil.mutex);
    errno = err;
}

首先获取mutex互斥锁,获取mutex互斥锁之后检查是否有线程持有GIL(locked为1),如果GIL被占有的话,设置cond信号量等待。等待的时候线程就会挂起,释放mutex互斥锁。cond信号被唤醒时,mutex互斥锁会自动加锁。while语句避免了意外唤醒时条件不满足,大多数情况下都是条件满足被唤醒。

如果等待超时并且这期间没有发生线程切换,就通过SET_GIL_DROP_REQUEST请求持有GIL的线程释放GIL。反之获得GIL就将locked置为1,并将当前线程状态PyThreadState对象保存到last_holderswitch_number切换次数加1。

[D] 检查和初始化原生系统线程环境

pending是一个_pending_calls结构对象:

struct _pending_calls {
    unsigned long main_thread;
    pythread_type_lock lock;
    /* request for running pending calls. */
    _py_atomic_int calls_to_do;
    /* request for looking at the `async_exc` field of the current
       thread state.
       guarded by the gil. */
    int async_exc;
#define npendingcalls 32
    struct {
        int (*func)(void *);
        void *arg;
    } calls[npendingcalls];
    int first;
    int last;
};

pending cells实现了一个机制:

Mechanism whereby asynchronously executing callbacks (e.g. UNIX signal handlers or Mac I/O completion routines) can schedule calls to a function to be called synchronously.

也就是主线程执行一些被注册的函数,pending.main_thread记录了主线程id:

unsigned long
PyThread_get_thread_ident(void)
{
    volatile pthread_t threadid;
    if (!initialized)
        PyThread_init_thread();
    threadid = pthread_self();
    return (unsigned long) threadid;
}

获得线程id之前会检查initialized变量,如果说GIL指示着Python的多线程环境是否已经建立,那么这个initialized变量就指示着为了使用底层平台所提供的原生thread,必须的初始化动作是否完成。

initializedPyThread_init_thread定义在Python/thread.c

static int initialized;

void
PyThread_init_thread(void)
{
#ifdef Py_DEBUG
    const char *p = Py_GETENV("PYTHONTHREADDEBUG");

    if (p) {
        if (*p)
            thread_debug = atoi(p);
        else
            thread_debug = 1;
    }
#endif /* Py_DEBUG */
    if (initialized)
        return;
    initialized = 1;
    dprintf(("PyThread_init_thread called\n"));
    PyThread__init_thread();
}

POSIX Thread(pthread) 下PyThread__init_thread会根据编译器和平台来确认是否要进行初始化动作,定义在Python/thread_pthread.h

static void
PyThread__init_thread(void)
{
#if defined(_AIX) && defined(__GNUC__)
    extern void pthread_init(void);
    pthread_init();
#endif
}

pending.lock用来确保线程安全,是一个Python级别的线程锁。

junnplus avatar Jun 13 '20 10:06 junnplus

0x01 线程的创建

static PyObject *
thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs)
{
    PyObject *func, *args, *keyw = NULL;
    struct bootstate *boot;
    unsigned long ident;

    ...
    
    // [1] 创建bootstate结构boot并初始化
    boot = PyMem_NEW(struct bootstate, 1);
    if (boot == NULL)
        return PyErr_NoMemory();
    boot->interp = PyThreadState_GET()->interp;
    boot->func = func;
    boot->args = args;
    boot->keyw = keyw;
    boot->tstate = _PyThreadState_Prealloc(boot->interp);
    if (boot->tstate == NULL) {
        PyMem_DEL(boot);
        return PyErr_NoMemory();
    }
    Py_INCREF(func);
    Py_INCREF(args);
    Py_XINCREF(keyw);
    // [2] 初始化多线程环境
    PyEval_InitThreads(); /* Start the interpreter's thread-awareness */
    // [3] 创建原生系统线程
    ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);
    ...
    return PyLong_FromUnsignedLong(ident);
}

[1] 创建bootstate结构boot并初始化。

bootstate结构体记录了线程的信息:

struct bootstate {
    PyInterpreterState *interp;
    PyObject *func;
    PyObject *args;
    PyObject *keyw;
    PyThreadState *tstate;
};

func, args, keyw记录线程执行的函数以及参数。 interptstate分别保存了进程状态PyInterpreterState对象和线程状态PyThreadState对象。

通过PyThreadState_GET获取当前线程状态PyThreadState对象,进而可以获取解释器状态PyInterpreterState对象。

通过_PyThreadState_Prealloc函数创建线程状态PyThreadState对象,定义在Python/pystate.c

PyThreadState *
_PyThreadState_Prealloc(PyInterpreterState *interp)
{
    return new_threadstate(interp, 0);
}

[3] 创建原生系统线程

POSIX Thread(pthread) 下的PyThread_start_new_thread定义在Python/thread_pthread.h

unsigned long
PyThread_start_new_thread(void (*func)(void *), void *arg)
{
    pthread_t th;
    int status;
#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
    pthread_attr_t attrs;
#endif
#if defined(THREAD_STACK_SIZE)
    size_t      tss;
#endif

    dprintf(("PyThread_start_new_thread called\n"));
    if (!initialized)
        PyThread_init_thread();

#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
    if (pthread_attr_init(&attrs) != 0)
        return PYTHREAD_INVALID_THREAD_ID;
#endif
#if defined(THREAD_STACK_SIZE)
    PyThreadState *tstate = PyThreadState_GET();
    size_t stacksize = tstate ? tstate->interp->pythread_stacksize : 0;
    tss = (stacksize != 0) ? stacksize : THREAD_STACK_SIZE;
    if (tss != 0) {
        if (pthread_attr_setstacksize(&attrs, tss) != 0) {
            pthread_attr_destroy(&attrs);
            return PYTHREAD_INVALID_THREAD_ID;
        }
    }
#endif
#if defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
    pthread_attr_setscope(&attrs, PTHREAD_SCOPE_SYSTEM);
#endif

    status = pthread_create(&th,
#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
                             &attrs,
#else
                             (pthread_attr_t*)NULL,
#endif
                             (void* (*)(void *))func,
                             (void *)arg
                             );

#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
    pthread_attr_destroy(&attrs);
#endif
    if (status != 0)
        return PYTHREAD_INVALID_THREAD_ID;

    pthread_detach(th);

#if SIZEOF_PTHREAD_T <= SIZEOF_LONG
    return (unsigned long) th;
#else
    return (unsigned long) *(unsigned long *) &th;
#endif
}

PyThread_start_new_thread函数通过pthread_create函数创建原生的系统线程,然后调用pthread_detach函数将线程状态改为unjoinable状态,确保线程运行结束后资源的释放,最后返回线程id。

junnplus avatar Jun 13 '20 10:06 junnplus

0x02 线程的执行

传给了pthread_create函数用来创建线程的func参数是t_bootstrap函数,arg参数包装了线程信息的boot对象,也就是说主线程创建的子线程将会执行t_bootstrap(boot)

t_bootstrap定义在Modules/_threadmodule.c

static void
t_bootstrap(void *boot_raw)
{
    struct bootstate *boot = (struct bootstate *) boot_raw;
    PyThreadState *tstate;
    PyObject *res;

    tstate = boot->tstate;
    tstate->thread_id = PyThread_get_thread_ident();
    _PyThreadState_Init(tstate);
    PyEval_AcquireThread(tstate);
    tstate->interp->num_threads++;
    res = PyObject_Call(boot->func, boot->args, boot->keyw);
    ...
    Py_DECREF(boot->func);
    Py_DECREF(boot->args);
    Py_XDECREF(boot->keyw);
    PyMem_DEL(boot_raw);
    tstate->interp->num_threads--;
    PyThreadState_Clear(tstate);
    PyThreadState_DeleteCurrent();
    PyThread_exit_thread();
}

t_bootstrap函数先对线程状态对象设置子线程的id,并初始化线程状态对象。

调用PyEval_AcquireThread获取GIL,定义在Python/ceval.c

void
PyEval_AcquireThread(PyThreadState *tstate)
{
    if (tstate == NULL)
        Py_FatalError("PyEval_AcquireThread: NULL new thread state");
    /* Check someone has called PyEval_InitThreads() to create the lock */
    assert(gil_created());
    take_gil(tstate);
    if (PyThreadState_Swap(tstate) != NULL)
        Py_FatalError(
            "PyEval_AcquireThread: non-NULL old thread state");
}

PyEval_AcquireThread实际上就是调用了take_gil来获取GIL。

获取GIL之后对进程状态对象(interp)累加线程数,并调用PyObject_Call执行子线程的函数。

在子线程的全部计算完成之后,Python将销毁子线程。

junnplus avatar Jun 13 '20 10:06 junnplus

0x03 线程的销毁

PyThreadState_Clear清除当前线程对应的线程状态对象,所谓清理,实际上比较简单,就是对线程状态对象中维护的东西进行引用计数的维护。

PyThreadState_DeleteCurrent释放线程状态对象并释放GIL。

void
PyThreadState_DeleteCurrent()
{
    PyThreadState *tstate = GET_TSTATE();
    if (tstate == NULL)
        Py_FatalError(
            "PyThreadState_DeleteCurrent: no current tstate");
    tstate_delete_common(tstate);
    if (_PyRuntime.gilstate.autoInterpreterState &&
        PyThread_tss_get(&_PyRuntime.gilstate.autoTSSkey) == tstate)
    {
        PyThread_tss_set(&_PyRuntime.gilstate.autoTSSkey, NULL);
    }
    SET_TSTATE(NULL);
    PyEval_ReleaseLock();
}

static void
tstate_delete_common(PyThreadState *tstate)
{
    PyInterpreterState *interp;
    if (tstate == NULL)
        Py_FatalError("PyThreadState_Delete: NULL tstate");
    interp = tstate->interp;
    if (interp == NULL)
        Py_FatalError("PyThreadState_Delete: NULL interp");
    HEAD_LOCK();
    if (tstate->prev)
        tstate->prev->next = tstate->next;
    else
        interp->tstate_head = tstate->next;
    if (tstate->next)
        tstate->next->prev = tstate->prev;
    HEAD_UNLOCK();
    if (tstate->on_delete != NULL) {
        tstate->on_delete(tstate->on_delete_data);
    }
    PyMem_RawFree(tstate);
}

PyThreadState_DeleteCurrent中,首先会删除清理后的当前线程状态对象,然后通过PyEval_ReleaseLock释放GIL。

PyEval_ReleaseLock定义在Python/ceval.c

void
PyEval_ReleaseLock(void)
{
    /* This function must succeed when the current thread state is NULL.
       We therefore avoid PyThreadState_GET() which dumps a fatal error
       in debug mode.
    */
    drop_gil((PyThreadState*)_Py_atomic_load_relaxed(
        &_PyThreadState_Current));
}

PyEval_ReleaseLock调用了drop_gil函数,定义在Python/ceval_gil.h

static void drop_gil(PyThreadState *tstate)
{
    if (!_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil.locked))
        Py_FatalError("drop_gil: GIL is not locked");
    /* tstate is allowed to be NULL (early interpreter init) */
    if (tstate != NULL) {
        /* Sub-interpreter support: threads might have been switched
           under our feet using PyThreadState_Swap(). Fix the GIL last
           holder variable so that our heuristics work. */
        _Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.last_holder,
                                 (uintptr_t)tstate);
    }

    MUTEX_LOCK(_PyRuntime.ceval.gil.mutex);
    _Py_ANNOTATE_RWLOCK_RELEASED(&_PyRuntime.ceval.gil.locked, /*is_write=*/1);
    _Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.locked, 0);
    COND_SIGNAL(_PyRuntime.ceval.gil.cond);
    MUTEX_UNLOCK(_PyRuntime.ceval.gil.mutex);

#ifdef FORCE_SWITCHING
    if (_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil_drop_request) &&
        tstate != NULL)
    {
        MUTEX_LOCK(_PyRuntime.ceval.gil.switch_mutex);
        /* Not switched yet => wait */
        if (((PyThreadState*)_Py_atomic_load_relaxed(
                    &_PyRuntime.ceval.gil.last_holder)
            ) == tstate)
        {
        RESET_GIL_DROP_REQUEST();
            /* NOTE: if COND_WAIT does not atomically start waiting when
               releasing the mutex, another thread can run through, take
               the GIL and drop it again, and reset the condition
               before we even had a chance to wait for it. */
            COND_WAIT(_PyRuntime.ceval.gil.switch_cond,
                      _PyRuntime.ceval.gil.switch_mutex);
    }
        MUTEX_UNLOCK(_PyRuntime.ceval.gil.switch_mutex);
    }
#endif
}

drop_gil函数获取mutex互斥锁之后将locked置为0,释放GIL,并通知条件变量cond

清除和释放工作完成后,子线程就调用PyThread_exit_thread退出了。

void
PyThread_exit_thread(void)
{
    dprintf(("PyThread_exit_thread called\n"));
    if (!initialized)
        exit(0);
    pthread_exit(0);
}

PyThread_exit_thread是一个平台相关的操作,完成各个平台上不同的销毁原生线程的工作。在POSIX Thread(pthread) 下,实际上就是调用pthread_exit函数。

junnplus avatar Jun 13 '20 11:06 junnplus

0x04 线程的调度

时间调度

当然,子线程是不会一直执行t_bootstrap到释放GIL,Python中持有GIL的线程会在某个时间释放GIL。

In the GIL-holding thread, the main loop (PyEval_EvalFrameEx) must be able to release the GIL on demand by another thread. A volatile boolean variable (gil_drop_request) is used for that purpose, which is checked at every turn of the eval loop. That variable is set after a wait of interval microseconds on gil_cond has timed out.

[Actually, another volatile boolean variable (eval_breaker) is used which ORs several conditions into one. Volatile booleans are sufficient as inter-thread signalling means since Python is run on cache-coherent architectures only.]

A thread wanting to take the GIL will first let pass a given amount of time (interval microseconds) before setting gil_drop_request. This encourages a defined switching period, but doesn't enforce it since opcodes can take an arbitrary time to execute.

The interval value is available for the user to read and modify using the Python API sys.{get,set}switchinterval().

我们看PyEval_EvalFrameEx函数,定义在Python/ceval.c

PyObject *
PyEval_EvalFrameEx(PyFrameObject *f, int throwflag)
{
    PyThreadState *tstate = PyThreadState_GET();
    return tstate->interp->eval_frame(f, throwflag);
}

PyObject* _Py_HOT_FUNCTION
_PyEval_EvalFrameDefault(PyFrameObject *f, int throwflag)
{
    ...
main_loop:
    for (;;) {
        ...

        if (_Py_atomic_load_relaxed(&_PyRuntime.ceval.eval_breaker)) {
            ...
            
            if (_Py_atomic_load_relaxed(
                        &_PyRuntime.ceval.gil_drop_request))
            {
                /* Give another thread a chance */
                if (PyThreadState_Swap(NULL) != tstate)
                    Py_FatalError("ceval: tstate mix-up");
                drop_gil(tstate);

                /* Other threads may run now */

                take_gil(tstate);

                /* Check if we should make a quick exit. */
                if (_Py_IsFinalizing() &&
                    !_Py_CURRENTLY_FINALIZING(tstate))
                {
                    drop_gil(tstate);
                    PyThread_exit_thread();
                }

                if (PyThreadState_Swap(tstate) != NULL)
                    Py_FatalError("ceval: orphan tstate");
            }
            ...
        }
        ...
    }
}

_PyEval_EvalFrameDefault是虚拟机执行字节码的主入口函数,当满足_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil_drop_request)时,当前线程会释放GIL,其他等待GIL的线程会尝试获取GIL并执行。

gil_drop_request会在cond等待超时被设置。而超时时间被设置在Python/ceval_gil.h

#define INTERVAL (_PyRuntime.ceval.gil.interval >= 1 ? _PyRuntime.ceval.gil.interval : 1)

gil.interval会在_PyRuntimeState初始化的时候会被设置。

#define DEFAULT_INTERVAL 5000

static void _gil_initialize(struct _gil_runtime_state *state)
{
    _Py_atomic_int uninitialized = {-1};
    state->locked = uninitialized;
    state->interval = DEFAULT_INTERVAL;
}

默认是0.005s,可以通过sys.getswitchinterval来查看interval时间间隔:

> import sys
> sys.getswitchinterval()
> 0.005

阻塞调度

Python3中除了时间调度之外,还有一种阻塞调度:当线程执行I/O操作,或者sleep时,线程将会挂起,释放GIL。

time.sleep为例,实现在Modules/timemodule.c

/* Implement pysleep() for various platforms.
   When interrupted (or when another error occurs), return -1 and
   set an exception; else return 0. */

static int
pysleep(_PyTime_t secs)
{
    _PyTime_t deadline, monotonic;
    
    _PyTime_t millisecs;
    unsigned long ul_millis;
    DWORD rc;
    HANDLE hInterruptEvent;

    deadline = _PyTime_GetMonotonicClock() + secs;

    do {
        if (_PyTime_AsTimeval(secs, &timeout, _PyTime_ROUND_CEILING) < 0)
            return -1;

        Py_BEGIN_ALLOW_THREADS
        err = select(0, (fd_set *)0, (fd_set *)0, (fd_set *)0, &timeout);
        Py_END_ALLOW_THREADS

        if (err == 0)
            break;

        if (errno != EINTR) {
            PyErr_SetFromErrno(PyExc_OSError);
            return -1;
        }

        /* sleep was interrupted by SIGINT */
        if (PyErr_CheckSignals())
            return -1;

        monotonic = _PyTime_GetMonotonicClock();
        secs = deadline - monotonic;
        if (secs < 0)
            break;
        /* retry with the recomputed delay */
    } while (1);

    return 0;
}

pysleep函数是跨平台实现,上面只保留非windows平台的代码。

Python在这里使用select来实现sleep阻塞,可以看到select前后有两个宏定义:

  • Py_BEGIN_ALLOW_THREADS
  • Py_END_ALLOW_THREADS

定义在Include/ceval.h

#define Py_BEGIN_ALLOW_THREADS { \
                        PyThreadState *_save; \
                        _save = PyEval_SaveThread();
#define Py_BLOCK_THREADS        PyEval_RestoreThread(_save);
#define Py_UNBLOCK_THREADS      _save = PyEval_SaveThread();
#define Py_END_ALLOW_THREADS    PyEval_RestoreThread(_save); \
                 }

Py_BEGIN_ALLOW_THREADSPy_END_ALLOW_THREADS宏分别调用了PyEval_SaveThreadPyEval_RestoreThread函数:

/* Functions save_thread and restore_thread are always defined so
   dynamically loaded modules needn't be compiled separately for use
   with and without threads: */

PyThreadState *
PyEval_SaveThread(void)
{
    PyThreadState *tstate = PyThreadState_Swap(NULL);
    if (tstate == NULL)
        Py_FatalError("PyEval_SaveThread: NULL tstate");
    assert(gil_created());
    drop_gil(tstate);
    return tstate;
}

void
PyEval_RestoreThread(PyThreadState *tstate)
{
    if (tstate == NULL)
        Py_FatalError("PyEval_RestoreThread: NULL tstate");
    assert(gil_created());

    int err = errno;
    take_gil(tstate);
    /* _Py_Finalizing is protected by the GIL */
    if (_Py_IsFinalizing() && !_Py_CURRENTLY_FINALIZING(tstate)) {
        drop_gil(tstate);
        PyThread_exit_thread();
        Py_UNREACHABLE();
    }
    errno = err;

    PyThreadState_Swap(tstate);
}

PyThreadState_Swap定义在Python/pystate.c

PyThreadState *
PyThreadState_Swap(PyThreadState *newts)
{
    PyThreadState *oldts = GET_TSTATE();

    SET_TSTATE(newts);
    ...
    return oldts;
}
  • PyEval_SaveThread:设置当前线程状态对象为NULL,释放GIL,并保存到_save变量。
  • PyEval_RestoreThread:获取GIL,重新设置当前线程状态对象。

junnplus avatar Jun 13 '20 11:06 junnplus