blog
blog copied to clipboard
gunicorn源码解析
Gunicorn 'Green Unicorn' is a Python WSGI HTTP Server for UNIX. It's a pre-fork worker model. The Gunicorn server is broadly compatible with various web frameworks, simply implemented, light on server resources, and fairly speedy.
关于gunicorn的设计
Server Model
Gunicorn is based on the pre-fork worker model. This means that there is a central master process that manages a set of worker processes. The master never knows anything about individual clients. All requests and responses are handled completely by worker processes.
gunicorn的实现是由一个master进程来管理多个worker进程,所有的请求都是由worker进程处理的。
源码解读
gunicorn官方网站的例子如下:
$ pip install gunicorn
$ cat myapp.py
def app(environ, start_response):
data = b"Hello, World!\n"
start_response("200 OK", [
("Content-Type", "text/plain"),
("Content-Length", str(len(data)))
])
return iter([data])
$ gunicorn -w 4 myapp:app
[2014-09-10 10:22:28 +0000] [30869] [INFO] Listening at: http://127.0.0.1:8000 (30869)
[2014-09-10 10:22:28 +0000] [30869] [INFO] Using worker: sync
[2014-09-10 10:22:28 +0000] [30874] [INFO] Booting worker with pid: 30874
[2014-09-10 10:22:28 +0000] [30875] [INFO] Booting worker with pid: 30875
[2014-09-10 10:22:28 +0000] [30876] [INFO] Booting worker with pid: 30876
[2014-09-10 10:22:28 +0000] [30877] [INFO] Booting worker with pid: 30877
阅读源码第一步要先定位到入口,我们知道gunicorn的调用方式
gunicorn -w 4 myapp:app
写过python包的同学就知道怎么去定位入口,那就是在setup.py这个文件
setup(
...,
entry_points="""
[console_scripts]
gunicorn=gunicorn.app.wsgiapp:run
gunicorn_paster=gunicorn.app.pasterapp:run
...
"""
)
也就是说入口在gunicorn/app/wsgiapp.py,我们直接定位到run这个函数上
def run():
"""\
The ``gunicorn`` command line runner for launching Gunicorn with
generic WSGI applications.
"""
from gunicorn.app.wsgiapp import WSGIApplication
WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()
可以看得出来其实是实例化WSGIApplication对象之后调用run方法。
gunicorn配置
我们可以先看看实例化WSGIApplication对象会做什么操作
由于WSGIApplication和其父类Application都没有实现__init__方法,我们直接看Application的父类BaseApplication的__init__方法。
class BaseApplication(object):
"""
An application interface for configuring and loading
the various necessities for any given web framework.
"""
def __init__(self, usage=None, prog=None):
...
self.do_load_config()
def do_load_config(self):
"""
Loads the configuration
"""
try:
self.load_default_config()
self.load_config()
except Exception as e:
...
def load_default_config(self):
# init configuration
self.cfg = Config(self.usage, prog=self.prog)
实例化过程的调用链看起来应该是这样的:
__init__ -> do_load_config -> load_default_config & load_config
也就是实例化WSGIApplication对象会加载配置self.cfg = Config()
我们再来看看Config对象是如何加载配置的
KNOWN_SETTINGS = []
...
def make_settings(ignore=None):
settings = {}
ignore = ignore or ()
for s in KNOWN_SETTINGS:
setting = s()
if setting.name in ignore:
continue
settings[setting.name] = setting.copy()
return settings
...
class Config(object):
def __init__(self, usage=None, prog=None):
self.settings = make_settings()
...
def __getattr__(self, name):
if name not in self.settings:
raise AttributeError("No configuration setting for: %s" % name)
return self.settings[name].get()
def __setattr__(self, name, value):
if name != "settings" and name in self.settings:
raise AttributeError("Invalid access!")
super(Config, self).__setattr__(name, value)
从上面的代码片段我们可以看出来其实实例化Config对象的时候会去访问KNOWN_SETTINGS这个列表的元素,但是从代码上看KNOWN_SETTINGS是个空列表,这边就有疑问了,什么时候会往KNOWN_SETTINGS这个列表上添加元素呢?
在这个文件全局搜了下KNOWN_SETTINGS,发现了一个有趣的技巧
class SettingMeta(type):
def __new__(cls, name, bases, attrs):
super_new = super(SettingMeta, cls).__new__
parents = [b for b in bases if isinstance(b, SettingMeta)]
if not parents:
return super_new(cls, name, bases, attrs)
attrs["order"] = len(KNOWN_SETTINGS)
attrs["validator"] = wrap_method(attrs["validator"])
new_class = super_new(cls, name, bases, attrs)
new_class.fmt_desc(attrs.get("desc", ""))
KNOWN_SETTINGS.append(new_class)
return new_class
...
class Setting(object):
...
Setting = SettingMeta('Setting', (Setting,), {})
...
class ConfigFile(Setting):
...
class Bind(Setting):
...
这边的代码片段上使用了python的元类,Setting是由SettingMeta这个元类创建出来的类,继承Setting的子类都会被SettingMeta这个元类创建。
而创建类的时候,会把这些类放在KNOWN_SETTINGS列表中。
所以make_settings这个函数返回了除ignore之外的所有继承Setting的类的实例。而对Config对象实例的操作会被代理到对应的setting实例上。
下面回到run方法的实现上,WSGIApplication没有实现run方法,重点还是看基类BaseApplication的run实现。
class BaseApplication(object):
...
def run(self):
try:
Arbiter(self).run()
except RuntimeError as e:
print("\nError: %s\n" % e, file=sys.stderr)
sys.stderr.flush()
sys.exit(1)
class Application(BaseApplication):
...
def run(self):
...
super(Application, self).run()
Arbiter这个类在gunicorn是相当重要,可以说WSGIApplication只是用来管理gunicorn的配置,而Arbiter是gunicorn中用来管理worker的。
Master 进程
The master process is a simple loop that listens for various process signals and reacts accordingly. It manages the list of running workers by listening for signals like TTIN, TTOU, and CHLD. TTIN and TTOU tell the master to increase or decrease the number of running workers. CHLD indicates that a child process has terminated, in this case the master process automatically restarts the failed worker.
master进程用循环来监听信号事件并处理,通过监听信号事件来管理运行中worker的数目。
run方法是master进程的loop所在。
class Arbiter(object):
...
def run(self):
"Main master loop."
self.start()
util._setproctitle("master [%s]" % self.proc_name)
try:
self.manage_workers()
while True:
self.maybe_promote_master()
sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None
if sig is None:
self.sleep()
self.murder_workers()
self.manage_workers()
continue
if sig not in self.SIG_NAMES:
self.log.info("Ignoring unknown signal: %s", sig)
continue
signame = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % signame, None)
if not handler:
self.log.error("Unhandled signal: %s", signame)
continue
self.log.info("Handling signal: %s", signame)
handler()
self.wakeup()
...
我们来看看start都做了什么事
class Arbiter(object):
...
def start(self):
"""\
Initialize the arbiter. Start listening and set pidfile if needed.
"""
self.log.info("Starting gunicorn %s", __version__)
if 'GUNICORN_PID' in os.environ:
self.master_pid = int(os.environ.get('GUNICORN_PID'))
self.proc_name = self.proc_name + ".2"
self.master_name = "Master.2"
self.pid = os.getpid()
if self.cfg.pidfile is not None:
pidname = self.cfg.pidfile
if self.master_pid != 0:
pidname += ".2"
self.pidfile = Pidfile(pidname)
self.pidfile.create(self.pid)
self.cfg.on_starting(self)
self.init_signals()
if not self.LISTENERS:
fds = None
listen_fds = systemd.listen_fds()
if listen_fds:
self.systemd = True
fds = range(systemd.SD_LISTEN_FDS_START,
systemd.SD_LISTEN_FDS_START + listen_fds)
elif self.master_pid:
fds = []
for fd in os.environ.pop('GUNICORN_FD').split(','):
fds.append(int(fd))
self.LISTENERS = sock.create_sockets(self.cfg, self.log, fds)
listeners_str = ",".join([str(l) for l in self.LISTENERS])
self.log.debug("Arbiter booted")
self.log.info("Listening at: %s (%s)", listeners_str, self.pid)
self.log.info("Using worker: %s", self.cfg.worker_class_str)
# check worker class requirements
if hasattr(self.worker_class, "check_config"):
self.worker_class.check_config(self.cfg, self.log)
self.cfg.when_ready(self)
- 调用
self.init_signals注册消息事件 - 创建
LISTENERS
注册消息事件
class Arbiter(object):
...
def init_signals(self):
"""\
Initialize master signal handling. Most of the signals
are queued. Child signals only wake up the master.
"""
# close old PIPE
if self.PIPE:
[os.close(p) for p in self.PIPE]
# initialize the pipe
self.PIPE = pair = os.pipe()
for p in pair:
util.set_non_blocking(p)
util.close_on_exec(p)
self.log.close_on_exec()
# initialize all signals
[signal.signal(s, self.signal) for s in self.SIGNALS]
signal.signal(signal.SIGCHLD, self.handle_chld)
def signal(self, sig, frame):
if len(self.SIG_QUEUE) < 5:
self.SIG_QUEUE.append(sig)
self.wakeup()
init_signals会先关闭已存在的管道对self.PIPE,然后创建一个新的管道对,初始化管道并注册信号事件,除了SIGCHLD信号外,其他信号都会被signal方法处理,处理方式就是把信号加到信号事件队列,然后唤醒自身,当然前提是信号事件队列没有满的情况。一旦队列满了,就不对信号做任何处理。
创建LISTENERS
def _sock_type(addr):
if isinstance(addr, tuple):
if util.is_ipv6(addr[0]):
sock_type = TCP6Socket
else:
sock_type = TCPSocket
elif isinstance(addr, string_types):
sock_type = UnixSocket
else:
raise TypeError("Unable to create socket from: %r" % addr)
return sock_type
def create_sockets(conf, log, fds=None):
"""
Create a new socket for the configured addresses or file descriptors.
If a configured address is a tuple then a TCP socket is created.
If it is a string, a Unix socket is created. Otherwise, a TypeError is
raised.
"""
listeners = []
# get it only once
laddr = conf.address
# check ssl config early to raise the error on startup
# only the certfile is needed since it can contains the keyfile
if conf.certfile and not os.path.exists(conf.certfile):
raise ValueError('certfile "%s" does not exist' % conf.certfile)
if conf.keyfile and not os.path.exists(conf.keyfile):
raise ValueError('keyfile "%s" does not exist' % conf.keyfile)
# sockets are already bound
if fds is not None:
for fd in fds:
sock = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM)
sock_name = sock.getsockname()
sock_type = _sock_type(sock_name)
listener = sock_type(sock_name, conf, log, fd=fd)
listeners.append(listener)
return listeners
# no sockets is bound, first initialization of gunicorn in this env.
for addr in laddr:
sock_type = _sock_type(addr)
sock = None
for i in range(5):
try:
sock = sock_type(addr, conf, log)
except socket.error as e:
...
else:
break
if sock is None:
log.error("Can't connect to %s", str(addr))
sys.exit(1)
listeners.append(sock)
return listeners
create_sockets函数会通过配置的地址或文件描述符去创建socket,如果配置的地址是元组,则创建一个tcp socket,如果是字符串,则创建一个unix socket。
这些sockets最终将被worker消费,每次创建worker的时候都会把sockets当参数传递过去。
再回到Arbiter的run方法,start之后调用了manage_workers方法。
class Arbiter(object):
...
def manage_workers(self):
"""\
Maintain the number of workers by spawning or killing
as required.
"""
if len(self.WORKERS.keys()) < self.num_workers:
self.spawn_workers()
workers = self.WORKERS.items()
workers = sorted(workers, key=lambda w: w[1].age)
while len(workers) > self.num_workers:
(pid, _) = workers.pop(0)
self.kill_worker(pid, signal.SIGTERM)
active_worker_count = len(workers)
if self._last_logged_active_worker_count != active_worker_count:
self._last_logged_active_worker_count = active_worker_count
self.log.debug("{0} workers".format(active_worker_count),
extra={"metric": "gunicorn.workers",
"value": active_worker_count,
"mtype": "gauge"})
manage_workers方法维护了大小为num_workers的worker数,worker进程是在spawn_worker方法中被创建的
class Arbiter(object):
...
def spawn_workers(self):
"""\
Spawn new workers as needed.
This is where a worker process leaves the main loop
of the master process.
"""
for i in range(self.num_workers - len(self.WORKERS.keys())):
self.spawn_worker()
time.sleep(0.1 * random.random())
def spawn_worker(self):
self.worker_age += 1
worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
self.app, self.timeout / 2.0,
self.cfg, self.log)
self.cfg.pre_fork(self, worker)
pid = os.fork()
if pid != 0:
worker.pid = pid
self.WORKERS[pid] = worker
return pid
# Process Child
worker.pid = os.getpid()
try:
util._setproctitle("worker [%s]" % self.proc_name)
self.log.info("Booting worker with pid: %s", worker.pid)
self.cfg.post_fork(self, worker)
worker.init_process()
sys.exit(0)
except SystemExit:
raise
except ...
master进程会先实例化worker_class,默认的worker_class是SyncWorker。
可以在fork子进程之前预处理一些操作,具体可以在gunicorn.config的Prefork类实现。
fork之后会产生子进程,而父进程master把实例化的worker对象放到self.WORKERS中,这边的pid是子进程的进程ID。结下来父进程结束了spawn_worker,直接return
worker.pid = pid
self.WORKERS[pid] = worker
return pid
而fork出来的子进程会继续执行spawn_worker的逻辑。主要的逻辑就是:
try:
util._setproctitle("worker [%s]" % self.proc_name)
self.log.info("Booting worker with pid: %s", worker.pid)
self.cfg.post_fork(self, worker)
worker.init_process()
sys.exit(0)
except SystemExit:
raise
这边会产生疑问,sys.exit(0)不是会退出子程序么?即使SystemExit异常被捕获但是也没有处理?
其实这个worker进程正常情况不会退出,原因就是在worker.init_process()中的实现。
class Worker(object):
...
def init_process(self):
...
self.run()
class SyncWorker(base.Worker):
def run_for_one(self, timeout):
listener = self.sockets[0]
while self.alive:
...
def run_for_multiple(self, timeout):
while self.alive:
...
def run(self):
...
if len(self.sockets) > 1:
self.run_for_multiple(timeout)
else:
self.run_for_one(timeout)
可以看得出来子类worker实现的run_for_multiple和run_for_one都会在循环中度过。
再次回到Arbiter的run方法,现在run方法进入了loop过程。
while True:
self.maybe_promote_master()
sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None
if sig is None:
self.sleep()
self.murder_workers()
self.manage_workers()
continue
if sig not in self.SIG_NAMES:
self.log.info("Ignoring unknown signal: %s", sig)
continue
signame = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % signame, None)
if not handler:
self.log.error("Unhandled signal: %s", signame)
continue
self.log.info("Handling signal: %s", signame)
handler()
self.wakeup()
loop过程,每次从消息事件队列取一个消息处理,具体的消息处理会转交给handle_<signame>方法处理,如果没有信号要处理,就进入休眠状态直到被唤醒。这里就是master进程基本的工作。
master进程进入休眠之后什么时候会被唤醒,怎么唤醒的?
我们来看看master进程休眠和唤醒的过程。
class Arbiter(object):
...
def wakeup(self):
"""\
Wake up the arbiter by writing to the PIPE
"""
try:
os.write(self.PIPE[1], b'.')
except IOError as e:
if e.errno not in [errno.EAGAIN, errno.EINTR]:
raise
...
def sleep(self):
"""\
Sleep until PIPE is readable or we timeout.
A readable PIPE means a signal occurred.
"""
try:
ready = select.select([self.PIPE[0]], [], [], 1.0)
if not ready[0]:
return
while os.read(self.PIPE[0], 1):
pass
except select.error as e:
if e.args[0] not in [errno.EAGAIN, errno.EINTR]:
raise
except OSError as e:
if e.errno not in [errno.EAGAIN, errno.EINTR]:
raise
except KeyboardInterrupt:
sys.exit()
可以看得出来Arbiter的sleep方法会监视之前创建的管道读端PIPE[0],一直等待到这一端有数据才结束。
wakeup方法会在信号被加到信号事件队列之后调用,往管道写端PIPE[1]写数据。
Worker 进程
这边我们重点来看看Worker中init_process的实现:
class Worker(object):
...
def init_process(self):
"""\
If you override this method in a subclass, the last statement
in the function should be to call this method with
super(MyWorkerClass, self).init_process() so that the ``run()``
loop is initiated.
"""
# set environment' variables
if self.cfg.env:
for k, v in self.cfg.env.items():
os.environ[k] = v
util.set_owner_process(self.cfg.uid, self.cfg.gid,
initgroups=self.cfg.initgroups)
# Reseed the random number generator
util.seed()
# For waking ourselves up
self.PIPE = os.pipe()
...
self.wait_fds = self.sockets + [self.PIPE[0]]
self.init_signals()
...
self.load_wsgi()
self.cfg.post_worker_init(self)
# Enter main run loop
self.booted = True
self.run()
- 创建管道
- 注册消息事件
- 加载wsgi应用
- 执行
run
注册消息事件
class Worker(object):
...
def init_signals(self):
# reset signaling
[signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
# init new signaling
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_quit)
signal.signal(signal.SIGWINCH, self.handle_winch)
signal.signal(signal.SIGUSR1, self.handle_usr1)
signal.signal(signal.SIGABRT, self.handle_abort)
# Don't let SIGTERM and SIGUSR1 disturb active requests
# by interrupting system calls
if hasattr(signal, 'siginterrupt'): # python >= 2.6
signal.siginterrupt(signal.SIGTERM, False)
signal.siginterrupt(signal.SIGUSR1, False)
if hasattr(signal, 'set_wakeup_fd'):
signal.set_wakeup_fd(self.PIPE[1])
注册消息事件的时候,worker进程会通过设置文件描述符(self.PIPE[1]),当接收到信号的时候,一个'\0'字节被写入到指定的fd上(这里是管道的写端self.PIPE[1]),从而来唤醒一个poll或select调用,允许信号被处理。
执行run
run方法由各个子类实现,我们来看看SyncWorker的run方法
class SyncWorker(base.Worker):
...
def accept(self, listener):
client, addr = listener.accept()
client.setblocking(1)
util.close_on_exec(client)
self.handle(listener, client, addr)
def run_for_one(self, timeout):
listener = self.sockets[0]
while self.alive:
self.notify()
# Accept a connection. If we get an error telling us
# that no connection is waiting we fall down to the
# select which is where we'll wait for a bit for new
# workers to come give us some love.
try:
self.accept(listener)
# Keep processing clients until no one is waiting. This
# prevents the need to select() for every client that we
# process.
continue
except ...
if not self.is_parent_alive():
return
try:
self.wait(timeout)
except StopWaiting:
return
def run_for_multiple(self, timeout):
while self.alive:
self.notify()
try:
ready = self.wait(timeout)
except StopWaiting:
return
if ready is not None:
for listener in ready:
if listener == self.PIPE[0]:
continue
try:
self.accept(listener)
except ...
if not self.is_parent_alive():
return
def run(self):
# if no timeout is given the worker will never wait and will
# use the CPU for nothing. This minimal timeout prevent it.
timeout = self.timeout or 0.5
# self.socket appears to lose its blocking status after
# we fork in the arbiter. Reset it here.
for s in self.sockets:
s.setblocking(0)
if len(self.sockets) > 1:
self.run_for_multiple(timeout)
else:
self.run_for_one(timeout)
run_for_multiple方法中调用wait方法
def wait(self, timeout):
try:
self.notify()
ret = select.select(self.wait_fds, [], [], timeout)
if ret[0]:
if self.PIPE[0] in ret[0]:
os.read(self.PIPE[0], 1)
return ret[0]
except ...
wait的调用会通过select来阻塞监听wait_fds列表,wait_fds列表包括socket列表self.sockets和worker管道读端self.PIPE[0],如果有可读的文件描述符,会返回这些可读的文件描述符,也就是说,worker进程会在有socket请求和信号事件(signal.set_wakeup_fd)触发唤醒。
可以看出了,run_for_one或者run_for_multiple方法从sockets列表取一个或多个socket,调用accept方法建立连接,调用handle方法处理请求。这边的请求处理是阻塞式的,每次只能处理一个请求。
class SyncWorker(base.Worker):
...
def handle(self, listener, client, addr):
req = None
try:
if self.cfg.is_ssl:
client = ssl.wrap_socket(client, server_side=True,
**self.cfg.ssl_options)
parser = http.RequestParser(self.cfg, client)
req = six.next(parser)
self.handle_request(listener, req, client, addr)
except ...
def handle_request(self, listener, req, client, addr):
environ = {}
resp = None
try:
self.cfg.pre_request(self, req)
request_start = datetime.now()
resp, environ = wsgi.create(req, client, addr,
listener.getsockname(), self.cfg)
# Force the connection closed until someone shows
# a buffering proxy that supports Keep-Alive to
# the backend.
resp.force_close()
self.nr += 1
if self.nr >= self.max_requests:
self.log.info("Autorestarting worker after current request.")
self.alive = False
respiter = self.wsgi(environ, resp.start_response)
except ...
handle方法会解析请求的内容并调用handle_request方法来创建一个wsgi请求并被wsgi应用处理。最后如果处理的请求总数大于最大请求数,这个worker进程就结束。
膜拜大佬,先mark。
@lvhuiyang 膜拜大佬,先mark。
被发现了QAQ,时隔一年才更新=。=
膜拜大佬 0v0
顺便问下大佬读源码的方式咋样?自己喜欢用ipdb单步调试一个最简单的demo,但gunicorn 会 fork进程,ipdb 貌似在多进程环境下就抽风了。。。
ps:美团云 wsgi server 也是用 Gunicorn的么?
@yetingsky 以前会用ipdb单步读源码,现在不这样了,主要还是单步容易被打断,影响阅读效率,而且对于fork操作也不好调试,所以现在都是直接用vim读源码片段,vim的ycm跳转代码块很方便的。
另外,美团云控制台的wsgi server是gunicorn + gevent
@Junnplus 感觉 Python 里面读着读着就不知道一个东西的 type 了 orz。。。anyway,膜拜菊苣 -w-
@yetingsky 豆瓣的菊苣别闹QWQ
对象的type其实可以靠上下文猜测,毕竟是动态语言。
@Junnplus 已经不在豆瓣了 orz =。=
也是菊苣 qaq
2017年5月6日(土) 10:35 Ting Ye [email protected]:
@Junnplus https://github.com/Junnplus 已经不在豆瓣了 orz =。=
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/Junnplus/blog/issues/9#issuecomment-299610387, or mute the thread https://github.com/notifications/unsubscribe-auth/AHuO9kh2OA0SCbyR7Yu-S9A55A4czhvQks5r29xrgaJpZM4NF6_D .
前排膜拜女装大佬!
@Sn0rt QWQ
膜拜大佬
看到一个头像和我一样的,内心还激动了会,结果发现是 2 月份就来过的我
膜拜大佬。。。
大佬
mark 一下,读完了。
mark
mark
膜拜大佬,mark