myblog icon indicating copy to clipboard operation
myblog copied to clipboard

envoy源码学习

Open hongmaoxiao opened this issue 6 years ago • 0 comments

最近深感时间不够用,想做的事情太多,想看的源码太多,而看源码并记录到博客里又是一件很耗费时间的事情。

所以我现在很能理解那些辛辛苦苦码字后被人随意转载且不留作者名字之后作者们愤愤不平想打官司的心情了。这让我想起了一个在微博看到的调侃这方面的一个笑话

以前看《青年文摘》的时候看见作者那一栏里总是写着"佚名"两个字,我当时觉得"佚名"一定是一个很厉害的人物,因为他什么题材都能写,而且写得还不算差,发誓长大后一定要做一个像"佚名"一样厉害的人物,写天下文章。
长大后我我才知道,"佚名"那不过是《青年文摘》编辑们盗取别人劳动成果的遮羞布。

笑话是不是这么讲的我已经忘了,照着自己的想法翻译了一遍。扯得有些远了,今天要写的是学习python大神Kenneth Reitz写的envoy库。

我照着一个个commit敲的代码放在envoy_source上。

代码结构如下

~/source/backend/python/envoy master
❯ tree
.
├── AUTHORS
├── envoy
│   ├── core.py
│   └── __init__.py
├── ext
│   └── in_action.png
├── LICENSE
├── MANIFEST.in
├── README.md
├── setup.cfg
├── setup.py
└── test_envoy.py

2 directories, 10 files

核心代码就一个core.py,直接研究core.py里的代码,看看它有多少行代码

~/source/backend/python/envoy master
❯ cat envoy/core.py | wc -l
256

也就256行代码,不多。

核心代码

#!/usr/bin/env python
# encoding: utf-8

"""
envoy.core
~~~~~~~~~~

This module provides envoy awesomeness
"""



import os
import sys
import shlex
import signal
import subprocess
import threading
import traceback


__version__ = '0.0.3'
__license__ = 'MIT'
__author__ = 'Kenneth Reitz'

def _terminate_process(process):
    if sys.platform == 'win32':
        import ctypes
        PROCESS_TERMINATE = 1
        handle = ctypes.windll.kernel32.OpenProcess(PROCESS_TERMINATE, False, process.pid)
        ctypes.windll.kernel32.TerminateProcess(handle, -1)
        ctypes.windll.kernel32.CloseHandle(handle)
    else:
        os.kill(process.pid, signal.SIGTERM)


def _kill_process(process):
    if sys.platform == 'win32':
        _terminate_process(process)
    else:
        os.kill(process.pid, signal.SIGKILL)


def _is_alive(thread):
    if hasattr(thread, "is_alive"):
        return thread.is_alive()
    else:
        return thread.isAlive()


class Command(object):
    def __init__(self, cmd):
        self.cmd = cmd
        self.process = None
        self.out = None
        self.err = None
        self.returncode = None
        self.data = None
        self.exc = None

    def run(self, data, timeout, kill_timeout, env, cwd):
        self.data = data
        environ = dict(os.environ)
        environ.update(env or {})

        def target():

            try:
                self.process = subprocess.Popen(self.cmd,
                    universal_newlines=True,
                    shell=False,
                    env=environ,
                    stdin=subprocess.PIPE,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    bufsize=0,
                    cwd=cwd,
                )

                if sys.version_info[0] >= 3:
                    self.out, self.err = self.process.communicate(
                        input = bytes(self.data, "UTF-8") if self.data else None
                    )
                else:
                    self.out, self.err = self.process.communicate(self.data)
            except Exception as exc:
                self.exc = exc


        thread = threading.Thread(target=target)
        thread.start()

        thread.join(timeout)
        if self.exc:
            raise self.exc
        if _is_alive(thread):
            _terminate_process(self.process)
            thread.join(kill_timeout)
            if _is_alive(thread):
                _kill_process(self.process)
                thread.join()
        self.returncode = self.process.returncode
        return self.out, self.err


class ConnectedCommand(object):
    def __init__(self,
        process=None,
        std_in=None,
        std_out=None,
        std_err=None):

        self._process = process
        self.std_in = std_in
        self.std_out = std_out
        self.std_err = std_out
        self._status_code = None

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        self.kill()

    @property
    def status_code(self):
        """The status code of the process.
        If the code is None, assume that it's still running.
        """
        return self._status_code

    @property
    def pid(self):
        """The process' PID."""
        return self._process.pid

    def kill(self):
        """Kills the process."""
        return self._process.kill()

    def expect(self, bytes, stream=None):
        """Block until given bytes appear in the stream."""
        if stream is None:
            stream = self.std_out

    def send(self, str, end='\n'):
        """Sends a line to std_in."""
        return self._process.stdin.write(str+end)

    def block(self):
        """Blocks until command finishes. Returns Response instance."""
        self._status_code = self._process.wait()



class Response(object):
    """A command's response"""

    def __init__(self, process=None):
        super(Response, self).__init__()

        self._process = process
        self.command = None
        self.std_err = None
        self.std_out = None
        self.status_code = None
        self.history = []


    def __repr__(self):
        if len(self.command):
            return '<Response [{0}]>'.format(self.command[0])
        else:
            return '<Response>'


def expand_args(command):
    """Parses command strings and returns a Popen-ready list."""

    # Prepare arguments.
    if isinstance(command, (str, unicode)):
        splitter = shlex.shlex(command.encode('utf-8'))
        splitter.whitespace = '|'
        splitter.whitespace_split = True
        command = []

        while True:
            token = splitter.get_token()
            if token:
                command.append(token)
            else:
                break

        command = list(map(shlex.split, command))

    return command


def run(command, data=None, timeout=None, kill_timeout=None, env=None, cwd=None):
    """Executes a given command and returns Response.

    Blocks until process is complete, or timeout is reached.
    """

    command = expand_args(command)

    history = []
    for c in command:

        if len(history):
            # due to broken pipe problems pass only first 10 KiB
            data = history[-1].std_out[0:10*1024]

        cmd = Command(c)
        try:
            out, err = cmd.run(data, timeout, kill_timeout, env, cwd)
            status_code = cmd.returncode
        except OSError as e:
            out, err = '', u"\n".join([e.strerror, traceback.format_exc()])
            status_code = 127

        r = Response(process=cmd)

        r.command = c
        r.std_out = out
        r.std_err = err
        r.status_code = status_code

        history.append(r)

    r = history.pop()
    r.history = history

    return r


def connect(command, data=None, env=None, cwd=None):
    """Spawns a new process from the given command."""

    # TODO: support piped commands
    command_str = expand_args(command).pop()
    environ = dict(os.environ)
    environ.update(env or {})

    process = subprocess.Popen(command_str,
        universal_newlines=True,
        shell=False,
        env=environ,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        bufsize=0,
        cwd=cwd,
    )

    return ConnectedCommand(process=process)

其实还好,算是一个非常小的库了,但这个库在github上的star有3023多个,可见这个库还是很有用的,那么这个库干什么用呢,看看github上的介绍。

Envoy: Python Subprocesses for Humans.

Note: Delegator is a replacement for Envoy.

This is a convenience wrapper around the subprocess module.

You don't need this.

But you want it.

Usage

Run a command, get the response:

>>> r = envoy.run('git config', data='data to pipe in', timeout=2)

>>> r.status_code
129
>>> r.std_out
'usage: git config [options]'
>>> r.std_err
''

Pipe stuff around too:

>>> r = envoy.run('uptime | pbcopy')

>>> r.command
'pbcopy'
>>> r.status_code
0

>>> r.history
[<Response 'uptime'>]

大概的意思就是在python里运行linux命令,可以查看命令运行的状态status_code、输出std_out、错误std_err、命令command和历史history等等。

import os
import sys
import shlex
import signal
import subprocess
import threading
import traceback


__version__ = '0.0.3'
__license__ = 'MIT'
__author__ = 'Kenneth Reitz'

开头导入后续会用到的库,还有当前的版本、开源许可证和库版本。

然后定义了一个终止进程的函数_terminate_process

def _terminate_process(process):
    if sys.platform == 'win32':
        import ctypes
        PROCESS_TERMINATE = 1
        handle = ctypes.windll.kernel32.OpenProcess(PROCESS_TERMINATE, False, process.pid)
        ctypes.windll.kernel32.TerminateProcess(handle, -1)
        ctypes.windll.kernel32.CloseHandle(handle)
    else:
        os.kill(process.pid, signal.SIGTERM)

这个终止进程不同系统平台的实现方式不一样,从代码可以看出,至少windows 32位的实现方式和别的不一样,所以需要单拎出来,先看普遍的实现方法os.kill(pid, sig)吧,它有两个参数,pid指的是进程的pid,sig值signal模块的信号码,这里终止进程的信号码是signal.SIGTERM。对于windows 32位的实现方法我查了一下文档都没有讲这么详细的函数,倒是在stackoverflow的时候查到了一个相关的In Python 2.5, how do I kill a subprocess?

。就是这里的实现方式,然后在python文档里说到,python3.2以后貌似os.kill已经支持了所有的windows 32位系统。

接下来看终止进程函数_kill_process

def _kill_process(process):
    if sys.platform == 'win32':
        _terminate_process(process)
    else:
        os.kill(process.pid, signal.SIGKILL)

这里也是判断系统,如果是windows 32位系统,则调用_terminate_process函数终止,否则,调用标准的os.kill函数。这里可能有人会说在_kill_process函数里已经判断了系统版本了,那在_terminate_process还要判断一次,那不是多此一举吗?其实我觉得不能这么看,这个现在是先调用_kill_process,这在_terminate_process不判断没有问题,但是如果是直接调用_terminate_process的话就有问题了。所以这个也是为了独立函数的完备性吧。

然后是_is_alive函数

def _is_alive(thread):
    if hasattr(thread, "is_alive"):
        return thread.is_alive()
    else:
        return thread.isAlive()

这个_is_alive函数存在的意义是由于不同版本的python实现该方法的名称不同,所以要保证兼容性要做完备判断,如果thread函数is_alive属性,说明该版本对该方法的实现就是thread.is_alive(),所以返回它就是了,否则,统统返回thread.isAlive()。<br><br> 然后是Command`类了

class Command(object):
    def __init__(self, cmd):
        self.cmd = cmd
        self.process = None
        self.out = None
        self.err = None
        self.returncode = None
        self.data = None
        self.exc = None

    def run(self, data, timeout, kill_timeout, env, cwd):
        self.data = data
        environ = dict(os.environ)
        environ.update(env or {})

        def target():

            try:
                self.process = subprocess.Popen(self.cmd,
                    universal_newlines=True,
                    shell=False,
                    env=environ,
                    stdin=subprocess.PIPE,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    bufsize=0,
                    cwd=cwd,
                )

                if sys.version_info[0] >= 3:
                    self.out, self.err = self.process.communicate(
                        input = bytes(self.data, "UTF-8") if self.data else None
                    )
                else:
                    self.out, self.err = self.process.communicate(self.data)
            except Exception as exc:
                self.exc = exc


        thread = threading.Thread(target=target)
        thread.start()

        thread.join(timeout)
        if self.exc:
            raise self.exc
        if _is_alive(thread):
            _terminate_process(self.process)
            thread.join(kill_timeout)
            if _is_alive(thread):
                _kill_process(self.process)
                thread.join()
        self.returncode = self.process.returncode
        return self.out, self.err

首先就是__init__函数了,这个就是实例化Command类的时候,就会自动调用的,这里就是初始化一些变量。主要就是把参数cmd存到self.cmd中,其他的就不多说了。接下来是一个类的run方法。先把data变量存到self.data中,然后dict(os.environ)把系统的环境变量转成字典形式存到environ中,再调用environ.update(env or {})env变量更新environ变量,更新的意思是environ没有的项而env有的,就增量加到environ中,如果envenviron中有相同的键,但值不一样,那么就用env的值替换environ对应项的值。update函数里还有个or表达式,意思是如果env是None的话,就用空字典取更新environ,其实就是跟没更新一样。可以看看我电脑里的环境变量

In [1]: import os

In [2]: print(os.environ)
environ({'XDG_DATA_DIRS': '/usr/share/ubuntu:/usr/share/gnome:/usr/local/share/:/usr/share/:/var/lib/snapd/desktop', 'MANPATH': '/home/mao/.nvm/versions/node/v6.9.1/share/man:/usr/local/man:/usr/local/share/man:/usr/share/man', 'GTK2_MODULES': 'overlay-scrollbar', 'LESS': '-R', 'SESSIONTYPE': 'gnome-session', 'QT_IM_MODULE': 'fcitx', 'PWD': '/home/mao', 'HOME': '/home/mao', 'ZSH': '/home/mao/.oh-my-zsh', 'CLUTTER_IM_MODULE': 'xim', 'NVM_NODEJS_ORG_MIRROR': 'https://nodejs.org/dist', 'DBUS_SESSION_BUS_ADDRESS': 'unix:abstract=/tmp/dbus-4k8QmdHrvZ', 'USER': 'mao', 'GDM_LANG': 'zh_CN', 'GPG_AGENT_INFO': '/home/mao/.gnupg/S.gpg-agent:0:1', 'UPSTART_SESSION': 'unix:abstract=/com/ubuntu/upstart-session/1000/1746', 'XDG_VTNR': '7', 'XDG_SEAT_PATH': '/org/freedesktop/DisplayManager/Seat0', 'XDG_CONFIG_DIRS': '/etc/xdg/xdg-ubuntu:/usr/share/upstart/xdg:/etc/xdg', 'GTK_IM_MODULE': 'fcitx', 'GNOME_KEYRING_PID': '', 'UPSTART_JOB': 'unity-settings-daemon', 'LOGNAME': 'mao', 'TERM': 'xterm-256color', 'XDG_RUNTIME_DIR': '/run/user/1000', 'NVM_BIN': '/home/mao/.nvm/versions/node/v6.9.1/bin', '_': '/usr/local/bin/ipython3', 'NVM_IOJS_ORG_MIRROR': 'https://iojs.org/dist', 'QT_LINUX_ACCESSIBILITY_ALWAYS_ON': '1', 'IM_CONFIG_PHASE': '1', 'XDG_SEAT': 'seat0', 'XMODIFIERS': '@im=fcitx', 'NVM_DIR': '/home/mao/.nvm', 'QT_ACCESSIBILITY': '1', 'PAGER': 'less', 'GNOME_DESKTOP_SESSION_ID': 'this-is-deprecated', 'NVM_PATH': '/home/mao/.nvm/versions/node/v6.9.1/lib/node', 'SHELL': '/bin/zsh', 'OLDPWD': '/home/mao', 'XDG_SESSION_ID': 'c2', 'GOPATH': '/home/mao/go:/home/mao/work:/home/mao/yesman', 'GTK_MODULES': 'gail:atk-bridge:unity-gtk-module', 'DESKTOP_SESSION': 'ubuntu', 'COMPIZ_CONFIG_PROFILE': 'ubuntu', 'PATH': '/home/mao/.nvm/versions/node/v6.9.1/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/usr/local/go/bin:/home/mao/go/bin:/home/mao/go:/home/mao/work:/home/mao/yesman/bin', 'VTE_VERSION': '4205', 'NVM_CD_FLAGS': '', 'SHLVL': '1', 'INSTANCE': 'Unity', 'SSH_AUTH_SOCK': '/run/user/1000/keyring/ssh', 'XDG_CURRENT_DESKTOP': 'Unity', 'QT_QPA_PLATFORMTHEME': 'appmenu-qt5', 'DISPLAY': ':0', 'QT4_IM_MODULE': 'fcitx', 'XDG_GREETER_DATA_DIR': '/var/lib/lightdm-data/mao', 'UPSTART_EVENTS': 'started starting', 'LSCOLORS': 'Gxfxcxdxbxegedabagacad', 'DEFAULTS_PATH': '/usr/share/gconf/ubuntu.default.path', 'XAUTHORITY': '/home/mao/.Xauthority', 'LC_CTYPE': 'zh_CN.UTF-8', 'XDG_SESSION_TYPE': 'x11', 'JOB': 'gnome-session', 'XDG_SESSION_PATH': '/org/freedesktop/DisplayManager/Session0', 'GOROOT': '/usr/local/go', 'XDG_SESSION_DESKTOP': 'ubuntu', 'UPSTART_INSTANCE': '', 'LS_COLORS': 'no=00:fi=00:di=34:ow=34;40:ln=35:pi=30;44:so=35;44:do=35;44:bd=33;44:cd=37;44:or=05;37;41:mi=05;37;41:ex=01;31:*.cmd=01;31:*.exe=01;31:*.com=01;31:*.bat=01;31:*.reg=01;31:*.app=01;31:*.txt=32:*.org=32:*.md=32:*.mkd=32:*.h=32:*.hpp=32:*.c=32:*.C=32:*.cc=32:*.cpp=32:*.cxx=32:*.objc=32:*.cl=32:*.sh=32:*.bash=32:*.csh=32:*.zsh=32:*.el=32:*.vim=32:*.java=32:*.pl=32:*.pm=32:*.py=32:*.rb=32:*.hs=32:*.php=32:*.htm=32:*.html=32:*.shtml=32:*.erb=32:*.haml=32:*.xml=32:*.rdf=32:*.css=32:*.sass=32:*.scss=32:*.less=32:*.js=32:*.coffee=32:*.man=32:*.0=32:*.1=32:*.2=32:*.3=32:*.4=32:*.5=32:*.6=32:*.7=32:*.8=32:*.9=32:*.l=32:*.n=32:*.p=32:*.pod=32:*.tex=32:*.go=32:*.sql=32:*.csv=32:*.bmp=33:*.cgm=33:*.dl=33:*.dvi=33:*.emf=33:*.eps=33:*.gif=33:*.jpeg=33:*.jpg=33:*.JPG=33:*.mng=33:*.pbm=33:*.pcx=33:*.pdf=33:*.pgm=33:*.png=33:*.PNG=33:*.ppm=33:*.pps=33:*.ppsx=33:*.ps=33:*.svg=33:*.svgz=33:*.tga=33:*.tif=33:*.tiff=33:*.xbm=33:*.xcf=33:*.xpm=33:*.xwd=33:*.xwd=33:*.yuv=33:*.aac=33:*.au=33:*.flac=33:*.m4a=33:*.mid=33:*.midi=33:*.mka=33:*.mp3=33:*.mpa=33:*.mpeg=33:*.mpg=33:*.ogg=33:*.opus=33:*.ra=33:*.wav=33:*.anx=33:*.asf=33:*.avi=33:*.axv=33:*.flc=33:*.fli=33:*.flv=33:*.gl=33:*.m2v=33:*.m4v=33:*.mkv=33:*.mov=33:*.MOV=33:*.mp4=33:*.mp4v=33:*.mpeg=33:*.mpg=33:*.nuv=33:*.ogm=33:*.ogv=33:*.ogx=33:*.qt=33:*.rm=33:*.rmvb=33:*.swf=33:*.vob=33:*.webm=33:*.wmv=33:*.doc=31:*.docx=31:*.rtf=31:*.odt=31:*.dot=31:*.dotx=31:*.ott=31:*.xls=31:*.xlsx=31:*.ods=31:*.ots=31:*.ppt=31:*.pptx=31:*.odp=31:*.otp=31:*.fla=31:*.psd=31:*.7z=1;35:*.apk=1;35:*.arj=1;35:*.bin=1;35:*.bz=1;35:*.bz2=1;35:*.cab=1;35:*.deb=1;35:*.dmg=1;35:*.gem=1;35:*.gz=1;35:*.iso=1;35:*.jar=1;35:*.msi=1;35:*.rar=1;35:*.rpm=1;35:*.tar=1;35:*.tbz=1;35:*.tbz2=1;35:*.tgz=1;35:*.tx=1;35:*.war=1;35:*.xpi=1;35:*.xz=1;35:*.z=1;35:*.Z=1;35:*.zip=1;35:*.ANSI-30-black=30:*.ANSI-01;30-brblack=01;30:*.ANSI-31-red=31:*.ANSI-01;31-brred=01;31:*.ANSI-32-green=32:*.ANSI-01;32-brgreen=01;32:*.ANSI-33-yellow=33:*.ANSI-01;33-bryellow=01;33:*.ANSI-34-blue=34:*.ANSI-01;34-brblue=01;34:*.ANSI-35-magenta=35:*.ANSI-01;35-brmagenta=01;35:*.ANSI-36-cyan=36:*.ANSI-01;36-brcyan=01;36:*.ANSI-37-white=37:*.ANSI-01;37-brwhite=01;37:*.log=01;32:*~=01;32:*#=01;32:*.bak=01;33:*.BAK=01;33:*.old=01;33:*.OLD=01;33:*.org_archive=01;33:*.off=01;33:*.OFF=01;33:*.dist=01;33:*.DIST=01;33:*.orig=01;33:*.ORIG=01;33:*.swp=01;33:*.swo=01;33:*,v=01;33:*.gpg=34:*.gpg=34:*.pgp=34:*.asc=34:*.3des=34:*.aes=34:*.enc=34:*.sqlite=34:', 'MANDATORY_PATH': '/usr/share/gconf/ubuntu.mandatory.path', 'PROMPT_EOL_MARK': '', 'LANG': 'zh_CN.UTF-8', 'LANGUAGE': 'zh_CN:zh', 'GNOME_KEYRING_CONTROL': '', 'WINDOWID': '88080394', 'GDMSESSION': 'ubuntu'})

当然你自己的环境变量基本上不可能和我的完全一样。
好,现在就轮到run方法里的target函数了,这里主要就是调用一个subprocess.Popen方法,并将结果存到self.process中。这个函数有很多很多参数,文档里非常详细,但这里就不一一重复了,只需要知道这个方法是创建进程并管理的方法就好。因为如果要研究的话应该在研究python源代码的时候去研究它。接下来进行一个if判断,这个sys.version_info是关于当前python版本的相关信息。它会返回含有5个元素的元组major, minor, micro, releaselevel, and serial。其中除了releaselevel以外都是整型。releaselevel的值有'alpha', 'beta', 'candidate'或者'final',具体意思的很明显。现在来看这个python3.5.2版本和python2.7.12版本的sys.version_info有什么不同。

❯ python
Python 3.5.2 (default, Nov 17 2016, 17:05:23) 
[GCC 5.4.0 20160609] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import sys
>>> sys.version_info
sys.version_info(major=3, minor=5, micro=2, releaselevel='final', serial=0)

❯ python2.7
Python 2.7.12 (default, Nov 19 2016, 06:48:10) 
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import sys
>>> sys.version_info
sys.version_info(major=2, minor=7, micro=12, releaselevel='final', serial=0)

对比一看就知道major就是主版本号,minor姑且叫副版本号,micro就是小版本的意思了。releaselevel就是版本是最终版还是测试版等,serial这个字面意思是串行的,序列接口的,这个我看不太明白它的意思。但不影响这个条件的判断。 回到条件判断应该就是python主版本号在大于等于3以后的和3之前的处理字符转换这些方面的方法已经不相同了,这又是一个很大的课题,这个据说很多python程序员在这一块上耗费的时间特别长,貌似特别繁琐,据说在3以后已经好很多了。当版本号大于等于3的时候self.process.communicate要用bytes转换成utf-8的编码方式作为参数。而其他的版本则直接以self.data作为参数。target()函数可能会在运行过程中有异常,如果有,把异常存到self.exc中。

然后是一个threading模块的Thread方法,这个方法的__doc__如下:

In [4]: print(threading.Thread.__doc__)
A class that represents a thread of control.

    This class can be safely subclassed in a limited fashion. There are two ways
    to specify the activity: by passing a callable object to the constructor, or
    by overriding the run() method in a subclass.

这是一个实现线程管理的类。这是一个可以在一定的限制条件下被安全继承的类。激活线程活动有两种方法,一种是传一个可调用对象给该类的构造函数,另一个是在子线程中通过run()方法重写。
官网介绍的threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)方法有六个个参数:

  1. group应当为None,这是为了将来ThreadGroup类实现后的扩展预留的。
  1. target是一个可以通过run()方法调用的可调用对象,默认为None,表明没什么可以调用的。
  2. name是thread的名字,如果未指定,则自动命名为一个唯一的类似 “Thread-N” 形式的名字,其中,N代表一个整数。如“Thread-1”、“Thread-2”等等。
  3. args是一个供target调用时候的参数元组,默认为空元组。
  4. kwargs是一个供target调用时候的字典,默认为空字典。
  5. daemon是指守护进程,如果不为None,就是指定该daemon为守护进程;默认是None,意思是将从当前线程继承(这块不太懂,不知道翻译得对不对)。

thread = threading.Thread(target=target)实例化一个threading.Thread()存到thread中,target就是在这之前定义的可调用的target函数,然后调用threadstart方法。

In [5]: print(threading.Thread.start.__doc__)
Start the thread's activity.

        It must be called at most once per thread object. It arranges for the
        object's run() method to be invoked in a separate thread of control.

        This method will raise a RuntimeError if called more than once on the
        same thread object.

start就是启动线程的方法。每个线程最多只能被启动一次,它让对象的run()方法在单独的控制线程中被调用。如果一个线程被调用多次,将会抛出值为RuntimeError的异常。

来看看thread.join(timeout)的_.doc_

In [6]: print(threading.Thread.join.__doc__)
Wait until the thread terminates.

        This blocks the calling thread until the thread whose join() method is
        called terminates -- either normally or through an unhandled exception
        or until the optional timeout occurs.

        When the timeout argument is present and not None, it should be a
        floating point number specifying a timeout for the operation in seconds
        (or fractions thereof). As join() always returns None, you must call
        isAlive() after join() to decide whether a timeout happened -- if the
        thread is still alive, the join() call timed out.

        When the timeout argument is not present or None, the operation will
        block until the thread terminates.

        A thread can be join()ed many times.

        join() raises a RuntimeError if an attempt is made to join the current
        thread as that would cause a deadlock. It is also an error to join() a
        thread before it has been started and attempts to do so raises the same
        exception.

join方法是线程等待超时时间,也就是说如果在timeout时间内,线程还未结束,则不再阻塞进程。由于join方法的返回值一直是None,所以在调用完join方法后必须得调用isAlive()/is_alive()方法来检测是不是过了超时时间,如果线程timeout未指定或者为None,进程将一直阻塞直到线程运行完成;一个线程可以调用join多次,如果试图加入当前线程会引起死锁的话join会抛出一个值为RuntimeError的异常。如果join方法在线程启动之前调用,也会抛出一样的异常。

有了这些基础知识之后,再回头来理解代码,先创建一个以threading.Thread(target=target)函数为target参数的线程,然后thread.start()启动线程,加上等待超时时间thread.join(timeout),防止线程一直阻塞进程。如果运行线程过程中有异常,则抛出一个self.exc的异常。通过_is_alive(thread)来检测是否过了超时时间,如果是,就终止self.process,然后再加上一个等待超时时间thread.join(kill_timeout),这里kill_timeout的意思是,如果在kill_timeout时间内还没运行结束,则完全终止self.processjoin完后再进行_is_alive(thread)的判断,如果超时,调用_kill_process(self.process)完全终止。然后再thread.join()(这块看不太懂,为啥还要join一次)。self.returncode = self.process.returncode要把线程运行的returncode返回存到self.returncode中,run方法最终返回self.outself.err

接下来是一个ConnectedCommand

class ConnectedCommand(object):
    def __init__(self,
        process=None,
        std_in=None,
        std_out=None,
        std_err=None):

        self._process = process
        self.std_in = std_in
        self.std_out = std_out
        self.std_err = std_out
        self._status_code = None

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        self.kill()

    @property
    def status_code(self):
        """The status code of the process.
        If the code is None, assume that it's still running.
        """
        return self._status_code

    @property
    def pid(self):
        """The process' PID."""
        return self._process.pid

    def kill(self):
        """Kills the process."""
        return self._process.kill()

    def expect(self, bytes, stream=None):
        """Block until given bytes appear in the stream."""
        if stream is None:
            stream = self.std_out

    def send(self, str, end='\n'):
        """Sends a line to std_in."""
        return self._process.stdin.write(str+end)

    def block(self):
        """Blocks until command finishes. Returns Response instance."""
        self._status_code = self._process.wait()

同样还是先__init__初始化一些变量赋值。

def __enter__(self):
    return self

__enter__返回实例本身。

def __exit__(self, type, value, traceback):
    self.kill()

__exit__杀掉进程方法。这里用到了类本身的kill方法。

def kill(self):
    """Kills the process."""
    return self._process.kill()

然后是一个返回状态码的方法

@property
def status_code(self):
    """The status code of the process.
    If the code is None, assume that it's still running.
    """
    return self._status_code

这方法就是直接返回self._status_code,这没什么的,如果这个值是None,说明线程还在运行。主要是来看看这个@property到底是做什么的。看文档说property(fget=None, fset=None, fdel=None, doc=None)是一个类,作用是返回属性。fget是用于返回属性值;fset用于设置属性值;fdel用于删除属性值;doc是属性值的docstring。官网的例子

class C:
    def __init__(self):
        self._x = None

    def getx(self):
        return self._x

    def setx(self, value):
        self._x = value

    def delx(self):
        del self._x

    x = property(getx, setx, delx, "I'm the 'x' property.")

如果c = C();,那么c.x会调用getx函数;如果c.x = value,那么会调用setx函数赋值;如果del c.x会调用delx删除。doc参数现在是"I'm the 'x' property.",那么属性的docstring就是"I'm the 'x' property.",如果doc未定义,那么属性的docstring默认是fget方法的docstring

上面是Property类所做的事情,如果在代码中用@property又会怎么样呢。再来看一个例子

class Parrot:
    def __init__(self):
        self._voltage = 100000

    @property
    def voltage(self):
        """Get the current voltage."""
        return self._voltage

加了@property(中文应该叫装饰器)以后,相当于把voltage函数变成了getter,读取属性值。最典型的一个例子如下

class C:
    def __init__(self):
        self._x = None

    @property
    def x(self):
        """I'm the 'x' property."""
        return self._x

    @x.setter
    def x(self, value):
        self._x = value

    @x.deleter
    def x(self):
        del self._x

这个例子和前面的Class C例子是完全一样的,但这个写比较pythonic。这里有关于装饰器的更详细的资料

回到之前的代码就很容易知道了,两个@property一个是为了获取状态码_status_code,一个是为了获取进程的pid码_process.pid

@property
def status_code(self):
    """The status code of the process.
    If the code is None, assume that it's still running.
    """
    return self._status_code

@property
def pid(self):
    """The process' PID."""
    return self._process.pid

往下看后三个函数

def expect(self, bytes, stream=None):
    """Block until given bytes appear in the stream."""
    if stream is None:
        stream = self.std_out

def send(self, str, end='\n'):
    """Sends a line to std_in."""
    return self._process.stdin.write(str+end)

def block(self):
    """Blocks until command finishes. Returns Response instance."""
    self._status_code = self._process.wait()

根据expect函数的docstring说法是阻塞直到给定字节出现在流中,这么看不太懂,等看到哪些地方调用再回头看吧。函数里判断stream,如果是None,把self.std_out赋值给stream
send函数作用是发送一行信息到std_in,参数是要发送的信息strend=\n换行符。这个方法的实现是直接返回self._process.stdin.write(str+end)str+end保证了发生一行信息。
block函数阻塞知道命令行运行结束,然后返回Response实例。直接调用进程的wait()方法返回状态码。

往下就是Response类了

class Response(object):
    """A command's response"""

    def __init__(self, process=None):
        super(Response, self).__init__()

        self._process = process
        self.command = None
        self.std_err = None
        self.std_out = None
        self.status_code = None
        self.history = []


    def __repr__(self):
        if len(self.command):
            return '<Response [{0}]>'.format(self.command[0])
        else:
            return '<Response>'

这个类就是运行命令行的应答。__init__先初始化各种参数,主要传入的是process,把它存到_process中。然后定义__repr__,如果command不为None,则输出第一个command,比如返回Response ls这样的。否则只返回Response

往下是expand_args

def expand_args(command):
    """Parses command strings and returns a Popen-ready list."""

    # Prepare arguments.
    if isinstance(command, (str, unicode)):
        splitter = shlex.shlex(command.encode('utf-8'))
        splitter.whitespace = '|'
        splitter.whitespace_split = True
        command = []

        while True:
            token = splitter.get_token()
            if token:
                command.append(token)
            else:
                break

        command = list(map(shlex.split, command))

    return command

这个expand_args就是负责解析命令行字符,返回Popen-ready列表。isinstance函数就是判断前者是不是后者的实例,或者是后者的直接或者间接的子类型。具体到当前的函数就是判断command是不是str或者unicode的子类型。官方文档shlex.shlex(instream=None, infile=None, posix=False, punctuation_chars=False)有四个参数,instream指从哪里读取参数,类型必须是a file-/stream-like object with read() and readline() methods(感觉真的是英文说的很直白,翻译成中文倒是很别扭,所以还是放英文吧)这样的文档流或者是字符串。如果该参数为空,默认会从sys.stdin获取;第二个参数infile是一个文件名字符串,这个在调试的时候方便区分。如果第一个参数instream为空,infile默认值就是"stdin"posix标识操作系统类型,默认指定为False,表明是兼容模式;如果该值为POSIX,则表明是类unix操作系统模式,将按照类unix的解析规则对instream进行解析。punctuation_chars参数的解释The punctuation_chars argument provides a way to make the behaviour even closer to how real shells parse,大约说的是这个参数提供了一个使解析的行为方式更靠近真实的shells parse的方法。它的默认值是False,操作的规则就是在python3.5以前的规则,因为这个参数是在python3.6加的,只有在3.6以后才有意义,如果该值位True,那么在解析中关于();<>|&的解析会有不同,那是怎么样一种规则呢,来看例子

# fxm @ bogon in ~/work/robot-admin on git:master x [12:12:02]
$ python
Python 3.6.1 (default, Jun  1 2017, 10:34:26)
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.42.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import shlex
>>> text = "a && b; c && d || e; f >'abc'; (def \"ghi\")"
>>> list(shlex.shlex(text))
['a', '&', '&', 'b', ';', 'c', '&', '&', 'd', '|', '|', 'e', ';', 'f', '>', "'abc'", ';', '(', 'def', '"ghi"', ')']
>>> list(shlex.shlex(text, punctuation_chars=True))
['a', '&&', 'b', ';', 'c', '&&', 'd', '||', 'e', ';', 'f', '>', "'abc'", ';', '(', 'def', '"ghi"', ')']

对比很明显,看起来punctuation_chars=True更符合我们所需要的解析效果。
除此之外,这个参数还可以取字符串作为参数,主要就是特指();<>|&中的一部分来解析。如仅仅指定"|"和仅仅指定"&"的例子如下

>>> list(shlex.shlex(text, punctuation_chars="|"))
['a', '&', '&', 'b', ';', 'c', '&', '&', 'd', '||', 'e', ';', 'f', '>', "'abc'", ';', '(', 'def', '"ghi"', ')']
>>> list(shlex.shlex(text, punctuation_chars="&"))
['a', '&&', 'b', ';', 'c', '&&', 'd', '|', '|', 'e', ';', 'f', '>', "'abc'", ';', '(', 'def', '"ghi"', ')']

对比明显,一眼就能明白,所以有些时候文档看得不太明白,但是看了例子就很容易明白。
另外一点要注意的是由于punctuation_chars最好的情况是结合posix=True来使用。

然后是shlex.whitespace,官方文档的解释是

Characters that will be considered whitespace and skipped. Whitespace bounds tokens. By default, includes space, tab, linefeed and carriage-return.

这里说的意思是把whitespace指定的当做空格处理,默认的有空格、tab、换行符和回车符。回到代码里splitter.whitespace = '|'就是把'|'当做空格一样来处理的意思,等会讲完whitespace_split会给例子。

然后就是splitter.whitespace_split = True这行了。官方文档关于shlex.whitespace_split的解释是

If True, tokens will only be split in whitespaces. This is useful, for example, for parsing command lines with shlex, getting tokens in a similar way to shell arguments. If this attribute is True, punctuation_chars will have no effect, and splitting will happen only on whitespaces. When using punctuation_chars, which is intended to provide parsing closer to that implemented by shells, it is advisable to leave whitespace_split as False (the default value).

取值为True的时候,仅仅根据whitespaces来切分,也就是前面shlex.whitespace指定的值,如果没有指定,那就是默认空格,tab等等,而且此时的punctuation_chars将不起作用。上个例子看看

# 默认情况下
~/source/backend/python/envoy master
❯ ipython
Python 3.5.2 (default, Nov 17 2016, 17:05:23) 
Type "copyright", "credits" or "license" for more information.

IPython 5.1.0 -- An enhanced Interactive Python.
?         -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help      -> Python's own help system.
object?   -> Details about 'object', use 'object??' for extra details.

In [1]: import shlex

In [2]: command = "cat | tr [:lower:] [:upper:]"

In [3]: splitter = shlex.shlex(command)

In [4]: splitter.get_token()
Out[4]: 'cat'

In [5]: splitter.get_token()
Out[5]: '|'

In [6]: splitter.get_token()
Out[6]: 'tr'

# 加了whitespace和whitespace_split的情况下
~/source/backend/python/envoy master 4m 34s
❯ ipython
Python 3.5.2 (default, Nov 17 2016, 17:05:23) 
Type "copyright", "credits" or "license" for more information.

IPython 5.1.0 -- An enhanced Interactive Python.
?         -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help      -> Python's own help system.
object?   -> Details about 'object', use 'object??' for extra details.

In [1]: import shlex

In [2]: command = "cat | tr [:lower:] [:upper:]"

In [3]: splitter = shlex.shlex(command)

In [4]: splitter.whitespace = '|'

In [5]: splitter.whitespace_split = True

In [6]: splitter.get_token()
Out[6]: 'cat '

In [7]: splitter.get_token()
Out[7]: ' tr [:lower:] [:upper:]'

没加whitespacewhitespace_split的时候,是以空格为默认切分的,加了whitespace '|'whitespace_split = True之后就是以'|'为切分依据的。看了例子就很明显了。例子里已经顺便演示了一下shlex.push_token()的作用,再回到代码中的话,就是做一个while循环,拿到切分的结果存到token中,如果token存在,就把它附加在command列表中。否则结束while循环。

 while True:
     token = splitter.get_token()
     if token:
         command.append(token)
     else:
         break

然后是command = list(map(shlex.split, command)),这里是一个map的方法,它的作用就是第一个参数的方法shlex.split分别运用于第二个参数的command列表的每个元素,然后将结果返回一个新的列表。这里返回的结果再用list()方法进行操作后变成了列表的列表覆盖掉原先的command列表。最后的最后return command把command列表返回,作为expand_args函数的返回结果。

然后是run函数

def run(command, data=None, timeout=None, kill_timeout=None, env=None, cwd=None):
    """Executes a given command and returns Response.

    Blocks until process is complete, or timeout is reached.
    """

    command = expand_args(command)

    history = []
    for c in command:

        if len(history):
            # due to broken pipe problems pass only first 10 KiB
            data = history[-1].std_out[0:10*1024]

        cmd = Command(c)
        try:
            out, err = cmd.run(data, timeout, kill_timeout, env, cwd)
            status_code = cmd.returncode
        except OSError as e:
            out, err = '', u"\n".join([e.strerror, traceback.format_exc()])
            status_code = 127

        r = Response(process=cmd)

        r.command = c
        r.std_out = out
        r.std_err = err
        r.status_code = status_code

        history.append(r)

    r = history.pop()
    r.history = history

    return r

run函数执行提供的命令行,阻塞直到进程结束或者是过了超时时间,最后返回Response类。首先调用expand_args(command)把待执行的命令行解析成想要的列表存到command中;接着创建一个history空列表;接着循环command列表,如果history列表长度大于0,把history列表最后一个元素的std_out属性值的[0:10*1024](指的是命令行管道一次最多只能有10kb的限额),也就是std_out小于等于10kb,将完全存到data中,否则截取前10kb的部分存到data中;然后,以循环项c为参数实例化Command类存到cmd中;接着cmd.run(data, timeout, kill_timeout, env, cwd)datatimeoutkill_timeoutenvcwd为参数执行Command类的run方法,把结果存到outerr中。status_code = cmd.returncode同样把cmdreturncode存到status_code中。如果执行run方法的过程中有异常,out就是空字符串,err就是用换行符join异常的strerrortraceback.format_exc()获取的堆栈异常信息;接着以cmd为参数实例化一个Response类存到r中,并把cmd.run的运行结果存到r.std_outr.std_err等中;然后把r附加到history列表中。这个其实很好理解,history就是历史命令行列表,for循环的时候把所有的命令的运行结果r都附加到列表中;for循环结束后r = history.pop()拿出history列表中的最后一项作为当前的项,r.history = history把拿掉最后一项的列表作为最后一项的history属性值。最后把r返回作为run函数的结果。这个很好理解了吧,举个例子

history = [x, y, z]
r = history.pop() # r = z
r.history = history # z.history = [x, y]

很明显了。当然这个x, y, z代表的是一个个的命令行运行结果的Response对象的实例化结果。

最后还有一个connect函数没看

def connect(command, data=None, env=None, cwd=None):
    """Spawns a new process from the given command."""

    # TODO: support piped commands
    command_str = expand_args(command).pop()
    environ = dict(os.environ)
    environ.update(env or {})

    process = subprocess.Popen(command_str,
        universal_newlines=True,
        shell=False,
        env=environ,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        bufsize=0,
        cwd=cwd,
    )

    return ConnectedCommand(process=process)

connect函数从给定的命令产生一个新的进程,首先调用command_str = expand_args(command).pop()解析命令command生成的列表取最后一项存到command_str中;然后拿到系统的环境后在根据传入的环境env取更新系统环境;这之前有类似的了。然后把command_str作为subprocess.Popen的参数产生一个进程存到process中,最后调用ConnectedCommand(process=process)生成ConnectedCommand类的一个实例并返回它。

到这里为止,core.py算是讲完了;在envoy子文件夹里还有一个和core.py同级的__.init__.py

from .core import Command, ConnectedCommand, Response
from .core import expand_args, run, connect

from .core import __version__

这个文件做的是一些初始化的工作,比如这里导出了core.py文件里的多个方法以及库的版本号__version__,一般而言这个文件也可以什么内容都不写。

测试

代码写完了,肯定得要测试一翻才能知道它是否能够工作。 先上三个简单测试吧,在envoy项目根目录下建一个test.py,代码如下

~/source/backend/python/envoy master*
❯ cat test.py 
#!/usr/bin/env python
# encoding: utf-8

import unittest
import envoy
import time

class SimpleTest(unittest.TestCase):

    def testInput(self):
        r = envoy.run("sed s/i/I/g", "Hi")
        self.assertEqual(r.std_out, "HI")
        self.assertEqual(r.status_code, 0)

    def testPipe(self):
        r = envoy.run("echo -n 'hi'| tr [:lower:] [:upper:]")
        self.assertEqual(r.std_out, "HI")
        self.assertEqual(r.status_code, 0)

    def testTimeout(self):
        r = envoy.run("yes | head", timeout=1)
        self.assertEqual(r.std_out, "y\ny\ny\ny\ny\ny\ny\ny\ny\ny\n")
        self.assertEqual(r.status_code, 0)
        
if __name__ == '__main__':
    unittest.main()

有三个测试用例testInputtestPipetestTimeout,现在来执行代码。

~/source/backend/python/envoy master*
❯ python2 test.py 
...
----------------------------------------------------------------------
Ran 3 tests in 2.880s

OK

这三个测试通过了,当然用的是python2.7,如果用python3会报unicode不存在

~/source/backend/python/envoy master*
❯ python3 test.py 
EEE
======================================================================
ERROR: testInput (__main__.SimpleTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "test.py", line 11, in testInput
    r = envoy.run("sed s/i/I/g", "Hi")
  File "/home/mao/source/backend/python/envoy/envoy/core.py", line 205, in run
    command = expand_args(command)
  File "/home/mao/source/backend/python/envoy/envoy/core.py", line 181, in expand_args
    if isinstance(command, (str, unicode)):
NameError: name 'unicode' is not defined

======================================================================
ERROR: testPipe (__main__.SimpleTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "test.py", line 16, in testPipe
    r = envoy.run("echo -n 'hi'| tr [:lower:] [:upper:]")
  File "/home/mao/source/backend/python/envoy/envoy/core.py", line 205, in run
    command = expand_args(command)
  File "/home/mao/source/backend/python/envoy/envoy/core.py", line 181, in expand_args
    if isinstance(command, (str, unicode)):
NameError: name 'unicode' is not defined

======================================================================
ERROR: testTimeout (__main__.SimpleTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "test.py", line 21, in testTimeout
    r = envoy.run("yes | head", timeout=1)
  File "/home/mao/source/backend/python/envoy/envoy/core.py", line 205, in run
    command = expand_args(command)
  File "/home/mao/source/backend/python/envoy/envoy/core.py", line 181, in expand_args
    if isinstance(command, (str, unicode)):
NameError: name 'unicode' is not defined

----------------------------------------------------------------------
Ran 3 tests in 0.001s

FAILED (errors=3)

现在这个代码是不兼容python3的,因为作者已经不维护这个代码了,库的介绍里也给了一个新的链接,推荐使用delegator这个库了,当然了学习的意义依然存在。

下一个测试

~/source/backend/python/envoy master*
❯ cat test.py 
#!/usr/bin/env python
# encoding: utf-8

import unittest
import envoy
import time

class SimpleTest(unittest.TestCase):

    # THIS TEST FAILS BECAUSE expand_args DOESN'T HANDLE QUOTES PROPERLY
    def test_quoted_args(self):
        sentinel = 'quoted_args' * 3
        r = envoy.run("python -c 'print \"%s\"'" % sentinel)
        self.assertEqual(r.std_out.rstrip(), sentinel)
        self.assertEqual(r.status_code, 0)

    def test_non_existing_command(self):
        r = envoy.run("blah")
        self.assertEqual(r.status_code, 127)

if __name__ == '__main__':
    unittest.main()

执行测试,查看测试结果

~/source/backend/python/envoy master*
❯ python2 test.py
..
----------------------------------------------------------------------
Ran 2 tests in 0.012s

OK

测试通过,test_non_existing_command这个测试测的就是一个不存在的命令行blah,如果成功,返回的状态码status_code应该是0,现在返回的状态码等于127,说明这个blah命令行没有正确执行。

再来一个测试

~/source/backend/python/envoy master*
❯ cat test.py 
#!/usr/bin/env python
# encoding: utf-8

import unittest
import envoy
import time

class ConnectedCommandTests(unittest.TestCase):

    def test_status_code_none(self):
        c = envoy.connect("sleep 5")
        self.assertEqual(c.status_code, None)

    def test_status_code_success(self):
        c = envoy.connect("sleep 1")
        time.sleep(2)
        self.assertEqual(c.status_code, 0)

    def test_status_code_failure(self):
        c = envoy.connect("sleep 1")
        self.assertEqual(c.status_code, 127)

    def test_input(self):
        test_string = 'asdfQWER'
        r = envoy.connect("cat | tr [:lower:] [:upper:]")
        r.send(test_string)
        self.assertEqual(r.std_out, test_string.upper())
        self.assertEqual(r.status_code, 0)

if __name__ == '__main__':
    unittest.main()

执行测试看结果

~/source/backend/python/envoy master*
❯ python2 test.py 
FF.F
======================================================================
FAIL: test_input (__main__.ConnectedCommandTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "test.py", line 27, in test_input
    self.assertEqual(r.std_out, test_string.upper())
AssertionError: None != 'ASDFQWER'

======================================================================
FAIL: test_status_code_failure (__main__.ConnectedCommandTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "test.py", line 21, in test_status_code_failure
    self.assertEqual(c.status_code, 127)
AssertionError: None != 127

======================================================================
FAIL: test_status_code_success (__main__.ConnectedCommandTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "test.py", line 17, in test_status_code_success
    self.assertEqual(c.status_code, 0)
AssertionError: None != 0

----------------------------------------------------------------------
Ran 4 tests in 2.009s

FAILED (failures=3)

四个小测试,有一个通过,三个没通过,那还是得具体看看调试信息。在开头导入logging模块,并配置它的levelDEBUG在,然后在connect函数和ConnectedCommand分别加上如下的调试信息

# 开头导入logging
import logging
logging.basicConfig(level=logging.DEBUG)

# connect函数调试
logging.debug("command: %s" % command_str)


# ConnectedCommand函数调试
@property
    def status_code(self):
        """The status code of the process.
        If the code is None, assume that it's still running.
        """
        logging.debug("status_code: %s" % self._status_code) # 新加调试部分
        return self._status_code

def expect(self, bytes, stream=None):
        """Block until given bytes appear in the stream."""
        logging.debug("stream: %s" % stream) # 新加调试部分
        logging.debug("stream: %s" % self.std_out) # 新加调试部分
        if stream is None:
            stream = self.std_out

    def send(self, str, end='\n'):
        """Sends a line to std_in."""
        logging.debug("str: %s" % (str+end)) # 新加调试部分
        return self._process.stdin.write(str+end)

    def block(self):
        """Blocks until command finishes. Returns Response instance."""
        logging.debug("_status_code: %s" % self._process.wait()) # 新加调试部分
        self._status_code = self._process.wait()

为了看得更清楚,四个小测试单个进行测试

第一个测试

# 测试用例
def test_status_code_none(self):
    c = envoy.connect("sleep 5")
    self.assertEqual(c.status_code, None)

# 测试结果
# fxm @ bogon in ~/source/backend/python/envoy_source on git:master x [10:03:14] C:1
$ python2 test.py
DEBUG:root:command: ['sleep', '5']
DEBUG:root:status_code: None
.
----------------------------------------------------------------------
Ran 1 test in 0.036s

OK

测了一个休眠5秒钟的,什么也没执行,返回None,测试通过。

第二个测试

# 测试用例
def test_status_code_success(self):
    c = envoy.connect("sleep 1")
    time.sleep(2)
    self.assertEqual(c.status_code, 0)

# 测试结果
# fxm @ bogon in ~/source/backend/python/envoy_source on git:master x [10:12:24]
$ python2 test.py
DEBUG:root:command: ['sleep', '1']
DEBUG:root:status_code: None
F
======================================================================
FAIL: test_status_code_success (__main__.ConnectedCommandTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "test.py", line 17, in test_status_code_success
    self.assertEqual(c.status_code, 0)
AssertionError: None != 0

----------------------------------------------------------------------
Ran 1 test in 2.042s

FAILED (failures=1)

测试的是休眠1秒,然后time.sleep(2)休眠两秒,返回的status_codeNone而不是0。好吧,结果就是这样,我的测试用例都是从作者的库里直接拿过来的,可能当时作者测试是通过的。可能后来python2的shlex库做了修改了,要让测试通过的话,把status_code改成None就行了。。。现在直接从github拉取的作者代码库做测试也是一样的结果,作者已经有一年半时间不维护了。

第三个测试

# 测试用例
def test_status_code_failure(self):
    c = envoy.connect("sleep 1")
    self.assertEqual(c.status_code, 127)

# 测试结果
# fxm @ bogon in ~/source/backend/python/envoy_source on git:master x [10:26:44] C:1
$ python2.7 test.py
DEBUG:root:command: ['sleep', '1']
DEBUG:root:status_code: None
F
======================================================================
FAIL: test_status_code_failure (__main__.ConnectedCommandTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "test.py", line 21, in test_status_code_failure
    self.assertEqual(c.status_code, 127)
AssertionError: None != 127

----------------------------------------------------------------------
Ran 1 test in 0.004s

FAILED (failures=1)

这个一样,也是返回None,预设是127当然通不过。这个测试用例我改了,作者的原测试是c = envoy.connect("sleeep 1")sleep里故意多了一个e,但是测试时报的是这个错

# 测试用例
def test_status_code_failure(self):
    c = envoy.connect("sleeep 1")
    self.assertEqual(c.status_code, 127)

# 测试结果
# fxm @ bogon in ~/source/backend/python/envoy_source on git:master x [10:29:54] C:1
$ python2.7 test.py
DEBUG:root:command: ['sleeep', '1']
E
======================================================================
ERROR: test_status_code_failure (__main__.ConnectedCommandTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "test.py", line 20, in test_status_code_failure
    c = envoy.connect("sleeep 1")
  File "/Users/fxm/source/backend/python/envoy_source/envoy/core.py", line 261, in connect
    cwd=cwd,
  File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/subprocess.py", line 390, in __init__
    errread, errwrite)
  File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/subprocess.py", line 1024, in _execute_child
    raise child_exception
OSError: [Errno 2] No such file or directory

----------------------------------------------------------------------
Ran 1 test in 0.030s

FAILED (errors=1)

就先不深究了,关键还是学习他的代码怎么写

第四个测试

# 测试用例
def test_input(self):
    test_string = 'asdfQWER'
    r = envoy.connect("cat | tr [:lower:] [:upper:]")
    r.send(test_string)
    self.assertEqual(r.std_out, test_string.upper())
    self.assertEqual(r.status_code, 0)

# 测试结果
# fxm @ bogon in ~/source/backend/python/envoy_source on git:master x [10:33:24] C:1
$ python2.7 test.py
DEBUG:root:command: ['tr', '[:lower:]', '[:upper:]']
DEBUG:root:str: asdfQWER

F
======================================================================
FAIL: test_input (__main__.ConnectedCommandTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "test.py", line 27, in test_input
    self.assertEqual(r.std_out, test_string.upper())
AssertionError: None != 'ASDFQWER'

----------------------------------------------------------------------
Ran 1 test in 0.038s

FAILED (failures=1)

envoy.connect("cat | tr [:lower:] [:upper:]")这个命令解析后拿的是后半部分,在linux命令行里tr [:lower:] [:upper:]作用是把输入流的字符串小写改成大写。r.send(test_string)里做的就是把test_string = 'asdfQWER'作为输入流的字符串,目的是把'asdfQWER'全部变成大写'ASDFQWER',然而测试的结果并没有,结果的r.std_outNone

总结

这个库到此就算是学习结束了,有几点需要总结一下

  1. python2和python3的差异较大,尤其在编码方面的处理很不同,据说python2这方面的坑太多了,所以写代码的时候要考虑兼容性问题,而且也还有考虑兼容多系统的问题。
  2. threading这个库对于初学者来说还是有点深,进程线程我还是傻傻分不清楚,并发怎么实现也不清楚,所以在这篇文章里写的关于threading可能不正确,只是记录学习过程。回头如果理解到位再修改好了。进程线程并发这方面还得多看代码多写代码才能理解。
  3. shlex这个库相当强大,有时间一定要研究一下这个库,这个库可以做很大跟命令行有关的有意思的事情。
  4. 目前为止,我已经算是把两个比较简单的关于python的源码学习了;接下去要研究两个中型的python源码库,跟爬虫相结合的话,我想requests这个库是绕不过去的,所以下一个就是它了。当然,如果精力允许我还想研究一下scrapy专业的爬虫库。

hongmaoxiao avatar Dec 23 '17 23:12 hongmaoxiao