blog
blog copied to clipboard
Python的多线程与GIL
Python从0.9.8版就开始支持多线程(thread模块),1.5.1版引入了threading高级模块,是对thread模块的封装。
在Python3中,thread模块被重命名为_thread,强调其作为底层模块应该避免使用。
_thread模块对应源码在Modules/_threadmodule.c中,我们看下_thread模块提供的接口函数:
static PyMethodDef thread_methods[] = {
{"start_new_thread", (PyCFunction)thread_PyThread_start_new_thread,
METH_VARARGS, start_new_doc},
{"start_new", (PyCFunction)thread_PyThread_start_new_thread,
METH_VARARGS, start_new_doc},
{"allocate_lock", (PyCFunction)thread_PyThread_allocate_lock,
METH_NOARGS, allocate_doc},
{"allocate", (PyCFunction)thread_PyThread_allocate_lock,
METH_NOARGS, allocate_doc},
{"exit_thread", (PyCFunction)thread_PyThread_exit_thread,
METH_NOARGS, exit_doc},
{"exit", (PyCFunction)thread_PyThread_exit_thread,
METH_NOARGS, exit_doc},
{"interrupt_main", (PyCFunction)thread_PyThread_interrupt_main,
METH_NOARGS, interrupt_doc},
{"get_ident", (PyCFunction)thread_get_ident,
METH_NOARGS, get_ident_doc},
{"_count", (PyCFunction)thread__count,
METH_NOARGS, _count_doc},
{"stack_size", (PyCFunction)thread_stack_size,
METH_VARARGS, stack_size_doc},
{"_set_sentinel", (PyCFunction)thread__set_sentinel,
METH_NOARGS, _set_sentinel_doc},
{NULL, NULL} /* sentinel */
};
0x00 主线程
Python 默认会有一个主线程,其实就是进程本身。
// Python/pylifecycle.c
_PyInitError
_Py_InitializeCore(const _PyCoreConfig *core_config)
{
...
tstate = PyThreadState_New(interp);
if (tstate == NULL)
return _Py_INIT_ERR("can't make first thread");
_Py_InitializeCore 初始化调用PyThreadState_New来创建第一个线程状态。
// Python/pystate.c
PyThreadState *
PyThreadState_New(PyInterpreterState *interp)
{
return new_threadstate(interp, 1);
}
static PyThreadState *
new_threadstate(PyInterpreterState *interp, int init)
{
PyThreadState *tstate = (PyThreadState *)PyMem_RawMalloc(sizeof(PyThreadState));
if (_PyThreadState_GetFrame == NULL)
_PyThreadState_GetFrame = threadstate_getframe;
if (tstate != NULL) {
tstate->interp = interp;
...
if (init)
_PyThreadState_Init(tstate);
}
new_threadstate 创建一个线程状态。
在Python3.7之前,Python默认是没有开启支持多线程的,换句话说,Python中支持多线程的数据结构以及GIL都是没有创建的,Python之所以有这种行为是因为大多数的Python程序都不需要多线程的支持。当用户使用_thread模块创建新线程,通过调用PyEval_InitThreads来初始化多线程环境,这个时候,Python虚拟机会自动建立多线程机制需要的数据结构、环境以及那个至关重要的GIL。
而从3.7开始之后,PyEval_InitThreads函数会在Python启动的时候默认被调用 https://github.com/python/cpython/commit/2914bb32e2adf8dff77c0ca58b33201bc94e398c#diff-baf5eab51059d96fb8837152dab0d1a4R689
PyEval_InitThreads函数定义在Python/ceval.c:
void
PyEval_InitThreads(void)
{
// [A] 检查是否存在GIL
if (gil_created())
return;
// [B] 创建并初始化GIL
create_gil();
// [C] 主线程获取GIL
take_gil(PyThreadState_GET());
// [D] 检查和初始化原生系统线程环境
_PyRuntime.ceval.pending.main_thread = PyThread_get_thread_ident();
if (!_PyRuntime.ceval.pending.lock)
_PyRuntime.ceval.pending.lock = PyThread_allocate_lock();
}
[A] 检查是否存在GIL
gil_created函数定义在Python/ceval_gil.h:
static int gil_created(void)
{
return (_Py_atomic_load_explicit(&_PyRuntime.ceval.gil.locked,
_Py_memory_order_acquire)
) >= 0;
}
_Py_atomic_*是一组原子操作,封装了不同的平台的原子接口。
_Py_atomic_load_explicit定义在Include/pyatomic.h:
#define _Py_atomic_load_explicit(ATOMIC_VAL, ORDER) \
atomic_load_explicit(&(ATOMIC_VAL)->_value, ORDER)
atomic_load_explicit函数是c11标准的原子读操作,参考Atomic operations library。
_Py_memory_order_acquire是枚举元素:
typedef enum _Py_memory_order {
_Py_memory_order_relaxed = memory_order_relaxed,
_Py_memory_order_acquire = memory_order_acquire,
_Py_memory_order_release = memory_order_release,
_Py_memory_order_acq_rel = memory_order_acq_rel,
_Py_memory_order_seq_cst = memory_order_seq_cst
} _Py_memory_order;
memory_order_*指定常规的非原子内存访问如何围绕原子操作排序,这边我们不细究这些原子操作排序的不同,感兴趣可以参考memory order。
我们只关心两种原子操作,一种load原子读和一种store原子写。
_PyRuntime是一个_PyRuntimeState对象,表示Python运行时状态对象。
ceval指向_ceval_runtime_state对象,表示Python字节码执行器的运行时状态对象,定义在Include/internal/ceval.h:
struct _ceval_runtime_state {
int recursion_limit;
/* Records whether tracing is on for any thread. Counts the number
of threads for which tstate->c_tracefunc is non-NULL, so if the
value is 0, we know we don't have to check this thread's
c_tracefunc. This speeds up the if statement in
PyEval_EvalFrameEx() after fast_next_opcode. */
int tracing_possible;
/* This single variable consolidates all requests to break out of
the fast path in the eval loop. */
_Py_atomic_int eval_breaker;
/* Request for dropping the GIL */
_Py_atomic_int gil_drop_request;
struct _pending_calls pending;
struct _gil_runtime_state gil;
};
ceval.gil指向_gil_runtime_state对象,表示GIL的运行状态对象,定义在Include/internal/gil.h:
struct _gil_runtime_state {
/* microseconds (the Python API uses seconds, though) */
unsigned long interval;
/* Last PyThreadState holding / having held the GIL. This helps us
know whether anyone else was scheduled after we dropped the GIL. */
_Py_atomic_address last_holder;
/* Whether the GIL is already taken (-1 if uninitialized). This is
atomic because it can be read without any lock taken in ceval.c. */
_Py_atomic_int locked;
/* Number of GIL switches since the beginning. */
unsigned long switch_number;
/* This condition variable allows one or several threads to wait
until the GIL is released. In addition, the mutex also protects
the above variables. */
PyCOND_T cond;
PyMUTEX_T mutex;
#ifdef FORCE_SWITCHING
/* This condition variable helps the GIL-releasing thread wait for
a GIL-awaiting thread to be scheduled and take the GIL. */
PyCOND_T switch_cond;
PyMUTEX_T switch_mutex;
#endif
};
locked就是传说中的GIL,是一个_Py_atomic_int对象,定义在Include/pyatomic.h:
typedef struct _Py_atomic_int {
atomic_int _value;
} _Py_atomic_int;
所以,gil_created函数通过原子读操作检查原子位locked是否初始化。未初始化时,locked为-1。当GIL被线程获取时,locked为1,反之为0。
[B] 创建并初始化GIL
static void create_gil(void)
{
MUTEX_INIT(_PyRuntime.ceval.gil.mutex);
#ifdef FORCE_SWITCHING
MUTEX_INIT(_PyRuntime.ceval.gil.switch_mutex);
#endif
COND_INIT(_PyRuntime.ceval.gil.cond);
#ifdef FORCE_SWITCHING
COND_INIT(_PyRuntime.ceval.gil.switch_cond);
#endif
_Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.last_holder, 0);
_Py_ANNOTATE_RWLOCK_CREATE(&_PyRuntime.ceval.gil.locked);
_Py_atomic_store_explicit(&_PyRuntime.ceval.gil.locked, 0,
_Py_memory_order_release);
}
_gil_runtime_state结构体中的cond和mutex用来保护locked。
The GIL is just a boolean variable (locked) whose access is protected by a mutex (gil_mutex), and whose changes are signalled by a condition variable (gil_cond). gil_mutex is taken for short periods of time, and therefore mostly uncontended.
GIL利用条件机制和互斥锁<cond, mutex>保护一个锁变量locked作为实现。
PyCOND_T和PyMUTEX_T定义在Include/internal/condvar.h,根据平台不一样具体实现也不一样,以POSIX Thread(pthread)为例:
#define PyMUTEX_T pthread_mutex_t
#define PyCOND_T pthread_cond_t
create_gil函数初始化锁变量mutex和初始化条件变量cond之后,通过_Py_atomic_store_relaxed原子写初始化last_holder,last_holder是一个_Py_atomic_address对象,定义在Include/pyatomic.h:
typedef struct _Py_atomic_address {
atomic_uintptr_t _value;
} _Py_atomic_address;
atomic_uintptr_t是一个原子类型指针,当一个线程获得GIL之后,_value将指向线程状态PyThreadState对象。
并通过_Py_atomic_store_explicit原子写初始化locked。
[C] 主线程获取GIL
take_gil函数定义在Python/ceval_gil.h:
static void take_gil(PyThreadState *tstate)
{
int err;
if (tstate == NULL)
Py_FatalError("take_gil: NULL tstate");
err = errno;
MUTEX_LOCK(_PyRuntime.ceval.gil.mutex);
if (!_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil.locked))
goto _ready;
while (_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil.locked)) {
int timed_out = 0;
unsigned long saved_switchnum;
saved_switchnum = _PyRuntime.ceval.gil.switch_number;
COND_TIMED_WAIT(_PyRuntime.ceval.gil.cond, _PyRuntime.ceval.gil.mutex,
INTERVAL, timed_out);
/* If we timed out and no switch occurred in the meantime, it is time
to ask the GIL-holding thread to drop it. */
if (timed_out &&
_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil.locked) &&
_PyRuntime.ceval.gil.switch_number == saved_switchnum) {
SET_GIL_DROP_REQUEST();
}
}
_ready:
#ifdef FORCE_SWITCHING
/* This mutex must be taken before modifying
_PyRuntime.ceval.gil.last_holder (see drop_gil()). */
MUTEX_LOCK(_PyRuntime.ceval.gil.switch_mutex);
#endif
/* We now hold the GIL */
_Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.locked, 1);
_Py_ANNOTATE_RWLOCK_ACQUIRED(&_PyRuntime.ceval.gil.locked, /*is_write=*/1);
if (tstate != (PyThreadState*)_Py_atomic_load_relaxed(
&_PyRuntime.ceval.gil.last_holder))
{
_Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.last_holder,
(uintptr_t)tstate);
++_PyRuntime.ceval.gil.switch_number;
}
#ifdef FORCE_SWITCHING
COND_SIGNAL(_PyRuntime.ceval.gil.switch_cond);
MUTEX_UNLOCK(_PyRuntime.ceval.gil.switch_mutex);
#endif
if (_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil_drop_request)) {
RESET_GIL_DROP_REQUEST();
}
if (tstate->async_exc != NULL) {
_PyEval_SignalAsyncExc();
}
MUTEX_UNLOCK(_PyRuntime.ceval.gil.mutex);
errno = err;
}
首先获取mutex互斥锁,获取mutex互斥锁之后检查是否有线程持有GIL(locked为1),如果GIL被占有的话,设置cond信号量等待。等待的时候线程就会挂起,释放mutex互斥锁。cond信号被唤醒时,mutex互斥锁会自动加锁。while语句避免了意外唤醒时条件不满足,大多数情况下都是条件满足被唤醒。
如果等待超时并且这期间没有发生线程切换,就通过SET_GIL_DROP_REQUEST请求持有GIL的线程释放GIL。反之获得GIL就将locked置为1,并将当前线程状态PyThreadState对象保存到last_holder,switch_number切换次数加1。
[D] 检查和初始化原生系统线程环境
pending是一个_pending_calls结构对象:
struct _pending_calls {
unsigned long main_thread;
pythread_type_lock lock;
/* request for running pending calls. */
_py_atomic_int calls_to_do;
/* request for looking at the `async_exc` field of the current
thread state.
guarded by the gil. */
int async_exc;
#define npendingcalls 32
struct {
int (*func)(void *);
void *arg;
} calls[npendingcalls];
int first;
int last;
};
pending cells实现了一个机制:
Mechanism whereby asynchronously executing callbacks (e.g. UNIX signal handlers or Mac I/O completion routines) can schedule calls to a function to be called synchronously.
也就是主线程执行一些被注册的函数,pending.main_thread记录了主线程id:
unsigned long
PyThread_get_thread_ident(void)
{
volatile pthread_t threadid;
if (!initialized)
PyThread_init_thread();
threadid = pthread_self();
return (unsigned long) threadid;
}
获得线程id之前会检查initialized变量,如果说GIL指示着Python的多线程环境是否已经建立,那么这个initialized变量就指示着为了使用底层平台所提供的原生thread,必须的初始化动作是否完成。
initialized和PyThread_init_thread定义在Python/thread.c:
static int initialized;
void
PyThread_init_thread(void)
{
#ifdef Py_DEBUG
const char *p = Py_GETENV("PYTHONTHREADDEBUG");
if (p) {
if (*p)
thread_debug = atoi(p);
else
thread_debug = 1;
}
#endif /* Py_DEBUG */
if (initialized)
return;
initialized = 1;
dprintf(("PyThread_init_thread called\n"));
PyThread__init_thread();
}
POSIX Thread(pthread) 下PyThread__init_thread会根据编译器和平台来确认是否要进行初始化动作,定义在Python/thread_pthread.h:
static void
PyThread__init_thread(void)
{
#if defined(_AIX) && defined(__GNUC__)
extern void pthread_init(void);
pthread_init();
#endif
}
pending.lock用来确保线程安全,是一个Python级别的线程锁。
0x01 线程的创建
static PyObject *
thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs)
{
PyObject *func, *args, *keyw = NULL;
struct bootstate *boot;
unsigned long ident;
...
// [1] 创建bootstate结构boot并初始化
boot = PyMem_NEW(struct bootstate, 1);
if (boot == NULL)
return PyErr_NoMemory();
boot->interp = PyThreadState_GET()->interp;
boot->func = func;
boot->args = args;
boot->keyw = keyw;
boot->tstate = _PyThreadState_Prealloc(boot->interp);
if (boot->tstate == NULL) {
PyMem_DEL(boot);
return PyErr_NoMemory();
}
Py_INCREF(func);
Py_INCREF(args);
Py_XINCREF(keyw);
// [2] 初始化多线程环境
PyEval_InitThreads(); /* Start the interpreter's thread-awareness */
// [3] 创建原生系统线程
ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);
...
return PyLong_FromUnsignedLong(ident);
}
[1] 创建bootstate结构boot并初始化。
bootstate结构体记录了线程的信息:
struct bootstate {
PyInterpreterState *interp;
PyObject *func;
PyObject *args;
PyObject *keyw;
PyThreadState *tstate;
};
func, args, keyw记录线程执行的函数以及参数。
interp和tstate分别保存了进程状态PyInterpreterState对象和线程状态PyThreadState对象。
通过PyThreadState_GET获取当前线程状态PyThreadState对象,进而可以获取解释器状态PyInterpreterState对象。
通过_PyThreadState_Prealloc函数创建线程状态PyThreadState对象,定义在Python/pystate.c:
PyThreadState *
_PyThreadState_Prealloc(PyInterpreterState *interp)
{
return new_threadstate(interp, 0);
}
[3] 创建原生系统线程
POSIX Thread(pthread) 下的PyThread_start_new_thread定义在Python/thread_pthread.h:
unsigned long
PyThread_start_new_thread(void (*func)(void *), void *arg)
{
pthread_t th;
int status;
#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
pthread_attr_t attrs;
#endif
#if defined(THREAD_STACK_SIZE)
size_t tss;
#endif
dprintf(("PyThread_start_new_thread called\n"));
if (!initialized)
PyThread_init_thread();
#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
if (pthread_attr_init(&attrs) != 0)
return PYTHREAD_INVALID_THREAD_ID;
#endif
#if defined(THREAD_STACK_SIZE)
PyThreadState *tstate = PyThreadState_GET();
size_t stacksize = tstate ? tstate->interp->pythread_stacksize : 0;
tss = (stacksize != 0) ? stacksize : THREAD_STACK_SIZE;
if (tss != 0) {
if (pthread_attr_setstacksize(&attrs, tss) != 0) {
pthread_attr_destroy(&attrs);
return PYTHREAD_INVALID_THREAD_ID;
}
}
#endif
#if defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
pthread_attr_setscope(&attrs, PTHREAD_SCOPE_SYSTEM);
#endif
status = pthread_create(&th,
#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
&attrs,
#else
(pthread_attr_t*)NULL,
#endif
(void* (*)(void *))func,
(void *)arg
);
#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
pthread_attr_destroy(&attrs);
#endif
if (status != 0)
return PYTHREAD_INVALID_THREAD_ID;
pthread_detach(th);
#if SIZEOF_PTHREAD_T <= SIZEOF_LONG
return (unsigned long) th;
#else
return (unsigned long) *(unsigned long *) &th;
#endif
}
PyThread_start_new_thread函数通过pthread_create函数创建原生的系统线程,然后调用pthread_detach函数将线程状态改为unjoinable状态,确保线程运行结束后资源的释放,最后返回线程id。
0x02 线程的执行
传给了pthread_create函数用来创建线程的func参数是t_bootstrap函数,arg参数包装了线程信息的boot对象,也就是说主线程创建的子线程将会执行t_bootstrap(boot)。
t_bootstrap定义在Modules/_threadmodule.c:
static void
t_bootstrap(void *boot_raw)
{
struct bootstate *boot = (struct bootstate *) boot_raw;
PyThreadState *tstate;
PyObject *res;
tstate = boot->tstate;
tstate->thread_id = PyThread_get_thread_ident();
_PyThreadState_Init(tstate);
PyEval_AcquireThread(tstate);
tstate->interp->num_threads++;
res = PyObject_Call(boot->func, boot->args, boot->keyw);
...
Py_DECREF(boot->func);
Py_DECREF(boot->args);
Py_XDECREF(boot->keyw);
PyMem_DEL(boot_raw);
tstate->interp->num_threads--;
PyThreadState_Clear(tstate);
PyThreadState_DeleteCurrent();
PyThread_exit_thread();
}
t_bootstrap函数先对线程状态对象设置子线程的id,并初始化线程状态对象。
调用PyEval_AcquireThread获取GIL,定义在Python/ceval.c:
void
PyEval_AcquireThread(PyThreadState *tstate)
{
if (tstate == NULL)
Py_FatalError("PyEval_AcquireThread: NULL new thread state");
/* Check someone has called PyEval_InitThreads() to create the lock */
assert(gil_created());
take_gil(tstate);
if (PyThreadState_Swap(tstate) != NULL)
Py_FatalError(
"PyEval_AcquireThread: non-NULL old thread state");
}
PyEval_AcquireThread实际上就是调用了take_gil来获取GIL。
获取GIL之后对进程状态对象(interp)累加线程数,并调用PyObject_Call执行子线程的函数。
在子线程的全部计算完成之后,Python将销毁子线程。
0x03 线程的销毁
PyThreadState_Clear清除当前线程对应的线程状态对象,所谓清理,实际上比较简单,就是对线程状态对象中维护的东西进行引用计数的维护。
PyThreadState_DeleteCurrent释放线程状态对象并释放GIL。
void
PyThreadState_DeleteCurrent()
{
PyThreadState *tstate = GET_TSTATE();
if (tstate == NULL)
Py_FatalError(
"PyThreadState_DeleteCurrent: no current tstate");
tstate_delete_common(tstate);
if (_PyRuntime.gilstate.autoInterpreterState &&
PyThread_tss_get(&_PyRuntime.gilstate.autoTSSkey) == tstate)
{
PyThread_tss_set(&_PyRuntime.gilstate.autoTSSkey, NULL);
}
SET_TSTATE(NULL);
PyEval_ReleaseLock();
}
static void
tstate_delete_common(PyThreadState *tstate)
{
PyInterpreterState *interp;
if (tstate == NULL)
Py_FatalError("PyThreadState_Delete: NULL tstate");
interp = tstate->interp;
if (interp == NULL)
Py_FatalError("PyThreadState_Delete: NULL interp");
HEAD_LOCK();
if (tstate->prev)
tstate->prev->next = tstate->next;
else
interp->tstate_head = tstate->next;
if (tstate->next)
tstate->next->prev = tstate->prev;
HEAD_UNLOCK();
if (tstate->on_delete != NULL) {
tstate->on_delete(tstate->on_delete_data);
}
PyMem_RawFree(tstate);
}
在PyThreadState_DeleteCurrent中,首先会删除清理后的当前线程状态对象,然后通过PyEval_ReleaseLock释放GIL。
PyEval_ReleaseLock定义在Python/ceval.c:
void
PyEval_ReleaseLock(void)
{
/* This function must succeed when the current thread state is NULL.
We therefore avoid PyThreadState_GET() which dumps a fatal error
in debug mode.
*/
drop_gil((PyThreadState*)_Py_atomic_load_relaxed(
&_PyThreadState_Current));
}
PyEval_ReleaseLock调用了drop_gil函数,定义在Python/ceval_gil.h:
static void drop_gil(PyThreadState *tstate)
{
if (!_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil.locked))
Py_FatalError("drop_gil: GIL is not locked");
/* tstate is allowed to be NULL (early interpreter init) */
if (tstate != NULL) {
/* Sub-interpreter support: threads might have been switched
under our feet using PyThreadState_Swap(). Fix the GIL last
holder variable so that our heuristics work. */
_Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.last_holder,
(uintptr_t)tstate);
}
MUTEX_LOCK(_PyRuntime.ceval.gil.mutex);
_Py_ANNOTATE_RWLOCK_RELEASED(&_PyRuntime.ceval.gil.locked, /*is_write=*/1);
_Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.locked, 0);
COND_SIGNAL(_PyRuntime.ceval.gil.cond);
MUTEX_UNLOCK(_PyRuntime.ceval.gil.mutex);
#ifdef FORCE_SWITCHING
if (_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil_drop_request) &&
tstate != NULL)
{
MUTEX_LOCK(_PyRuntime.ceval.gil.switch_mutex);
/* Not switched yet => wait */
if (((PyThreadState*)_Py_atomic_load_relaxed(
&_PyRuntime.ceval.gil.last_holder)
) == tstate)
{
RESET_GIL_DROP_REQUEST();
/* NOTE: if COND_WAIT does not atomically start waiting when
releasing the mutex, another thread can run through, take
the GIL and drop it again, and reset the condition
before we even had a chance to wait for it. */
COND_WAIT(_PyRuntime.ceval.gil.switch_cond,
_PyRuntime.ceval.gil.switch_mutex);
}
MUTEX_UNLOCK(_PyRuntime.ceval.gil.switch_mutex);
}
#endif
}
drop_gil函数获取mutex互斥锁之后将locked置为0,释放GIL,并通知条件变量cond。
清除和释放工作完成后,子线程就调用PyThread_exit_thread退出了。
void
PyThread_exit_thread(void)
{
dprintf(("PyThread_exit_thread called\n"));
if (!initialized)
exit(0);
pthread_exit(0);
}
PyThread_exit_thread是一个平台相关的操作,完成各个平台上不同的销毁原生线程的工作。在POSIX Thread(pthread) 下,实际上就是调用pthread_exit函数。
0x04 线程的调度
时间调度
当然,子线程是不会一直执行t_bootstrap到释放GIL,Python中持有GIL的线程会在某个时间释放GIL。
In the GIL-holding thread, the main loop (PyEval_EvalFrameEx) must be able to release the GIL on demand by another thread. A volatile boolean variable (gil_drop_request) is used for that purpose, which is checked at every turn of the eval loop. That variable is set after a wait of
intervalmicroseconds ongil_condhas timed out.[Actually, another volatile boolean variable (eval_breaker) is used which ORs several conditions into one. Volatile booleans are sufficient as inter-thread signalling means since Python is run on cache-coherent architectures only.]
A thread wanting to take the GIL will first let pass a given amount of time (
intervalmicroseconds) before setting gil_drop_request. This encourages a defined switching period, but doesn't enforce it since opcodes can take an arbitrary time to execute.The
intervalvalue is available for the user to read and modify using the Python APIsys.{get,set}switchinterval().
我们看PyEval_EvalFrameEx函数,定义在Python/ceval.c:
PyObject *
PyEval_EvalFrameEx(PyFrameObject *f, int throwflag)
{
PyThreadState *tstate = PyThreadState_GET();
return tstate->interp->eval_frame(f, throwflag);
}
PyObject* _Py_HOT_FUNCTION
_PyEval_EvalFrameDefault(PyFrameObject *f, int throwflag)
{
...
main_loop:
for (;;) {
...
if (_Py_atomic_load_relaxed(&_PyRuntime.ceval.eval_breaker)) {
...
if (_Py_atomic_load_relaxed(
&_PyRuntime.ceval.gil_drop_request))
{
/* Give another thread a chance */
if (PyThreadState_Swap(NULL) != tstate)
Py_FatalError("ceval: tstate mix-up");
drop_gil(tstate);
/* Other threads may run now */
take_gil(tstate);
/* Check if we should make a quick exit. */
if (_Py_IsFinalizing() &&
!_Py_CURRENTLY_FINALIZING(tstate))
{
drop_gil(tstate);
PyThread_exit_thread();
}
if (PyThreadState_Swap(tstate) != NULL)
Py_FatalError("ceval: orphan tstate");
}
...
}
...
}
}
_PyEval_EvalFrameDefault是虚拟机执行字节码的主入口函数,当满足_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil_drop_request)时,当前线程会释放GIL,其他等待GIL的线程会尝试获取GIL并执行。
gil_drop_request会在cond等待超时被设置。而超时时间被设置在Python/ceval_gil.h:
#define INTERVAL (_PyRuntime.ceval.gil.interval >= 1 ? _PyRuntime.ceval.gil.interval : 1)
gil.interval会在_PyRuntimeState初始化的时候会被设置。
#define DEFAULT_INTERVAL 5000
static void _gil_initialize(struct _gil_runtime_state *state)
{
_Py_atomic_int uninitialized = {-1};
state->locked = uninitialized;
state->interval = DEFAULT_INTERVAL;
}
默认是0.005s,可以通过sys.getswitchinterval来查看interval时间间隔:
> import sys
> sys.getswitchinterval()
> 0.005
阻塞调度
Python3中除了时间调度之外,还有一种阻塞调度:当线程执行I/O操作,或者sleep时,线程将会挂起,释放GIL。
以time.sleep为例,实现在Modules/timemodule.c:
/* Implement pysleep() for various platforms.
When interrupted (or when another error occurs), return -1 and
set an exception; else return 0. */
static int
pysleep(_PyTime_t secs)
{
_PyTime_t deadline, monotonic;
_PyTime_t millisecs;
unsigned long ul_millis;
DWORD rc;
HANDLE hInterruptEvent;
deadline = _PyTime_GetMonotonicClock() + secs;
do {
if (_PyTime_AsTimeval(secs, &timeout, _PyTime_ROUND_CEILING) < 0)
return -1;
Py_BEGIN_ALLOW_THREADS
err = select(0, (fd_set *)0, (fd_set *)0, (fd_set *)0, &timeout);
Py_END_ALLOW_THREADS
if (err == 0)
break;
if (errno != EINTR) {
PyErr_SetFromErrno(PyExc_OSError);
return -1;
}
/* sleep was interrupted by SIGINT */
if (PyErr_CheckSignals())
return -1;
monotonic = _PyTime_GetMonotonicClock();
secs = deadline - monotonic;
if (secs < 0)
break;
/* retry with the recomputed delay */
} while (1);
return 0;
}
pysleep函数是跨平台实现,上面只保留非windows平台的代码。
Python在这里使用select来实现sleep阻塞,可以看到select前后有两个宏定义:
Py_BEGIN_ALLOW_THREADSPy_END_ALLOW_THREADS
定义在Include/ceval.h:
#define Py_BEGIN_ALLOW_THREADS { \
PyThreadState *_save; \
_save = PyEval_SaveThread();
#define Py_BLOCK_THREADS PyEval_RestoreThread(_save);
#define Py_UNBLOCK_THREADS _save = PyEval_SaveThread();
#define Py_END_ALLOW_THREADS PyEval_RestoreThread(_save); \
}
Py_BEGIN_ALLOW_THREADS和Py_END_ALLOW_THREADS宏分别调用了PyEval_SaveThread和PyEval_RestoreThread函数:
/* Functions save_thread and restore_thread are always defined so
dynamically loaded modules needn't be compiled separately for use
with and without threads: */
PyThreadState *
PyEval_SaveThread(void)
{
PyThreadState *tstate = PyThreadState_Swap(NULL);
if (tstate == NULL)
Py_FatalError("PyEval_SaveThread: NULL tstate");
assert(gil_created());
drop_gil(tstate);
return tstate;
}
void
PyEval_RestoreThread(PyThreadState *tstate)
{
if (tstate == NULL)
Py_FatalError("PyEval_RestoreThread: NULL tstate");
assert(gil_created());
int err = errno;
take_gil(tstate);
/* _Py_Finalizing is protected by the GIL */
if (_Py_IsFinalizing() && !_Py_CURRENTLY_FINALIZING(tstate)) {
drop_gil(tstate);
PyThread_exit_thread();
Py_UNREACHABLE();
}
errno = err;
PyThreadState_Swap(tstate);
}
PyThreadState_Swap定义在Python/pystate.c:
PyThreadState *
PyThreadState_Swap(PyThreadState *newts)
{
PyThreadState *oldts = GET_TSTATE();
SET_TSTATE(newts);
...
return oldts;
}
PyEval_SaveThread:设置当前线程状态对象为NULL,释放GIL,并保存到_save变量。PyEval_RestoreThread:获取GIL,重新设置当前线程状态对象。