Gevent 拾遗

2017 年 11 月 2 日 Python开发者

(点击上方蓝字,快速关注我们)


来源:xybaby 

www.cnblogs.com/xybaby/p/6394188.html

如有好文章投稿,请点击 → 这里了解详情


在前文《Gevent 调度流程解析》已经介绍过了gevent的调度流程,本文介绍gevent一些重要的模块,包括Timeout,Event\AsynResult, Semphore, socket patch,这些模块都涉及当前协程与hub的切换。本文分析的gevent版本为1.2。


Timeout


这个类在gevent.timeout模块,其作用是超时后在当前协程抛出异常,这样执行流程也强制回到了当前协程。看一个简单的例子:


SLEEP = 6

TIMEOUT = 5

 

timeout = Timeout(TIMEOUT)

timeout.start()

 

def wait():

    gevent.sleep(SLEEP)

    print('log in wait')

 

begin = time.time()

try:

    gevent.spawn(wait).join()

except Timeout:

    print('after %s catch Timeout Exception' % (time.time() - begin))

finally:    

    timeout.cancel()


输出为:after 5.00100016594 catch Timeout Exception。可以看出,在5s之后在main协程抛出了Timeout异常(继承自BaseException)。Timeout的实现很简单,核心在start函数:


def start(self):

        """Schedule the timeout."""

        assert not self.pending, '%r is already started; to restart it, cancel it first' % self

        if self.seconds is None:  # "fake" timeout (never expires)

            return

 

        if self.exception is None or self.exception is False or isinstance(self.exception, string_types):

            # timeout that raises self

            self.timer.start(getcurrent().throw, self)

        else:  # regular timeout with user-provided exception

            self.timer.start(getcurrent().throw, self.exception)


从源码可以看到,在超时之后调用了getcurrent().throw(),throw方法会切换协程,并抛出异常(在上面的代码中默认抛出Timeout异常)。使用Timeout有两点需要注意:


第一:一定要记得在finally调用cancel,否则如果协程先于TIMEOUT时间恢复,之后还会抛出异常,例如下面的代码:


import gevent

from gevent import Timeout

 

SLEEP = 4

TIMEOUT = 5

 

timeout = Timeout(TIMEOUT)

timeout.start()

 

def wait():

    gevent.sleep(SLEEP)

    print('log in wait')

 

begin = time.time()

try:

    gevent.spawn(wait).join()

except Timeout:

    print('after %s catch Timeout Exception'  % (time.time() - begin))

# finally:    

#     timeout.cancel()

 

gevent.sleep(2)

print 'program will finish'


上述的代码运行会抛出Timeout异常,在这个例子中,协程先于超时恢复(SLEEP < TIMEOUT),且没有在finally中调用Timeout.cancel。最后的两行保证程序不要过早结束退出,那么在hub调度的时候会重新抛出异常。


由于Timeout实现了with协议(__enter__和__exit__方法),更好的写法是将TImeout写在with语句中,如下面的代码:


import gevent

from gevent import Timeout

 

SLEEP = 4

TIMEOUT = 5

 

 

def wait():

    gevent.sleep(SLEEP)

    print('log in wait')

 

with Timeout(TIMEOUT):

    begin = time.time()

    try:

        gevent.spawn(wait).join()

    except Timeout:

        print('after %s catch Timeout Exception'  % (time.time() - begin))

 

gevent.sleep(2)

print 'program will finish'

 

Timeout with


第二:Timeout只是切换到当前协程,并不会取消已经注册的协程(上面通过spawn发起的协程),我们改改代码:


import gevent

from gevent import Timeout

 

SLEEP = 6

TIMEOUT = 5

 

timeout = Timeout(TIMEOUT)

timeout.start()

 

def wait():

    gevent.sleep(SLEEP)

    print('log in wait')

 

begin = time.time()

try:

    gevent.spawn(wait).join()

except Timeout:

    print('after %s catch Timeout Exception'  % (time.time() - begin))

finally:    

    timeout.cancel()

 

gevent.sleep(2)

print 'program will finish'

# output:

# after 5.00100016594 catch Timeout Exception

# log in wait

# program will finish


从输出可以看到,即使因为超时切回了main greenlet,但spawn发起的协程并不受影响。如果希望超时取消之前发起的协程,那么可以在捕获到异常之后调用 Greenlet.kill。


第三:gevent对可能导致当前协程挂起的函数都提供了timeout参数,用于在指定时间到达之后恢复被挂起的协程。在函数内部会捕获Timeout异常,并不会抛出。例如:


SLEEP = 6

TIMEOUT = 5

 

 

def wait():

    gevent.sleep(SLEEP)

    print('log in wait')

 

begin = time.time()

try:

    gevent.spawn(wait).join(TIMEOUT)

except Timeout:

    print('after %s catch Timeout Exception' % (time.time() - begin))

 

print 'program will exit', time.time() - begin


Event & AsyncResult:


Event用来在Greenlet之间同步,tutorial上的例子简单明了:


import gevent

from gevent.event import Event

 

'''

Illustrates the use of events

'''

 

 

evt = Event()

 

def setter():

    '''After 3 seconds, wake all threads waiting on the value of evt'''

    print('A: Hey wait for me, I have to do something')

    gevent.sleep(3)

    print("Ok, I'm done")

    evt.set()

 

 

def waiter():

    '''After 3 seconds the get call will unblock'''

    print("I'll wait for you")

    evt.wait()  # blocking

    print("It's about time")

 

def main():

    gevent.joinall([

        gevent.spawn(setter),

        gevent.spawn(waiter),

        gevent.spawn(waiter),

 

    ])

 

if __name__ == '__main__': main()


Event主要的两个方法是set和wait:wait等待事件发生,如果事件未发生那么挂起该协程;set通知事件发生,然后hub会唤醒所有wait在该事件的协程。从输出可知, 一次event触发可以唤醒所有在该event上等待的协程。AsyncResult同Event类似,只不过可以在协程唤醒的时候传值(有点类似generator的next send的区别)。接下来大致看看Event的set和wait方法。


Event.wait的核心代码在gevent.event._AbstractLinkable._wait_core,其中_AbstractLinkable是Event的基类。_wait_core源码如下:


def _wait_core(self, timeout, catch=Timeout):

        # The core of the wait implementation, handling

        # switching and linking. If *catch* is set to (),

        # a timeout that elapses will be allowed to be raised.

        # Returns a true value if the wait succeeded without timing out.

        switch = getcurrent().switch

        self.rawlink(switch)

        try:

            timer = Timeout._start_new_or_dummy(timeout)

            try:

                try:

                    result = self.hub.switch()

                    if result is not self: # pragma: no cover

                        raise InvalidSwitchError('Invalid switch into Event.wait(): %r' % (result, ))

                    return True

                except catch as ex:

                    if ex is not timer:

                        raise

                    # test_set_and_clear and test_timeout in test_threading

                    # rely on the exact return values, not just truthish-ness

                    return False

            finally:

                timer.cancel()

        finally:

            self.unlink(switch)


首先是将当前协程的switch加入到Event的callback列表,然后切换到hub。


接下来是set函数:


def set(self):

    self._flag = True # make event ready

    self._check_and_notify()


def _check_and_notify(self):

     # If this object is ready to be notified, begin the process.

     if self.ready():

        if self._links and not self._notifier:

            self._notifier = self.hub.loop.run_callback(self._notify_links)


_check_and_notify函数通知hub调用_notify_links, 在这个函数中将调用Event的callback列表(记录的是之前各个协程的switch函数),这样就恢复了所有wait的协程。


Semaphore & Lock


Semaphore是gevent提供的信号量,实例化为Semaphore(value), value代表了可以并发的量。当value为1,就变成了互斥锁(Lock)。Semaphore提供了两个函数,acquire(P操作)和release(V操作)。当acquire操作导致资源数量将为0之后,就会在当前协程wait,源代码如下(gevent._semaphore.Semaphore.acquire):


def acquire(self, blocking=True, timeout=None):

        

        if self.counter > 0:

            self.counter -= 1

            return True

 

        if not blocking:

            return False

 

        timeout = self._do_wait(timeout)

        if timeout is not None:

            # Our timer expired.

            return False

 

        # Neither our timer no another one expired, so we blocked until

        # awoke. Therefore, the counter is ours

        self.counter -= 1

        assert self.counter >= 0

        return True


逻辑比较简单,如果counter数量大于0,那么表示可并发。否则进入wait,_do_wait的实现与Event.wait十分类似,都是记录当前协程的switch,并切换到hub。当资源足够切换回到当前协程,此时counter一定是大于0的。由于协程的并发并不等同于线程的并发,在任意时刻,一个线程内只可能有一个协程在调度,所以上面对counter的操作也不用加锁。


Monkey-Patch


对于python这种动态语言,在运行时替换模块、类、实例的属性都是非常容易的。我们以patch_socket为例:


>>> import socket

>>> print(socket.socket)

<class 'gevent._socket2.socket'>

>>> from gevent import monkey

>>> monkey.patch_socket()

>>> print(socket.socket)

<class 'gevent._socket2.socket'>


可见在patch前后,同一个名字(socket)所指向的对象是不一样的。在python2.x环境下,patch后的socket源码在gevent._socket2.py,如果是python3.x,那么对应的源码在gevent._socket3.py.。至于为什么patch之后就让原生的socket操作可以在协程之间协作,看两个函数socket.__init__ 和 socket.recv就明白了。


__init__函数(gevent._socket2.socket.__init__):


def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, _sock=None):

        if _sock is None:

            self._sock = _realsocket(family, type, proto) # 原生的socket

            self.timeout = _socket.getdefaulttimeout()

        else:

            if hasattr(_sock, '_sock'):

                self._sock = _sock._sock

                self.timeout = getattr(_sock, 'timeout', False)

                if self.timeout is False:

                    self.timeout = _socket.getdefaulttimeout()

            else:

                self._sock = _sock

                self.timeout = _socket.getdefaulttimeout()

            if PYPY:

                self._sock._reuse()

        self._sock.setblocking(0) #设置成非阻塞

        fileno = self._sock.fileno()

        self.hub = get_hub()    # hub

        io = self.hub.loop.io

        self._read_event = io(fileno, 1) # 监听事件

        self._write_event = io(fileno, 2)


从init函数可以看到,patch后的socket还是会维护原生的socket对象,并且将原生的socket设置成非阻塞(line16),当一个socket是非阻塞时,如果读写数据没有准备好,那么会抛出EWOULDBLOCK\EAGIN异常。最后两行注册socket的可读和可写事件。再来看看recv函数(gevent._socket2.socket.recv):


def recv(self, *args):

        sock = self._sock  # keeping the reference so that fd is not closed during waiting

        while True:

            try:

                return sock.recv(*args) # 如果数据准备好了,直接返回

            except error as ex:

                if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:

                    raise

                # QQQ without clearing exc_info test__refcount.test_clean_exit fails

                sys.exc_clear()

            self._wait(self._read_event) # 等待数据可读的watcher


如果在while循环中读到了数据,那么直接返回。但实际很大概率数据并没有准备好,对于非阻塞socket,抛出EWOULDBLOCK异常(line7)。在第11行,调用wait,注册当前协程switch,并切换到hub,当read_event触发时,表示socket可读,这个时候就会切回当前协程,进入下一次while循环。


参考


  • http://sdiehl.github.io/gevent-tutorial/

  • http://www.cnblogs.com/xybaby/p/6370799.html


看完本文有收获?请转发分享给更多人

关注「Python开发者」,提升Python技能

登录查看更多
0

相关内容

Socket 是一种进程间通信机制,提供一种供应用程序访问通信协议的操作系统调用,并且通过将 Socket 与 Unix 系统文件描述符相整合,使得网络读写数据(或者服务调用)和读写本地文件一样容易。 参考: zhihu.com/question/2138
MIT新书《强化学习与最优控制》
专知会员服务
279+阅读 · 2019年10月9日
Pupy – 全平台远程控制工具
黑白之道
43+阅读 · 2019年4月26日
实战分享之专业领域词汇无监督挖掘
PaperWeekly
15+阅读 · 2019年4月16日
《小美好》短评文本情感分析+生成词云
数据挖掘入门与实战
5+阅读 · 2018年1月7日
[编程经验] CVPR2017论文全集下载代码脚本分享
机器学习和数学
9+阅读 · 2017年7月27日
Arxiv
5+阅读 · 2019年2月28日
Arxiv
4+阅读 · 2018年10月5日
VIP会员
相关VIP内容
MIT新书《强化学习与最优控制》
专知会员服务
279+阅读 · 2019年10月9日
相关资讯
Pupy – 全平台远程控制工具
黑白之道
43+阅读 · 2019年4月26日
实战分享之专业领域词汇无监督挖掘
PaperWeekly
15+阅读 · 2019年4月16日
《小美好》短评文本情感分析+生成词云
数据挖掘入门与实战
5+阅读 · 2018年1月7日
[编程经验] CVPR2017论文全集下载代码脚本分享
机器学习和数学
9+阅读 · 2017年7月27日
Top
微信扫码咨询专知VIP会员